/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdc_service.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include "yb/cdc/cdc_service.h" |
14 | | |
15 | | #include <chrono> |
16 | | #include <memory> |
17 | | |
18 | | #include <boost/multi_index/hashed_index.hpp> |
19 | | #include <boost/multi_index/mem_fun.hpp> |
20 | | #include <boost/multi_index/member.hpp> |
21 | | #include <boost/multi_index_container.hpp> |
22 | | |
23 | | #include "yb/cdc/cdc_producer.h" |
24 | | #include "yb/cdc/cdc_rpc.h" |
25 | | #include "yb/cdc/cdc_service.proxy.h" |
26 | | |
27 | | #include "yb/client/client.h" |
28 | | #include "yb/client/meta_cache.h" |
29 | | #include "yb/client/schema.h" |
30 | | #include "yb/client/session.h" |
31 | | #include "yb/client/table.h" |
32 | | #include "yb/client/table_alterer.h" |
33 | | #include "yb/client/table_handle.h" |
34 | | #include "yb/client/yb_op.h" |
35 | | #include "yb/client/yb_table_name.h" |
36 | | |
37 | | #include "yb/common/entity_ids.h" |
38 | | #include "yb/common/pg_system_attr.h" |
39 | | #include "yb/common/ql_expr.h" |
40 | | #include "yb/common/ql_value.h" |
41 | | #include "yb/common/schema.h" |
42 | | #include "yb/common/wire_protocol.h" |
43 | | |
44 | | #include "yb/consensus/log.h" |
45 | | #include "yb/consensus/raft_consensus.h" |
46 | | #include "yb/consensus/replicate_msgs_holder.h" |
47 | | |
48 | | #include "yb/gutil/dynamic_annotations.h" |
49 | | #include "yb/gutil/strings/join.h" |
50 | | |
51 | | #include "yb/master/master_client.pb.h" |
52 | | #include "yb/master/master_ddl.pb.h" |
53 | | #include "yb/master/master_defaults.h" |
54 | | |
55 | | #include "yb/rpc/rpc_context.h" |
56 | | #include "yb/rpc/rpc_controller.h" |
57 | | |
58 | | #include "yb/tablet/tablet_metadata.h" |
59 | | #include "yb/tablet/tablet_peer.h" |
60 | | #include "yb/tablet/transaction_participant.h" |
61 | | |
62 | | #include "yb/tserver/tablet_server.h" |
63 | | #include "yb/tserver/ts_tablet_manager.h" |
64 | | |
65 | | #include "yb/util/debug/trace_event.h" |
66 | | #include "yb/util/flag_tags.h" |
67 | | #include "yb/util/format.h" |
68 | | #include "yb/util/logging.h" |
69 | | #include "yb/util/metrics.h" |
70 | | #include "yb/util/monotime.h" |
71 | | #include "yb/util/scope_exit.h" |
72 | | #include "yb/util/shared_lock.h" |
73 | | #include "yb/util/status_format.h" |
74 | | #include "yb/util/status_log.h" |
75 | | #include "yb/util/trace.h" |
76 | | |
77 | | #include "yb/yql/cql/ql/util/statement_result.h" |
78 | | |
79 | | constexpr uint32_t kUpdateIntervalMs = 15 * 1000; |
80 | | |
81 | | DEFINE_int32(cdc_read_rpc_timeout_ms, 30 * 1000, |
82 | | "Timeout used for CDC read rpc calls. Reads normally occur cross-cluster."); |
83 | | TAG_FLAG(cdc_read_rpc_timeout_ms, advanced); |
84 | | |
85 | | DEFINE_int32(cdc_write_rpc_timeout_ms, 30 * 1000, |
86 | | "Timeout used for CDC write rpc calls. Writes normally occur intra-cluster."); |
87 | | TAG_FLAG(cdc_write_rpc_timeout_ms, advanced); |
88 | | |
89 | | DEFINE_int32(cdc_ybclient_reactor_threads, 50, |
90 | | "The number of reactor threads to be used for processing ybclient " |
91 | | "requests for CDC."); |
92 | | TAG_FLAG(cdc_ybclient_reactor_threads, advanced); |
93 | | |
94 | | DEFINE_int32(cdc_state_checkpoint_update_interval_ms, kUpdateIntervalMs, |
95 | | "Rate at which CDC state's checkpoint is updated."); |
96 | | |
97 | | DEFINE_string(certs_for_cdc_dir, "", |
98 | | "The parent directory of where all certificates for xCluster producer universes will " |
99 | | "be stored, for when the producer and consumer clusters use different certificates. " |
100 | | "Place the certificates for each producer cluster in " |
101 | | "<certs_for_cdc_dir>/<producer_cluster_id>/*."); |
102 | | |
103 | | DEFINE_int32(update_min_cdc_indices_interval_secs, 60, |
104 | | "How often to read cdc_state table to get the minimum applied index for each tablet " |
105 | | "across all streams. This information is used to correctly keep log files that " |
106 | | "contain unapplied entries. This is also the rate at which a tablet's minimum " |
107 | | "replicated index across all streams is sent to the other peers in the configuration. " |
108 | | "If flag enable_log_retention_by_op_idx is disabled, this flag has no effect."); |
109 | | |
110 | | DEFINE_int32(update_metrics_interval_ms, kUpdateIntervalMs, |
111 | | "How often to update xDC cluster metrics."); |
112 | | |
113 | | DEFINE_bool(enable_cdc_state_table_caching, true, "Enable caching the cdc_state table schema."); |
114 | | |
115 | | DEFINE_bool(enable_collect_cdc_metrics, true, "Enable collecting cdc metrics."); |
116 | | |
117 | | DEFINE_double(cdc_read_safe_deadline_ratio, .10, |
118 | | "When the heartbeat deadline has this percentage of time remaining, " |
119 | | "the master should halt tablet report processing so it can respond in time."); |
120 | | |
121 | | DECLARE_bool(enable_log_retention_by_op_idx); |
122 | | |
123 | | DECLARE_int32(cdc_checkpoint_opid_interval_ms); |
124 | | |
125 | | METRIC_DEFINE_entity(cdc); |
126 | | |
127 | | namespace yb { |
128 | | namespace cdc { |
129 | | |
130 | | using namespace std::literals; |
131 | | |
132 | | using rpc::RpcContext; |
133 | | using tserver::TSTabletManager; |
134 | | using client::internal::RemoteTabletServer; |
135 | | |
136 | | constexpr int kMaxDurationForTabletLookup = 50; |
137 | | const client::YBTableName kCdcStateTableName( |
138 | | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
139 | | |
140 | | namespace { |
141 | | |
142 | | // These are guarded by lock_. |
143 | | // Map of checkpoints that have been sent to CDC consumer and stored in cdc_state. |
144 | | struct TabletCheckpointInfo { |
145 | | public: |
146 | | ProducerTabletInfo producer_tablet_info; |
147 | | |
148 | | mutable TabletCheckpoint cdc_state_checkpoint; |
149 | | mutable TabletCheckpoint sent_checkpoint; |
150 | | mutable MemTrackerPtr mem_tracker; |
151 | | |
152 | 17.4k | const TabletId& tablet_id() const { |
153 | 17.4k | return producer_tablet_info.tablet_id; |
154 | 17.4k | } |
155 | | |
156 | 15.5k | const CDCStreamId& stream_id() const { |
157 | 15.5k | return producer_tablet_info.stream_id; |
158 | 15.5k | } |
159 | | }; |
160 | | |
161 | | struct CDCStateMetadataInfo { |
162 | | ProducerTabletInfo producer_tablet_info; |
163 | | |
164 | | mutable std::string commit_timestamp; |
165 | | mutable std::shared_ptr<Schema> current_schema; |
166 | | mutable OpId last_streamed_op_id; |
167 | | |
168 | | std::shared_ptr<MemTracker> mem_tracker; |
169 | | |
170 | 463 | const TableId& tablet_id() const { |
171 | 463 | return producer_tablet_info.tablet_id; |
172 | 463 | } |
173 | | |
174 | 499 | const CDCStreamId& stream_id() const { |
175 | 499 | return producer_tablet_info.stream_id; |
176 | 499 | } |
177 | | |
178 | | }; |
179 | | |
180 | | class TabletTag; |
181 | | class StreamTag; |
182 | | |
183 | | using TabletCheckpoints = boost::multi_index_container < |
184 | | TabletCheckpointInfo, |
185 | | boost::multi_index::indexed_by < |
186 | | boost::multi_index::hashed_unique < |
187 | | boost::multi_index::member < |
188 | | TabletCheckpointInfo, ProducerTabletInfo, |
189 | | &TabletCheckpointInfo::producer_tablet_info |
190 | | > |
191 | | >, |
192 | | boost::multi_index::hashed_non_unique < |
193 | | boost::multi_index::tag <TabletTag>, |
194 | | boost::multi_index::const_mem_fun < |
195 | | TabletCheckpointInfo, const TabletId&, &TabletCheckpointInfo::tablet_id |
196 | | > |
197 | | >, |
198 | | boost::multi_index::hashed_non_unique < |
199 | | boost::multi_index::tag <StreamTag>, |
200 | | boost::multi_index::const_mem_fun < |
201 | | TabletCheckpointInfo, const CDCStreamId&, &TabletCheckpointInfo::stream_id |
202 | | > |
203 | | > |
204 | | > |
205 | | >; |
206 | | |
207 | | using CDCStateMetadata = boost::multi_index_container < |
208 | | CDCStateMetadataInfo, |
209 | | boost::multi_index::indexed_by < |
210 | | boost::multi_index::hashed_unique < |
211 | | boost::multi_index::member < |
212 | | CDCStateMetadataInfo, ProducerTabletInfo, |
213 | | &CDCStateMetadataInfo::producer_tablet_info> |
214 | | >, |
215 | | boost::multi_index::hashed_non_unique < |
216 | | boost::multi_index::tag <TabletTag>, |
217 | | boost::multi_index::const_mem_fun < |
218 | | CDCStateMetadataInfo, const TabletId&, &CDCStateMetadataInfo::tablet_id |
219 | | > |
220 | | >, |
221 | | boost::multi_index::hashed_non_unique < |
222 | | boost::multi_index::tag <StreamTag>, |
223 | | boost::multi_index::const_mem_fun < |
224 | | CDCStateMetadataInfo, const CDCStreamId&, &CDCStateMetadataInfo::stream_id |
225 | | > |
226 | | > |
227 | | > |
228 | | >; |
229 | | |
230 | | } // namespace |
231 | | |
232 | | class CDCServiceImpl::Impl { |
233 | | public: |
234 | 8.74k | explicit Impl(TSTabletManager* tablet_manager, rw_spinlock* mutex) : mutex_(*mutex) { |
235 | 8.74k | const auto server = tablet_manager->server(); |
236 | 8.74k | async_client_init_.emplace( |
237 | 8.74k | "cdc_client", FLAGS_cdc_ybclient_reactor_threads, FLAGS_cdc_read_rpc_timeout_ms / 1000, |
238 | 8.74k | server->permanent_uuid(), &server->options(), server->metric_entity(), |
239 | 8.74k | server->mem_tracker(), server->messenger()); |
240 | 8.74k | async_client_init_->Start(); |
241 | 8.74k | } |
242 | | |
243 | | void UpdateCDCStateMetadata( |
244 | | const ProducerTabletInfo& producer_tablet, |
245 | | const std::string& timestamp, |
246 | | const std::shared_ptr<Schema>& schema, |
247 | 641 | const OpId& op_id) { |
248 | 641 | std::lock_guard<decltype(mutex_)> l(mutex_); |
249 | 641 | auto it = cdc_state_metadata_.find(producer_tablet); |
250 | 641 | if (it == cdc_state_metadata_.end()) { |
251 | 0 | LOG(DFATAL) << "Failed to update the cdc state metadata for tablet id: " |
252 | 0 | << producer_tablet.tablet_id; |
253 | 0 | return; |
254 | 0 | } |
255 | 641 | it->commit_timestamp = timestamp; |
256 | 641 | it->current_schema = schema; |
257 | 641 | it->last_streamed_op_id = op_id; |
258 | 641 | } |
259 | | |
260 | 641 | std::shared_ptr<Schema> GetOrAddSchema(const ProducerTabletInfo& producer_tablet) { |
261 | 641 | std::lock_guard<decltype(mutex_)> l(mutex_); |
262 | 641 | auto it = cdc_state_metadata_.find(producer_tablet); |
263 | | |
264 | 641 | if (it != cdc_state_metadata_.end()) { |
265 | 320 | return it->current_schema; |
266 | 320 | } |
267 | 321 | CDCStateMetadataInfo info = CDCStateMetadataInfo { |
268 | 321 | .producer_tablet_info = producer_tablet, |
269 | 321 | .current_schema = std::make_shared<Schema>() |
270 | 321 | }; |
271 | 321 | cdc_state_metadata_.emplace(info); |
272 | 321 | return info.current_schema; |
273 | 641 | } |
274 | | |
275 | | void AddTabletCheckpoint( |
276 | | OpId op_id, |
277 | | const CDCStreamId& stream_id, |
278 | | const TabletId& tablet_id, |
279 | 5.21k | std::vector<ProducerTabletInfo>* producer_entries_modified) { |
280 | 5.21k | ProducerTabletInfo producer_tablet{ |
281 | 5.21k | .universe_uuid = "", |
282 | 5.21k | .stream_id = stream_id, |
283 | 5.21k | .tablet_id = tablet_id |
284 | 5.21k | }; |
285 | 5.21k | CoarseTimePoint time; |
286 | 5.21k | if (producer_entries_modified) { |
287 | 5.21k | producer_entries_modified->push_back(producer_tablet); |
288 | 5.21k | time = CoarseMonoClock::Now(); |
289 | 5.21k | } else { |
290 | 0 | time = CoarseTimePoint::min(); |
291 | 0 | } |
292 | 5.21k | std::lock_guard<decltype(mutex_)> l(mutex_); |
293 | 5.21k | if (!producer_entries_modified && tablet_checkpoints_.count(producer_tablet)0 ) { |
294 | 0 | return; |
295 | 0 | } |
296 | 5.21k | tablet_checkpoints_.emplace(TabletCheckpointInfo { |
297 | 5.21k | .producer_tablet_info = producer_tablet, |
298 | 5.21k | .cdc_state_checkpoint = {op_id, time}, |
299 | 5.21k | .sent_checkpoint = {op_id, time}, |
300 | 5.21k | }); |
301 | 5.21k | } |
302 | | |
303 | | void EraseTablets(const std::vector<ProducerTabletInfo>& producer_entries_modified, |
304 | | bool erase_cdc_states) |
305 | 0 | NO_THREAD_SAFETY_ANALYSIS { |
306 | 0 | for (const auto& entry : producer_entries_modified) { |
307 | 0 | tablet_checkpoints_.get<TabletTag>().erase(entry.tablet_id); |
308 | 0 | if (erase_cdc_states) { |
309 | 0 | cdc_state_metadata_.get<TabletTag>().erase(entry.tablet_id); |
310 | 0 | } |
311 | 0 | } |
312 | 0 | } |
313 | | |
314 | 619 | boost::optional<OpId> GetLastCheckpoint(const ProducerTabletInfo& producer_tablet) { |
315 | 619 | SharedLock<rw_spinlock> lock(mutex_); |
316 | 619 | auto it = tablet_checkpoints_.find(producer_tablet); |
317 | 619 | if (it != tablet_checkpoints_.end()) { |
318 | | // Use checkpoint from cache only if it is current. |
319 | 619 | if (it->cdc_state_checkpoint.op_id.index > 0 && |
320 | 619 | !it->cdc_state_checkpoint.ExpiredAt( |
321 | 4 | FLAGS_cdc_state_checkpoint_update_interval_ms * 1ms, CoarseMonoClock::Now())) { |
322 | 4 | return it->cdc_state_checkpoint.op_id; |
323 | 4 | } |
324 | 619 | } |
325 | 615 | return boost::none; |
326 | 619 | } |
327 | | |
328 | | bool UpdateCheckpoint(const ProducerTabletInfo& producer_tablet, |
329 | | const OpId& sent_op_id, |
330 | 631 | const OpId& commit_op_id) { |
331 | 631 | auto now = CoarseMonoClock::Now(); |
332 | | |
333 | 631 | TabletCheckpoint sent_checkpoint = { |
334 | 631 | .op_id = sent_op_id, |
335 | 631 | .last_update_time = now, |
336 | 631 | }; |
337 | 631 | TabletCheckpoint commit_checkpoint = { |
338 | 631 | .op_id = commit_op_id, |
339 | 631 | .last_update_time = now, |
340 | 631 | }; |
341 | | |
342 | 631 | std::lock_guard<decltype(mutex_)> l(mutex_); |
343 | 631 | auto it = tablet_checkpoints_.find(producer_tablet); |
344 | 631 | if (it != tablet_checkpoints_.end()) { |
345 | 631 | it->sent_checkpoint = sent_checkpoint; |
346 | | |
347 | 631 | if (commit_op_id.index > 0) { |
348 | 28 | it->cdc_state_checkpoint.op_id = commit_op_id; |
349 | 28 | } |
350 | | |
351 | | // Check if we need to update cdc_state table. |
352 | 631 | if (!it->cdc_state_checkpoint.ExpiredAt( |
353 | 631 | FLAGS_cdc_state_checkpoint_update_interval_ms * 1ms, now)) { |
354 | 631 | return false; |
355 | 631 | } |
356 | | |
357 | 0 | it->cdc_state_checkpoint.last_update_time = now; |
358 | 0 | } else { |
359 | 0 | tablet_checkpoints_.emplace(TabletCheckpointInfo{ |
360 | 0 | .producer_tablet_info = producer_tablet, |
361 | 0 | .cdc_state_checkpoint = commit_checkpoint, |
362 | 0 | .sent_checkpoint = sent_checkpoint |
363 | 0 | }); |
364 | 0 | } |
365 | | |
366 | 0 | return true; |
367 | 631 | } |
368 | | |
369 | 637 | OpId GetMinSentCheckpointForTablet(const TabletId& tablet_id) { |
370 | 637 | OpId min_op_id = OpId::Max(); |
371 | | |
372 | 637 | SharedLock<rw_spinlock> l(mutex_); |
373 | 637 | auto it_range = tablet_checkpoints_.get<TabletTag>().equal_range(tablet_id); |
374 | 637 | if (it_range.first == it_range.second) { |
375 | 0 | LOG(WARNING) << "Tablet ID not found in stream_tablets map: " << tablet_id; |
376 | 0 | return min_op_id; |
377 | 0 | } |
378 | | |
379 | 637 | auto cdc_checkpoint_opid_interval = FLAGS_cdc_checkpoint_opid_interval_ms * 1ms; |
380 | 8.44k | for (auto it = it_range.first; it != it_range.second; ++it7.80k ) { |
381 | | // We don't want to include streams that are not being actively polled. |
382 | | // So, if the stream has not been polled in the last x seconds, |
383 | | // then we ignore that stream while calculating min op ID. |
384 | 7.80k | if (!it->sent_checkpoint.ExpiredAt(cdc_checkpoint_opid_interval, CoarseMonoClock::Now()) && |
385 | 7.80k | it->sent_checkpoint.op_id.index < min_op_id.index) { |
386 | 781 | min_op_id = it->sent_checkpoint.op_id; |
387 | 781 | } |
388 | 7.80k | } |
389 | 637 | return min_op_id; |
390 | 637 | } |
391 | | |
392 | | MemTrackerPtr GetMemTracker( |
393 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
394 | 641 | const ProducerTabletInfo& producer_info) { |
395 | 641 | { |
396 | 641 | SharedLock<rw_spinlock> l(mutex_); |
397 | 641 | auto it = tablet_checkpoints_.find(producer_info); |
398 | 641 | if (it == tablet_checkpoints_.end()) { |
399 | 0 | return nullptr; |
400 | 0 | } |
401 | 641 | if (it->mem_tracker) { |
402 | 320 | return it->mem_tracker; |
403 | 320 | } |
404 | 641 | } |
405 | 321 | std::lock_guard<rw_spinlock> l(mutex_); |
406 | 321 | auto it = tablet_checkpoints_.find(producer_info); |
407 | 321 | if (it == tablet_checkpoints_.end()) { |
408 | 0 | return nullptr; |
409 | 0 | } |
410 | 321 | if (it->mem_tracker) { |
411 | 0 | return it->mem_tracker; |
412 | 0 | } |
413 | 321 | auto cdc_mem_tracker = MemTracker::FindOrCreateTracker( |
414 | 321 | "CDC", tablet_peer->tablet()->mem_tracker()); |
415 | 321 | it->mem_tracker = MemTracker::FindOrCreateTracker(producer_info.stream_id, cdc_mem_tracker); |
416 | 321 | return it->mem_tracker; |
417 | 321 | } |
418 | | |
419 | 656 | Result<bool> PreCheckTabletValidForStream(const ProducerTabletInfo& info) { |
420 | 656 | SharedLock<rw_spinlock> l(mutex_); |
421 | 656 | if (tablet_checkpoints_.count(info) != 0) { |
422 | 655 | return true; |
423 | 655 | } |
424 | 1 | if (tablet_checkpoints_.get<StreamTag>().count(info.stream_id) != 0) { |
425 | | // Did not find matching tablet ID. |
426 | | // TODO: Add the split tablets in during tablet split? |
427 | 0 | LOG(INFO) << "Tablet ID " << info.tablet_id << " is not part of stream ID " << info.stream_id |
428 | 0 | << ". Repopulating tablet list for this stream."; |
429 | 0 | } |
430 | 1 | return false; |
431 | 656 | } |
432 | | |
433 | | CHECKED_STATUS CheckTabletValidForStream( |
434 | | const ProducerTabletInfo& info, |
435 | 0 | const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& tablets) { |
436 | 0 | bool found = false; |
437 | 0 | { |
438 | 0 | std::lock_guard<rw_spinlock> l(mutex_); |
439 | 0 | for (const auto &tablet : tablets) { |
440 | | // Add every tablet in the stream. |
441 | 0 | ProducerTabletInfo producer_info{info.universe_uuid, info.stream_id, tablet.tablet_id()}; |
442 | 0 | tablet_checkpoints_.emplace(TabletCheckpointInfo{ |
443 | 0 | .producer_tablet_info = producer_info |
444 | 0 | }); |
445 | 0 | cdc_state_metadata_.emplace(CDCStateMetadataInfo{ |
446 | 0 | .producer_tablet_info = producer_info, |
447 | 0 | .current_schema = std::make_shared<Schema>() |
448 | 0 | }); |
449 | | // If this is the tablet that the user requested. |
450 | 0 | if (tablet.tablet_id() == info.tablet_id) { |
451 | 0 | found = true; |
452 | 0 | } |
453 | 0 | } |
454 | 0 | } |
455 | 0 | return found ? Status::OK() |
456 | 0 | : STATUS_FORMAT(InvalidArgument, "Tablet ID $0 is not part of stream ID $1", |
457 | 0 | info.tablet_id, info.stream_id); |
458 | 0 | } |
459 | | |
460 | 0 | boost::optional<OpId> MinOpId(const TabletId& tablet_id) { |
461 | 0 | boost::optional<OpId> result; |
462 | 0 | SharedLock<rw_spinlock> l(mutex_); |
463 | 0 | // right => multimap where keys are tablet_ids and values are stream_ids. |
464 | 0 | // left => multimap where keys are stream_ids and values are tablet_ids. |
465 | 0 | auto it_range = tablet_checkpoints_.get<TabletTag>().equal_range(tablet_id); |
466 | 0 | if (it_range.first != it_range.second) { |
467 | 0 | // Iterate over all the streams for this tablet. |
468 | 0 | for (auto it = it_range.first; it != it_range.second; ++it) { |
469 | 0 | if (!result || it->cdc_state_checkpoint.op_id.index < result->index) { |
470 | 0 | result = it->cdc_state_checkpoint.op_id; |
471 | 0 | } |
472 | 0 | } |
473 | 0 | } else { |
474 | 0 | VLOG(2) << "Didn't find any streams for tablet " << tablet_id; |
475 | 0 | } |
476 | 0 |
|
477 | 0 | return result; |
478 | 0 | } |
479 | | |
480 | 94 | TabletCheckpoints TabletCheckpointsCopy() { |
481 | 94 | SharedLock<rw_spinlock> lock(mutex_); |
482 | 94 | return tablet_checkpoints_; |
483 | 94 | } |
484 | | |
485 | | boost::optional<client::AsyncClientInitialiser> async_client_init_; |
486 | | |
487 | | // this will be used for the std::call_once call while caching the client |
488 | | std::once_flag is_client_cached_; |
489 | | private: |
490 | | rw_spinlock& mutex_; |
491 | | |
492 | | TabletCheckpoints tablet_checkpoints_ GUARDED_BY(mutex_); |
493 | | |
494 | | CDCStateMetadata cdc_state_metadata_ GUARDED_BY(mutex_); |
495 | | }; |
496 | | |
497 | | CDCServiceImpl::CDCServiceImpl(TSTabletManager* tablet_manager, |
498 | | const scoped_refptr<MetricEntity>& metric_entity_server, |
499 | | MetricRegistry* metric_registry) |
500 | | : CDCServiceIf(metric_entity_server), |
501 | | tablet_manager_(tablet_manager), |
502 | | metric_registry_(metric_registry), |
503 | | server_metrics_(std::make_shared<CDCServerMetrics>(metric_entity_server)), |
504 | 8.74k | impl_(new Impl(tablet_manager, &mutex_)) { |
505 | | |
506 | 8.74k | update_peers_and_metrics_thread_.reset(new std::thread( |
507 | 8.74k | &CDCServiceImpl::UpdatePeersAndMetrics, this)); |
508 | 8.74k | } |
509 | | |
510 | 92 | CDCServiceImpl::~CDCServiceImpl() { |
511 | 92 | Shutdown(); |
512 | 92 | } |
513 | | |
514 | 19.0k | client::YBClient* CDCServiceImpl::client() { |
515 | 19.0k | return impl_->async_client_init_->client(); |
516 | 19.0k | } |
517 | | |
518 | | namespace { |
519 | | |
520 | 10.3k | bool YsqlTableHasPrimaryKey(const client::YBSchema& schema) { |
521 | 23.4k | for (const auto& col : schema.columns()) { |
522 | 23.4k | if (col.order() == static_cast<int32_t>(PgSystemAttrNum::kYBRowId)) { |
523 | | // ybrowid column is added for tables that don't have user-specified primary key. |
524 | 2 | return false; |
525 | 2 | } |
526 | 23.4k | } |
527 | 10.3k | return true; |
528 | 10.3k | } |
529 | | |
530 | 233 | bool IsTabletPeerLeader(const std::shared_ptr<tablet::TabletPeer>& peer) { |
531 | 233 | return peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY; |
532 | 233 | } |
533 | | |
534 | | std::unordered_map<std::string, std::string> GetCreateCDCStreamOptions( |
535 | 306 | const CreateCDCStreamRequestPB* req) { |
536 | 306 | std::unordered_map<std::string, std::string> options; |
537 | 306 | if(req->has_namespace_name()) { |
538 | 306 | options.reserve(5); |
539 | 306 | } else { |
540 | 0 | options.reserve(4); |
541 | 0 | } |
542 | | |
543 | 306 | options.emplace(kRecordType, CDCRecordType_Name(req->record_type())); |
544 | 306 | options.emplace(kRecordFormat, CDCRecordFormat_Name(req->record_format())); |
545 | 306 | options.emplace(kSourceType, CDCRequestSource_Name(req->source_type())); |
546 | 306 | options.emplace(kCheckpointType, CDCCheckpointType_Name(req->checkpoint_type())); |
547 | 306 | if (req->has_namespace_name()) { |
548 | 306 | options.emplace(kIdType, kNamespaceId); |
549 | 306 | } |
550 | | |
551 | 306 | return options; |
552 | 306 | } |
553 | | |
554 | | Status DoUpdateCDCConsumerOpId(const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
555 | | const OpId& checkpoint, |
556 | 862 | const TabletId& tablet_id) { |
557 | 862 | std::shared_ptr<consensus::Consensus> shared_consensus = tablet_peer->shared_consensus(); |
558 | | |
559 | 862 | if (shared_consensus == nullptr) { |
560 | 0 | return STATUS_FORMAT(InternalError, |
561 | 0 | "Failed to get tablet $0 peer consensus", tablet_id); |
562 | 0 | } |
563 | | |
564 | 862 | shared_consensus->UpdateCDCConsumerOpId(checkpoint); |
565 | 862 | return Status::OK(); |
566 | 862 | } |
567 | | |
568 | | bool UpdateCheckpointRequired(const StreamMetadata& record, |
569 | 637 | const CDCSDKCheckpointPB& cdc_sdk_op_id) { |
570 | | |
571 | 637 | switch (record.source_type) { |
572 | 0 | case XCLUSTER: |
573 | 0 | return true; |
574 | | |
575 | 637 | case CDCSDK: |
576 | 637 | if (cdc_sdk_op_id.write_id() == 0) { |
577 | 613 | return true; |
578 | 613 | } |
579 | 24 | return cdc_sdk_op_id.write_id() == -1 && cdc_sdk_op_id.key().empty() && |
580 | 24 | cdc_sdk_op_id.snapshot_time() != 0; |
581 | | |
582 | 0 | default: |
583 | 0 | return false; |
584 | 637 | } |
585 | | |
586 | 637 | } |
587 | | |
588 | | bool GetFromOpId(const GetChangesRequestPB* req, |
589 | | OpId* op_id, |
590 | 641 | CDCSDKCheckpointPB* cdc_sdk_op_id) { |
591 | 641 | if (req->has_from_checkpoint()) { |
592 | 0 | *op_id = OpId::FromPB(req->from_checkpoint().op_id()); |
593 | 641 | } else if (req->has_from_cdc_sdk_checkpoint()) { |
594 | 36 | *cdc_sdk_op_id = req->from_cdc_sdk_checkpoint(); |
595 | 36 | *op_id = OpId::FromPB(*cdc_sdk_op_id); |
596 | 605 | } else { |
597 | 605 | return false; |
598 | 605 | } |
599 | 36 | return true; |
600 | 641 | } |
601 | | |
602 | | // Check for compatibility whether CDC can be setup on the table |
603 | | // This essentially checks that the table should not be a REDIS table since we do not support it |
604 | | // and if it's a YSQL or YCQL one, it should have a primary key |
605 | 5.18k | Status CheckCdcCompatibility(const std::shared_ptr<client::YBTable>& table) { |
606 | | // return if it is a CQL table because they always have a user specified primary key |
607 | 5.18k | if (table->table_type() == client::YBTableType::YQL_TABLE_TYPE) { |
608 | 0 | LOG(INFO) << "Returning while checking CDC compatibility, table is a YCQL table"; |
609 | 0 | return Status::OK(); |
610 | 0 | } |
611 | | |
612 | 5.18k | if (table->table_type() == client::YBTableType::REDIS_TABLE_TYPE) { |
613 | 0 | return STATUS(InvalidArgument, "Cannot setup CDC on YEDIS_TABLE");; |
614 | 0 | } |
615 | | |
616 | | // Check if YSQL table has a primary key. CQL tables always have a |
617 | | // user specified primary key. |
618 | 5.18k | if (!YsqlTableHasPrimaryKey(table->schema())) { |
619 | 0 | return STATUS(InvalidArgument, "Cannot setup CDC on table without primary key"); |
620 | 0 | } |
621 | | |
622 | 5.18k | return Status::OK(); |
623 | 5.18k | } |
624 | | |
625 | 963 | CoarseTimePoint GetDeadline(const RpcContext& context, client::YBClient* client) { |
626 | 963 | CoarseTimePoint deadline = context.GetClientDeadline(); |
627 | 963 | if (deadline == CoarseTimePoint::max()) { // Not specified by user. |
628 | 0 | deadline = CoarseMonoClock::now() + client->default_rpc_timeout(); |
629 | 0 | } |
630 | 963 | return deadline; |
631 | 963 | } |
632 | | |
633 | 6 | CHECKED_STATUS VerifyArg(const SetCDCCheckpointRequestPB& req) { |
634 | 6 | if (!req.has_checkpoint()) { |
635 | 0 | return STATUS(InvalidArgument, "OpId is required to set checkpoint"); |
636 | 0 | } |
637 | | |
638 | 6 | if (!req.has_tablet_id()) { |
639 | 0 | return STATUS(InvalidArgument, "Tablet ID is required to set checkpoint"); |
640 | 0 | } |
641 | | |
642 | 6 | if(!req.has_stream_id()) { |
643 | 0 | return STATUS(InvalidArgument, "Stream ID is required to set checkpoint"); |
644 | 0 | } |
645 | | |
646 | 6 | return Status::OK(); |
647 | 6 | } |
648 | | |
649 | | } // namespace |
650 | | |
651 | | template <class ReqType, class RespType> |
652 | 1.18k | bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) { |
653 | 1.18k | TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString()); |
654 | 1.18k | if (PREDICT_FALSE(!tablet_manager_)) { |
655 | 0 | SetupErrorAndRespond(resp->mutable_error(), |
656 | 0 | STATUS(ServiceUnavailable, "Tablet Server is not running"), |
657 | 0 | CDCErrorPB::NOT_RUNNING, |
658 | 0 | rpc); |
659 | 0 | return false; |
660 | 0 | } |
661 | 1.18k | return true; |
662 | 1.18k | } bool yb::cdc::CDCServiceImpl::CheckOnline<yb::cdc::CreateCDCStreamRequestPB, yb::cdc::CreateCDCStreamResponsePB>(yb::cdc::CreateCDCStreamRequestPB const*, yb::cdc::CreateCDCStreamResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 652 | 307 | bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) { | 653 | 307 | TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString()); | 654 | 307 | if (PREDICT_FALSE(!tablet_manager_)) { | 655 | 0 | SetupErrorAndRespond(resp->mutable_error(), | 656 | 0 | STATUS(ServiceUnavailable, "Tablet Server is not running"), | 657 | 0 | CDCErrorPB::NOT_RUNNING, | 658 | 0 | rpc); | 659 | 0 | return false; | 660 | 0 | } | 661 | 307 | return true; | 662 | 307 | } |
Unexecuted instantiation: bool yb::cdc::CDCServiceImpl::CheckOnline<yb::cdc::DeleteCDCStreamRequestPB, yb::cdc::DeleteCDCStreamResponsePB>(yb::cdc::DeleteCDCStreamRequestPB const*, yb::cdc::DeleteCDCStreamResponsePB*, yb::rpc::RpcContext*) Unexecuted instantiation: bool yb::cdc::CDCServiceImpl::CheckOnline<yb::cdc::ListTabletsRequestPB, yb::cdc::ListTabletsResponsePB>(yb::cdc::ListTabletsRequestPB const*, yb::cdc::ListTabletsResponsePB*, yb::rpc::RpcContext*) bool yb::cdc::CDCServiceImpl::CheckOnline<yb::cdc::GetChangesRequestPB, yb::cdc::GetChangesResponsePB>(yb::cdc::GetChangesRequestPB const*, yb::cdc::GetChangesResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 652 | 642 | bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) { | 653 | 642 | TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString()); | 654 | 642 | if (PREDICT_FALSE(!tablet_manager_)) { | 655 | 0 | SetupErrorAndRespond(resp->mutable_error(), | 656 | 0 | STATUS(ServiceUnavailable, "Tablet Server is not running"), | 657 | 0 | CDCErrorPB::NOT_RUNNING, | 658 | 0 | rpc); | 659 | 0 | return false; | 660 | 0 | } | 661 | 642 | return true; | 662 | 642 | } |
bool yb::cdc::CDCServiceImpl::CheckOnline<yb::cdc::GetCheckpointRequestPB, yb::cdc::GetCheckpointResponsePB>(yb::cdc::GetCheckpointRequestPB const*, yb::cdc::GetCheckpointResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 652 | 14 | bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) { | 653 | 14 | TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString()); | 654 | 14 | if (PREDICT_FALSE(!tablet_manager_)) { | 655 | 0 | SetupErrorAndRespond(resp->mutable_error(), | 656 | 0 | STATUS(ServiceUnavailable, "Tablet Server is not running"), | 657 | 0 | CDCErrorPB::NOT_RUNNING, | 658 | 0 | rpc); | 659 | 0 | return false; | 660 | 0 | } | 661 | 14 | return true; | 662 | 14 | } |
bool yb::cdc::CDCServiceImpl::CheckOnline<yb::cdc::UpdateCdcReplicatedIndexRequestPB, yb::cdc::UpdateCdcReplicatedIndexResponsePB>(yb::cdc::UpdateCdcReplicatedIndexRequestPB const*, yb::cdc::UpdateCdcReplicatedIndexResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 652 | 219 | bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) { | 653 | 219 | TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString()); | 654 | 219 | if (PREDICT_FALSE(!tablet_manager_)) { | 655 | 0 | SetupErrorAndRespond(resp->mutable_error(), | 656 | 0 | STATUS(ServiceUnavailable, "Tablet Server is not running"), | 657 | 0 | CDCErrorPB::NOT_RUNNING, | 658 | 0 | rpc); | 659 | 0 | return false; | 660 | 0 | } | 661 | 219 | return true; | 662 | 219 | } |
Unexecuted instantiation: bool yb::cdc::CDCServiceImpl::CheckOnline<yb::cdc::GetCDCDBStreamInfoRequestPB, yb::cdc::GetCDCDBStreamInfoResponsePB>(yb::cdc::GetCDCDBStreamInfoRequestPB const*, yb::cdc::GetCDCDBStreamInfoResponsePB*, yb::rpc::RpcContext*) |
663 | | |
664 | | void CDCServiceImpl::CreateEntryInCdcStateTable( |
665 | | const std::shared_ptr<client::TableHandle>& cdc_state_table, |
666 | | std::vector<ProducerTabletInfo>* producer_entries_modified, |
667 | | std::vector<client::YBOperationPtr>* ops, |
668 | | const CDCStreamId& stream_id, |
669 | | const TableId& table_id, |
670 | 5.21k | const TabletId& tablet_id) { |
671 | 5.21k | OpId op_id; |
672 | | |
673 | 5.21k | const auto cdc_state_table_op = cdc_state_table->NewWriteOp( |
674 | 5.21k | QLWriteRequestPB::QL_STMT_INSERT); |
675 | 5.21k | auto *const cdc_state_table_write_req = cdc_state_table_op->mutable_request(); |
676 | | |
677 | 5.21k | QLAddStringHashValue(cdc_state_table_write_req, tablet_id); |
678 | 5.21k | QLAddStringRangeValue(cdc_state_table_write_req, stream_id); |
679 | 5.21k | cdc_state_table->AddStringColumnValue(cdc_state_table_write_req, |
680 | 5.21k | master::kCdcCheckpoint, op_id.ToString()); |
681 | 5.21k | ops->push_back(std::move(cdc_state_table_op)); |
682 | | |
683 | 5.21k | impl_->AddTabletCheckpoint(op_id, stream_id, tablet_id, producer_entries_modified); |
684 | 5.21k | } |
685 | | |
686 | 307 | Result<NamespaceId> CDCServiceImpl::GetNamespaceId(const std::string& ns_name) { |
687 | 307 | master::GetNamespaceInfoResponsePB namespace_info_resp; |
688 | 307 | RETURN_NOT_OK(client()->GetNamespaceInfo(std::string(), |
689 | 307 | ns_name, |
690 | 307 | YQL_DATABASE_PGSQL, |
691 | 307 | &namespace_info_resp)); |
692 | | |
693 | 306 | return namespace_info_resp.namespace_().id(); |
694 | 307 | } |
695 | | |
696 | | Status CDCServiceImpl::CreateCDCStreamForNamespace( |
697 | | const CreateCDCStreamRequestPB* req, |
698 | | CreateCDCStreamResponsePB* resp, |
699 | 307 | CoarseTimePoint deadline) { |
700 | 307 | auto session = client()->NewSession(); |
701 | | |
702 | | // Used to delete streams in case of failure. |
703 | 307 | CDCCreationState creation_state; |
704 | | |
705 | 307 | auto scope_exit = ScopeExit([this, &creation_state] { |
706 | 307 | RollbackPartialCreate(creation_state); |
707 | 307 | }); |
708 | | |
709 | 307 | auto ns_id = VERIFY_RESULT_OR_SET_CODE306 ( |
710 | 306 | GetNamespaceId(req->namespace_name()), CDCError(CDCErrorPB::INVALID_REQUEST)); |
711 | | |
712 | | // Generate a stream id by calling CreateCDCStream, and also setup the stream in the master. |
713 | 0 | std::unordered_map<std::string, std::string> options = GetCreateCDCStreamOptions(req); |
714 | | |
715 | 306 | CDCStreamId db_stream_id = VERIFY_RESULT_OR_SET_CODE( |
716 | 306 | client()->CreateCDCStream(ns_id, options), CDCError(CDCErrorPB::INTERNAL_ERROR)); |
717 | | |
718 | 306 | auto table_list = VERIFY_RESULT_OR_SET_CODE( |
719 | 306 | client()->ListUserTables(ns_id), CDCError(CDCErrorPB::INTERNAL_ERROR)); |
720 | | |
721 | 0 | options.erase(kIdType); |
722 | | |
723 | 306 | std::vector<client::YBOperationPtr> ops; |
724 | 306 | std::vector<TableId> table_ids; |
725 | 306 | std::vector<CDCStreamId> stream_ids; |
726 | | |
727 | 306 | auto cdc_state_table = |
728 | 306 | VERIFY_RESULT_OR_SET_CODE(GetCdcStateTable(), CDCError(CDCErrorPB::INTERNAL_ERROR)); |
729 | | |
730 | 5.19k | for (const auto& table_iter : table_list) { |
731 | 5.19k | std::shared_ptr<client::YBTable> table; |
732 | | |
733 | 5.19k | RETURN_NOT_OK_SET_CODE( |
734 | 5.19k | client()->OpenTable(table_iter.table_id(), &table), CDCError(CDCErrorPB::TABLE_NOT_FOUND)); |
735 | | |
736 | | // internally if any of the table doesn't have a primary key, then do not create |
737 | | // a CDC stream ID for that table |
738 | 5.19k | if (!YsqlTableHasPrimaryKey(table->schema())) { |
739 | 2 | LOG(WARNING) << "Skipping CDC stream creation on " << table->name().table_name() |
740 | 2 | << " because it does not have a primary key"; |
741 | 2 | continue; |
742 | 2 | } |
743 | | |
744 | | // We don't allow CDC on YEDIS and tables without a primary key. |
745 | 5.18k | if (req->record_format() != CDCRecordFormat::WAL) { |
746 | 5.18k | RETURN_NOT_OK_SET_CODE(CheckCdcCompatibility(table), CDCError(CDCErrorPB::INVALID_REQUEST)); |
747 | 5.18k | } |
748 | | |
749 | 5.18k | const CDCStreamId stream_id = VERIFY_RESULT_OR_SET_CODE( |
750 | 5.18k | client()->CreateCDCStream(table_iter.table_id(), options, true, db_stream_id), |
751 | 5.18k | CDCError(CDCErrorPB::INTERNAL_ERROR)); |
752 | | |
753 | 0 | creation_state.created_cdc_streams.push_back(stream_id); |
754 | | |
755 | 5.18k | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets; |
756 | 5.18k | RETURN_NOT_OK_SET_CODE( |
757 | 5.18k | client()->GetTabletsFromTableId(table_iter.table_id(), 0, &tablets), |
758 | 5.18k | CDCError(CDCErrorPB::TABLE_NOT_FOUND)); |
759 | | |
760 | | // For each tablet, create a row in cdc_state table containing the generated stream id, and |
761 | | // the op id as max in the logs. |
762 | 5.21k | for (const auto& tablet : tablets)5.18k { |
763 | 5.21k | CreateEntryInCdcStateTable( |
764 | 5.21k | cdc_state_table, |
765 | 5.21k | &creation_state.producer_entries_modified, |
766 | 5.21k | &ops, |
767 | 5.21k | db_stream_id, |
768 | 5.21k | table_iter.table_id(), |
769 | 5.21k | tablet.tablet_id()); |
770 | 5.21k | } |
771 | 5.18k | stream_ids.push_back(std::move(stream_id)); |
772 | 5.18k | table_ids.push_back(table_iter.table_id()); |
773 | 5.18k | } |
774 | | |
775 | | // Add stream to cache. |
776 | 306 | AddStreamMetadataToCache( |
777 | 306 | db_stream_id, |
778 | 306 | std::make_shared<StreamMetadata>( |
779 | 306 | ns_id, table_ids, req->record_type(), req->record_format(), req->source_type(), |
780 | 306 | req->checkpoint_type())); |
781 | | |
782 | 306 | session->SetDeadline(deadline); |
783 | | |
784 | 306 | RETURN_NOT_OK_SET_CODE( |
785 | 306 | RefreshCacheOnFail(session->ApplyAndFlush(ops)), CDCError(CDCErrorPB::INTERNAL_ERROR)); |
786 | | |
787 | 306 | resp->set_db_stream_id(db_stream_id); |
788 | | |
789 | | // Clear creation_state so no changes are reversed by scope_exit since we succeeded. |
790 | 306 | creation_state.Clear(); |
791 | | |
792 | 306 | return Status::OK(); |
793 | 306 | } |
794 | | |
795 | | void CDCServiceImpl::CreateCDCStream(const CreateCDCStreamRequestPB* req, |
796 | | CreateCDCStreamResponsePB* resp, |
797 | 307 | RpcContext context) { |
798 | 307 | CDCStreamId streamId; |
799 | | |
800 | 307 | if (!CheckOnline(req, resp, &context)) { |
801 | 0 | return; |
802 | 0 | } |
803 | | |
804 | 307 | RPC_CHECK_AND_RETURN_ERROR(req->has_table_id() || req->has_namespace_name(), |
805 | 307 | STATUS(InvalidArgument, |
806 | 307 | "Table ID or Database name is required to create CDC stream"), |
807 | 307 | resp->mutable_error(), |
808 | 307 | CDCErrorPB::INVALID_REQUEST, |
809 | 307 | context); |
810 | | |
811 | 307 | bool is_xcluster = req->source_type() == XCLUSTER; |
812 | 307 | if (is_xcluster || req->has_table_id()) { |
813 | 0 | std::shared_ptr<client::YBTable> table; |
814 | 0 | Status s = client()->OpenTable(req->table_id(), &table); |
815 | 0 | RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::TABLE_NOT_FOUND, context); |
816 | | |
817 | | // We don't allow CDC on YEDIS and tables without a primary key. |
818 | 0 | if (req->record_format() != CDCRecordFormat::WAL) { |
819 | 0 | s = CheckCdcCompatibility(table); |
820 | 0 | RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context); |
821 | 0 | } |
822 | | |
823 | 0 | std::unordered_map<std::string, std::string> options = GetCreateCDCStreamOptions(req); |
824 | |
|
825 | 0 | auto result = client()->CreateCDCStream(req->table_id(), options); |
826 | 0 | RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(), |
827 | 0 | CDCErrorPB::INTERNAL_ERROR, context); |
828 | | |
829 | 0 | resp->set_stream_id(*result); |
830 | | |
831 | | // Add stream to cache. |
832 | 0 | AddStreamMetadataToCache( |
833 | 0 | *result, std::make_shared<StreamMetadata>( |
834 | 0 | "", |
835 | 0 | std::vector<TableId>{req->table_id()}, |
836 | 0 | req->record_type(), |
837 | 0 | req->record_format(), |
838 | 0 | req->source_type(), |
839 | 0 | req->checkpoint_type())); |
840 | 307 | } else if (req->has_namespace_name()) { |
841 | 307 | auto deadline = GetDeadline(context, client()); |
842 | 307 | Status status = CreateCDCStreamForNamespace(req, resp, deadline); |
843 | 307 | CDCError error(status); |
844 | | |
845 | 307 | if (!status.ok()) { |
846 | 1 | SetupErrorAndRespond(resp->mutable_error(), status, error.value(), &context); |
847 | 1 | return; |
848 | 1 | } |
849 | 307 | } |
850 | | |
851 | 306 | context.RespondSuccess(); |
852 | 306 | } |
853 | | |
854 | | Result<SetCDCCheckpointResponsePB> CDCServiceImpl::SetCDCCheckpoint( |
855 | 6 | const SetCDCCheckpointRequestPB& req, CoarseTimePoint deadline) { |
856 | 6 | VLOG(1) << "Received SetCDCCheckpoint request " << req.ShortDebugString()0 ; |
857 | | |
858 | 6 | RETURN_NOT_OK_SET_CODE(VerifyArg(req), CDCError(CDCErrorPB::INVALID_REQUEST)); |
859 | | |
860 | 6 | auto record = VERIFY_RESULT(GetStream(req.stream_id())); |
861 | 6 | if ((*record).checkpoint_type != EXPLICIT) { |
862 | 2 | LOG(WARNING) << "Setting the checkpoint explicitly even though the checkpoint type is implicit"; |
863 | 2 | } |
864 | | |
865 | 6 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
866 | 6 | auto s = tablet_manager_->GetTabletPeer(req.tablet_id(), &tablet_peer); |
867 | | |
868 | 6 | if (s.IsNotFound()) { |
869 | 0 | RETURN_NOT_OK_SET_CODE(s, CDCError(CDCErrorPB::TABLET_NOT_FOUND)); |
870 | 6 | } else if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::NOT_LEADER) { |
871 | 0 | RETURN_NOT_OK_SET_CODE(s, CDCError(CDCErrorPB::NOT_LEADER)); |
872 | 6 | } else if (!s.ok()) { |
873 | 0 | RETURN_NOT_OK_SET_CODE(s, CDCError(CDCErrorPB::LEADER_NOT_READY)); |
874 | 0 | } |
875 | | |
876 | 6 | ProducerTabletInfo producer_tablet{"" /* UUID */, req.stream_id(), req.tablet_id()}; |
877 | 6 | OpId checkpoint = OpId::FromPB(req.checkpoint().op_id()); |
878 | | |
879 | 6 | auto session = client()->NewSession(); |
880 | 6 | session->SetDeadline(deadline); |
881 | 6 | RETURN_NOT_OK_SET_CODE( |
882 | 6 | UpdateCheckpoint(producer_tablet, checkpoint, checkpoint, session, GetCurrentTimeMicros()), |
883 | 6 | CDCError(CDCErrorPB::INTERNAL_ERROR)); |
884 | | |
885 | 6 | RETURN_NOT_OK_SET_CODE( |
886 | 6 | DoUpdateCDCConsumerOpId(tablet_peer, checkpoint, req.tablet_id()), |
887 | 6 | CDCError(CDCErrorPB::INTERNAL_ERROR)); |
888 | | |
889 | 6 | return SetCDCCheckpointResponsePB(); |
890 | 6 | } |
891 | | |
892 | | void CDCServiceImpl::DeleteCDCStream(const DeleteCDCStreamRequestPB* req, |
893 | | DeleteCDCStreamResponsePB* resp, |
894 | 0 | RpcContext context) { |
895 | 0 | if (!CheckOnline(req, resp, &context)) { |
896 | 0 | return; |
897 | 0 | } |
898 | | |
899 | 0 | LOG(INFO) << "Received DeleteCDCStream request " << req->ShortDebugString(); |
900 | |
|
901 | 0 | RPC_CHECK_AND_RETURN_ERROR( |
902 | 0 | !req->stream_id().empty(), |
903 | 0 | STATUS(InvalidArgument, "Stream ID or Database stream ID is required to delete CDC stream"), |
904 | 0 | resp->mutable_error(), |
905 | 0 | CDCErrorPB::INVALID_REQUEST, |
906 | 0 | context); |
907 | | |
908 | 0 | vector<CDCStreamId> streams(req->stream_id().begin(), req->stream_id().end()); |
909 | 0 | Status s = client()->DeleteCDCStream( |
910 | 0 | streams, |
911 | 0 | (req->has_force_delete() && req->force_delete()), |
912 | 0 | (req->has_ignore_errors() && req->ignore_errors())); |
913 | 0 | RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); |
914 | | |
915 | 0 | context.RespondSuccess(); |
916 | 0 | } |
917 | | |
918 | | void CDCServiceImpl::ListTablets(const ListTabletsRequestPB* req, |
919 | | ListTabletsResponsePB* resp, |
920 | 0 | RpcContext context) { |
921 | 0 | if (!CheckOnline(req, resp, &context)) { |
922 | 0 | return; |
923 | 0 | } |
924 | | |
925 | 0 | RPC_CHECK_AND_RETURN_ERROR(req->has_stream_id(), |
926 | 0 | STATUS(InvalidArgument, "Stream ID is required to list tablets"), |
927 | 0 | resp->mutable_error(), |
928 | 0 | CDCErrorPB::INVALID_REQUEST, |
929 | 0 | context); |
930 | | |
931 | 0 | auto tablets = GetTablets(req->stream_id()); |
932 | 0 | RPC_CHECK_AND_RETURN_ERROR(tablets.ok(), tablets.status(), resp->mutable_error(), |
933 | 0 | CDCErrorPB::INTERNAL_ERROR, context); |
934 | | |
935 | 0 | if (!req->local_only()) { |
936 | 0 | resp->mutable_tablets()->Reserve(tablets->size()); |
937 | 0 | } |
938 | |
|
939 | 0 | for (const auto& tablet : *tablets) { |
940 | | // Filter local tablets if needed. |
941 | 0 | if (req->local_only()) { |
942 | 0 | bool is_local = false; |
943 | 0 | for (const auto& replica : tablet.replicas()) { |
944 | 0 | if (replica.ts_info().permanent_uuid() == tablet_manager_->server()->permanent_uuid()) { |
945 | 0 | is_local = true; |
946 | 0 | break; |
947 | 0 | } |
948 | 0 | } |
949 | |
|
950 | 0 | if (!is_local) { |
951 | 0 | continue; |
952 | 0 | } |
953 | 0 | } |
954 | | |
955 | 0 | auto res = resp->add_tablets(); |
956 | 0 | res->set_tablet_id(tablet.tablet_id()); |
957 | 0 | res->mutable_tservers()->Reserve(tablet.replicas_size()); |
958 | 0 | for (const auto& replica : tablet.replicas()) { |
959 | 0 | auto tserver = res->add_tservers(); |
960 | 0 | tserver->mutable_broadcast_addresses()->CopyFrom(replica.ts_info().broadcast_addresses()); |
961 | 0 | if (tserver->broadcast_addresses_size() == 0) { |
962 | 0 | LOG(WARNING) << "No public broadcast addresses found for " |
963 | 0 | << replica.ts_info().permanent_uuid() << ". Using private addresses instead."; |
964 | 0 | tserver->mutable_broadcast_addresses()->CopyFrom(replica.ts_info().private_rpc_addresses()); |
965 | 0 | } |
966 | 0 | } |
967 | 0 | } |
968 | |
|
969 | 0 | context.RespondSuccess(); |
970 | 0 | } |
971 | | |
972 | | Result<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> CDCServiceImpl::GetTablets( |
973 | 1 | const CDCStreamId& stream_id) { |
974 | 1 | auto stream_metadata = VERIFY_RESULT0 (GetStream(stream_id));0 |
975 | 0 | client::YBTableName table_name; |
976 | 0 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> all_tablets; |
977 | |
|
978 | 0 | for (const auto& table_id : stream_metadata->table_ids) { |
979 | 0 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets; |
980 | 0 | table_name.set_table_id(table_id); |
981 | 0 | RETURN_NOT_OK(client()->GetTablets( |
982 | 0 | table_name, 0, &tablets, /* partition_list_version =*/nullptr, |
983 | 0 | RequireTabletsRunning::kFalse, master::IncludeInactive::kTrue)); |
984 | | |
985 | 0 | all_tablets.MergeFrom(tablets); |
986 | 0 | } |
987 | | |
988 | 0 | return all_tablets; |
989 | 0 | } |
990 | | |
991 | | void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req, |
992 | | GetChangesResponsePB* resp, |
993 | 642 | RpcContext context) { |
994 | 642 | if (!CheckOnline(req, resp, &context)) { |
995 | 0 | return; |
996 | 0 | } |
997 | 642 | YB_LOG_EVERY_N_SECS(INFO, 300) << "Received GetChanges request " << req->ShortDebugString()93 ; |
998 | | |
999 | 642 | RPC_CHECK_AND_RETURN_ERROR(req->has_tablet_id(), |
1000 | 642 | STATUS(InvalidArgument, "Tablet ID is required to get CDC changes"), |
1001 | 642 | resp->mutable_error(), |
1002 | 642 | CDCErrorPB::INVALID_REQUEST, |
1003 | 642 | context); |
1004 | 642 | RPC_CHECK_AND_RETURN_ERROR(req->has_stream_id() || req->has_db_stream_id(), |
1005 | 642 | STATUS(InvalidArgument, |
1006 | 642 | "Stream ID/DB Stream ID is required to get CDC changes"), |
1007 | 642 | resp->mutable_error(), |
1008 | 642 | CDCErrorPB::INVALID_REQUEST, |
1009 | 642 | context); |
1010 | | |
1011 | 642 | ProducerTabletInfo producer_tablet; |
1012 | 642 | CDCStreamId stream_id = req->has_db_stream_id() ? req->db_stream_id() : req->stream_id()0 ; |
1013 | | |
1014 | 642 | auto session = client()->NewSession(); |
1015 | 642 | CoarseTimePoint deadline = GetDeadline(context, client()); |
1016 | 642 | session->SetDeadline(deadline); |
1017 | | |
1018 | | // Check that requested tablet_id is part of the CDC stream. |
1019 | 642 | producer_tablet = {"" /* UUID */, stream_id, req->tablet_id()}; |
1020 | | |
1021 | 642 | Status s = CheckTabletValidForStream(producer_tablet); |
1022 | 642 | RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context); |
1023 | | |
1024 | 641 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1025 | 641 | s = tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer); |
1026 | 641 | auto original_leader_term = tablet_peer ? tablet_peer->LeaderTerm() : OpId::kUnknownTerm0 ; |
1027 | | |
1028 | | // If we can't serve this tablet... |
1029 | 641 | if (s.IsNotFound() || tablet_peer->LeaderStatus() != consensus::LeaderStatus::LEADER_AND_READY) { |
1030 | 0 | if (req->serve_as_proxy()) { |
1031 | | // Forward GetChanges() to tablet leader. This commonly happens in Kubernetes setups. |
1032 | 0 | auto context_ptr = std::make_shared<RpcContext>(std::move(context)); |
1033 | 0 | TabletLeaderGetChanges(req, resp, context_ptr, tablet_peer); |
1034 | | // Otherwise, figure out the proper return code. |
1035 | 0 | } else if (s.IsNotFound()) { |
1036 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, CDCErrorPB::TABLET_NOT_FOUND, &context); |
1037 | 0 | } else if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::NOT_LEADER) { |
1038 | | // TODO: we may be able to get some changes, even if we're not the leader. |
1039 | 0 | SetupErrorAndRespond(resp->mutable_error(), |
1040 | 0 | STATUS(NotFound, Format("Not leader for $0", req->tablet_id())), |
1041 | 0 | CDCErrorPB::TABLET_NOT_FOUND, &context); |
1042 | 0 | } else { |
1043 | 0 | SetupErrorAndRespond(resp->mutable_error(), |
1044 | 0 | STATUS(LeaderNotReadyToServe, "Not ready to serve"), |
1045 | 0 | CDCErrorPB::LEADER_NOT_READY, &context); |
1046 | 0 | } |
1047 | 0 | return; |
1048 | 0 | } |
1049 | | |
1050 | | // This is the leader tablet, so mark cdc as enabled. |
1051 | 641 | cdc_enabled_.store(true, std::memory_order_release); |
1052 | | |
1053 | 641 | auto res = GetStream(stream_id); |
1054 | 641 | RPC_CHECK_AND_RETURN_ERROR(res.ok(), res.status(), resp->mutable_error(), |
1055 | 641 | CDCErrorPB::INTERNAL_ERROR, context); |
1056 | 641 | StreamMetadata record = **res; |
1057 | | |
1058 | 641 | OpId op_id; |
1059 | 641 | CDCSDKCheckpointPB cdc_sdk_op_id; |
1060 | | // Get opId from request. |
1061 | 641 | if (!GetFromOpId(req, &op_id, &cdc_sdk_op_id)) { |
1062 | 605 | auto result = GetLastCheckpoint(producer_tablet, session); |
1063 | 605 | RPC_CHECK_AND_RETURN_ERROR( |
1064 | 605 | result.ok(), result.status(), resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); |
1065 | 605 | if (record.source_type == XCLUSTER) { |
1066 | 0 | op_id = *result; |
1067 | 605 | } else { |
1068 | 605 | result->ToPB(&cdc_sdk_op_id); |
1069 | 605 | op_id = OpId::FromPB(cdc_sdk_op_id); |
1070 | 605 | } |
1071 | 605 | } |
1072 | | |
1073 | 641 | int64_t last_readable_index; |
1074 | 641 | consensus::ReplicateMsgsHolder msgs_holder; |
1075 | 641 | MemTrackerPtr mem_tracker = impl_->GetMemTracker(tablet_peer, producer_tablet); |
1076 | | |
1077 | | // Calculate deadline to be passed to GetChanges. |
1078 | 641 | CoarseTimePoint get_changes_deadline = CoarseTimePoint::max(); |
1079 | 641 | if (deadline != CoarseTimePoint::max()) { |
1080 | | // Check if we are too close to calculate a safe deadline. |
1081 | 641 | RPC_CHECK_AND_RETURN_ERROR( |
1082 | 641 | deadline - CoarseMonoClock::Now() > 1ms, |
1083 | 641 | STATUS(TimedOut, "Too close to rpc timeout to call GetChanges."), |
1084 | 641 | resp->mutable_error(), |
1085 | 641 | CDCErrorPB::INTERNAL_ERROR, context); |
1086 | | |
1087 | | // Calculate a safe deadline so that CdcProducer::GetChanges times out |
1088 | | // 20% faster than CdcServiceImpl::GetChanges. This gives enough |
1089 | | // time (unless timeouts are unrealistically small) for CdcServiceImpl::GetChanges |
1090 | | // to finish post-processing and return the partial results without itself timing out. |
1091 | 641 | const auto safe_deadline = deadline - |
1092 | 641 | (FLAGS_cdc_read_rpc_timeout_ms * 1ms * FLAGS_cdc_read_safe_deadline_ratio); |
1093 | 641 | get_changes_deadline = ToCoarse(MonoTime::FromUint64(safe_deadline.time_since_epoch().count())); |
1094 | 641 | } |
1095 | | |
1096 | | // Read the latest changes from the Log. |
1097 | 641 | if (record.source_type == XCLUSTER) { |
1098 | 0 | s = cdc::GetChangesForXCluster( |
1099 | 0 | stream_id, req->tablet_id(), op_id, record, tablet_peer, mem_tracker, |
1100 | 0 | &msgs_holder, resp, &last_readable_index, get_changes_deadline); |
1101 | 641 | } else { |
1102 | 641 | std::string commit_timestamp; |
1103 | 641 | OpId last_streamed_op_id; |
1104 | | |
1105 | 641 | auto cached_schema = impl_->GetOrAddSchema(producer_tablet); |
1106 | 641 | s = cdc::GetChangesForCDCSDK( |
1107 | 641 | req->stream_id(), req->tablet_id(), cdc_sdk_op_id, record, tablet_peer, mem_tracker, |
1108 | 641 | &msgs_holder, resp, &commit_timestamp, &cached_schema, |
1109 | 641 | &last_streamed_op_id, &last_readable_index, get_changes_deadline); |
1110 | | |
1111 | 641 | impl_->UpdateCDCStateMetadata( |
1112 | 641 | producer_tablet, commit_timestamp, cached_schema, last_streamed_op_id); |
1113 | 641 | } |
1114 | | |
1115 | 641 | RPC_STATUS_RETURN_ERROR( |
1116 | 641 | s, |
1117 | 641 | resp->mutable_error(), |
1118 | 641 | s.IsNotFound() ? CDCErrorPB::CHECKPOINT_TOO_OLD : CDCErrorPB::UNKNOWN_ERROR, |
1119 | 641 | context); |
1120 | | |
1121 | | // Verify leadership was maintained for the duration of the GetChanges() read. |
1122 | 641 | s = tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer); |
1123 | 641 | if (s.IsNotFound() || tablet_peer->LeaderStatus() != consensus::LeaderStatus::LEADER_AND_READY || |
1124 | 641 | tablet_peer->LeaderTerm() != original_leader_term) { |
1125 | 0 | SetupErrorAndRespond(resp->mutable_error(), |
1126 | 0 | STATUS(NotFound, Format("Not leader for $0", req->tablet_id())), |
1127 | 0 | CDCErrorPB::TABLET_NOT_FOUND, &context); |
1128 | 0 | return; |
1129 | 0 | } |
1130 | | |
1131 | | // Store information about the last server read & remote client ACK. |
1132 | 641 | uint64_t last_record_hybrid_time = resp->records_size() > 0 ? |
1133 | 641 | resp->records(resp->records_size() - 1).time()0 : 0; |
1134 | | |
1135 | 641 | if (record.checkpoint_type == IMPLICIT) { |
1136 | 637 | if (UpdateCheckpointRequired(record, cdc_sdk_op_id)) { |
1137 | 625 | s = UpdateCheckpoint(producer_tablet, OpId::FromPB(resp->checkpoint().op_id()), |
1138 | 625 | op_id, session, last_record_hybrid_time); |
1139 | 625 | } |
1140 | | |
1141 | 637 | RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); |
1142 | | |
1143 | 637 | s = DoUpdateCDCConsumerOpId(tablet_peer, |
1144 | 637 | impl_->GetMinSentCheckpointForTablet(req->tablet_id()), |
1145 | 637 | req->tablet_id()); |
1146 | | |
1147 | 637 | RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); |
1148 | 637 | } |
1149 | | // Update relevant GetChanges metrics before handing off the Response. |
1150 | 641 | UpdateCDCTabletMetrics(resp, producer_tablet, tablet_peer, op_id, last_readable_index); |
1151 | 641 | context.RespondSuccess(); |
1152 | 641 | } |
1153 | | |
1154 | | Status CDCServiceImpl::UpdatePeersCdcMinReplicatedIndex(const TabletId& tablet_id, |
1155 | | int64_t min_index, |
1156 | 219 | int64_t min_term) { |
1157 | 219 | std::vector<client::internal::RemoteTabletServer *> servers; |
1158 | 219 | RETURN_NOT_OK(GetTServers(tablet_id, &servers)); |
1159 | 219 | for (const auto &server : servers) { |
1160 | 219 | if (server->IsLocal()) { |
1161 | | // We modify our log directly. Avoid calling itself through the proxy. |
1162 | 0 | continue; |
1163 | 0 | } |
1164 | 219 | LOG(INFO) << "Modifying remote peer " << server->ToString(); |
1165 | 219 | auto proxy = GetCDCServiceProxy(server); |
1166 | 219 | UpdateCdcReplicatedIndexRequestPB update_index_req; |
1167 | 219 | UpdateCdcReplicatedIndexResponsePB update_index_resp; |
1168 | 219 | update_index_req.set_tablet_id(tablet_id); |
1169 | 219 | update_index_req.set_replicated_index(min_index); |
1170 | 219 | update_index_req.set_replicated_term(min_term); |
1171 | 219 | rpc::RpcController rpc; |
1172 | 219 | rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms)); |
1173 | 219 | RETURN_NOT_OK(proxy->UpdateCdcReplicatedIndex(update_index_req, &update_index_resp, &rpc)); |
1174 | 219 | if (update_index_resp.has_error()) { |
1175 | 0 | return StatusFromPB(update_index_resp.error().status()); |
1176 | 0 | } |
1177 | 219 | } |
1178 | 219 | return Status::OK(); |
1179 | 219 | } |
1180 | | |
1181 | | void CDCServiceImpl::ComputeLagMetric(int64_t last_replicated_micros, |
1182 | | int64_t metric_last_timestamp_micros, |
1183 | | int64_t cdc_state_last_replication_time_micros, |
1184 | 440 | scoped_refptr<AtomicGauge<int64_t>> metric) { |
1185 | 440 | if (metric_last_timestamp_micros == 0) { |
1186 | | // The tablet metric timestamp is uninitialized, so try to use last replicated time in cdc |
1187 | | // state. |
1188 | 398 | if (cdc_state_last_replication_time_micros == 0) { |
1189 | | // Last replicated time in cdc state is uninitialized as well, so set the metric value to |
1190 | | // 0 and update later when we have a suitable lower bound. |
1191 | 398 | metric->set_value(0); |
1192 | 398 | } else { |
1193 | 0 | metric->set_value(last_replicated_micros - cdc_state_last_replication_time_micros); |
1194 | 0 | } |
1195 | 398 | } else { |
1196 | 42 | metric->set_value(last_replicated_micros - metric_last_timestamp_micros); |
1197 | 42 | } |
1198 | 440 | } |
1199 | | |
1200 | 94 | void CDCServiceImpl::UpdateLagMetrics() { |
1201 | 94 | auto tablet_checkpoints = impl_->TabletCheckpointsCopy(); |
1202 | | |
1203 | 94 | auto cdc_state_table_result = GetCdcStateTable(); |
1204 | 94 | if (!cdc_state_table_result.ok()) { |
1205 | | // It is possible that this runs before the cdc_state table is created. This is |
1206 | | // ok. It just means that this is the first time the cluster starts. |
1207 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 30) |
1208 | 0 | << "Unable to open table " << kCdcStateTableName.table_name() << " for metrics update."; |
1209 | 0 | return; |
1210 | 0 | } |
1211 | | |
1212 | 94 | std::unordered_set<ProducerTabletInfo, ProducerTabletInfo::Hash> tablets_in_cdc_state_table; |
1213 | 94 | client::TableIteratorOptions options; |
1214 | 94 | options.columns = std::vector<string>{ |
1215 | 94 | master::kCdcTabletId, master::kCdcStreamId, master::kCdcLastReplicationTime}; |
1216 | 94 | bool failed = false; |
1217 | 94 | options.error_handler = [&failed](const Status& status) { |
1218 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 30) << "Scan of table " << kCdcStateTableName.table_name() |
1219 | 0 | << " failed: " << status << ". Could not update metrics."; |
1220 | 0 | failed = true; |
1221 | 0 | }; |
1222 | | // First go through tablets in the cdc_state table and update metrics for each one. |
1223 | 245 | for (const auto& row : client::TableRange(**cdc_state_table_result, options)) { |
1224 | 245 | auto tablet_id = row.column(master::kCdcTabletIdIdx).string_value(); |
1225 | 245 | auto stream_id = row.column(master::kCdcStreamIdIdx).string_value(); |
1226 | 245 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1227 | 245 | Status s = tablet_manager_->GetTabletPeer(tablet_id, &tablet_peer); |
1228 | 245 | if (s.IsNotFound()) { |
1229 | 25 | continue; |
1230 | 25 | } |
1231 | | |
1232 | 220 | ProducerTabletInfo tablet_info = {"" /* universe_uuid */, stream_id, tablet_id}; |
1233 | 220 | tablets_in_cdc_state_table.insert(tablet_info); |
1234 | 220 | auto tablet_metric = GetCDCTabletMetrics(tablet_info, tablet_peer); |
1235 | 220 | if (!tablet_metric) { |
1236 | 0 | continue; |
1237 | 0 | } |
1238 | 220 | if (tablet_peer->LeaderStatus() != consensus::LeaderStatus::LEADER_AND_READY) { |
1239 | | // Set lag to 0 because we're not the leader for this tablet anymore, which means another peer |
1240 | | // is responsible for tracking this tablet's lag. |
1241 | 0 | tablet_metric->async_replication_sent_lag_micros->set_value(0); |
1242 | 0 | tablet_metric->async_replication_committed_lag_micros->set_value(0); |
1243 | 220 | } else { |
1244 | | // Get the physical time of the last committed record on producer. |
1245 | 220 | auto last_replicated_micros = GetLastReplicatedTime(tablet_peer); |
1246 | 220 | const auto& timestamp_ql_value = row.column(2); |
1247 | 220 | auto cdc_state_last_replication_time_micros = |
1248 | 220 | !timestamp_ql_value.IsNull() ? |
1249 | 220 | timestamp_ql_value.timestamp_value().ToInt64()0 : 0; |
1250 | 220 | auto last_sent_micros = tablet_metric->last_read_physicaltime->value(); |
1251 | 220 | ComputeLagMetric(last_replicated_micros, last_sent_micros, |
1252 | 220 | cdc_state_last_replication_time_micros, |
1253 | 220 | tablet_metric->async_replication_sent_lag_micros); |
1254 | 220 | auto last_committed_micros = tablet_metric->last_checkpoint_physicaltime->value(); |
1255 | 220 | ComputeLagMetric(last_replicated_micros, last_committed_micros, |
1256 | 220 | cdc_state_last_replication_time_micros, |
1257 | 220 | tablet_metric->async_replication_committed_lag_micros); |
1258 | 220 | } |
1259 | 220 | } |
1260 | 94 | if (failed) { |
1261 | 0 | RefreshCdcStateTable(); |
1262 | 0 | return; |
1263 | 0 | } |
1264 | | |
1265 | | // Now, go through tablets in tablet_checkpoints_ and set lag to 0 for all tablets we're no |
1266 | | // longer replicating. |
1267 | 265 | for (const auto& checkpoint : tablet_checkpoints)94 { |
1268 | 265 | const ProducerTabletInfo& tablet_info = checkpoint.producer_tablet_info; |
1269 | 265 | if (tablets_in_cdc_state_table.find(tablet_info) == tablets_in_cdc_state_table.end()) { |
1270 | | // We're no longer replicating this tablet, so set lag to 0. |
1271 | 45 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1272 | 45 | Status s = tablet_manager_->GetTabletPeer(checkpoint.tablet_id(), &tablet_peer); |
1273 | 45 | if (s.IsNotFound()) { |
1274 | 25 | continue; |
1275 | 25 | } |
1276 | 20 | auto tablet_metric = GetCDCTabletMetrics(checkpoint.producer_tablet_info, tablet_peer); |
1277 | 20 | if (!tablet_metric) { |
1278 | 0 | continue; |
1279 | 0 | } |
1280 | 20 | tablet_metric->async_replication_sent_lag_micros->set_value(0); |
1281 | 20 | tablet_metric->async_replication_committed_lag_micros->set_value(0); |
1282 | 20 | } |
1283 | 265 | } |
1284 | 94 | } |
1285 | | |
1286 | 1.22k | bool CDCServiceImpl::ShouldUpdateLagMetrics(MonoTime time_since_update_metrics) { |
1287 | | // Only update metrics if cdc is enabled, which means we have a valid replication stream. |
1288 | 1.22k | return GetAtomicFlag(&FLAGS_enable_collect_cdc_metrics) && |
1289 | 1.22k | (time_since_update_metrics == MonoTime::kUninitialized || |
1290 | 1.22k | MonoTime::Now() - time_since_update_metrics >= |
1291 | 1.13k | MonoDelta::FromMilliseconds(GetAtomicFlag(&FLAGS_update_metrics_interval_ms))); |
1292 | 1.22k | } |
1293 | | |
1294 | 0 | bool CDCServiceImpl::CDCEnabled() { |
1295 | 0 | return cdc_enabled_.load(std::memory_order_acquire); |
1296 | 0 | } |
1297 | | |
1298 | 1.10k | Result<std::shared_ptr<client::TableHandle>> CDCServiceImpl::GetCdcStateTable() { |
1299 | 1.10k | bool use_cache = GetAtomicFlag(&FLAGS_enable_cdc_state_table_caching); |
1300 | 1.10k | { |
1301 | 1.10k | SharedLock<decltype(mutex_)> l(mutex_); |
1302 | 1.10k | if (cdc_state_table_ && use_cache1.01k ) { |
1303 | 1.01k | return cdc_state_table_; |
1304 | 1.01k | } |
1305 | 98 | if (cdc_service_stopped_) { |
1306 | 0 | return STATUS(ShutdownInProgress, ""); |
1307 | 0 | } |
1308 | 98 | } |
1309 | | |
1310 | 98 | auto cdc_state_table = std::make_shared<yb::client::TableHandle>(); |
1311 | 98 | auto s = cdc_state_table->Open(kCdcStateTableName, client()); |
1312 | | // It is possible that this runs before the cdc_state table is created. |
1313 | 98 | RETURN_NOT_OK(s); |
1314 | | |
1315 | 98 | { |
1316 | 98 | std::lock_guard<decltype(mutex_)> l(mutex_); |
1317 | 98 | if (cdc_state_table_ && use_cache0 ) { |
1318 | 0 | return cdc_state_table_; |
1319 | 0 | } |
1320 | 98 | if (cdc_service_stopped_) { |
1321 | 0 | return STATUS(ShutdownInProgress, ""); |
1322 | 0 | } |
1323 | 98 | cdc_state_table_ = cdc_state_table; |
1324 | 98 | return cdc_state_table_; |
1325 | 98 | } |
1326 | 98 | } |
1327 | | |
1328 | 0 | void CDCServiceImpl::RefreshCdcStateTable() { |
1329 | | // Set cached value to null so we regenerate it on the next call. |
1330 | 0 | std::lock_guard<decltype(mutex_)> l(mutex_); |
1331 | 0 | cdc_state_table_ = nullptr; |
1332 | 0 | } |
1333 | | |
1334 | 921 | Status CDCServiceImpl::RefreshCacheOnFail(const Status& s) { |
1335 | 921 | if (!s.ok()) { |
1336 | 0 | RefreshCdcStateTable(); |
1337 | 0 | } |
1338 | 921 | return s; |
1339 | 921 | } |
1340 | | |
1341 | | MicrosTime CDCServiceImpl::GetLastReplicatedTime( |
1342 | 861 | const std::shared_ptr<tablet::TabletPeer>& tablet_peer) { |
1343 | 861 | tablet::RemoveIntentsData data; |
1344 | 861 | auto status = tablet_peer->GetLastReplicatedData(&data); |
1345 | 861 | return status.ok() ? data.log_ht.GetPhysicalValueMicros() : 00 ; |
1346 | 861 | } |
1347 | | |
1348 | 8.74k | void CDCServiceImpl::UpdatePeersAndMetrics() { |
1349 | 8.74k | int64_t current_term = -1; |
1350 | 8.74k | MonoTime time_since_update_peers = MonoTime::kUninitialized; |
1351 | 8.74k | MonoTime time_since_update_metrics = MonoTime::kUninitialized; |
1352 | | |
1353 | | // Returns false if the CDC service has been stopped. |
1354 | 48.7M | auto sleep_while_not_stopped = [this]() { |
1355 | 48.7M | int min_sleep_ms = std::min(100, GetAtomicFlag(&FLAGS_update_metrics_interval_ms)); |
1356 | 48.7M | auto sleep_period = MonoDelta::FromMilliseconds(min_sleep_ms); |
1357 | 48.7M | SleepFor(sleep_period); |
1358 | | |
1359 | 48.7M | SharedLock<decltype(mutex_)> l(mutex_); |
1360 | 48.7M | return !cdc_service_stopped_; |
1361 | 48.7M | }; |
1362 | | |
1363 | 48.7M | do { |
1364 | 48.7M | if (!cdc_enabled_.load(std::memory_order_acquire)) { |
1365 | | // Have not yet received any GetChanges requests, so skip background thread work. |
1366 | 48.7M | continue; |
1367 | 48.7M | } |
1368 | | // Should we update lag metrics default every 1s. |
1369 | 1.22k | if (ShouldUpdateLagMetrics(time_since_update_metrics)) { |
1370 | 94 | UpdateLagMetrics(); |
1371 | 94 | time_since_update_metrics = MonoTime::Now(); |
1372 | 94 | } |
1373 | | |
1374 | | // If its not been 60s since the last peer update, continue. |
1375 | 1.22k | if (!FLAGS_enable_log_retention_by_op_idx || |
1376 | 1.22k | (time_since_update_peers != MonoTime::kUninitialized && |
1377 | 1.22k | MonoTime::Now() - time_since_update_peers < |
1378 | 1.13k | MonoDelta::FromSeconds(GetAtomicFlag(&FLAGS_update_min_cdc_indices_interval_secs)))) { |
1379 | 1.13k | continue; |
1380 | 1.13k | } |
1381 | | |
1382 | 93 | time_since_update_peers = MonoTime::Now(); |
1383 | 93 | LOG(INFO) << "Started to read minimum replicated indices for all tablets"; |
1384 | | |
1385 | 93 | auto cdc_state_table_result = GetCdcStateTable(); |
1386 | 93 | if (!cdc_state_table_result.ok()) { |
1387 | | // It is possible that this runs before the cdc_state table is created. This is |
1388 | | // ok. It just means that this is the first time the cluster starts. |
1389 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 3600) << "Unable to open table " |
1390 | 0 | << kCdcStateTableName.table_name() |
1391 | 0 | << ". CDC min replicated indices won't be updated"; |
1392 | 0 | continue; |
1393 | 0 | } |
1394 | | |
1395 | 93 | int count = 0; |
1396 | 93 | std::unordered_map<std::string, int64_t> tablet_min_checkpoint_index; |
1397 | 93 | client::TableIteratorOptions options; |
1398 | 93 | bool failed = false; |
1399 | 93 | options.error_handler = [&failed](const Status& status) { |
1400 | 0 | LOG(WARNING) << "Scan of table " << kCdcStateTableName.table_name() << " failed: " << status; |
1401 | 0 | failed = true; |
1402 | 0 | }; |
1403 | 93 | options.columns = std::vector<std::string>{master::kCdcTabletId, master::kCdcStreamId, |
1404 | 93 | master::kCdcCheckpoint, master::kCdcLastReplicationTime}; |
1405 | 219 | for (const auto& row : client::TableRange(**cdc_state_table_result, options)) { |
1406 | 219 | count++; |
1407 | 219 | auto tablet_id = row.column(master::kCdcTabletIdIdx).string_value(); |
1408 | 219 | auto stream_id = row.column(master::kCdcStreamIdIdx).string_value(); |
1409 | 219 | auto checkpoint = row.column(master::kCdcCheckpointIdx).string_value(); |
1410 | 219 | std::string last_replicated_time_str; |
1411 | 219 | const auto& timestamp_ql_value = row.column(3); |
1412 | 219 | if (!timestamp_ql_value.IsNull()) { |
1413 | 0 | last_replicated_time_str = timestamp_ql_value.timestamp_value().ToFormattedString(); |
1414 | 0 | } |
1415 | | |
1416 | 219 | VLOG(1) << "stream_id: " << stream_id << ", tablet_id: " << tablet_id |
1417 | 0 | << ", checkpoint: " << checkpoint << ", last replicated time: " |
1418 | 0 | << last_replicated_time_str; |
1419 | | |
1420 | 219 | auto result = OpId::FromString(checkpoint); |
1421 | 219 | if (!result.ok()) { |
1422 | 0 | LOG(WARNING) << "Read invalid op id " << row.column(1).string_value() |
1423 | 0 | << " for tablet " << tablet_id; |
1424 | 0 | continue; |
1425 | 0 | } |
1426 | | |
1427 | 219 | auto index = (*result).index; |
1428 | 219 | current_term = (*result).term; |
1429 | 219 | auto it = tablet_min_checkpoint_index.find(tablet_id); |
1430 | 219 | if (it == tablet_min_checkpoint_index.end()) { |
1431 | 219 | tablet_min_checkpoint_index[tablet_id] = index; |
1432 | 219 | } else { |
1433 | 0 | if (index < it->second) { |
1434 | 0 | it->second = index; |
1435 | 0 | } |
1436 | 0 | } |
1437 | 219 | } |
1438 | 93 | if (failed) { |
1439 | 0 | RefreshCdcStateTable(); |
1440 | 0 | continue; |
1441 | 0 | } |
1442 | 93 | LOG(INFO) << "Read " << count << " records from " << kCdcStateTableName.table_name(); |
1443 | | |
1444 | 93 | VLOG(3) << "tablet_min_checkpoint_index size " << tablet_min_checkpoint_index.size()0 ; |
1445 | 219 | for (const auto &elem : tablet_min_checkpoint_index) { |
1446 | 219 | auto tablet_id = elem.first; |
1447 | 219 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1448 | | |
1449 | 219 | Status s = tablet_manager_->GetTabletPeer(tablet_id, &tablet_peer); |
1450 | 219 | if (s.IsNotFound()) { |
1451 | 0 | VLOG(2) << "Did not found tablet peer for tablet " << tablet_id; |
1452 | 0 | continue; |
1453 | 219 | } else if (!IsTabletPeerLeader(tablet_peer)) { |
1454 | 0 | VLOG(2) << "Tablet peer " << tablet_peer->permanent_uuid() |
1455 | 0 | << " is not the leader for tablet " << tablet_id; |
1456 | 0 | continue; |
1457 | 219 | } else if (!s.ok()) { |
1458 | 0 | LOG(WARNING) << "Error getting tablet_peer for tablet " << tablet_id << ": " << s; |
1459 | 0 | continue; |
1460 | 0 | } |
1461 | | |
1462 | 219 | auto min_index = elem.second; |
1463 | 219 | s = tablet_peer->set_cdc_min_replicated_index(min_index); |
1464 | 219 | if (!s.ok()) { |
1465 | 0 | LOG(WARNING) << "Unable to set cdc min index for tablet peer " |
1466 | 0 | << tablet_peer->permanent_uuid() |
1467 | 0 | << " and tablet " << tablet_peer->tablet_id() |
1468 | 0 | << ": " << s; |
1469 | 0 | } |
1470 | 219 | VLOG(1) << "Updating followers for tablet " << tablet_id << " with index " << min_index0 ; |
1471 | 219 | WARN_NOT_OK(UpdatePeersCdcMinReplicatedIndex(tablet_id, min_index, current_term), |
1472 | 219 | "UpdatePeersCdcMinReplicatedIndex failed"); |
1473 | 219 | } |
1474 | 93 | LOG(INFO) << "Done reading all the indices for all tablets and updating peers"; |
1475 | 48.7M | } while (sleep_while_not_stopped()); |
1476 | 8.74k | } |
1477 | | |
1478 | | Result<client::internal::RemoteTabletPtr> CDCServiceImpl::GetRemoteTablet( |
1479 | 219 | const TabletId& tablet_id) { |
1480 | 219 | std::promise<Result<client::internal::RemoteTabletPtr>> tablet_lookup_promise; |
1481 | 219 | auto future = tablet_lookup_promise.get_future(); |
1482 | 219 | auto callback = [&tablet_lookup_promise]( |
1483 | 219 | const Result<client::internal::RemoteTabletPtr>& result) { |
1484 | 219 | tablet_lookup_promise.set_value(result); |
1485 | 219 | }; |
1486 | | |
1487 | 219 | auto start = CoarseMonoClock::Now(); |
1488 | 219 | client()->LookupTabletById( |
1489 | 219 | tablet_id, |
1490 | 219 | /* table =*/ nullptr, |
1491 | | // In case this is a split parent tablet, it will be hidden so we need this flag to access it. |
1492 | 219 | master::IncludeInactive::kTrue, |
1493 | 219 | CoarseMonoClock::Now() + MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms), |
1494 | 219 | callback, client::UseCache::kFalse); |
1495 | 219 | future.wait(); |
1496 | | |
1497 | 219 | auto duration = CoarseMonoClock::Now() - start; |
1498 | 219 | if (duration > (kMaxDurationForTabletLookup * 1ms)) { |
1499 | 0 | LOG(WARNING) << "LookupTabletByKey took long time: " << duration << " ms"; |
1500 | 0 | } |
1501 | | |
1502 | 219 | auto remote_tablet = VERIFY_RESULT(future.get()); |
1503 | 0 | return remote_tablet; |
1504 | 219 | } |
1505 | | |
1506 | 0 | Result<RemoteTabletServer *> CDCServiceImpl::GetLeaderTServer(const TabletId& tablet_id) { |
1507 | 0 | auto result = VERIFY_RESULT(GetRemoteTablet(tablet_id)); |
1508 | | |
1509 | 0 | auto ts = result->LeaderTServer(); |
1510 | 0 | if (ts == nullptr) { |
1511 | 0 | return STATUS(NotFound, "Tablet leader not found for tablet", tablet_id); |
1512 | 0 | } |
1513 | 0 | return ts; |
1514 | 0 | } |
1515 | | |
1516 | | Status CDCServiceImpl::GetTServers(const TabletId& tablet_id, |
1517 | 219 | std::vector<client::internal::RemoteTabletServer*>* servers) { |
1518 | 219 | auto result = VERIFY_RESULT(GetRemoteTablet(tablet_id)); |
1519 | | |
1520 | 0 | result->GetRemoteTabletServers(servers); |
1521 | 219 | return Status::OK(); |
1522 | 219 | } |
1523 | | |
1524 | 219 | std::shared_ptr<CDCServiceProxy> CDCServiceImpl::GetCDCServiceProxy(RemoteTabletServer* ts) { |
1525 | 219 | auto hostport = HostPortFromPB(ts->DesiredHostPort(client()->cloud_info())); |
1526 | 219 | DCHECK(!hostport.host().empty()); |
1527 | | |
1528 | 219 | { |
1529 | 219 | SharedLock<decltype(mutex_)> l(mutex_); |
1530 | 219 | auto it = cdc_service_map_.find(hostport); |
1531 | 219 | if (it != cdc_service_map_.end()) { |
1532 | 126 | return it->second; |
1533 | 126 | } |
1534 | 219 | } |
1535 | | |
1536 | 93 | auto cdc_service = std::make_shared<CDCServiceProxy>(&client()->proxy_cache(), hostport); |
1537 | | |
1538 | 93 | { |
1539 | 93 | std::lock_guard<decltype(mutex_)> l(mutex_); |
1540 | 93 | auto it = cdc_service_map_.find(hostport); |
1541 | 93 | if (it != cdc_service_map_.end()) { |
1542 | 0 | return it->second; |
1543 | 0 | } |
1544 | 93 | cdc_service_map_.emplace(hostport, cdc_service); |
1545 | 93 | } |
1546 | 0 | return cdc_service; |
1547 | 93 | } |
1548 | | |
1549 | | void CDCServiceImpl::TabletLeaderGetChanges(const GetChangesRequestPB* req, |
1550 | | GetChangesResponsePB* resp, |
1551 | | std::shared_ptr<RpcContext> context, |
1552 | 0 | std::shared_ptr<tablet::TabletPeer> peer) { |
1553 | 0 | auto rpc_handle = rpcs_.Prepare(); |
1554 | 0 | RPC_CHECK_AND_RETURN_ERROR(rpc_handle != rpcs_.InvalidHandle(), |
1555 | 0 | STATUS(Aborted, |
1556 | 0 | Format("Could not create valid handle for GetChangesCDCRpc: tablet=$0, peer=$1", |
1557 | 0 | req->tablet_id(), |
1558 | 0 | peer->permanent_uuid())), |
1559 | 0 | resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, *context.get()); |
1560 | | |
1561 | | // Increment Proxy Metric. |
1562 | 0 | server_metrics_->cdc_rpc_proxy_count->Increment(); |
1563 | | |
1564 | | // Forward this Request Info to the proper TabletServer. |
1565 | 0 | GetChangesRequestPB new_req; |
1566 | 0 | new_req.CopyFrom(*req); |
1567 | 0 | new_req.set_serve_as_proxy(false); |
1568 | 0 | CoarseTimePoint deadline = GetDeadline(*context.get(), client()); |
1569 | |
|
1570 | 0 | *rpc_handle = CreateGetChangesCDCRpc( |
1571 | 0 | deadline, |
1572 | 0 | nullptr, /* RemoteTablet: will get this from 'new_req' */ |
1573 | 0 | client(), |
1574 | 0 | &new_req, |
1575 | 0 | [=] (Status status, GetChangesResponsePB&& new_resp) { |
1576 | 0 | auto retained = rpcs_.Unregister(rpc_handle); |
1577 | 0 | *resp = std::move(new_resp); |
1578 | 0 | RPC_STATUS_RETURN_ERROR(status, resp->mutable_error(), resp->error().code(), |
1579 | 0 | *context.get()); |
1580 | 0 | context->RespondSuccess(); |
1581 | 0 | }); |
1582 | 0 | (**rpc_handle).SendRpc(); |
1583 | 0 | } |
1584 | | |
1585 | | void CDCServiceImpl::TabletLeaderGetCheckpoint(const GetCheckpointRequestPB* req, |
1586 | | GetCheckpointResponsePB* resp, |
1587 | | RpcContext* context, |
1588 | 0 | const std::shared_ptr<tablet::TabletPeer>& peer) { |
1589 | 0 | auto result = GetLeaderTServer(req->tablet_id()); |
1590 | 0 | RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(), |
1591 | 0 | CDCErrorPB::TABLET_NOT_FOUND, *context); |
1592 | | |
1593 | 0 | auto ts_leader = *result; |
1594 | | // Check that tablet leader identified by master is not current tablet peer. |
1595 | | // This can happen during tablet rebalance if master and tserver have different views of |
1596 | | // leader. We need to avoid self-looping in this case. |
1597 | 0 | if (peer) { |
1598 | 0 | RPC_CHECK_NE_AND_RETURN_ERROR(ts_leader->permanent_uuid(), peer->permanent_uuid(), |
1599 | 0 | STATUS(IllegalState, |
1600 | 0 | Format("Tablet leader changed: leader=$0, peer=$1", |
1601 | 0 | ts_leader->permanent_uuid(), |
1602 | 0 | peer->permanent_uuid())), |
1603 | 0 | resp->mutable_error(), CDCErrorPB::NOT_LEADER, *context); |
1604 | 0 | } |
1605 | | |
1606 | 0 | auto cdc_proxy = GetCDCServiceProxy(ts_leader); |
1607 | 0 | rpc::RpcController rpc; |
1608 | 0 | rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms)); |
1609 | | // TODO(NIC): Change to GetCheckpointAsync like CDCPoller::DoPoll. |
1610 | 0 | auto status = cdc_proxy->GetCheckpoint(*req, resp, &rpc); |
1611 | 0 | RPC_STATUS_RETURN_ERROR(status, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, *context); |
1612 | 0 | context->RespondSuccess(); |
1613 | 0 | } |
1614 | | |
1615 | | void CDCServiceImpl::GetCheckpoint(const GetCheckpointRequestPB* req, |
1616 | | GetCheckpointResponsePB* resp, |
1617 | 14 | RpcContext context) { |
1618 | 14 | if (!CheckOnline(req, resp, &context)) { |
1619 | 0 | return; |
1620 | 0 | } |
1621 | | |
1622 | 14 | RPC_CHECK_AND_RETURN_ERROR(req->has_tablet_id(), |
1623 | 14 | STATUS(InvalidArgument, "Tablet ID is required to get CDC checkpoint"), |
1624 | 14 | resp->mutable_error(), |
1625 | 14 | CDCErrorPB::INVALID_REQUEST, |
1626 | 14 | context); |
1627 | 14 | RPC_CHECK_AND_RETURN_ERROR(req->has_stream_id(), |
1628 | 14 | STATUS(InvalidArgument, "Stream ID is required to get CDC checkpoint"), |
1629 | 14 | resp->mutable_error(), |
1630 | 14 | CDCErrorPB::INVALID_REQUEST, |
1631 | 14 | context); |
1632 | | |
1633 | 14 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1634 | 14 | Status s = tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer); |
1635 | | |
1636 | 14 | if (s.IsNotFound() || !IsTabletPeerLeader(tablet_peer)) { |
1637 | | // Forward GetChanges() to tablet leader. This happens often in Kubernetes setups. |
1638 | 0 | TabletLeaderGetCheckpoint(req, resp, &context, tablet_peer); |
1639 | 0 | return; |
1640 | 0 | } |
1641 | | |
1642 | | // Check that requested tablet_id is part of the CDC stream. |
1643 | 14 | ProducerTabletInfo producer_tablet = {"" /* UUID */, req->stream_id(), req->tablet_id()}; |
1644 | 14 | s = CheckTabletValidForStream(producer_tablet); |
1645 | 14 | RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context); |
1646 | | |
1647 | 14 | auto session = client()->NewSession(); |
1648 | 14 | CoarseTimePoint deadline = GetDeadline(context, client()); |
1649 | | |
1650 | 14 | session->SetDeadline(deadline); |
1651 | | |
1652 | 14 | auto result = GetLastCheckpoint(producer_tablet, session); |
1653 | 14 | RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(), |
1654 | 14 | CDCErrorPB::INTERNAL_ERROR, context); |
1655 | | |
1656 | 14 | result->ToPB(resp->mutable_checkpoint()->mutable_op_id()); |
1657 | 14 | context.RespondSuccess(); |
1658 | 14 | } |
1659 | | |
1660 | | void CDCServiceImpl::UpdateCdcReplicatedIndex(const UpdateCdcReplicatedIndexRequestPB* req, |
1661 | | UpdateCdcReplicatedIndexResponsePB* resp, |
1662 | 219 | rpc::RpcContext context) { |
1663 | 219 | if (!CheckOnline(req, resp, &context)) { |
1664 | 0 | return; |
1665 | 0 | } |
1666 | | |
1667 | 219 | RPC_CHECK_AND_RETURN_ERROR(req->has_tablet_id(), |
1668 | 219 | STATUS(InvalidArgument, |
1669 | 219 | "Tablet ID is required to set the log replicated index"), |
1670 | 219 | resp->mutable_error(), |
1671 | 219 | CDCErrorPB::INVALID_REQUEST, |
1672 | 219 | context); |
1673 | | |
1674 | 219 | RPC_CHECK_AND_RETURN_ERROR(req->has_replicated_index(), |
1675 | 219 | STATUS(InvalidArgument, |
1676 | 219 | "Replicated index is required to set the log replicated index"), |
1677 | 219 | resp->mutable_error(), |
1678 | 219 | CDCErrorPB::INVALID_REQUEST, |
1679 | 219 | context); |
1680 | | |
1681 | 219 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1682 | 219 | RPC_STATUS_RETURN_ERROR(tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer), |
1683 | 219 | resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); |
1684 | | |
1685 | 219 | RPC_CHECK_AND_RETURN_ERROR(tablet_peer->log_available(), |
1686 | 219 | STATUS(TryAgain, "Tablet peer is not ready to set its log cdc index"), |
1687 | 219 | resp->mutable_error(), |
1688 | 219 | CDCErrorPB::INTERNAL_ERROR, |
1689 | 219 | context); |
1690 | | |
1691 | 219 | RPC_STATUS_RETURN_ERROR(tablet_peer->set_cdc_min_replicated_index(req->replicated_index()), |
1692 | 219 | resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); |
1693 | | |
1694 | 219 | auto status = DoUpdateCDCConsumerOpId(tablet_peer, |
1695 | 219 | OpId(req->replicated_term(), req->replicated_index()), |
1696 | 219 | req->tablet_id()); |
1697 | | |
1698 | 219 | RPC_STATUS_RETURN_ERROR(status, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); |
1699 | | |
1700 | 219 | { |
1701 | 219 | RequestScope request_scope; |
1702 | 219 | auto txn_participant = tablet_peer->tablet()->transaction_participant(); |
1703 | 219 | if (txn_participant) { |
1704 | 219 | VLOG(1) << "Registering and unregistering request so that transactions are " |
1705 | 0 | "cleaned up on followers."; |
1706 | 219 | request_scope = RequestScope(txn_participant); |
1707 | 219 | } |
1708 | 219 | } |
1709 | | |
1710 | 219 | context.RespondSuccess(); |
1711 | 219 | } |
1712 | | |
1713 | 0 | Result<OpId> CDCServiceImpl::TabletLeaderLatestEntryOpId(const TabletId& tablet_id) { |
1714 | 0 | auto ts_leader = VERIFY_RESULT(GetLeaderTServer(tablet_id)); |
1715 | | |
1716 | 0 | auto cdc_proxy = GetCDCServiceProxy(ts_leader); |
1717 | 0 | rpc::RpcController rpc; |
1718 | 0 | rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms)); |
1719 | 0 | GetLatestEntryOpIdRequestPB req; |
1720 | 0 | GetLatestEntryOpIdResponsePB resp; |
1721 | 0 | req.set_tablet_id(tablet_id); |
1722 | 0 | auto status = cdc_proxy->GetLatestEntryOpId(req, &resp, &rpc); |
1723 | 0 | if (!status.ok()) { |
1724 | | // If we failed to get the latest entry op id, we try other tservers. The leader is guaranteed |
1725 | | // to have the most up-to-date information, but for our purposes, it's ok to be slightly |
1726 | | // behind. |
1727 | 0 | std::vector<client::internal::RemoteTabletServer *> servers; |
1728 | 0 | auto s = GetTServers(tablet_id, &servers); |
1729 | 0 | for (const auto& server : servers) { |
1730 | | // We don't want to try the leader again. |
1731 | 0 | if (server->permanent_uuid() == ts_leader->permanent_uuid()) { |
1732 | 0 | continue; |
1733 | 0 | } |
1734 | 0 | auto follower_cdc_proxy = GetCDCServiceProxy(server); |
1735 | 0 | status = follower_cdc_proxy->GetLatestEntryOpId(req, &resp, &rpc); |
1736 | 0 | if (status.ok()) { |
1737 | 0 | return OpId::FromPB(resp.op_id()); |
1738 | 0 | } |
1739 | 0 | } |
1740 | 0 | DCHECK(!status.ok()); |
1741 | 0 | return status; |
1742 | 0 | } |
1743 | 0 | return OpId::FromPB(resp.op_id()); |
1744 | 0 | } |
1745 | | |
1746 | | void CDCServiceImpl::GetLatestEntryOpId(const GetLatestEntryOpIdRequestPB* req, |
1747 | | GetLatestEntryOpIdResponsePB* resp, |
1748 | 0 | rpc::RpcContext context) { |
1749 | 0 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1750 | 0 | Status s = tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer); |
1751 | 0 | RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); |
1752 | | |
1753 | 0 | if (!tablet_peer->log_available()) { |
1754 | 0 | const string err_message = strings::Substitute("Unable to get the latest entry op id from " |
1755 | 0 | "peer $0 and tablet $1 because its log object hasn't been initialized", |
1756 | 0 | tablet_peer->permanent_uuid(), tablet_peer->tablet_id()); |
1757 | 0 | LOG(WARNING) << err_message; |
1758 | 0 | SetupErrorAndRespond(resp->mutable_error(), |
1759 | 0 | STATUS(ServiceUnavailable, err_message), |
1760 | 0 | CDCErrorPB::INTERNAL_ERROR, |
1761 | 0 | &context); |
1762 | 0 | return; |
1763 | 0 | } |
1764 | 0 | OpId op_id = tablet_peer->log()->GetLatestEntryOpId(); |
1765 | 0 | op_id.ToPB(resp->mutable_op_id()); |
1766 | 0 | context.RespondSuccess(); |
1767 | 0 | } |
1768 | | |
1769 | | void CDCServiceImpl::GetCDCDBStreamInfo(const GetCDCDBStreamInfoRequestPB* req, |
1770 | | GetCDCDBStreamInfoResponsePB* resp, |
1771 | 0 | rpc::RpcContext context) { |
1772 | 0 | if (!CheckOnline(req, resp, &context)) { |
1773 | 0 | return; |
1774 | 0 | } |
1775 | | |
1776 | 0 | LOG(INFO) << "Received GetCDCDBStreamInfo request " << req->ShortDebugString(); |
1777 | |
|
1778 | 0 | RPC_CHECK_AND_RETURN_ERROR( |
1779 | 0 | req->has_db_stream_id(), |
1780 | 0 | STATUS(InvalidArgument, "Database Stream ID is required to get DB stream information"), |
1781 | 0 | resp->mutable_error(), |
1782 | 0 | CDCErrorPB::INVALID_REQUEST, |
1783 | 0 | context); |
1784 | | |
1785 | 0 | std::vector<pair<std::string, std::string>> db_stream_info; |
1786 | 0 | Status s = client()->GetCDCDBStreamInfo(req->db_stream_id(), &db_stream_info); |
1787 | 0 | RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); |
1788 | | |
1789 | 0 | for (const auto& tabinfo : db_stream_info) { |
1790 | 0 | auto* const table_info = resp->add_table_info(); |
1791 | 0 | table_info->set_stream_id(tabinfo.first); |
1792 | 0 | table_info->set_table_id(tabinfo.second); |
1793 | 0 | } |
1794 | |
|
1795 | 0 | context.RespondSuccess(); |
1796 | 0 | } |
1797 | | |
1798 | 307 | void CDCServiceImpl::RollbackPartialCreate(const CDCCreationState& creation_state) { |
1799 | 307 | if (!creation_state.created_cdc_streams.empty()) { |
1800 | 0 | Status s = client()->DeleteCDCStream(creation_state.created_cdc_streams); |
1801 | 0 | if (!s.ok()) { |
1802 | 0 | LOG(WARNING) << "Unable to delete streams " << JoinCSVLine(creation_state.created_cdc_streams) |
1803 | 0 | << ": " << s; |
1804 | 0 | } |
1805 | 0 | } |
1806 | | |
1807 | | // For all tablets we modified state for, reverse those changes if the operation failed |
1808 | | // halfway through. |
1809 | 307 | if (creation_state.producer_entries_modified.empty()) { |
1810 | 307 | return; |
1811 | 307 | } |
1812 | 0 | std::lock_guard<decltype(mutex_)> l(mutex_); |
1813 | 0 | impl_->EraseTablets(creation_state.producer_entries_modified, false); |
1814 | 0 | for (const auto& entry : creation_state.producer_entries_modified) { |
1815 | 0 | WARN_NOT_OK( |
1816 | 0 | UpdatePeersCdcMinReplicatedIndex(entry.tablet_id, numeric_limits<uint64_t>::max()), |
1817 | 0 | "Unable to update tablet " + entry.tablet_id); |
1818 | 0 | } |
1819 | |
|
1820 | 0 | } |
1821 | | |
1822 | | void CDCServiceImpl::BootstrapProducer(const BootstrapProducerRequestPB* req, |
1823 | | BootstrapProducerResponsePB* resp, |
1824 | 0 | rpc::RpcContext context) { |
1825 | 0 | LOG(INFO) << "Received BootstrapProducer request " << req->ShortDebugString(); |
1826 | 0 | RPC_CHECK_AND_RETURN_ERROR(req->table_ids().size() > 0, |
1827 | 0 | STATUS(InvalidArgument, "Table ID is required to create CDC stream"), |
1828 | 0 | resp->mutable_error(), |
1829 | 0 | CDCErrorPB::INVALID_REQUEST, |
1830 | 0 | context); |
1831 | | |
1832 | 0 | std::shared_ptr<client::TableHandle> cdc_state_table; |
1833 | |
|
1834 | 0 | std::vector<client::YBOperationPtr> ops; |
1835 | 0 | auto session = client()->NewSession(); |
1836 | | |
1837 | | // Used to delete streams in case of failure. |
1838 | 0 | CDCCreationState creation_state; |
1839 | 0 | auto scope_exit = ScopeExit([this, &creation_state] { |
1840 | 0 | RollbackPartialCreate(creation_state); |
1841 | 0 | }); |
1842 | |
|
1843 | 0 | std::vector<CDCStreamId> bootstrap_ids; |
1844 | |
|
1845 | 0 | for (const auto& table_id : req->table_ids()) { |
1846 | 0 | std::shared_ptr<client::YBTable> table; |
1847 | 0 | Status s = client()->OpenTable(table_id, &table); |
1848 | 0 | RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::TABLE_NOT_FOUND, context); |
1849 | | |
1850 | | // Generate a bootstrap id by calling CreateCDCStream, and also setup the stream in the master. |
1851 | | // If the consumer's master sends a CreateCDCStream with a bootstrap id, the producer's master |
1852 | | // will verify that the stream id exists and return success if it does since everything else |
1853 | | // has already been done by this call. |
1854 | 0 | std::unordered_map<std::string, std::string> options; |
1855 | 0 | options.reserve(4); |
1856 | 0 | options.emplace(cdc::kRecordType, CDCRecordType_Name(cdc::CDCRecordType::CHANGE)); |
1857 | 0 | options.emplace(cdc::kRecordFormat, CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL)); |
1858 | 0 | options.emplace(cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER)); |
1859 | 0 | options.emplace(cdc::kCheckpointType, CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT)); |
1860 | | |
1861 | | // Mark this stream as being bootstrapped, to help in finding dangling streams. |
1862 | 0 | auto result = client()->CreateCDCStream(table_id, options, false); |
1863 | 0 | RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(), |
1864 | 0 | CDCErrorPB::INTERNAL_ERROR, context); |
1865 | 0 | const std::string& bootstrap_id = *result; |
1866 | 0 | creation_state.created_cdc_streams.push_back(bootstrap_id); |
1867 | |
|
1868 | 0 | if (cdc_state_table == nullptr) { |
1869 | 0 | auto res = GetCdcStateTable(); |
1870 | 0 | RPC_CHECK_AND_RETURN_ERROR(res.ok(), res.status(), resp->mutable_error(), |
1871 | 0 | CDCErrorPB::INTERNAL_ERROR, context); |
1872 | 0 | cdc_state_table = *res; |
1873 | 0 | } |
1874 | | |
1875 | 0 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets; |
1876 | 0 | s = client()->GetTabletsFromTableId(table_id, 0, &tablets); |
1877 | 0 | RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::TABLE_NOT_FOUND, context); |
1878 | | |
1879 | | // For each tablet, create a row in cdc_state table containing the generated bootstrap id, and |
1880 | | // the latest op id in the logs. |
1881 | 0 | for (const auto &tablet : tablets) { |
1882 | 0 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1883 | 0 | OpId op_id; |
1884 | |
|
1885 | 0 | s = tablet_manager_->GetTabletPeer(tablet.tablet_id(), &tablet_peer); |
1886 | 0 | if (!s.ok()) { |
1887 | 0 | auto res = TabletLeaderLatestEntryOpId(tablet.tablet_id()); |
1888 | 0 | RPC_CHECK_AND_RETURN_ERROR(res.ok(), res.status(), resp->mutable_error(), |
1889 | 0 | CDCErrorPB::INTERNAL_ERROR, context); |
1890 | 0 | op_id = *res; |
1891 | 0 | } else { |
1892 | 0 | if (!tablet_peer->log_available()) { |
1893 | 0 | const string err_message = strings::Substitute("Unable to get the latest entry op id " |
1894 | 0 | "from peer $0 and tablet $1 because its log object hasn't been initialized", |
1895 | 0 | tablet_peer->permanent_uuid(), tablet_peer->tablet_id()); |
1896 | 0 | LOG(WARNING) << err_message; |
1897 | 0 | SetupErrorAndRespond(resp->mutable_error(), |
1898 | 0 | STATUS(ServiceUnavailable, err_message), |
1899 | 0 | CDCErrorPB::INTERNAL_ERROR, |
1900 | 0 | &context); |
1901 | 0 | return; |
1902 | 0 | } |
1903 | 0 | op_id = tablet_peer->log()->GetLatestEntryOpId(); |
1904 | 0 | RPC_STATUS_RETURN_ERROR(UpdatePeersCdcMinReplicatedIndex(tablet.tablet_id(), op_id.index), |
1905 | 0 | resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, |
1906 | 0 | context); |
1907 | 0 | } |
1908 | | |
1909 | 0 | const auto op = cdc_state_table->NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
1910 | 0 | auto *const write_req = op->mutable_request(); |
1911 | |
|
1912 | 0 | QLAddStringHashValue(write_req, tablet.tablet_id()); |
1913 | 0 | QLAddStringRangeValue(write_req, bootstrap_id); |
1914 | 0 | cdc_state_table->AddStringColumnValue(write_req, master::kCdcCheckpoint, op_id.ToString()); |
1915 | 0 | ops.push_back(std::move(op)); |
1916 | 0 | impl_->AddTabletCheckpoint( |
1917 | 0 | op_id, bootstrap_id, tablet.tablet_id(), &creation_state.producer_entries_modified); |
1918 | 0 | } |
1919 | 0 | bootstrap_ids.push_back(std::move(bootstrap_id)); |
1920 | 0 | } |
1921 | 0 | CoarseTimePoint deadline = GetDeadline(context, client()); |
1922 | |
|
1923 | 0 | session->SetDeadline(deadline); |
1924 | 0 | Status s = RefreshCacheOnFail(session->ApplyAndFlush(ops)); |
1925 | 0 | RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); |
1926 | | |
1927 | 0 | for (const auto& bootstrap_id : bootstrap_ids) { |
1928 | 0 | resp->add_cdc_bootstrap_ids(bootstrap_id); |
1929 | 0 | } |
1930 | | // Clear these vectors so no changes are reversed by scope_exit since we succeeded. |
1931 | 0 | creation_state.Clear(); |
1932 | 0 | context.RespondSuccess(); |
1933 | 0 | } |
1934 | | |
1935 | 188 | void CDCServiceImpl::Shutdown() { |
1936 | 188 | if (impl_->async_client_init_) { |
1937 | 96 | impl_->async_client_init_->Shutdown(); |
1938 | 96 | rpcs_.Shutdown(); |
1939 | 96 | { |
1940 | 96 | std::lock_guard<decltype(mutex_)> l(mutex_); |
1941 | 96 | cdc_service_stopped_ = true; |
1942 | 96 | cdc_state_table_ = nullptr; |
1943 | 96 | } |
1944 | 96 | if (update_peers_and_metrics_thread_) { |
1945 | 96 | update_peers_and_metrics_thread_->join(); |
1946 | 96 | } |
1947 | 96 | impl_->async_client_init_ = boost::none; |
1948 | 96 | } |
1949 | 188 | } |
1950 | | |
1951 | | Result<OpId> CDCServiceImpl::GetLastCheckpoint( |
1952 | | const ProducerTabletInfo& producer_tablet, |
1953 | 619 | const client::YBSessionPtr& session) { |
1954 | 619 | auto result = impl_->GetLastCheckpoint(producer_tablet); |
1955 | 619 | if (result) { |
1956 | 4 | return *result; |
1957 | 4 | } |
1958 | | |
1959 | 615 | auto cdc_state_table_result = GetCdcStateTable(); |
1960 | 615 | RETURN_NOT_OK(cdc_state_table_result); |
1961 | | |
1962 | 615 | const auto op = (*cdc_state_table_result)->NewReadOp(); |
1963 | 615 | auto* const req = op->mutable_request(); |
1964 | 615 | DCHECK(!producer_tablet.stream_id.empty() && !producer_tablet.tablet_id.empty()); |
1965 | 615 | QLAddStringHashValue(req, producer_tablet.tablet_id); |
1966 | | |
1967 | 615 | auto cond = req->mutable_where_expr()->mutable_condition(); |
1968 | 615 | cond->set_op(QLOperator::QL_OP_AND); |
1969 | 615 | QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, |
1970 | 615 | QL_OP_EQUAL, producer_tablet.stream_id); |
1971 | 615 | req->mutable_column_refs()->add_ids(Schema::first_column_id() + master::kCdcTabletIdIdx); |
1972 | 615 | req->mutable_column_refs()->add_ids(Schema::first_column_id() + master::kCdcStreamIdIdx); |
1973 | 615 | (*cdc_state_table_result)->AddColumns({master::kCdcCheckpoint}, req); |
1974 | | |
1975 | 615 | RETURN_NOT_OK(RefreshCacheOnFail(session->ReadSync(op))); |
1976 | 615 | auto row_block = ql::RowsResult(op.get()).GetRowBlock(); |
1977 | 615 | if (row_block->row_count() == 0) { |
1978 | 0 | return OpId(0, 0); |
1979 | 0 | } |
1980 | | |
1981 | 615 | DCHECK_EQ(row_block->row_count(), 1); |
1982 | 615 | DCHECK_EQ(row_block->row(0).column(0).type(), InternalType::kStringValue); |
1983 | | |
1984 | 615 | return OpId::FromString(row_block->row(0).column(0).string_value()); |
1985 | 615 | } |
1986 | | |
1987 | | void CDCServiceImpl::UpdateCDCTabletMetrics( |
1988 | | const GetChangesResponsePB* resp, |
1989 | | const ProducerTabletInfo& producer_tablet, |
1990 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
1991 | | const OpId& op_id, |
1992 | 641 | int64_t last_readable_index) { |
1993 | 641 | auto tablet_metric = GetCDCTabletMetrics(producer_tablet, tablet_peer); |
1994 | 641 | if (!tablet_metric) { |
1995 | 0 | return; |
1996 | 0 | } |
1997 | | |
1998 | 641 | auto lid = resp->checkpoint().op_id(); |
1999 | 641 | tablet_metric->last_read_opid_term->set_value(lid.term()); |
2000 | 641 | tablet_metric->last_read_opid_index->set_value(lid.index()); |
2001 | 641 | tablet_metric->last_readable_opid_index->set_value(last_readable_index); |
2002 | 641 | tablet_metric->last_checkpoint_opid_index->set_value(op_id.index); |
2003 | 641 | if (resp->records_size() > 0) { |
2004 | 0 | auto& last_record = resp->records(resp->records_size() - 1); |
2005 | 0 | tablet_metric->last_read_hybridtime->set_value(last_record.time()); |
2006 | 0 | auto last_record_micros = HybridTime(last_record.time()).GetPhysicalValueMicros(); |
2007 | 0 | tablet_metric->last_read_physicaltime->set_value(last_record_micros); |
2008 | | // Only count bytes responded if we are including a response payload. |
2009 | 0 | tablet_metric->rpc_payload_bytes_responded->Increment(resp->ByteSize()); |
2010 | | // Get the physical time of the last committed record on producer. |
2011 | 0 | auto last_replicated_micros = GetLastReplicatedTime(tablet_peer); |
2012 | 0 | tablet_metric->async_replication_sent_lag_micros->set_value( |
2013 | 0 | last_replicated_micros - last_record_micros); |
2014 | 0 | auto& first_record = resp->records(0); |
2015 | 0 | auto first_record_micros = HybridTime(first_record.time()).GetPhysicalValueMicros(); |
2016 | 0 | tablet_metric->last_checkpoint_physicaltime->set_value(first_record_micros); |
2017 | 0 | tablet_metric->async_replication_committed_lag_micros->set_value( |
2018 | 0 | last_replicated_micros - first_record_micros); |
2019 | 641 | } else { |
2020 | 641 | tablet_metric->rpc_heartbeats_responded->Increment(); |
2021 | | // If there are no more entries to be read, that means we're caught up. |
2022 | 641 | auto last_replicated_micros = GetLastReplicatedTime(tablet_peer); |
2023 | 641 | tablet_metric->last_read_physicaltime->set_value(last_replicated_micros); |
2024 | 641 | tablet_metric->last_checkpoint_physicaltime->set_value(last_replicated_micros); |
2025 | 641 | tablet_metric->async_replication_sent_lag_micros->set_value(0); |
2026 | 641 | tablet_metric->async_replication_committed_lag_micros->set_value(0); |
2027 | 641 | } |
2028 | 641 | } |
2029 | | |
2030 | | Status CDCServiceImpl::UpdateCheckpoint(const ProducerTabletInfo& producer_tablet, |
2031 | | const OpId& sent_op_id, |
2032 | | const OpId& commit_op_id, |
2033 | | const client::YBSessionPtr& session, |
2034 | 631 | uint64_t last_record_hybrid_time) { |
2035 | 631 | bool update_cdc_state = impl_->UpdateCheckpoint(producer_tablet, sent_op_id, commit_op_id); |
2036 | | |
2037 | 631 | if (update_cdc_state) { |
2038 | 0 | auto cdc_state = VERIFY_RESULT(GetCdcStateTable()); |
2039 | 0 | const auto op = cdc_state->NewUpdateOp(); |
2040 | 0 | auto* const req = op->mutable_request(); |
2041 | 0 | DCHECK(!producer_tablet.stream_id.empty() && !producer_tablet.tablet_id.empty()); |
2042 | 0 | QLAddStringHashValue(req, producer_tablet.tablet_id); |
2043 | 0 | QLAddStringRangeValue(req, producer_tablet.stream_id); |
2044 | |
|
2045 | 0 | cdc_state->AddStringColumnValue(req, master::kCdcCheckpoint, commit_op_id.ToString()); |
2046 | | // If we have a last record hybrid time, use that for physical time. If not, it means we're |
2047 | | // caught up, so the current time. |
2048 | 0 | uint64_t last_replication_time_micros = last_record_hybrid_time != 0 ? |
2049 | 0 | HybridTime(last_record_hybrid_time).GetPhysicalValueMicros() : GetCurrentTimeMicros(); |
2050 | 0 | cdc_state->AddTimestampColumnValue(req, master::kCdcLastReplicationTime, |
2051 | 0 | last_replication_time_micros); |
2052 | | // Only perform the update if we have a row in cdc_state to prevent a race condition where |
2053 | | // a stream is deleted and then this logic inserts entries in cdc_state from that deleted |
2054 | | // stream. |
2055 | 0 | auto* condition = req->mutable_if_expr()->mutable_condition(); |
2056 | 0 | condition->set_op(QL_OP_EXISTS); |
2057 | 0 | RETURN_NOT_OK(RefreshCacheOnFail(session->ApplyAndFlush(op))); |
2058 | 0 | } |
2059 | | |
2060 | 631 | return Status::OK(); |
2061 | 631 | } |
2062 | | |
2063 | | std::shared_ptr<CDCTabletMetrics> CDCServiceImpl::GetCDCTabletMetrics( |
2064 | | const ProducerTabletInfo& producer, |
2065 | 881 | std::shared_ptr<tablet::TabletPeer> tablet_peer) { |
2066 | | // 'nullptr' not recommended: using for tests. |
2067 | 881 | if (tablet_peer == nullptr) { |
2068 | 0 | auto status = tablet_manager_->GetTabletPeer(producer.tablet_id, &tablet_peer); |
2069 | 0 | if (!status.ok() || tablet_peer == nullptr) return nullptr; |
2070 | 0 | } |
2071 | | |
2072 | 881 | auto tablet = tablet_peer->shared_tablet(); |
2073 | 881 | if (tablet == nullptr) return nullptr0 ; |
2074 | | |
2075 | 881 | std::string key = "CDCMetrics::" + producer.stream_id; |
2076 | 881 | std::shared_ptr<void> metrics_raw = tablet->GetAdditionalMetadata(key); |
2077 | 881 | if (metrics_raw == nullptr) { |
2078 | | // Create a new METRIC_ENTITY_cdc here. |
2079 | 444 | MetricEntity::AttributeMap attrs; |
2080 | 444 | { |
2081 | 444 | SharedLock<rw_spinlock> l(mutex_); |
2082 | 444 | auto raft_group_metadata = tablet_peer->tablet()->metadata(); |
2083 | 444 | attrs["table_id"] = raft_group_metadata->table_id(); |
2084 | 444 | attrs["namespace_name"] = raft_group_metadata->namespace_name(); |
2085 | 444 | attrs["table_name"] = raft_group_metadata->table_name(); |
2086 | 444 | attrs["stream_id"] = producer.stream_id; |
2087 | 444 | } |
2088 | 444 | auto entity = METRIC_ENTITY_cdc.Instantiate(metric_registry_, producer.MetricsString(), attrs); |
2089 | 444 | metrics_raw = std::make_shared<CDCTabletMetrics>(entity); |
2090 | | // Adding the new metric to the tablet so it maintains the same lifetime scope. |
2091 | 444 | tablet->AddAdditionalMetadata(key, metrics_raw); |
2092 | 444 | } |
2093 | | |
2094 | 881 | return std::static_pointer_cast<CDCTabletMetrics>(metrics_raw); |
2095 | 881 | } |
2096 | | |
2097 | 648 | Result<std::shared_ptr<StreamMetadata>> CDCServiceImpl::GetStream(const std::string& stream_id) { |
2098 | 648 | auto stream = GetStreamMetadataFromCache(stream_id); |
2099 | 648 | if (stream != nullptr) { |
2100 | 647 | return stream; |
2101 | 647 | } |
2102 | | |
2103 | | // Look up stream in sys catalog. |
2104 | 1 | std::vector<ObjectId> object_ids; |
2105 | 1 | NamespaceId ns_id; |
2106 | 1 | std::unordered_map<std::string, std::string> options; |
2107 | 1 | RETURN_NOT_OK(client()->GetCDCStream(stream_id, &ns_id, &object_ids, &options)); |
2108 | | |
2109 | 0 | auto stream_metadata = std::make_shared<StreamMetadata>(); |
2110 | 0 | for (const auto& option : options) { |
2111 | 0 | if (option.first == kRecordType) { |
2112 | 0 | SCHECK(CDCRecordType_Parse(option.second, &stream_metadata->record_type), |
2113 | 0 | IllegalState, "CDC record type parsing error"); |
2114 | 0 | } else if (option.first == kRecordFormat) { |
2115 | 0 | SCHECK(CDCRecordFormat_Parse(option.second, &stream_metadata->record_format), |
2116 | 0 | IllegalState, "CDC record format parsing error"); |
2117 | 0 | } else if (option.first == kSourceType) { |
2118 | 0 | SCHECK(CDCRequestSource_Parse(option.second, &stream_metadata->source_type), IllegalState, |
2119 | 0 | "CDC record format parsing error"); |
2120 | 0 | } else if (option.first == kCheckpointType) { |
2121 | 0 | SCHECK(CDCCheckpointType_Parse(option.second, &stream_metadata->checkpoint_type), |
2122 | 0 | IllegalState, "CDC record format parsing error"); |
2123 | 0 | } else if (option.first == cdc::kIdType && option.second == cdc::kNamespaceId) { |
2124 | 0 | stream_metadata->ns_id = ns_id; |
2125 | 0 | stream_metadata->table_ids.insert( |
2126 | 0 | stream_metadata->table_ids.end(), object_ids.begin(), object_ids.end()); |
2127 | 0 | } else if (option.first == cdc::kIdType && option.second == cdc::kTableId) { |
2128 | 0 | stream_metadata->table_ids.insert( |
2129 | 0 | stream_metadata->table_ids.end(), object_ids.begin(), object_ids.end()); |
2130 | 0 | } else { |
2131 | 0 | LOG(WARNING) << "Unsupported CDC option: " << option.first; |
2132 | 0 | } |
2133 | 0 | } |
2134 | | |
2135 | 0 | AddStreamMetadataToCache(stream_id, stream_metadata); |
2136 | 0 | return stream_metadata; |
2137 | 0 | } |
2138 | | |
2139 | | void CDCServiceImpl::AddStreamMetadataToCache(const std::string& stream_id, |
2140 | 306 | const std::shared_ptr<StreamMetadata>& metadata) { |
2141 | 306 | std::lock_guard<decltype(mutex_)> l(mutex_); |
2142 | 306 | stream_metadata_.emplace(stream_id, metadata); |
2143 | 306 | } |
2144 | | |
2145 | | std::shared_ptr<StreamMetadata> CDCServiceImpl::GetStreamMetadataFromCache( |
2146 | 648 | const std::string& stream_id) { |
2147 | 648 | SharedLock<decltype(mutex_)> l(mutex_); |
2148 | 648 | auto it = stream_metadata_.find(stream_id); |
2149 | 648 | if (it != stream_metadata_.end()) { |
2150 | 647 | return it->second; |
2151 | 647 | } else { |
2152 | 1 | return nullptr; |
2153 | 1 | } |
2154 | 648 | } |
2155 | | |
2156 | 656 | Status CDCServiceImpl::CheckTabletValidForStream(const ProducerTabletInfo& info) { |
2157 | 656 | auto result = VERIFY_RESULT(impl_->PreCheckTabletValidForStream(info)); |
2158 | 656 | if (result) { |
2159 | 655 | return Status::OK(); |
2160 | 655 | } |
2161 | | // If we don't recognize the stream_id, populate our full tablet list for this stream. |
2162 | 1 | auto tablets = VERIFY_RESULT0 (GetTablets(info.stream_id));0 |
2163 | 0 | return impl_->CheckTabletValidForStream(info, tablets); |
2164 | 1 | } |
2165 | | |
2166 | | } // namespace cdc |
2167 | | } // namespace yb |