YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdc_service.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/cdc/cdc_service.h"
14
15
#include <chrono>
16
#include <memory>
17
18
#include <boost/multi_index/hashed_index.hpp>
19
#include <boost/multi_index/mem_fun.hpp>
20
#include <boost/multi_index/member.hpp>
21
#include <boost/multi_index_container.hpp>
22
23
#include "yb/cdc/cdc_producer.h"
24
#include "yb/cdc/cdc_rpc.h"
25
#include "yb/cdc/cdc_service.proxy.h"
26
27
#include "yb/client/client.h"
28
#include "yb/client/meta_cache.h"
29
#include "yb/client/schema.h"
30
#include "yb/client/session.h"
31
#include "yb/client/table.h"
32
#include "yb/client/table_alterer.h"
33
#include "yb/client/table_handle.h"
34
#include "yb/client/yb_op.h"
35
#include "yb/client/yb_table_name.h"
36
37
#include "yb/common/entity_ids.h"
38
#include "yb/common/pg_system_attr.h"
39
#include "yb/common/ql_expr.h"
40
#include "yb/common/ql_value.h"
41
#include "yb/common/schema.h"
42
#include "yb/common/wire_protocol.h"
43
44
#include "yb/consensus/log.h"
45
#include "yb/consensus/raft_consensus.h"
46
#include "yb/consensus/replicate_msgs_holder.h"
47
48
#include "yb/gutil/dynamic_annotations.h"
49
#include "yb/gutil/strings/join.h"
50
51
#include "yb/master/master_client.pb.h"
52
#include "yb/master/master_ddl.pb.h"
53
#include "yb/master/master_defaults.h"
54
55
#include "yb/rpc/rpc_context.h"
56
#include "yb/rpc/rpc_controller.h"
57
58
#include "yb/tablet/tablet_metadata.h"
59
#include "yb/tablet/tablet_peer.h"
60
#include "yb/tablet/transaction_participant.h"
61
62
#include "yb/tserver/tablet_server.h"
63
#include "yb/tserver/ts_tablet_manager.h"
64
65
#include "yb/util/debug/trace_event.h"
66
#include "yb/util/flag_tags.h"
67
#include "yb/util/format.h"
68
#include "yb/util/logging.h"
69
#include "yb/util/metrics.h"
70
#include "yb/util/monotime.h"
71
#include "yb/util/scope_exit.h"
72
#include "yb/util/shared_lock.h"
73
#include "yb/util/status_format.h"
74
#include "yb/util/status_log.h"
75
#include "yb/util/trace.h"
76
77
#include "yb/yql/cql/ql/util/statement_result.h"
78
79
constexpr uint32_t kUpdateIntervalMs = 15 * 1000;
80
81
DEFINE_int32(cdc_read_rpc_timeout_ms, 30 * 1000,
82
             "Timeout used for CDC read rpc calls.  Reads normally occur cross-cluster.");
83
TAG_FLAG(cdc_read_rpc_timeout_ms, advanced);
84
85
DEFINE_int32(cdc_write_rpc_timeout_ms, 30 * 1000,
86
             "Timeout used for CDC write rpc calls.  Writes normally occur intra-cluster.");
87
TAG_FLAG(cdc_write_rpc_timeout_ms, advanced);
88
89
DEFINE_int32(cdc_ybclient_reactor_threads, 50,
90
             "The number of reactor threads to be used for processing ybclient "
91
             "requests for CDC.");
92
TAG_FLAG(cdc_ybclient_reactor_threads, advanced);
93
94
DEFINE_int32(cdc_state_checkpoint_update_interval_ms, kUpdateIntervalMs,
95
             "Rate at which CDC state's checkpoint is updated.");
96
97
DEFINE_string(certs_for_cdc_dir, "",
98
              "The parent directory of where all certificates for xCluster producer universes will "
99
              "be stored, for when the producer and consumer clusters use different certificates. "
100
              "Place the certificates for each producer cluster in "
101
              "<certs_for_cdc_dir>/<producer_cluster_id>/*.");
102
103
DEFINE_int32(update_min_cdc_indices_interval_secs, 60,
104
             "How often to read cdc_state table to get the minimum applied index for each tablet "
105
             "across all streams. This information is used to correctly keep log files that "
106
             "contain unapplied entries. This is also the rate at which a tablet's minimum "
107
             "replicated index across all streams is sent to the other peers in the configuration. "
108
             "If flag enable_log_retention_by_op_idx is disabled, this flag has no effect.");
109
110
DEFINE_int32(update_metrics_interval_ms, kUpdateIntervalMs,
111
             "How often to update xDC cluster metrics.");
112
113
DEFINE_bool(enable_cdc_state_table_caching, true, "Enable caching the cdc_state table schema.");
114
115
DEFINE_bool(enable_collect_cdc_metrics, true, "Enable collecting cdc metrics.");
116
117
DEFINE_double(cdc_read_safe_deadline_ratio, .10,
118
              "When the heartbeat deadline has this percentage of time remaining, "
119
              "the master should halt tablet report processing so it can respond in time.");
120
121
DECLARE_bool(enable_log_retention_by_op_idx);
122
123
DECLARE_int32(cdc_checkpoint_opid_interval_ms);
124
125
METRIC_DEFINE_entity(cdc);
126
127
namespace yb {
128
namespace cdc {
129
130
using namespace std::literals;
131
132
using rpc::RpcContext;
133
using tserver::TSTabletManager;
134
using client::internal::RemoteTabletServer;
135
136
constexpr int kMaxDurationForTabletLookup = 50;
137
const client::YBTableName kCdcStateTableName(
138
    YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
139
140
namespace {
141
142
// These are guarded by lock_.
143
// Map of checkpoints that have been sent to CDC consumer and stored in cdc_state.
144
struct TabletCheckpointInfo {
145
 public:
146
  ProducerTabletInfo producer_tablet_info;
147
148
  mutable TabletCheckpoint cdc_state_checkpoint;
149
  mutable TabletCheckpoint sent_checkpoint;
150
  mutable MemTrackerPtr mem_tracker;
151
152
17.4k
  const TabletId& tablet_id() const {
153
17.4k
    return producer_tablet_info.tablet_id;
154
17.4k
  }
155
156
15.5k
  const CDCStreamId& stream_id() const {
157
15.5k
    return producer_tablet_info.stream_id;
158
15.5k
  }
159
};
160
161
struct CDCStateMetadataInfo {
162
  ProducerTabletInfo producer_tablet_info;
163
164
  mutable std::string commit_timestamp;
165
  mutable std::shared_ptr<Schema>  current_schema;
166
  mutable OpId last_streamed_op_id;
167
168
  std::shared_ptr<MemTracker> mem_tracker;
169
170
463
  const TableId& tablet_id() const {
171
463
    return producer_tablet_info.tablet_id;
172
463
  }
173
174
499
  const CDCStreamId& stream_id() const {
175
499
    return producer_tablet_info.stream_id;
176
499
  }
177
178
};
179
180
class TabletTag;
181
class StreamTag;
182
183
using TabletCheckpoints = boost::multi_index_container <
184
    TabletCheckpointInfo,
185
    boost::multi_index::indexed_by <
186
        boost::multi_index::hashed_unique <
187
            boost::multi_index::member <
188
                TabletCheckpointInfo, ProducerTabletInfo,
189
                &TabletCheckpointInfo::producer_tablet_info
190
            >
191
        >,
192
        boost::multi_index::hashed_non_unique <
193
            boost::multi_index::tag <TabletTag>,
194
            boost::multi_index::const_mem_fun <
195
                TabletCheckpointInfo, const TabletId&, &TabletCheckpointInfo::tablet_id
196
            >
197
        >,
198
        boost::multi_index::hashed_non_unique <
199
            boost::multi_index::tag <StreamTag>,
200
            boost::multi_index::const_mem_fun <
201
                TabletCheckpointInfo, const CDCStreamId&, &TabletCheckpointInfo::stream_id
202
            >
203
        >
204
    >
205
>;
206
207
using CDCStateMetadata = boost::multi_index_container <
208
    CDCStateMetadataInfo,
209
    boost::multi_index::indexed_by <
210
        boost::multi_index::hashed_unique <
211
            boost::multi_index::member <
212
                CDCStateMetadataInfo, ProducerTabletInfo,
213
                &CDCStateMetadataInfo::producer_tablet_info>
214
        >,
215
        boost::multi_index::hashed_non_unique <
216
            boost::multi_index::tag <TabletTag>,
217
            boost::multi_index::const_mem_fun <
218
                CDCStateMetadataInfo, const TabletId&, &CDCStateMetadataInfo::tablet_id
219
            >
220
        >,
221
        boost::multi_index::hashed_non_unique <
222
            boost::multi_index::tag <StreamTag>,
223
            boost::multi_index::const_mem_fun <
224
                CDCStateMetadataInfo, const CDCStreamId&, &CDCStateMetadataInfo::stream_id
225
            >
226
        >
227
    >
228
>;
229
230
} // namespace
231
232
class CDCServiceImpl::Impl {
233
 public:
234
8.74k
  explicit Impl(TSTabletManager* tablet_manager, rw_spinlock* mutex) : mutex_(*mutex) {
235
8.74k
    const auto server = tablet_manager->server();
236
8.74k
    async_client_init_.emplace(
237
8.74k
        "cdc_client", FLAGS_cdc_ybclient_reactor_threads, FLAGS_cdc_read_rpc_timeout_ms / 1000,
238
8.74k
        server->permanent_uuid(), &server->options(), server->metric_entity(),
239
8.74k
        server->mem_tracker(), server->messenger());
240
8.74k
    async_client_init_->Start();
241
8.74k
  }
242
243
  void UpdateCDCStateMetadata(
244
      const ProducerTabletInfo& producer_tablet,
245
      const std::string& timestamp,
246
      const std::shared_ptr<Schema>& schema,
247
641
      const OpId& op_id) {
248
641
    std::lock_guard<decltype(mutex_)> l(mutex_);
249
641
    auto it = cdc_state_metadata_.find(producer_tablet);
250
641
    if (it == cdc_state_metadata_.end()) {
251
0
      LOG(DFATAL) << "Failed to update the cdc state metadata for tablet id: "
252
0
                  << producer_tablet.tablet_id;
253
0
      return;
254
0
    }
255
641
    it->commit_timestamp = timestamp;
256
641
    it->current_schema = schema;
257
641
    it->last_streamed_op_id = op_id;
258
641
  }
259
260
641
  std::shared_ptr<Schema> GetOrAddSchema(const ProducerTabletInfo& producer_tablet) {
261
641
    std::lock_guard<decltype(mutex_)> l(mutex_);
262
641
    auto it = cdc_state_metadata_.find(producer_tablet);
263
264
641
    if (it != cdc_state_metadata_.end()) {
265
320
      return it->current_schema;
266
320
    }
267
321
    CDCStateMetadataInfo info = CDCStateMetadataInfo {
268
321
      .producer_tablet_info = producer_tablet,
269
321
      .current_schema = std::make_shared<Schema>()
270
321
    };
271
321
    cdc_state_metadata_.emplace(info);
272
321
    return info.current_schema;
273
641
  }
274
275
  void AddTabletCheckpoint(
276
      OpId op_id,
277
      const CDCStreamId& stream_id,
278
      const TabletId& tablet_id,
279
5.21k
      std::vector<ProducerTabletInfo>* producer_entries_modified) {
280
5.21k
    ProducerTabletInfo producer_tablet{
281
5.21k
      .universe_uuid = "",
282
5.21k
      .stream_id = stream_id,
283
5.21k
      .tablet_id = tablet_id
284
5.21k
    };
285
5.21k
    CoarseTimePoint time;
286
5.21k
    if (producer_entries_modified) {
287
5.21k
      producer_entries_modified->push_back(producer_tablet);
288
5.21k
      time = CoarseMonoClock::Now();
289
5.21k
    } else {
290
0
      time = CoarseTimePoint::min();
291
0
    }
292
5.21k
    std::lock_guard<decltype(mutex_)> l(mutex_);
293
5.21k
    if (!producer_entries_modified && 
tablet_checkpoints_.count(producer_tablet)0
) {
294
0
      return;
295
0
    }
296
5.21k
    tablet_checkpoints_.emplace(TabletCheckpointInfo {
297
5.21k
      .producer_tablet_info = producer_tablet,
298
5.21k
      .cdc_state_checkpoint = {op_id, time},
299
5.21k
      .sent_checkpoint = {op_id, time},
300
5.21k
    });
301
5.21k
  }
302
303
  void EraseTablets(const std::vector<ProducerTabletInfo>& producer_entries_modified,
304
                    bool erase_cdc_states)
305
0
      NO_THREAD_SAFETY_ANALYSIS {
306
0
    for (const auto& entry : producer_entries_modified) {
307
0
      tablet_checkpoints_.get<TabletTag>().erase(entry.tablet_id);
308
0
      if (erase_cdc_states) {
309
0
        cdc_state_metadata_.get<TabletTag>().erase(entry.tablet_id);
310
0
      }
311
0
    }
312
0
  }
313
314
619
  boost::optional<OpId> GetLastCheckpoint(const ProducerTabletInfo& producer_tablet) {
315
619
    SharedLock<rw_spinlock> lock(mutex_);
316
619
    auto it = tablet_checkpoints_.find(producer_tablet);
317
619
    if (it != tablet_checkpoints_.end()) {
318
      // Use checkpoint from cache only if it is current.
319
619
      if (it->cdc_state_checkpoint.op_id.index > 0 &&
320
619
          !it->cdc_state_checkpoint.ExpiredAt(
321
4
              FLAGS_cdc_state_checkpoint_update_interval_ms * 1ms, CoarseMonoClock::Now())) {
322
4
        return it->cdc_state_checkpoint.op_id;
323
4
      }
324
619
    }
325
615
    return boost::none;
326
619
  }
327
328
  bool UpdateCheckpoint(const ProducerTabletInfo& producer_tablet,
329
                        const OpId& sent_op_id,
330
631
                        const OpId& commit_op_id) {
331
631
    auto now = CoarseMonoClock::Now();
332
333
631
    TabletCheckpoint sent_checkpoint = {
334
631
      .op_id = sent_op_id,
335
631
      .last_update_time = now,
336
631
    };
337
631
    TabletCheckpoint commit_checkpoint = {
338
631
      .op_id = commit_op_id,
339
631
      .last_update_time = now,
340
631
    };
341
342
631
    std::lock_guard<decltype(mutex_)> l(mutex_);
343
631
    auto it = tablet_checkpoints_.find(producer_tablet);
344
631
    if (it != tablet_checkpoints_.end()) {
345
631
      it->sent_checkpoint = sent_checkpoint;
346
347
631
      if (commit_op_id.index > 0) {
348
28
        it->cdc_state_checkpoint.op_id = commit_op_id;
349
28
      }
350
351
      // Check if we need to update cdc_state table.
352
631
      if (!it->cdc_state_checkpoint.ExpiredAt(
353
631
              FLAGS_cdc_state_checkpoint_update_interval_ms * 1ms, now)) {
354
631
        return false;
355
631
      }
356
357
0
      it->cdc_state_checkpoint.last_update_time = now;
358
0
    } else {
359
0
      tablet_checkpoints_.emplace(TabletCheckpointInfo{
360
0
        .producer_tablet_info = producer_tablet,
361
0
        .cdc_state_checkpoint = commit_checkpoint,
362
0
        .sent_checkpoint = sent_checkpoint
363
0
      });
364
0
    }
365
366
0
    return true;
367
631
  }
368
369
637
  OpId GetMinSentCheckpointForTablet(const TabletId& tablet_id) {
370
637
    OpId min_op_id = OpId::Max();
371
372
637
    SharedLock<rw_spinlock> l(mutex_);
373
637
    auto it_range = tablet_checkpoints_.get<TabletTag>().equal_range(tablet_id);
374
637
    if (it_range.first == it_range.second) {
375
0
      LOG(WARNING) << "Tablet ID not found in stream_tablets map: " << tablet_id;
376
0
      return min_op_id;
377
0
    }
378
379
637
    auto cdc_checkpoint_opid_interval = FLAGS_cdc_checkpoint_opid_interval_ms * 1ms;
380
8.44k
    for (auto it = it_range.first; it != it_range.second; 
++it7.80k
) {
381
      // We don't want to include streams that are not being actively polled.
382
      // So, if the stream has not been polled in the last x seconds,
383
      // then we ignore that stream while calculating min op ID.
384
7.80k
      if (!it->sent_checkpoint.ExpiredAt(cdc_checkpoint_opid_interval, CoarseMonoClock::Now()) &&
385
7.80k
          it->sent_checkpoint.op_id.index < min_op_id.index) {
386
781
        min_op_id = it->sent_checkpoint.op_id;
387
781
      }
388
7.80k
    }
389
637
    return min_op_id;
390
637
  }
391
392
  MemTrackerPtr GetMemTracker(
393
      const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
394
641
      const ProducerTabletInfo& producer_info) {
395
641
    {
396
641
      SharedLock<rw_spinlock> l(mutex_);
397
641
      auto it = tablet_checkpoints_.find(producer_info);
398
641
      if (it == tablet_checkpoints_.end()) {
399
0
        return nullptr;
400
0
      }
401
641
      if (it->mem_tracker) {
402
320
        return it->mem_tracker;
403
320
      }
404
641
    }
405
321
    std::lock_guard<rw_spinlock> l(mutex_);
406
321
    auto it = tablet_checkpoints_.find(producer_info);
407
321
    if (it == tablet_checkpoints_.end()) {
408
0
      return nullptr;
409
0
    }
410
321
    if (it->mem_tracker) {
411
0
      return it->mem_tracker;
412
0
    }
413
321
    auto cdc_mem_tracker = MemTracker::FindOrCreateTracker(
414
321
        "CDC", tablet_peer->tablet()->mem_tracker());
415
321
    it->mem_tracker = MemTracker::FindOrCreateTracker(producer_info.stream_id, cdc_mem_tracker);
416
321
    return it->mem_tracker;
417
321
  }
418
419
656
  Result<bool> PreCheckTabletValidForStream(const ProducerTabletInfo& info) {
420
656
    SharedLock<rw_spinlock> l(mutex_);
421
656
    if (tablet_checkpoints_.count(info) != 0) {
422
655
      return true;
423
655
    }
424
1
    if (tablet_checkpoints_.get<StreamTag>().count(info.stream_id) != 0) {
425
      // Did not find matching tablet ID.
426
      // TODO: Add the split tablets in during tablet split?
427
0
      LOG(INFO) << "Tablet ID " << info.tablet_id << " is not part of stream ID " << info.stream_id
428
0
                << ". Repopulating tablet list for this stream.";
429
0
    }
430
1
    return false;
431
656
  }
432
433
  CHECKED_STATUS CheckTabletValidForStream(
434
      const ProducerTabletInfo& info,
435
0
      const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& tablets) {
436
0
    bool found = false;
437
0
    {
438
0
      std::lock_guard<rw_spinlock> l(mutex_);
439
0
      for (const auto &tablet : tablets) {
440
        // Add every tablet in the stream.
441
0
        ProducerTabletInfo producer_info{info.universe_uuid, info.stream_id, tablet.tablet_id()};
442
0
        tablet_checkpoints_.emplace(TabletCheckpointInfo{
443
0
            .producer_tablet_info = producer_info
444
0
        });
445
0
        cdc_state_metadata_.emplace(CDCStateMetadataInfo{
446
0
            .producer_tablet_info = producer_info,
447
0
            .current_schema = std::make_shared<Schema>()
448
0
        });
449
        // If this is the tablet that the user requested.
450
0
        if (tablet.tablet_id() == info.tablet_id) {
451
0
          found = true;
452
0
        }
453
0
      }
454
0
    }
455
0
    return found ? Status::OK()
456
0
                 : STATUS_FORMAT(InvalidArgument, "Tablet ID $0 is not part of stream ID $1",
457
0
                                 info.tablet_id, info.stream_id);
458
0
  }
459
460
0
  boost::optional<OpId> MinOpId(const TabletId& tablet_id) {
461
0
    boost::optional<OpId> result;
462
0
    SharedLock<rw_spinlock> l(mutex_);
463
0
    // right => multimap where keys are tablet_ids and values are stream_ids.
464
0
    // left => multimap where keys are stream_ids and values are tablet_ids.
465
0
    auto it_range = tablet_checkpoints_.get<TabletTag>().equal_range(tablet_id);
466
0
    if (it_range.first != it_range.second) {
467
0
      // Iterate over all the streams for this tablet.
468
0
      for (auto it = it_range.first; it != it_range.second; ++it) {
469
0
        if (!result || it->cdc_state_checkpoint.op_id.index < result->index) {
470
0
          result = it->cdc_state_checkpoint.op_id;
471
0
        }
472
0
      }
473
0
    } else {
474
0
      VLOG(2) << "Didn't find any streams for tablet " << tablet_id;
475
0
    }
476
0
477
0
    return result;
478
0
  }
479
480
94
  TabletCheckpoints TabletCheckpointsCopy() {
481
94
    SharedLock<rw_spinlock> lock(mutex_);
482
94
    return tablet_checkpoints_;
483
94
  }
484
485
  boost::optional<client::AsyncClientInitialiser> async_client_init_;
486
487
  // this will be used for the std::call_once call while caching the client
488
  std::once_flag is_client_cached_;
489
 private:
490
  rw_spinlock& mutex_;
491
492
  TabletCheckpoints tablet_checkpoints_ GUARDED_BY(mutex_);
493
494
  CDCStateMetadata cdc_state_metadata_ GUARDED_BY(mutex_);
495
};
496
497
CDCServiceImpl::CDCServiceImpl(TSTabletManager* tablet_manager,
498
                               const scoped_refptr<MetricEntity>& metric_entity_server,
499
                               MetricRegistry* metric_registry)
500
    : CDCServiceIf(metric_entity_server),
501
      tablet_manager_(tablet_manager),
502
      metric_registry_(metric_registry),
503
      server_metrics_(std::make_shared<CDCServerMetrics>(metric_entity_server)),
504
8.74k
      impl_(new Impl(tablet_manager, &mutex_)) {
505
506
8.74k
  update_peers_and_metrics_thread_.reset(new std::thread(
507
8.74k
      &CDCServiceImpl::UpdatePeersAndMetrics, this));
508
8.74k
}
509
510
92
CDCServiceImpl::~CDCServiceImpl() {
511
92
  Shutdown();
512
92
}
513
514
19.0k
client::YBClient* CDCServiceImpl::client() {
515
19.0k
  return impl_->async_client_init_->client();
516
19.0k
}
517
518
namespace {
519
520
10.3k
bool YsqlTableHasPrimaryKey(const client::YBSchema& schema) {
521
23.4k
  for (const auto& col : schema.columns()) {
522
23.4k
    if (col.order() == static_cast<int32_t>(PgSystemAttrNum::kYBRowId)) {
523
      // ybrowid column is added for tables that don't have user-specified primary key.
524
2
      return false;
525
2
    }
526
23.4k
  }
527
10.3k
  return true;
528
10.3k
}
529
530
233
bool IsTabletPeerLeader(const std::shared_ptr<tablet::TabletPeer>& peer) {
531
233
  return peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY;
532
233
}
533
534
std::unordered_map<std::string, std::string> GetCreateCDCStreamOptions(
535
306
    const CreateCDCStreamRequestPB* req) {
536
306
  std::unordered_map<std::string, std::string> options;
537
306
  if(req->has_namespace_name()) {
538
306
    options.reserve(5);
539
306
  } else {
540
0
    options.reserve(4);
541
0
  }
542
543
306
  options.emplace(kRecordType, CDCRecordType_Name(req->record_type()));
544
306
  options.emplace(kRecordFormat, CDCRecordFormat_Name(req->record_format()));
545
306
  options.emplace(kSourceType, CDCRequestSource_Name(req->source_type()));
546
306
  options.emplace(kCheckpointType, CDCCheckpointType_Name(req->checkpoint_type()));
547
306
  if (req->has_namespace_name()) {
548
306
    options.emplace(kIdType, kNamespaceId);
549
306
  }
550
551
306
  return options;
552
306
}
553
554
Status DoUpdateCDCConsumerOpId(const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
555
                               const OpId& checkpoint,
556
862
                               const TabletId& tablet_id) {
557
862
  std::shared_ptr<consensus::Consensus> shared_consensus = tablet_peer->shared_consensus();
558
559
862
  if (shared_consensus == nullptr) {
560
0
    return STATUS_FORMAT(InternalError,
561
0
                         "Failed to get tablet $0 peer consensus", tablet_id);
562
0
  }
563
564
862
  shared_consensus->UpdateCDCConsumerOpId(checkpoint);
565
862
  return Status::OK();
566
862
}
567
568
bool UpdateCheckpointRequired(const StreamMetadata& record,
569
637
                              const CDCSDKCheckpointPB& cdc_sdk_op_id) {
570
571
637
  switch (record.source_type) {
572
0
    case XCLUSTER:
573
0
      return true;
574
575
637
    case CDCSDK:
576
637
      if (cdc_sdk_op_id.write_id() == 0) {
577
613
        return true;
578
613
      }
579
24
      return cdc_sdk_op_id.write_id() == -1 && cdc_sdk_op_id.key().empty() &&
580
24
             cdc_sdk_op_id.snapshot_time() != 0;
581
582
0
    default:
583
0
      return false;
584
637
  }
585
586
637
}
587
588
bool GetFromOpId(const GetChangesRequestPB* req,
589
                 OpId* op_id,
590
641
                 CDCSDKCheckpointPB* cdc_sdk_op_id) {
591
641
  if (req->has_from_checkpoint()) {
592
0
    *op_id = OpId::FromPB(req->from_checkpoint().op_id());
593
641
  } else if (req->has_from_cdc_sdk_checkpoint()) {
594
36
    *cdc_sdk_op_id = req->from_cdc_sdk_checkpoint();
595
36
    *op_id = OpId::FromPB(*cdc_sdk_op_id);
596
605
  } else {
597
605
    return false;
598
605
  }
599
36
  return true;
600
641
}
601
602
// Check for compatibility whether CDC can be setup on the table
603
// This essentially checks that the table should not be a REDIS table since we do not support it
604
// and if it's a YSQL or YCQL one, it should have a primary key
605
5.18k
Status CheckCdcCompatibility(const std::shared_ptr<client::YBTable>& table) {
606
  // return if it is a CQL table because they always have a user specified primary key
607
5.18k
  if (table->table_type() == client::YBTableType::YQL_TABLE_TYPE) {
608
0
    LOG(INFO) << "Returning while checking CDC compatibility, table is a YCQL table";
609
0
    return Status::OK();
610
0
  }
611
612
5.18k
  if (table->table_type() == client::YBTableType::REDIS_TABLE_TYPE) {
613
0
    return STATUS(InvalidArgument, "Cannot setup CDC on YEDIS_TABLE");;
614
0
  }
615
616
  // Check if YSQL table has a primary key. CQL tables always have a
617
  // user specified primary key.
618
5.18k
  if (!YsqlTableHasPrimaryKey(table->schema())) {
619
0
    return STATUS(InvalidArgument, "Cannot setup CDC on table without primary key");
620
0
  }
621
622
5.18k
  return Status::OK();
623
5.18k
}
624
625
963
CoarseTimePoint GetDeadline(const RpcContext& context, client::YBClient* client) {
626
963
  CoarseTimePoint deadline = context.GetClientDeadline();
627
963
  if (deadline == CoarseTimePoint::max()) {  // Not specified by user.
628
0
    deadline = CoarseMonoClock::now() + client->default_rpc_timeout();
629
0
  }
630
963
  return deadline;
631
963
}
632
633
6
CHECKED_STATUS VerifyArg(const SetCDCCheckpointRequestPB& req) {
634
6
  if (!req.has_checkpoint()) {
635
0
    return STATUS(InvalidArgument, "OpId is required to set checkpoint");
636
0
  }
637
638
6
  if (!req.has_tablet_id()) {
639
0
    return STATUS(InvalidArgument, "Tablet ID is required to set checkpoint");
640
0
  }
641
642
6
  if(!req.has_stream_id()) {
643
0
    return STATUS(InvalidArgument, "Stream ID is required to set checkpoint");
644
0
  }
645
646
6
  return Status::OK();
647
6
}
648
649
} // namespace
650
651
template <class ReqType, class RespType>
652
1.18k
bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) {
653
1.18k
  TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString());
654
1.18k
  if (PREDICT_FALSE(!tablet_manager_)) {
655
0
    SetupErrorAndRespond(resp->mutable_error(),
656
0
                         STATUS(ServiceUnavailable, "Tablet Server is not running"),
657
0
                         CDCErrorPB::NOT_RUNNING,
658
0
                         rpc);
659
0
    return false;
660
0
  }
661
1.18k
  return true;
662
1.18k
}
bool yb::cdc::CDCServiceImpl::CheckOnline<yb::cdc::CreateCDCStreamRequestPB, yb::cdc::CreateCDCStreamResponsePB>(yb::cdc::CreateCDCStreamRequestPB const*, yb::cdc::CreateCDCStreamResponsePB*, yb::rpc::RpcContext*)
Line
Count
Source
652
307
bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) {
653
307
  TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString());
654
307
  if (PREDICT_FALSE(!tablet_manager_)) {
655
0
    SetupErrorAndRespond(resp->mutable_error(),
656
0
                         STATUS(ServiceUnavailable, "Tablet Server is not running"),
657
0
                         CDCErrorPB::NOT_RUNNING,
658
0
                         rpc);
659
0
    return false;
660
0
  }
661
307
  return true;
662
307
}
Unexecuted instantiation: bool yb::cdc::CDCServiceImpl::CheckOnline<yb::cdc::DeleteCDCStreamRequestPB, yb::cdc::DeleteCDCStreamResponsePB>(yb::cdc::DeleteCDCStreamRequestPB const*, yb::cdc::DeleteCDCStreamResponsePB*, yb::rpc::RpcContext*)
Unexecuted instantiation: bool yb::cdc::CDCServiceImpl::CheckOnline<yb::cdc::ListTabletsRequestPB, yb::cdc::ListTabletsResponsePB>(yb::cdc::ListTabletsRequestPB const*, yb::cdc::ListTabletsResponsePB*, yb::rpc::RpcContext*)
bool yb::cdc::CDCServiceImpl::CheckOnline<yb::cdc::GetChangesRequestPB, yb::cdc::GetChangesResponsePB>(yb::cdc::GetChangesRequestPB const*, yb::cdc::GetChangesResponsePB*, yb::rpc::RpcContext*)
Line
Count
Source
652
642
bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) {
653
642
  TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString());
654
642
  if (PREDICT_FALSE(!tablet_manager_)) {
655
0
    SetupErrorAndRespond(resp->mutable_error(),
656
0
                         STATUS(ServiceUnavailable, "Tablet Server is not running"),
657
0
                         CDCErrorPB::NOT_RUNNING,
658
0
                         rpc);
659
0
    return false;
660
0
  }
661
642
  return true;
662
642
}
bool yb::cdc::CDCServiceImpl::CheckOnline<yb::cdc::GetCheckpointRequestPB, yb::cdc::GetCheckpointResponsePB>(yb::cdc::GetCheckpointRequestPB const*, yb::cdc::GetCheckpointResponsePB*, yb::rpc::RpcContext*)
Line
Count
Source
652
14
bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) {
653
14
  TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString());
654
14
  if (PREDICT_FALSE(!tablet_manager_)) {
655
0
    SetupErrorAndRespond(resp->mutable_error(),
656
0
                         STATUS(ServiceUnavailable, "Tablet Server is not running"),
657
0
                         CDCErrorPB::NOT_RUNNING,
658
0
                         rpc);
659
0
    return false;
660
0
  }
661
14
  return true;
662
14
}
bool yb::cdc::CDCServiceImpl::CheckOnline<yb::cdc::UpdateCdcReplicatedIndexRequestPB, yb::cdc::UpdateCdcReplicatedIndexResponsePB>(yb::cdc::UpdateCdcReplicatedIndexRequestPB const*, yb::cdc::UpdateCdcReplicatedIndexResponsePB*, yb::rpc::RpcContext*)
Line
Count
Source
652
219
bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) {
653
219
  TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString());
654
219
  if (PREDICT_FALSE(!tablet_manager_)) {
655
0
    SetupErrorAndRespond(resp->mutable_error(),
656
0
                         STATUS(ServiceUnavailable, "Tablet Server is not running"),
657
0
                         CDCErrorPB::NOT_RUNNING,
658
0
                         rpc);
659
0
    return false;
660
0
  }
661
219
  return true;
662
219
}
Unexecuted instantiation: bool yb::cdc::CDCServiceImpl::CheckOnline<yb::cdc::GetCDCDBStreamInfoRequestPB, yb::cdc::GetCDCDBStreamInfoResponsePB>(yb::cdc::GetCDCDBStreamInfoRequestPB const*, yb::cdc::GetCDCDBStreamInfoResponsePB*, yb::rpc::RpcContext*)
663
664
void CDCServiceImpl::CreateEntryInCdcStateTable(
665
    const std::shared_ptr<client::TableHandle>& cdc_state_table,
666
    std::vector<ProducerTabletInfo>* producer_entries_modified,
667
    std::vector<client::YBOperationPtr>* ops,
668
    const CDCStreamId& stream_id,
669
    const TableId& table_id,
670
5.21k
    const TabletId& tablet_id) {
671
5.21k
  OpId op_id;
672
673
5.21k
  const auto cdc_state_table_op = cdc_state_table->NewWriteOp(
674
5.21k
      QLWriteRequestPB::QL_STMT_INSERT);
675
5.21k
  auto *const cdc_state_table_write_req = cdc_state_table_op->mutable_request();
676
677
5.21k
  QLAddStringHashValue(cdc_state_table_write_req, tablet_id);
678
5.21k
  QLAddStringRangeValue(cdc_state_table_write_req, stream_id);
679
5.21k
  cdc_state_table->AddStringColumnValue(cdc_state_table_write_req,
680
5.21k
                                        master::kCdcCheckpoint, op_id.ToString());
681
5.21k
  ops->push_back(std::move(cdc_state_table_op));
682
683
5.21k
  impl_->AddTabletCheckpoint(op_id, stream_id, tablet_id, producer_entries_modified);
684
5.21k
}
685
686
307
Result<NamespaceId> CDCServiceImpl::GetNamespaceId(const std::string& ns_name) {
687
307
  master::GetNamespaceInfoResponsePB namespace_info_resp;
688
307
  RETURN_NOT_OK(client()->GetNamespaceInfo(std::string(),
689
307
                                           ns_name,
690
307
                                           YQL_DATABASE_PGSQL,
691
307
                                           &namespace_info_resp));
692
693
306
  return namespace_info_resp.namespace_().id();
694
307
}
695
696
Status CDCServiceImpl::CreateCDCStreamForNamespace(
697
    const CreateCDCStreamRequestPB* req,
698
    CreateCDCStreamResponsePB* resp,
699
307
    CoarseTimePoint deadline) {
700
307
  auto session = client()->NewSession();
701
702
  // Used to delete streams in case of failure.
703
307
  CDCCreationState creation_state;
704
705
307
  auto scope_exit = ScopeExit([this, &creation_state] {
706
307
    RollbackPartialCreate(creation_state);
707
307
  });
708
709
307
  auto ns_id = 
VERIFY_RESULT_OR_SET_CODE306
(
710
306
      GetNamespaceId(req->namespace_name()), CDCError(CDCErrorPB::INVALID_REQUEST));
711
712
  // Generate a stream id by calling CreateCDCStream, and also setup the stream in the master.
713
0
  std::unordered_map<std::string, std::string> options = GetCreateCDCStreamOptions(req);
714
715
306
  CDCStreamId db_stream_id = VERIFY_RESULT_OR_SET_CODE(
716
306
      client()->CreateCDCStream(ns_id, options), CDCError(CDCErrorPB::INTERNAL_ERROR));
717
718
306
  auto table_list = VERIFY_RESULT_OR_SET_CODE(
719
306
      client()->ListUserTables(ns_id), CDCError(CDCErrorPB::INTERNAL_ERROR));
720
721
0
  options.erase(kIdType);
722
723
306
  std::vector<client::YBOperationPtr> ops;
724
306
  std::vector<TableId> table_ids;
725
306
  std::vector<CDCStreamId> stream_ids;
726
727
306
  auto cdc_state_table =
728
306
      VERIFY_RESULT_OR_SET_CODE(GetCdcStateTable(), CDCError(CDCErrorPB::INTERNAL_ERROR));
729
730
5.19k
  for (const auto& table_iter : table_list) {
731
5.19k
    std::shared_ptr<client::YBTable> table;
732
733
5.19k
    RETURN_NOT_OK_SET_CODE(
734
5.19k
        client()->OpenTable(table_iter.table_id(), &table), CDCError(CDCErrorPB::TABLE_NOT_FOUND));
735
736
    // internally if any of the table doesn't have a primary key, then do not create
737
    // a CDC stream ID for that table
738
5.19k
    if (!YsqlTableHasPrimaryKey(table->schema())) {
739
2
      LOG(WARNING) << "Skipping CDC stream creation on " << table->name().table_name()
740
2
                   << " because it does not have a primary key";
741
2
      continue;
742
2
    }
743
744
    // We don't allow CDC on YEDIS and tables without a primary key.
745
5.18k
    if (req->record_format() != CDCRecordFormat::WAL) {
746
5.18k
      RETURN_NOT_OK_SET_CODE(CheckCdcCompatibility(table), CDCError(CDCErrorPB::INVALID_REQUEST));
747
5.18k
    }
748
749
5.18k
    const CDCStreamId stream_id = VERIFY_RESULT_OR_SET_CODE(
750
5.18k
        client()->CreateCDCStream(table_iter.table_id(), options, true, db_stream_id),
751
5.18k
        CDCError(CDCErrorPB::INTERNAL_ERROR));
752
753
0
    creation_state.created_cdc_streams.push_back(stream_id);
754
755
5.18k
    google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
756
5.18k
    RETURN_NOT_OK_SET_CODE(
757
5.18k
        client()->GetTabletsFromTableId(table_iter.table_id(), 0, &tablets),
758
5.18k
        CDCError(CDCErrorPB::TABLE_NOT_FOUND));
759
760
    // For each tablet, create a row in cdc_state table containing the generated stream id, and
761
    // the op id as max in the logs.
762
5.21k
    
for (const auto& tablet : tablets)5.18k
{
763
5.21k
      CreateEntryInCdcStateTable(
764
5.21k
          cdc_state_table,
765
5.21k
          &creation_state.producer_entries_modified,
766
5.21k
          &ops,
767
5.21k
          db_stream_id,
768
5.21k
          table_iter.table_id(),
769
5.21k
          tablet.tablet_id());
770
5.21k
    }
771
5.18k
    stream_ids.push_back(std::move(stream_id));
772
5.18k
    table_ids.push_back(table_iter.table_id());
773
5.18k
  }
774
775
  // Add stream to cache.
776
306
  AddStreamMetadataToCache(
777
306
      db_stream_id,
778
306
      std::make_shared<StreamMetadata>(
779
306
          ns_id, table_ids, req->record_type(), req->record_format(), req->source_type(),
780
306
          req->checkpoint_type()));
781
782
306
  session->SetDeadline(deadline);
783
784
306
  RETURN_NOT_OK_SET_CODE(
785
306
      RefreshCacheOnFail(session->ApplyAndFlush(ops)), CDCError(CDCErrorPB::INTERNAL_ERROR));
786
787
306
  resp->set_db_stream_id(db_stream_id);
788
789
  // Clear creation_state so no changes are reversed by scope_exit since we succeeded.
790
306
  creation_state.Clear();
791
792
306
  return Status::OK();
793
306
}
794
795
void CDCServiceImpl::CreateCDCStream(const CreateCDCStreamRequestPB* req,
796
                                     CreateCDCStreamResponsePB* resp,
797
307
                                     RpcContext context) {
798
307
  CDCStreamId streamId;
799
800
307
  if (!CheckOnline(req, resp, &context)) {
801
0
    return;
802
0
  }
803
804
307
  RPC_CHECK_AND_RETURN_ERROR(req->has_table_id() || req->has_namespace_name(),
805
307
                             STATUS(InvalidArgument,
806
307
                                    "Table ID or Database name is required to create CDC stream"),
807
307
                             resp->mutable_error(),
808
307
                             CDCErrorPB::INVALID_REQUEST,
809
307
                             context);
810
811
307
  bool is_xcluster = req->source_type() == XCLUSTER;
812
307
  if (is_xcluster || req->has_table_id()) {
813
0
    std::shared_ptr<client::YBTable> table;
814
0
    Status s = client()->OpenTable(req->table_id(), &table);
815
0
    RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::TABLE_NOT_FOUND, context);
816
817
    // We don't allow CDC on YEDIS and tables without a primary key.
818
0
    if (req->record_format() != CDCRecordFormat::WAL) {
819
0
      s = CheckCdcCompatibility(table);
820
0
      RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);
821
0
    }
822
823
0
    std::unordered_map<std::string, std::string> options = GetCreateCDCStreamOptions(req);
824
825
0
    auto result = client()->CreateCDCStream(req->table_id(), options);
826
0
    RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(),
827
0
                               CDCErrorPB::INTERNAL_ERROR, context);
828
829
0
    resp->set_stream_id(*result);
830
831
    // Add stream to cache.
832
0
    AddStreamMetadataToCache(
833
0
        *result, std::make_shared<StreamMetadata>(
834
0
                     "",
835
0
                     std::vector<TableId>{req->table_id()},
836
0
                     req->record_type(),
837
0
                     req->record_format(),
838
0
                     req->source_type(),
839
0
                     req->checkpoint_type()));
840
307
  } else if (req->has_namespace_name()) {
841
307
    auto deadline = GetDeadline(context, client());
842
307
    Status status = CreateCDCStreamForNamespace(req, resp, deadline);
843
307
    CDCError error(status);
844
845
307
    if (!status.ok()) {
846
1
      SetupErrorAndRespond(resp->mutable_error(), status, error.value(), &context);
847
1
      return;
848
1
    }
849
307
  }
850
851
306
  context.RespondSuccess();
852
306
}
853
854
Result<SetCDCCheckpointResponsePB> CDCServiceImpl::SetCDCCheckpoint(
855
6
    const SetCDCCheckpointRequestPB& req, CoarseTimePoint deadline) {
856
6
  VLOG
(1) << "Received SetCDCCheckpoint request " << req.ShortDebugString()0
;
857
858
6
  RETURN_NOT_OK_SET_CODE(VerifyArg(req), CDCError(CDCErrorPB::INVALID_REQUEST));
859
860
6
  auto record = VERIFY_RESULT(GetStream(req.stream_id()));
861
6
  if ((*record).checkpoint_type != EXPLICIT) {
862
2
    LOG(WARNING) << "Setting the checkpoint explicitly even though the checkpoint type is implicit";
863
2
  }
864
865
6
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
866
6
  auto s = tablet_manager_->GetTabletPeer(req.tablet_id(), &tablet_peer);
867
868
6
  if (s.IsNotFound()) {
869
0
    RETURN_NOT_OK_SET_CODE(s, CDCError(CDCErrorPB::TABLET_NOT_FOUND));
870
6
  } else if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::NOT_LEADER) {
871
0
    RETURN_NOT_OK_SET_CODE(s, CDCError(CDCErrorPB::NOT_LEADER));
872
6
  } else if (!s.ok()) {
873
0
    RETURN_NOT_OK_SET_CODE(s, CDCError(CDCErrorPB::LEADER_NOT_READY));
874
0
  }
875
876
6
  ProducerTabletInfo producer_tablet{"" /* UUID */, req.stream_id(), req.tablet_id()};
877
6
  OpId checkpoint = OpId::FromPB(req.checkpoint().op_id());
878
879
6
  auto session = client()->NewSession();
880
6
  session->SetDeadline(deadline);
881
6
  RETURN_NOT_OK_SET_CODE(
882
6
      UpdateCheckpoint(producer_tablet, checkpoint, checkpoint, session, GetCurrentTimeMicros()),
883
6
      CDCError(CDCErrorPB::INTERNAL_ERROR));
884
885
6
  RETURN_NOT_OK_SET_CODE(
886
6
      DoUpdateCDCConsumerOpId(tablet_peer, checkpoint, req.tablet_id()),
887
6
      CDCError(CDCErrorPB::INTERNAL_ERROR));
888
889
6
  return SetCDCCheckpointResponsePB();
890
6
}
891
892
void CDCServiceImpl::DeleteCDCStream(const DeleteCDCStreamRequestPB* req,
893
                                     DeleteCDCStreamResponsePB* resp,
894
0
                                     RpcContext context) {
895
0
  if (!CheckOnline(req, resp, &context)) {
896
0
    return;
897
0
  }
898
899
0
  LOG(INFO) << "Received DeleteCDCStream request " << req->ShortDebugString();
900
901
0
  RPC_CHECK_AND_RETURN_ERROR(
902
0
      !req->stream_id().empty(),
903
0
      STATUS(InvalidArgument, "Stream ID or Database stream ID is required to delete CDC stream"),
904
0
      resp->mutable_error(),
905
0
      CDCErrorPB::INVALID_REQUEST,
906
0
      context);
907
908
0
  vector<CDCStreamId> streams(req->stream_id().begin(), req->stream_id().end());
909
0
  Status s = client()->DeleteCDCStream(
910
0
        streams,
911
0
        (req->has_force_delete() && req->force_delete()),
912
0
        (req->has_ignore_errors() && req->ignore_errors()));
913
0
  RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
914
915
0
  context.RespondSuccess();
916
0
}
917
918
void CDCServiceImpl::ListTablets(const ListTabletsRequestPB* req,
919
                                 ListTabletsResponsePB* resp,
920
0
                                 RpcContext context) {
921
0
  if (!CheckOnline(req, resp, &context)) {
922
0
    return;
923
0
  }
924
925
0
  RPC_CHECK_AND_RETURN_ERROR(req->has_stream_id(),
926
0
                             STATUS(InvalidArgument, "Stream ID is required to list tablets"),
927
0
                             resp->mutable_error(),
928
0
                             CDCErrorPB::INVALID_REQUEST,
929
0
                             context);
930
931
0
  auto tablets = GetTablets(req->stream_id());
932
0
  RPC_CHECK_AND_RETURN_ERROR(tablets.ok(), tablets.status(), resp->mutable_error(),
933
0
                             CDCErrorPB::INTERNAL_ERROR, context);
934
935
0
  if (!req->local_only()) {
936
0
    resp->mutable_tablets()->Reserve(tablets->size());
937
0
  }
938
939
0
  for (const auto& tablet : *tablets) {
940
    // Filter local tablets if needed.
941
0
    if (req->local_only()) {
942
0
      bool is_local = false;
943
0
      for (const auto& replica : tablet.replicas()) {
944
0
        if (replica.ts_info().permanent_uuid() == tablet_manager_->server()->permanent_uuid()) {
945
0
          is_local = true;
946
0
          break;
947
0
        }
948
0
      }
949
950
0
      if (!is_local) {
951
0
        continue;
952
0
      }
953
0
    }
954
955
0
    auto res = resp->add_tablets();
956
0
    res->set_tablet_id(tablet.tablet_id());
957
0
    res->mutable_tservers()->Reserve(tablet.replicas_size());
958
0
    for (const auto& replica : tablet.replicas()) {
959
0
      auto tserver =  res->add_tservers();
960
0
      tserver->mutable_broadcast_addresses()->CopyFrom(replica.ts_info().broadcast_addresses());
961
0
      if (tserver->broadcast_addresses_size() == 0) {
962
0
        LOG(WARNING) << "No public broadcast addresses found for "
963
0
                     << replica.ts_info().permanent_uuid() << ".  Using private addresses instead.";
964
0
        tserver->mutable_broadcast_addresses()->CopyFrom(replica.ts_info().private_rpc_addresses());
965
0
      }
966
0
    }
967
0
  }
968
969
0
  context.RespondSuccess();
970
0
}
971
972
Result<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> CDCServiceImpl::GetTablets(
973
1
    const CDCStreamId& stream_id) {
974
1
  auto stream_metadata = 
VERIFY_RESULT0
(GetStream(stream_id));0
975
0
  client::YBTableName table_name;
976
0
  google::protobuf::RepeatedPtrField<master::TabletLocationsPB> all_tablets;
977
978
0
  for (const auto& table_id : stream_metadata->table_ids) {
979
0
    google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
980
0
    table_name.set_table_id(table_id);
981
0
    RETURN_NOT_OK(client()->GetTablets(
982
0
        table_name, 0, &tablets, /* partition_list_version =*/nullptr,
983
0
        RequireTabletsRunning::kFalse, master::IncludeInactive::kTrue));
984
985
0
    all_tablets.MergeFrom(tablets);
986
0
  }
987
988
0
  return all_tablets;
989
0
}
990
991
void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
992
                                GetChangesResponsePB* resp,
993
642
                                RpcContext context) {
994
642
  if (!CheckOnline(req, resp, &context)) {
995
0
    return;
996
0
  }
997
642
  YB_LOG_EVERY_N_SECS
(INFO, 300) << "Received GetChanges request " << req->ShortDebugString()93
;
998
999
642
  RPC_CHECK_AND_RETURN_ERROR(req->has_tablet_id(),
1000
642
                             STATUS(InvalidArgument, "Tablet ID is required to get CDC changes"),
1001
642
                             resp->mutable_error(),
1002
642
                             CDCErrorPB::INVALID_REQUEST,
1003
642
                             context);
1004
642
  RPC_CHECK_AND_RETURN_ERROR(req->has_stream_id() || req->has_db_stream_id(),
1005
642
                             STATUS(InvalidArgument,
1006
642
                             "Stream ID/DB Stream ID is required to get CDC changes"),
1007
642
                             resp->mutable_error(),
1008
642
                             CDCErrorPB::INVALID_REQUEST,
1009
642
                             context);
1010
1011
642
  ProducerTabletInfo producer_tablet;
1012
642
  CDCStreamId stream_id = req->has_db_stream_id() ? req->db_stream_id() : 
req->stream_id()0
;
1013
1014
642
  auto session = client()->NewSession();
1015
642
  CoarseTimePoint deadline = GetDeadline(context, client());
1016
642
  session->SetDeadline(deadline);
1017
1018
  // Check that requested tablet_id is part of the CDC stream.
1019
642
  producer_tablet = {"" /* UUID */, stream_id, req->tablet_id()};
1020
1021
642
  Status s = CheckTabletValidForStream(producer_tablet);
1022
642
  RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);
1023
1024
641
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1025
641
  s = tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer);
1026
641
  auto original_leader_term = tablet_peer ? tablet_peer->LeaderTerm() : 
OpId::kUnknownTerm0
;
1027
1028
  // If we can't serve this tablet...
1029
641
  if (s.IsNotFound() || tablet_peer->LeaderStatus() != consensus::LeaderStatus::LEADER_AND_READY) {
1030
0
    if (req->serve_as_proxy()) {
1031
      // Forward GetChanges() to tablet leader. This commonly happens in Kubernetes setups.
1032
0
      auto context_ptr = std::make_shared<RpcContext>(std::move(context));
1033
0
      TabletLeaderGetChanges(req, resp, context_ptr, tablet_peer);
1034
    // Otherwise, figure out the proper return code.
1035
0
    } else if (s.IsNotFound()) {
1036
0
      SetupErrorAndRespond(resp->mutable_error(), s, CDCErrorPB::TABLET_NOT_FOUND, &context);
1037
0
    } else if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::NOT_LEADER) {
1038
      // TODO: we may be able to get some changes, even if we're not the leader.
1039
0
      SetupErrorAndRespond(resp->mutable_error(),
1040
0
          STATUS(NotFound, Format("Not leader for $0", req->tablet_id())),
1041
0
          CDCErrorPB::TABLET_NOT_FOUND, &context);
1042
0
    } else {
1043
0
      SetupErrorAndRespond(resp->mutable_error(),
1044
0
          STATUS(LeaderNotReadyToServe, "Not ready to serve"),
1045
0
          CDCErrorPB::LEADER_NOT_READY, &context);
1046
0
    }
1047
0
    return;
1048
0
  }
1049
1050
  // This is the leader tablet, so mark cdc as enabled.
1051
641
  cdc_enabled_.store(true, std::memory_order_release);
1052
1053
641
  auto res = GetStream(stream_id);
1054
641
  RPC_CHECK_AND_RETURN_ERROR(res.ok(), res.status(), resp->mutable_error(),
1055
641
                             CDCErrorPB::INTERNAL_ERROR, context);
1056
641
  StreamMetadata record = **res;
1057
1058
641
  OpId op_id;
1059
641
  CDCSDKCheckpointPB cdc_sdk_op_id;
1060
  // Get opId from request.
1061
641
  if (!GetFromOpId(req, &op_id, &cdc_sdk_op_id)) {
1062
605
    auto result = GetLastCheckpoint(producer_tablet, session);
1063
605
    RPC_CHECK_AND_RETURN_ERROR(
1064
605
        result.ok(), result.status(), resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1065
605
    if (record.source_type == XCLUSTER) {
1066
0
      op_id = *result;
1067
605
    } else {
1068
605
      result->ToPB(&cdc_sdk_op_id);
1069
605
      op_id = OpId::FromPB(cdc_sdk_op_id);
1070
605
    }
1071
605
  }
1072
1073
641
  int64_t last_readable_index;
1074
641
  consensus::ReplicateMsgsHolder msgs_holder;
1075
641
  MemTrackerPtr mem_tracker = impl_->GetMemTracker(tablet_peer, producer_tablet);
1076
1077
  // Calculate deadline to be passed to GetChanges.
1078
641
  CoarseTimePoint get_changes_deadline = CoarseTimePoint::max();
1079
641
  if (deadline != CoarseTimePoint::max()) {
1080
    // Check if we are too close to calculate a safe deadline.
1081
641
    RPC_CHECK_AND_RETURN_ERROR(
1082
641
      deadline - CoarseMonoClock::Now() > 1ms,
1083
641
      STATUS(TimedOut, "Too close to rpc timeout to call GetChanges."),
1084
641
      resp->mutable_error(),
1085
641
      CDCErrorPB::INTERNAL_ERROR, context);
1086
1087
    // Calculate a safe deadline so that CdcProducer::GetChanges times out
1088
    // 20% faster than CdcServiceImpl::GetChanges. This gives enough
1089
    // time (unless timeouts are unrealistically small) for CdcServiceImpl::GetChanges
1090
    // to finish post-processing and return the partial results without itself timing out.
1091
641
    const auto safe_deadline = deadline -
1092
641
      (FLAGS_cdc_read_rpc_timeout_ms * 1ms * FLAGS_cdc_read_safe_deadline_ratio);
1093
641
    get_changes_deadline = ToCoarse(MonoTime::FromUint64(safe_deadline.time_since_epoch().count()));
1094
641
  }
1095
1096
  // Read the latest changes from the Log.
1097
641
  if (record.source_type == XCLUSTER) {
1098
0
    s = cdc::GetChangesForXCluster(
1099
0
        stream_id, req->tablet_id(), op_id, record, tablet_peer, mem_tracker,
1100
0
        &msgs_holder, resp, &last_readable_index, get_changes_deadline);
1101
641
  } else {
1102
641
    std::string commit_timestamp;
1103
641
    OpId last_streamed_op_id;
1104
1105
641
    auto cached_schema = impl_->GetOrAddSchema(producer_tablet);
1106
641
    s = cdc::GetChangesForCDCSDK(
1107
641
        req->stream_id(), req->tablet_id(), cdc_sdk_op_id, record, tablet_peer, mem_tracker,
1108
641
        &msgs_holder, resp, &commit_timestamp, &cached_schema,
1109
641
        &last_streamed_op_id, &last_readable_index, get_changes_deadline);
1110
1111
641
    impl_->UpdateCDCStateMetadata(
1112
641
        producer_tablet, commit_timestamp, cached_schema, last_streamed_op_id);
1113
641
  }
1114
1115
641
  RPC_STATUS_RETURN_ERROR(
1116
641
      s,
1117
641
      resp->mutable_error(),
1118
641
      s.IsNotFound() ? CDCErrorPB::CHECKPOINT_TOO_OLD : CDCErrorPB::UNKNOWN_ERROR,
1119
641
      context);
1120
1121
  // Verify leadership was maintained for the duration of the GetChanges() read.
1122
641
  s = tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer);
1123
641
  if (s.IsNotFound() || tablet_peer->LeaderStatus() != consensus::LeaderStatus::LEADER_AND_READY ||
1124
641
      tablet_peer->LeaderTerm() != original_leader_term) {
1125
0
    SetupErrorAndRespond(resp->mutable_error(),
1126
0
        STATUS(NotFound, Format("Not leader for $0", req->tablet_id())),
1127
0
        CDCErrorPB::TABLET_NOT_FOUND, &context);
1128
0
    return;
1129
0
  }
1130
1131
  // Store information about the last server read & remote client ACK.
1132
641
  uint64_t last_record_hybrid_time = resp->records_size() > 0 ?
1133
641
      
resp->records(resp->records_size() - 1).time()0
: 0;
1134
1135
641
  if (record.checkpoint_type == IMPLICIT) {
1136
637
    if (UpdateCheckpointRequired(record, cdc_sdk_op_id)) {
1137
625
      s = UpdateCheckpoint(producer_tablet, OpId::FromPB(resp->checkpoint().op_id()),
1138
625
                           op_id, session, last_record_hybrid_time);
1139
625
    }
1140
1141
637
    RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1142
1143
637
    s = DoUpdateCDCConsumerOpId(tablet_peer,
1144
637
                                impl_->GetMinSentCheckpointForTablet(req->tablet_id()),
1145
637
                                req->tablet_id());
1146
1147
637
    RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1148
637
  }
1149
  // Update relevant GetChanges metrics before handing off the Response.
1150
641
  UpdateCDCTabletMetrics(resp, producer_tablet, tablet_peer, op_id, last_readable_index);
1151
641
  context.RespondSuccess();
1152
641
}
1153
1154
Status CDCServiceImpl::UpdatePeersCdcMinReplicatedIndex(const TabletId& tablet_id,
1155
                                                        int64_t min_index,
1156
219
                                                        int64_t min_term) {
1157
219
  std::vector<client::internal::RemoteTabletServer *> servers;
1158
219
  RETURN_NOT_OK(GetTServers(tablet_id, &servers));
1159
219
  for (const auto &server : servers) {
1160
219
    if (server->IsLocal()) {
1161
      // We modify our log directly. Avoid calling itself through the proxy.
1162
0
      continue;
1163
0
    }
1164
219
    LOG(INFO) << "Modifying remote peer " << server->ToString();
1165
219
    auto proxy = GetCDCServiceProxy(server);
1166
219
    UpdateCdcReplicatedIndexRequestPB update_index_req;
1167
219
    UpdateCdcReplicatedIndexResponsePB update_index_resp;
1168
219
    update_index_req.set_tablet_id(tablet_id);
1169
219
    update_index_req.set_replicated_index(min_index);
1170
219
    update_index_req.set_replicated_term(min_term);
1171
219
    rpc::RpcController rpc;
1172
219
    rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms));
1173
219
    RETURN_NOT_OK(proxy->UpdateCdcReplicatedIndex(update_index_req, &update_index_resp, &rpc));
1174
219
    if (update_index_resp.has_error()) {
1175
0
      return StatusFromPB(update_index_resp.error().status());
1176
0
    }
1177
219
  }
1178
219
  return Status::OK();
1179
219
}
1180
1181
void CDCServiceImpl::ComputeLagMetric(int64_t last_replicated_micros,
1182
                                      int64_t metric_last_timestamp_micros,
1183
                                      int64_t cdc_state_last_replication_time_micros,
1184
440
                                      scoped_refptr<AtomicGauge<int64_t>> metric) {
1185
440
  if (metric_last_timestamp_micros == 0) {
1186
    // The tablet metric timestamp is uninitialized, so try to use last replicated time in cdc
1187
    // state.
1188
398
    if (cdc_state_last_replication_time_micros == 0) {
1189
      // Last replicated time in cdc state is uninitialized as well, so set the metric value to
1190
      // 0 and update later when we have a suitable lower bound.
1191
398
      metric->set_value(0);
1192
398
    } else {
1193
0
      metric->set_value(last_replicated_micros - cdc_state_last_replication_time_micros);
1194
0
    }
1195
398
  } else {
1196
42
    metric->set_value(last_replicated_micros - metric_last_timestamp_micros);
1197
42
  }
1198
440
}
1199
1200
94
void CDCServiceImpl::UpdateLagMetrics() {
1201
94
  auto tablet_checkpoints = impl_->TabletCheckpointsCopy();
1202
1203
94
  auto cdc_state_table_result = GetCdcStateTable();
1204
94
  if (!cdc_state_table_result.ok()) {
1205
    // It is possible that this runs before the cdc_state table is created. This is
1206
    // ok. It just means that this is the first time the cluster starts.
1207
0
    YB_LOG_EVERY_N_SECS(WARNING, 30)
1208
0
        << "Unable to open table " << kCdcStateTableName.table_name() << " for metrics update.";
1209
0
    return;
1210
0
  }
1211
1212
94
  std::unordered_set<ProducerTabletInfo, ProducerTabletInfo::Hash> tablets_in_cdc_state_table;
1213
94
  client::TableIteratorOptions options;
1214
94
  options.columns = std::vector<string>{
1215
94
      master::kCdcTabletId, master::kCdcStreamId, master::kCdcLastReplicationTime};
1216
94
  bool failed = false;
1217
94
  options.error_handler = [&failed](const Status& status) {
1218
0
    YB_LOG_EVERY_N_SECS(WARNING, 30) << "Scan of table " << kCdcStateTableName.table_name()
1219
0
                                     << " failed: " << status << ". Could not update metrics.";
1220
0
    failed = true;
1221
0
  };
1222
  // First go through tablets in the cdc_state table and update metrics for each one.
1223
245
  for (const auto& row : client::TableRange(**cdc_state_table_result, options)) {
1224
245
    auto tablet_id = row.column(master::kCdcTabletIdIdx).string_value();
1225
245
    auto stream_id = row.column(master::kCdcStreamIdIdx).string_value();
1226
245
    std::shared_ptr<tablet::TabletPeer> tablet_peer;
1227
245
    Status s = tablet_manager_->GetTabletPeer(tablet_id, &tablet_peer);
1228
245
    if (s.IsNotFound()) {
1229
25
      continue;
1230
25
    }
1231
1232
220
    ProducerTabletInfo tablet_info = {"" /* universe_uuid */, stream_id, tablet_id};
1233
220
    tablets_in_cdc_state_table.insert(tablet_info);
1234
220
    auto tablet_metric = GetCDCTabletMetrics(tablet_info, tablet_peer);
1235
220
    if (!tablet_metric) {
1236
0
      continue;
1237
0
    }
1238
220
    if (tablet_peer->LeaderStatus() != consensus::LeaderStatus::LEADER_AND_READY) {
1239
      // Set lag to 0 because we're not the leader for this tablet anymore, which means another peer
1240
      // is responsible for tracking this tablet's lag.
1241
0
      tablet_metric->async_replication_sent_lag_micros->set_value(0);
1242
0
      tablet_metric->async_replication_committed_lag_micros->set_value(0);
1243
220
    } else {
1244
      // Get the physical time of the last committed record on producer.
1245
220
      auto last_replicated_micros = GetLastReplicatedTime(tablet_peer);
1246
220
      const auto& timestamp_ql_value = row.column(2);
1247
220
      auto cdc_state_last_replication_time_micros =
1248
220
          !timestamp_ql_value.IsNull() ?
1249
220
          
timestamp_ql_value.timestamp_value().ToInt64()0
: 0;
1250
220
      auto last_sent_micros = tablet_metric->last_read_physicaltime->value();
1251
220
      ComputeLagMetric(last_replicated_micros, last_sent_micros,
1252
220
                       cdc_state_last_replication_time_micros,
1253
220
                       tablet_metric->async_replication_sent_lag_micros);
1254
220
      auto last_committed_micros = tablet_metric->last_checkpoint_physicaltime->value();
1255
220
      ComputeLagMetric(last_replicated_micros, last_committed_micros,
1256
220
                       cdc_state_last_replication_time_micros,
1257
220
                       tablet_metric->async_replication_committed_lag_micros);
1258
220
    }
1259
220
  }
1260
94
  if (failed) {
1261
0
    RefreshCdcStateTable();
1262
0
    return;
1263
0
  }
1264
1265
  // Now, go through tablets in tablet_checkpoints_ and set lag to 0 for all tablets we're no
1266
  // longer replicating.
1267
265
  
for (const auto& checkpoint : tablet_checkpoints)94
{
1268
265
    const ProducerTabletInfo& tablet_info = checkpoint.producer_tablet_info;
1269
265
    if (tablets_in_cdc_state_table.find(tablet_info) == tablets_in_cdc_state_table.end()) {
1270
      // We're no longer replicating this tablet, so set lag to 0.
1271
45
      std::shared_ptr<tablet::TabletPeer> tablet_peer;
1272
45
      Status s = tablet_manager_->GetTabletPeer(checkpoint.tablet_id(), &tablet_peer);
1273
45
      if (s.IsNotFound()) {
1274
25
        continue;
1275
25
      }
1276
20
      auto tablet_metric = GetCDCTabletMetrics(checkpoint.producer_tablet_info, tablet_peer);
1277
20
      if (!tablet_metric) {
1278
0
        continue;
1279
0
      }
1280
20
      tablet_metric->async_replication_sent_lag_micros->set_value(0);
1281
20
      tablet_metric->async_replication_committed_lag_micros->set_value(0);
1282
20
    }
1283
265
  }
1284
94
}
1285
1286
1.22k
bool CDCServiceImpl::ShouldUpdateLagMetrics(MonoTime time_since_update_metrics) {
1287
  // Only update metrics if cdc is enabled, which means we have a valid replication stream.
1288
1.22k
  return GetAtomicFlag(&FLAGS_enable_collect_cdc_metrics) &&
1289
1.22k
         (time_since_update_metrics == MonoTime::kUninitialized ||
1290
1.22k
         MonoTime::Now() - time_since_update_metrics >=
1291
1.13k
             MonoDelta::FromMilliseconds(GetAtomicFlag(&FLAGS_update_metrics_interval_ms)));
1292
1.22k
}
1293
1294
0
bool CDCServiceImpl::CDCEnabled() {
1295
0
  return cdc_enabled_.load(std::memory_order_acquire);
1296
0
}
1297
1298
1.10k
Result<std::shared_ptr<client::TableHandle>> CDCServiceImpl::GetCdcStateTable() {
1299
1.10k
  bool use_cache = GetAtomicFlag(&FLAGS_enable_cdc_state_table_caching);
1300
1.10k
  {
1301
1.10k
    SharedLock<decltype(mutex_)> l(mutex_);
1302
1.10k
    if (cdc_state_table_ && 
use_cache1.01k
) {
1303
1.01k
      return cdc_state_table_;
1304
1.01k
    }
1305
98
    if (cdc_service_stopped_) {
1306
0
      return STATUS(ShutdownInProgress, "");
1307
0
    }
1308
98
  }
1309
1310
98
  auto cdc_state_table = std::make_shared<yb::client::TableHandle>();
1311
98
  auto s = cdc_state_table->Open(kCdcStateTableName, client());
1312
  // It is possible that this runs before the cdc_state table is created.
1313
98
  RETURN_NOT_OK(s);
1314
1315
98
  {
1316
98
    std::lock_guard<decltype(mutex_)> l(mutex_);
1317
98
    if (cdc_state_table_ && 
use_cache0
) {
1318
0
      return cdc_state_table_;
1319
0
    }
1320
98
    if (cdc_service_stopped_) {
1321
0
      return STATUS(ShutdownInProgress, "");
1322
0
    }
1323
98
    cdc_state_table_ = cdc_state_table;
1324
98
    return cdc_state_table_;
1325
98
  }
1326
98
}
1327
1328
0
void CDCServiceImpl::RefreshCdcStateTable() {
1329
  // Set cached value to null so we regenerate it on the next call.
1330
0
  std::lock_guard<decltype(mutex_)> l(mutex_);
1331
0
  cdc_state_table_ = nullptr;
1332
0
}
1333
1334
921
Status CDCServiceImpl::RefreshCacheOnFail(const Status& s) {
1335
921
  if (!s.ok()) {
1336
0
    RefreshCdcStateTable();
1337
0
  }
1338
921
  return s;
1339
921
}
1340
1341
MicrosTime CDCServiceImpl::GetLastReplicatedTime(
1342
861
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer) {
1343
861
  tablet::RemoveIntentsData data;
1344
861
  auto status = tablet_peer->GetLastReplicatedData(&data);
1345
861
  return status.ok() ? data.log_ht.GetPhysicalValueMicros() : 
00
;
1346
861
}
1347
1348
8.74k
void CDCServiceImpl::UpdatePeersAndMetrics() {
1349
8.74k
  int64_t current_term = -1;
1350
8.74k
  MonoTime time_since_update_peers = MonoTime::kUninitialized;
1351
8.74k
  MonoTime time_since_update_metrics = MonoTime::kUninitialized;
1352
1353
  // Returns false if the CDC service has been stopped.
1354
48.7M
  auto sleep_while_not_stopped = [this]() {
1355
48.7M
    int min_sleep_ms = std::min(100, GetAtomicFlag(&FLAGS_update_metrics_interval_ms));
1356
48.7M
    auto sleep_period = MonoDelta::FromMilliseconds(min_sleep_ms);
1357
48.7M
    SleepFor(sleep_period);
1358
1359
48.7M
    SharedLock<decltype(mutex_)> l(mutex_);
1360
48.7M
    return !cdc_service_stopped_;
1361
48.7M
  };
1362
1363
48.7M
  do {
1364
48.7M
    if (!cdc_enabled_.load(std::memory_order_acquire)) {
1365
      // Have not yet received any GetChanges requests, so skip background thread work.
1366
48.7M
      continue;
1367
48.7M
    }
1368
    // Should we update lag metrics default every 1s.
1369
1.22k
    if (ShouldUpdateLagMetrics(time_since_update_metrics)) {
1370
94
      UpdateLagMetrics();
1371
94
      time_since_update_metrics = MonoTime::Now();
1372
94
    }
1373
1374
    // If its not been 60s since the last peer update, continue.
1375
1.22k
    if (!FLAGS_enable_log_retention_by_op_idx ||
1376
1.22k
        (time_since_update_peers != MonoTime::kUninitialized &&
1377
1.22k
         MonoTime::Now() - time_since_update_peers <
1378
1.13k
             MonoDelta::FromSeconds(GetAtomicFlag(&FLAGS_update_min_cdc_indices_interval_secs)))) {
1379
1.13k
      continue;
1380
1.13k
    }
1381
1382
93
    time_since_update_peers = MonoTime::Now();
1383
93
    LOG(INFO) << "Started to read minimum replicated indices for all tablets";
1384
1385
93
    auto cdc_state_table_result = GetCdcStateTable();
1386
93
    if (!cdc_state_table_result.ok()) {
1387
      // It is possible that this runs before the cdc_state table is created. This is
1388
      // ok. It just means that this is the first time the cluster starts.
1389
0
      YB_LOG_EVERY_N_SECS(WARNING, 3600) << "Unable to open table "
1390
0
                                         << kCdcStateTableName.table_name()
1391
0
                                         << ". CDC min replicated indices won't be updated";
1392
0
      continue;
1393
0
    }
1394
1395
93
    int count = 0;
1396
93
    std::unordered_map<std::string, int64_t> tablet_min_checkpoint_index;
1397
93
    client::TableIteratorOptions options;
1398
93
    bool failed = false;
1399
93
    options.error_handler = [&failed](const Status& status) {
1400
0
      LOG(WARNING) << "Scan of table " << kCdcStateTableName.table_name() << " failed: " << status;
1401
0
      failed = true;
1402
0
    };
1403
93
    options.columns = std::vector<std::string>{master::kCdcTabletId, master::kCdcStreamId,
1404
93
        master::kCdcCheckpoint, master::kCdcLastReplicationTime};
1405
219
    for (const auto& row : client::TableRange(**cdc_state_table_result, options)) {
1406
219
      count++;
1407
219
      auto tablet_id = row.column(master::kCdcTabletIdIdx).string_value();
1408
219
      auto stream_id = row.column(master::kCdcStreamIdIdx).string_value();
1409
219
      auto checkpoint = row.column(master::kCdcCheckpointIdx).string_value();
1410
219
      std::string last_replicated_time_str;
1411
219
      const auto& timestamp_ql_value = row.column(3);
1412
219
      if (!timestamp_ql_value.IsNull()) {
1413
0
        last_replicated_time_str = timestamp_ql_value.timestamp_value().ToFormattedString();
1414
0
      }
1415
1416
219
      VLOG(1) << "stream_id: " << stream_id << ", tablet_id: " << tablet_id
1417
0
              << ", checkpoint: " << checkpoint << ", last replicated time: "
1418
0
              << last_replicated_time_str;
1419
1420
219
      auto result = OpId::FromString(checkpoint);
1421
219
      if (!result.ok()) {
1422
0
        LOG(WARNING) << "Read invalid op id " << row.column(1).string_value()
1423
0
                     << " for tablet " << tablet_id;
1424
0
        continue;
1425
0
      }
1426
1427
219
      auto index = (*result).index;
1428
219
      current_term = (*result).term;
1429
219
      auto it = tablet_min_checkpoint_index.find(tablet_id);
1430
219
      if (it == tablet_min_checkpoint_index.end()) {
1431
219
        tablet_min_checkpoint_index[tablet_id] = index;
1432
219
      } else {
1433
0
        if (index < it->second) {
1434
0
          it->second = index;
1435
0
        }
1436
0
      }
1437
219
    }
1438
93
    if (failed) {
1439
0
      RefreshCdcStateTable();
1440
0
      continue;
1441
0
    }
1442
93
    LOG(INFO) << "Read " << count << " records from " << kCdcStateTableName.table_name();
1443
1444
93
    VLOG
(3) << "tablet_min_checkpoint_index size " << tablet_min_checkpoint_index.size()0
;
1445
219
    for (const auto &elem : tablet_min_checkpoint_index) {
1446
219
      auto tablet_id = elem.first;
1447
219
      std::shared_ptr<tablet::TabletPeer> tablet_peer;
1448
1449
219
      Status s = tablet_manager_->GetTabletPeer(tablet_id, &tablet_peer);
1450
219
      if (s.IsNotFound()) {
1451
0
        VLOG(2) << "Did not found tablet peer for tablet " << tablet_id;
1452
0
        continue;
1453
219
      } else if (!IsTabletPeerLeader(tablet_peer)) {
1454
0
        VLOG(2) << "Tablet peer " << tablet_peer->permanent_uuid()
1455
0
                << " is not the leader for tablet " << tablet_id;
1456
0
        continue;
1457
219
      } else if (!s.ok()) {
1458
0
        LOG(WARNING) << "Error getting tablet_peer for tablet " << tablet_id << ": " << s;
1459
0
        continue;
1460
0
      }
1461
1462
219
      auto min_index = elem.second;
1463
219
      s = tablet_peer->set_cdc_min_replicated_index(min_index);
1464
219
      if (!s.ok()) {
1465
0
        LOG(WARNING) << "Unable to set cdc min index for tablet peer "
1466
0
                     << tablet_peer->permanent_uuid()
1467
0
                     << " and tablet " << tablet_peer->tablet_id()
1468
0
                     << ": " << s;
1469
0
      }
1470
219
      VLOG
(1) << "Updating followers for tablet " << tablet_id << " with index " << min_index0
;
1471
219
      WARN_NOT_OK(UpdatePeersCdcMinReplicatedIndex(tablet_id, min_index, current_term),
1472
219
                  "UpdatePeersCdcMinReplicatedIndex failed");
1473
219
    }
1474
93
    LOG(INFO) << "Done reading all the indices for all tablets and updating peers";
1475
48.7M
  } while (sleep_while_not_stopped());
1476
8.74k
}
1477
1478
Result<client::internal::RemoteTabletPtr> CDCServiceImpl::GetRemoteTablet(
1479
219
    const TabletId& tablet_id) {
1480
219
  std::promise<Result<client::internal::RemoteTabletPtr>> tablet_lookup_promise;
1481
219
  auto future = tablet_lookup_promise.get_future();
1482
219
  auto callback = [&tablet_lookup_promise](
1483
219
      const Result<client::internal::RemoteTabletPtr>& result) {
1484
219
    tablet_lookup_promise.set_value(result);
1485
219
  };
1486
1487
219
  auto start = CoarseMonoClock::Now();
1488
219
  client()->LookupTabletById(
1489
219
      tablet_id,
1490
219
      /* table =*/ nullptr,
1491
      // In case this is a split parent tablet, it will be hidden so we need this flag to access it.
1492
219
      master::IncludeInactive::kTrue,
1493
219
      CoarseMonoClock::Now() + MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms),
1494
219
      callback, client::UseCache::kFalse);
1495
219
  future.wait();
1496
1497
219
  auto duration = CoarseMonoClock::Now() - start;
1498
219
  if (duration > (kMaxDurationForTabletLookup * 1ms)) {
1499
0
    LOG(WARNING) << "LookupTabletByKey took long time: " << duration << " ms";
1500
0
  }
1501
1502
219
  auto remote_tablet = VERIFY_RESULT(future.get());
1503
0
  return remote_tablet;
1504
219
}
1505
1506
0
Result<RemoteTabletServer *> CDCServiceImpl::GetLeaderTServer(const TabletId& tablet_id) {
1507
0
  auto result = VERIFY_RESULT(GetRemoteTablet(tablet_id));
1508
1509
0
  auto ts = result->LeaderTServer();
1510
0
  if (ts == nullptr) {
1511
0
    return STATUS(NotFound, "Tablet leader not found for tablet", tablet_id);
1512
0
  }
1513
0
  return ts;
1514
0
}
1515
1516
Status CDCServiceImpl::GetTServers(const TabletId& tablet_id,
1517
219
                                   std::vector<client::internal::RemoteTabletServer*>* servers) {
1518
219
  auto result = VERIFY_RESULT(GetRemoteTablet(tablet_id));
1519
1520
0
  result->GetRemoteTabletServers(servers);
1521
219
  return Status::OK();
1522
219
}
1523
1524
219
std::shared_ptr<CDCServiceProxy> CDCServiceImpl::GetCDCServiceProxy(RemoteTabletServer* ts) {
1525
219
  auto hostport = HostPortFromPB(ts->DesiredHostPort(client()->cloud_info()));
1526
219
  DCHECK(!hostport.host().empty());
1527
1528
219
  {
1529
219
    SharedLock<decltype(mutex_)> l(mutex_);
1530
219
    auto it = cdc_service_map_.find(hostport);
1531
219
    if (it != cdc_service_map_.end()) {
1532
126
      return it->second;
1533
126
    }
1534
219
  }
1535
1536
93
  auto cdc_service = std::make_shared<CDCServiceProxy>(&client()->proxy_cache(), hostport);
1537
1538
93
  {
1539
93
    std::lock_guard<decltype(mutex_)> l(mutex_);
1540
93
    auto it = cdc_service_map_.find(hostport);
1541
93
    if (it != cdc_service_map_.end()) {
1542
0
      return it->second;
1543
0
    }
1544
93
    cdc_service_map_.emplace(hostport, cdc_service);
1545
93
  }
1546
0
  return cdc_service;
1547
93
}
1548
1549
void CDCServiceImpl::TabletLeaderGetChanges(const GetChangesRequestPB* req,
1550
                                            GetChangesResponsePB* resp,
1551
                                            std::shared_ptr<RpcContext> context,
1552
0
                                            std::shared_ptr<tablet::TabletPeer> peer) {
1553
0
  auto rpc_handle = rpcs_.Prepare();
1554
0
  RPC_CHECK_AND_RETURN_ERROR(rpc_handle != rpcs_.InvalidHandle(),
1555
0
      STATUS(Aborted,
1556
0
          Format("Could not create valid handle for GetChangesCDCRpc: tablet=$0, peer=$1",
1557
0
                 req->tablet_id(),
1558
0
                 peer->permanent_uuid())),
1559
0
      resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, *context.get());
1560
1561
  // Increment Proxy Metric.
1562
0
  server_metrics_->cdc_rpc_proxy_count->Increment();
1563
1564
  // Forward this Request Info to the proper TabletServer.
1565
0
  GetChangesRequestPB new_req;
1566
0
  new_req.CopyFrom(*req);
1567
0
  new_req.set_serve_as_proxy(false);
1568
0
  CoarseTimePoint deadline = GetDeadline(*context.get(), client());
1569
1570
0
  *rpc_handle = CreateGetChangesCDCRpc(
1571
0
      deadline,
1572
0
      nullptr, /* RemoteTablet: will get this from 'new_req' */
1573
0
      client(),
1574
0
      &new_req,
1575
0
      [=] (Status status, GetChangesResponsePB&& new_resp) {
1576
0
        auto retained = rpcs_.Unregister(rpc_handle);
1577
0
        *resp = std::move(new_resp);
1578
0
        RPC_STATUS_RETURN_ERROR(status, resp->mutable_error(), resp->error().code(),
1579
0
                                *context.get());
1580
0
        context->RespondSuccess();
1581
0
      });
1582
0
  (**rpc_handle).SendRpc();
1583
0
}
1584
1585
void CDCServiceImpl::TabletLeaderGetCheckpoint(const GetCheckpointRequestPB* req,
1586
                                               GetCheckpointResponsePB* resp,
1587
                                               RpcContext* context,
1588
0
                                               const std::shared_ptr<tablet::TabletPeer>& peer) {
1589
0
  auto result = GetLeaderTServer(req->tablet_id());
1590
0
  RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(),
1591
0
                             CDCErrorPB::TABLET_NOT_FOUND, *context);
1592
1593
0
  auto ts_leader = *result;
1594
  // Check that tablet leader identified by master is not current tablet peer.
1595
  // This can happen during tablet rebalance if master and tserver have different views of
1596
  // leader. We need to avoid self-looping in this case.
1597
0
  if (peer) {
1598
0
    RPC_CHECK_NE_AND_RETURN_ERROR(ts_leader->permanent_uuid(), peer->permanent_uuid(),
1599
0
                                  STATUS(IllegalState,
1600
0
                                         Format("Tablet leader changed: leader=$0, peer=$1",
1601
0
                                                ts_leader->permanent_uuid(),
1602
0
                                                peer->permanent_uuid())),
1603
0
                                  resp->mutable_error(), CDCErrorPB::NOT_LEADER, *context);
1604
0
  }
1605
1606
0
  auto cdc_proxy = GetCDCServiceProxy(ts_leader);
1607
0
  rpc::RpcController rpc;
1608
0
  rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms));
1609
  // TODO(NIC): Change to GetCheckpointAsync like CDCPoller::DoPoll.
1610
0
  auto status = cdc_proxy->GetCheckpoint(*req, resp, &rpc);
1611
0
  RPC_STATUS_RETURN_ERROR(status, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, *context);
1612
0
  context->RespondSuccess();
1613
0
}
1614
1615
void CDCServiceImpl::GetCheckpoint(const GetCheckpointRequestPB* req,
1616
                                   GetCheckpointResponsePB* resp,
1617
14
                                   RpcContext context) {
1618
14
  if (!CheckOnline(req, resp, &context)) {
1619
0
    return;
1620
0
  }
1621
1622
14
  RPC_CHECK_AND_RETURN_ERROR(req->has_tablet_id(),
1623
14
                             STATUS(InvalidArgument, "Tablet ID is required to get CDC checkpoint"),
1624
14
                             resp->mutable_error(),
1625
14
                             CDCErrorPB::INVALID_REQUEST,
1626
14
                             context);
1627
14
  RPC_CHECK_AND_RETURN_ERROR(req->has_stream_id(),
1628
14
                             STATUS(InvalidArgument, "Stream ID is required to get CDC checkpoint"),
1629
14
                             resp->mutable_error(),
1630
14
                             CDCErrorPB::INVALID_REQUEST,
1631
14
                             context);
1632
1633
14
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1634
14
  Status s = tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer);
1635
1636
14
  if (s.IsNotFound() || !IsTabletPeerLeader(tablet_peer)) {
1637
    // Forward GetChanges() to tablet leader. This happens often in Kubernetes setups.
1638
0
    TabletLeaderGetCheckpoint(req, resp, &context, tablet_peer);
1639
0
    return;
1640
0
  }
1641
1642
  // Check that requested tablet_id is part of the CDC stream.
1643
14
  ProducerTabletInfo producer_tablet = {"" /* UUID */, req->stream_id(), req->tablet_id()};
1644
14
  s = CheckTabletValidForStream(producer_tablet);
1645
14
  RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);
1646
1647
14
  auto session = client()->NewSession();
1648
14
  CoarseTimePoint deadline = GetDeadline(context, client());
1649
1650
14
  session->SetDeadline(deadline);
1651
1652
14
  auto result = GetLastCheckpoint(producer_tablet, session);
1653
14
  RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(),
1654
14
                             CDCErrorPB::INTERNAL_ERROR, context);
1655
1656
14
  result->ToPB(resp->mutable_checkpoint()->mutable_op_id());
1657
14
  context.RespondSuccess();
1658
14
}
1659
1660
void CDCServiceImpl::UpdateCdcReplicatedIndex(const UpdateCdcReplicatedIndexRequestPB* req,
1661
                                              UpdateCdcReplicatedIndexResponsePB* resp,
1662
219
                                              rpc::RpcContext context) {
1663
219
  if (!CheckOnline(req, resp, &context)) {
1664
0
    return;
1665
0
  }
1666
1667
219
  RPC_CHECK_AND_RETURN_ERROR(req->has_tablet_id(),
1668
219
                             STATUS(InvalidArgument,
1669
219
                                    "Tablet ID is required to set the log replicated index"),
1670
219
                             resp->mutable_error(),
1671
219
                             CDCErrorPB::INVALID_REQUEST,
1672
219
                             context);
1673
1674
219
  RPC_CHECK_AND_RETURN_ERROR(req->has_replicated_index(),
1675
219
                             STATUS(InvalidArgument,
1676
219
                                    "Replicated index is required to set the log replicated index"),
1677
219
                             resp->mutable_error(),
1678
219
                             CDCErrorPB::INVALID_REQUEST,
1679
219
                             context);
1680
1681
219
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1682
219
  RPC_STATUS_RETURN_ERROR(tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer),
1683
219
                          resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1684
1685
219
  RPC_CHECK_AND_RETURN_ERROR(tablet_peer->log_available(),
1686
219
                             STATUS(TryAgain, "Tablet peer is not ready to set its log cdc index"),
1687
219
                             resp->mutable_error(),
1688
219
                             CDCErrorPB::INTERNAL_ERROR,
1689
219
                             context);
1690
1691
219
  RPC_STATUS_RETURN_ERROR(tablet_peer->set_cdc_min_replicated_index(req->replicated_index()),
1692
219
                          resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1693
1694
219
  auto status = DoUpdateCDCConsumerOpId(tablet_peer,
1695
219
                                        OpId(req->replicated_term(), req->replicated_index()),
1696
219
                                        req->tablet_id());
1697
1698
219
  RPC_STATUS_RETURN_ERROR(status, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1699
1700
219
  {
1701
219
    RequestScope request_scope;
1702
219
    auto txn_participant = tablet_peer->tablet()->transaction_participant();
1703
219
    if (txn_participant) {
1704
219
      VLOG(1) << "Registering and unregistering request so that transactions are "
1705
0
                   "cleaned up on followers.";
1706
219
      request_scope = RequestScope(txn_participant);
1707
219
    }
1708
219
  }
1709
1710
219
  context.RespondSuccess();
1711
219
}
1712
1713
0
Result<OpId> CDCServiceImpl::TabletLeaderLatestEntryOpId(const TabletId& tablet_id) {
1714
0
    auto ts_leader = VERIFY_RESULT(GetLeaderTServer(tablet_id));
1715
1716
0
    auto cdc_proxy = GetCDCServiceProxy(ts_leader);
1717
0
    rpc::RpcController rpc;
1718
0
    rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms));
1719
0
    GetLatestEntryOpIdRequestPB req;
1720
0
    GetLatestEntryOpIdResponsePB resp;
1721
0
    req.set_tablet_id(tablet_id);
1722
0
    auto status = cdc_proxy->GetLatestEntryOpId(req, &resp, &rpc);
1723
0
    if (!status.ok()) {
1724
      // If we failed to get the latest entry op id, we try other tservers. The leader is guaranteed
1725
      // to have the most up-to-date information, but for our purposes, it's ok to be slightly
1726
      // behind.
1727
0
      std::vector<client::internal::RemoteTabletServer *> servers;
1728
0
      auto s = GetTServers(tablet_id, &servers);
1729
0
      for (const auto& server : servers) {
1730
        // We don't want to try the leader again.
1731
0
        if (server->permanent_uuid() == ts_leader->permanent_uuid()) {
1732
0
          continue;
1733
0
        }
1734
0
        auto follower_cdc_proxy = GetCDCServiceProxy(server);
1735
0
        status = follower_cdc_proxy->GetLatestEntryOpId(req, &resp, &rpc);
1736
0
        if (status.ok()) {
1737
0
          return OpId::FromPB(resp.op_id());
1738
0
        }
1739
0
      }
1740
0
      DCHECK(!status.ok());
1741
0
      return status;
1742
0
    }
1743
0
    return OpId::FromPB(resp.op_id());
1744
0
  }
1745
1746
void CDCServiceImpl::GetLatestEntryOpId(const GetLatestEntryOpIdRequestPB* req,
1747
                                        GetLatestEntryOpIdResponsePB* resp,
1748
0
                                        rpc::RpcContext context) {
1749
0
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1750
0
  Status s = tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer);
1751
0
  RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1752
1753
0
  if (!tablet_peer->log_available()) {
1754
0
    const string err_message = strings::Substitute("Unable to get the latest entry op id from "
1755
0
        "peer $0 and tablet $1 because its log object hasn't been initialized",
1756
0
        tablet_peer->permanent_uuid(), tablet_peer->tablet_id());
1757
0
    LOG(WARNING) << err_message;
1758
0
    SetupErrorAndRespond(resp->mutable_error(),
1759
0
                         STATUS(ServiceUnavailable, err_message),
1760
0
                         CDCErrorPB::INTERNAL_ERROR,
1761
0
                         &context);
1762
0
    return;
1763
0
  }
1764
0
  OpId op_id = tablet_peer->log()->GetLatestEntryOpId();
1765
0
  op_id.ToPB(resp->mutable_op_id());
1766
0
  context.RespondSuccess();
1767
0
}
1768
1769
void CDCServiceImpl::GetCDCDBStreamInfo(const GetCDCDBStreamInfoRequestPB* req,
1770
                                        GetCDCDBStreamInfoResponsePB* resp,
1771
0
                                        rpc::RpcContext context) {
1772
0
  if (!CheckOnline(req, resp, &context)) {
1773
0
    return;
1774
0
  }
1775
1776
0
  LOG(INFO) << "Received GetCDCDBStreamInfo request " << req->ShortDebugString();
1777
1778
0
  RPC_CHECK_AND_RETURN_ERROR(
1779
0
    req->has_db_stream_id(),
1780
0
    STATUS(InvalidArgument, "Database Stream ID is required to get DB stream information"),
1781
0
    resp->mutable_error(),
1782
0
    CDCErrorPB::INVALID_REQUEST,
1783
0
    context);
1784
1785
0
  std::vector<pair<std::string, std::string>> db_stream_info;
1786
0
  Status s = client()->GetCDCDBStreamInfo(req->db_stream_id(), &db_stream_info);
1787
0
  RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1788
1789
0
  for (const auto& tabinfo : db_stream_info) {
1790
0
    auto* const table_info = resp->add_table_info();
1791
0
    table_info->set_stream_id(tabinfo.first);
1792
0
    table_info->set_table_id(tabinfo.second);
1793
0
  }
1794
1795
0
  context.RespondSuccess();
1796
0
}
1797
1798
307
void CDCServiceImpl::RollbackPartialCreate(const CDCCreationState& creation_state) {
1799
307
  if (!creation_state.created_cdc_streams.empty()) {
1800
0
    Status s = client()->DeleteCDCStream(creation_state.created_cdc_streams);
1801
0
    if (!s.ok()) {
1802
0
      LOG(WARNING) << "Unable to delete streams " << JoinCSVLine(creation_state.created_cdc_streams)
1803
0
                   << ": " << s;
1804
0
    }
1805
0
  }
1806
1807
  // For all tablets we modified state for, reverse those changes if the operation failed
1808
  // halfway through.
1809
307
  if (creation_state.producer_entries_modified.empty()) {
1810
307
    return;
1811
307
  }
1812
0
  std::lock_guard<decltype(mutex_)> l(mutex_);
1813
0
  impl_->EraseTablets(creation_state.producer_entries_modified, false);
1814
0
  for (const auto& entry : creation_state.producer_entries_modified) {
1815
0
    WARN_NOT_OK(
1816
0
        UpdatePeersCdcMinReplicatedIndex(entry.tablet_id, numeric_limits<uint64_t>::max()),
1817
0
        "Unable to update tablet " + entry.tablet_id);
1818
0
  }
1819
1820
0
}
1821
1822
void CDCServiceImpl::BootstrapProducer(const BootstrapProducerRequestPB* req,
1823
                                       BootstrapProducerResponsePB* resp,
1824
0
                                       rpc::RpcContext context) {
1825
0
  LOG(INFO) << "Received BootstrapProducer request " << req->ShortDebugString();
1826
0
  RPC_CHECK_AND_RETURN_ERROR(req->table_ids().size() > 0,
1827
0
                             STATUS(InvalidArgument, "Table ID is required to create CDC stream"),
1828
0
                             resp->mutable_error(),
1829
0
                             CDCErrorPB::INVALID_REQUEST,
1830
0
                             context);
1831
1832
0
  std::shared_ptr<client::TableHandle> cdc_state_table;
1833
1834
0
  std::vector<client::YBOperationPtr> ops;
1835
0
  auto session = client()->NewSession();
1836
1837
  // Used to delete streams in case of failure.
1838
0
  CDCCreationState creation_state;
1839
0
  auto scope_exit = ScopeExit([this, &creation_state] {
1840
0
    RollbackPartialCreate(creation_state);
1841
0
  });
1842
1843
0
  std::vector<CDCStreamId> bootstrap_ids;
1844
1845
0
  for (const auto& table_id : req->table_ids()) {
1846
0
    std::shared_ptr<client::YBTable> table;
1847
0
    Status s = client()->OpenTable(table_id, &table);
1848
0
    RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::TABLE_NOT_FOUND, context);
1849
1850
    // Generate a bootstrap id by calling CreateCDCStream, and also setup the stream in the master.
1851
    // If the consumer's master sends a CreateCDCStream with a bootstrap id, the producer's master
1852
    // will verify that the stream id exists and return success if it does since everything else
1853
    // has already been done by this call.
1854
0
    std::unordered_map<std::string, std::string> options;
1855
0
    options.reserve(4);
1856
0
    options.emplace(cdc::kRecordType, CDCRecordType_Name(cdc::CDCRecordType::CHANGE));
1857
0
    options.emplace(cdc::kRecordFormat, CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL));
1858
0
    options.emplace(cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER));
1859
0
    options.emplace(cdc::kCheckpointType, CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT));
1860
1861
    // Mark this stream as being bootstrapped, to help in finding dangling streams.
1862
0
    auto result = client()->CreateCDCStream(table_id, options, false);
1863
0
    RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(),
1864
0
                               CDCErrorPB::INTERNAL_ERROR, context);
1865
0
    const std::string& bootstrap_id = *result;
1866
0
    creation_state.created_cdc_streams.push_back(bootstrap_id);
1867
1868
0
    if (cdc_state_table == nullptr) {
1869
0
      auto res = GetCdcStateTable();
1870
0
      RPC_CHECK_AND_RETURN_ERROR(res.ok(), res.status(), resp->mutable_error(),
1871
0
          CDCErrorPB::INTERNAL_ERROR, context);
1872
0
      cdc_state_table = *res;
1873
0
    }
1874
1875
0
    google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
1876
0
    s = client()->GetTabletsFromTableId(table_id, 0, &tablets);
1877
0
    RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::TABLE_NOT_FOUND, context);
1878
1879
    // For each tablet, create a row in cdc_state table containing the generated bootstrap id, and
1880
    // the latest op id in the logs.
1881
0
    for (const auto &tablet : tablets) {
1882
0
      std::shared_ptr<tablet::TabletPeer> tablet_peer;
1883
0
      OpId op_id;
1884
1885
0
      s = tablet_manager_->GetTabletPeer(tablet.tablet_id(), &tablet_peer);
1886
0
      if (!s.ok()) {
1887
0
        auto res = TabletLeaderLatestEntryOpId(tablet.tablet_id());
1888
0
        RPC_CHECK_AND_RETURN_ERROR(res.ok(), res.status(), resp->mutable_error(),
1889
0
            CDCErrorPB::INTERNAL_ERROR, context);
1890
0
        op_id = *res;
1891
0
      } else {
1892
0
        if (!tablet_peer->log_available()) {
1893
0
          const string err_message = strings::Substitute("Unable to get the latest entry op id "
1894
0
              "from peer $0 and tablet $1 because its log object hasn't been initialized",
1895
0
              tablet_peer->permanent_uuid(), tablet_peer->tablet_id());
1896
0
          LOG(WARNING) << err_message;
1897
0
          SetupErrorAndRespond(resp->mutable_error(),
1898
0
                               STATUS(ServiceUnavailable, err_message),
1899
0
                               CDCErrorPB::INTERNAL_ERROR,
1900
0
                               &context);
1901
0
          return;
1902
0
        }
1903
0
        op_id = tablet_peer->log()->GetLatestEntryOpId();
1904
0
        RPC_STATUS_RETURN_ERROR(UpdatePeersCdcMinReplicatedIndex(tablet.tablet_id(), op_id.index),
1905
0
                                resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR,
1906
0
                                context);
1907
0
      }
1908
1909
0
      const auto op = cdc_state_table->NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
1910
0
      auto *const write_req = op->mutable_request();
1911
1912
0
      QLAddStringHashValue(write_req, tablet.tablet_id());
1913
0
      QLAddStringRangeValue(write_req, bootstrap_id);
1914
0
      cdc_state_table->AddStringColumnValue(write_req, master::kCdcCheckpoint, op_id.ToString());
1915
0
      ops.push_back(std::move(op));
1916
0
      impl_->AddTabletCheckpoint(
1917
0
          op_id, bootstrap_id, tablet.tablet_id(), &creation_state.producer_entries_modified);
1918
0
    }
1919
0
    bootstrap_ids.push_back(std::move(bootstrap_id));
1920
0
  }
1921
0
  CoarseTimePoint deadline = GetDeadline(context, client());
1922
1923
0
  session->SetDeadline(deadline);
1924
0
  Status s = RefreshCacheOnFail(session->ApplyAndFlush(ops));
1925
0
  RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1926
1927
0
  for (const auto& bootstrap_id : bootstrap_ids) {
1928
0
    resp->add_cdc_bootstrap_ids(bootstrap_id);
1929
0
  }
1930
  // Clear these vectors so no changes are reversed by scope_exit since we succeeded.
1931
0
  creation_state.Clear();
1932
0
  context.RespondSuccess();
1933
0
}
1934
1935
188
void CDCServiceImpl::Shutdown() {
1936
188
  if (impl_->async_client_init_) {
1937
96
    impl_->async_client_init_->Shutdown();
1938
96
    rpcs_.Shutdown();
1939
96
    {
1940
96
      std::lock_guard<decltype(mutex_)> l(mutex_);
1941
96
      cdc_service_stopped_ = true;
1942
96
      cdc_state_table_ = nullptr;
1943
96
    }
1944
96
    if (update_peers_and_metrics_thread_) {
1945
96
      update_peers_and_metrics_thread_->join();
1946
96
    }
1947
96
    impl_->async_client_init_ = boost::none;
1948
96
  }
1949
188
}
1950
1951
Result<OpId> CDCServiceImpl::GetLastCheckpoint(
1952
    const ProducerTabletInfo& producer_tablet,
1953
619
    const client::YBSessionPtr& session) {
1954
619
  auto result = impl_->GetLastCheckpoint(producer_tablet);
1955
619
  if (result) {
1956
4
    return *result;
1957
4
  }
1958
1959
615
  auto cdc_state_table_result = GetCdcStateTable();
1960
615
  RETURN_NOT_OK(cdc_state_table_result);
1961
1962
615
  const auto op = (*cdc_state_table_result)->NewReadOp();
1963
615
  auto* const req = op->mutable_request();
1964
615
  DCHECK(!producer_tablet.stream_id.empty() && !producer_tablet.tablet_id.empty());
1965
615
  QLAddStringHashValue(req, producer_tablet.tablet_id);
1966
1967
615
  auto cond = req->mutable_where_expr()->mutable_condition();
1968
615
  cond->set_op(QLOperator::QL_OP_AND);
1969
615
  QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx,
1970
615
      QL_OP_EQUAL, producer_tablet.stream_id);
1971
615
  req->mutable_column_refs()->add_ids(Schema::first_column_id() + master::kCdcTabletIdIdx);
1972
615
  req->mutable_column_refs()->add_ids(Schema::first_column_id() + master::kCdcStreamIdIdx);
1973
615
  (*cdc_state_table_result)->AddColumns({master::kCdcCheckpoint}, req);
1974
1975
615
  RETURN_NOT_OK(RefreshCacheOnFail(session->ReadSync(op)));
1976
615
  auto row_block = ql::RowsResult(op.get()).GetRowBlock();
1977
615
  if (row_block->row_count() == 0) {
1978
0
    return OpId(0, 0);
1979
0
  }
1980
1981
615
  DCHECK_EQ(row_block->row_count(), 1);
1982
615
  DCHECK_EQ(row_block->row(0).column(0).type(), InternalType::kStringValue);
1983
1984
615
  return OpId::FromString(row_block->row(0).column(0).string_value());
1985
615
}
1986
1987
void CDCServiceImpl::UpdateCDCTabletMetrics(
1988
    const GetChangesResponsePB* resp,
1989
    const ProducerTabletInfo& producer_tablet,
1990
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
1991
    const OpId& op_id,
1992
641
    int64_t last_readable_index) {
1993
641
  auto tablet_metric = GetCDCTabletMetrics(producer_tablet, tablet_peer);
1994
641
  if (!tablet_metric) {
1995
0
    return;
1996
0
  }
1997
1998
641
  auto lid = resp->checkpoint().op_id();
1999
641
  tablet_metric->last_read_opid_term->set_value(lid.term());
2000
641
  tablet_metric->last_read_opid_index->set_value(lid.index());
2001
641
  tablet_metric->last_readable_opid_index->set_value(last_readable_index);
2002
641
  tablet_metric->last_checkpoint_opid_index->set_value(op_id.index);
2003
641
  if (resp->records_size() > 0) {
2004
0
    auto& last_record = resp->records(resp->records_size() - 1);
2005
0
    tablet_metric->last_read_hybridtime->set_value(last_record.time());
2006
0
    auto last_record_micros = HybridTime(last_record.time()).GetPhysicalValueMicros();
2007
0
    tablet_metric->last_read_physicaltime->set_value(last_record_micros);
2008
    // Only count bytes responded if we are including a response payload.
2009
0
    tablet_metric->rpc_payload_bytes_responded->Increment(resp->ByteSize());
2010
    // Get the physical time of the last committed record on producer.
2011
0
    auto last_replicated_micros = GetLastReplicatedTime(tablet_peer);
2012
0
    tablet_metric->async_replication_sent_lag_micros->set_value(
2013
0
        last_replicated_micros - last_record_micros);
2014
0
    auto& first_record = resp->records(0);
2015
0
    auto first_record_micros = HybridTime(first_record.time()).GetPhysicalValueMicros();
2016
0
    tablet_metric->last_checkpoint_physicaltime->set_value(first_record_micros);
2017
0
    tablet_metric->async_replication_committed_lag_micros->set_value(
2018
0
        last_replicated_micros - first_record_micros);
2019
641
  } else {
2020
641
    tablet_metric->rpc_heartbeats_responded->Increment();
2021
    // If there are no more entries to be read, that means we're caught up.
2022
641
    auto last_replicated_micros = GetLastReplicatedTime(tablet_peer);
2023
641
    tablet_metric->last_read_physicaltime->set_value(last_replicated_micros);
2024
641
    tablet_metric->last_checkpoint_physicaltime->set_value(last_replicated_micros);
2025
641
    tablet_metric->async_replication_sent_lag_micros->set_value(0);
2026
641
    tablet_metric->async_replication_committed_lag_micros->set_value(0);
2027
641
  }
2028
641
}
2029
2030
Status CDCServiceImpl::UpdateCheckpoint(const ProducerTabletInfo& producer_tablet,
2031
                                        const OpId& sent_op_id,
2032
                                        const OpId& commit_op_id,
2033
                                        const client::YBSessionPtr& session,
2034
631
                                        uint64_t last_record_hybrid_time) {
2035
631
  bool update_cdc_state = impl_->UpdateCheckpoint(producer_tablet, sent_op_id, commit_op_id);
2036
2037
631
  if (update_cdc_state) {
2038
0
    auto cdc_state = VERIFY_RESULT(GetCdcStateTable());
2039
0
    const auto op = cdc_state->NewUpdateOp();
2040
0
    auto* const req = op->mutable_request();
2041
0
    DCHECK(!producer_tablet.stream_id.empty() && !producer_tablet.tablet_id.empty());
2042
0
    QLAddStringHashValue(req, producer_tablet.tablet_id);
2043
0
    QLAddStringRangeValue(req, producer_tablet.stream_id);
2044
2045
0
    cdc_state->AddStringColumnValue(req, master::kCdcCheckpoint, commit_op_id.ToString());
2046
    // If we have a last record hybrid time, use that for physical time. If not, it means we're
2047
    // caught up, so the current time.
2048
0
    uint64_t last_replication_time_micros = last_record_hybrid_time != 0 ?
2049
0
        HybridTime(last_record_hybrid_time).GetPhysicalValueMicros() : GetCurrentTimeMicros();
2050
0
    cdc_state->AddTimestampColumnValue(req, master::kCdcLastReplicationTime,
2051
0
                                       last_replication_time_micros);
2052
    // Only perform the update if we have a row in cdc_state to prevent a race condition where
2053
    // a stream is deleted and then this logic inserts entries in cdc_state from that deleted
2054
    // stream.
2055
0
    auto* condition = req->mutable_if_expr()->mutable_condition();
2056
0
    condition->set_op(QL_OP_EXISTS);
2057
0
    RETURN_NOT_OK(RefreshCacheOnFail(session->ApplyAndFlush(op)));
2058
0
  }
2059
2060
631
  return Status::OK();
2061
631
}
2062
2063
std::shared_ptr<CDCTabletMetrics> CDCServiceImpl::GetCDCTabletMetrics(
2064
    const ProducerTabletInfo& producer,
2065
881
    std::shared_ptr<tablet::TabletPeer> tablet_peer) {
2066
  // 'nullptr' not recommended: using for tests.
2067
881
  if (tablet_peer == nullptr) {
2068
0
    auto status = tablet_manager_->GetTabletPeer(producer.tablet_id, &tablet_peer);
2069
0
    if (!status.ok() || tablet_peer == nullptr) return nullptr;
2070
0
  }
2071
2072
881
  auto tablet = tablet_peer->shared_tablet();
2073
881
  if (tablet == nullptr) 
return nullptr0
;
2074
2075
881
  std::string key = "CDCMetrics::" + producer.stream_id;
2076
881
  std::shared_ptr<void> metrics_raw = tablet->GetAdditionalMetadata(key);
2077
881
  if (metrics_raw == nullptr) {
2078
    //  Create a new METRIC_ENTITY_cdc here.
2079
444
    MetricEntity::AttributeMap attrs;
2080
444
    {
2081
444
      SharedLock<rw_spinlock> l(mutex_);
2082
444
      auto raft_group_metadata = tablet_peer->tablet()->metadata();
2083
444
      attrs["table_id"] = raft_group_metadata->table_id();
2084
444
      attrs["namespace_name"] = raft_group_metadata->namespace_name();
2085
444
      attrs["table_name"] = raft_group_metadata->table_name();
2086
444
      attrs["stream_id"] = producer.stream_id;
2087
444
    }
2088
444
    auto entity = METRIC_ENTITY_cdc.Instantiate(metric_registry_, producer.MetricsString(), attrs);
2089
444
    metrics_raw = std::make_shared<CDCTabletMetrics>(entity);
2090
    // Adding the new metric to the tablet so it maintains the same lifetime scope.
2091
444
    tablet->AddAdditionalMetadata(key, metrics_raw);
2092
444
  }
2093
2094
881
  return std::static_pointer_cast<CDCTabletMetrics>(metrics_raw);
2095
881
}
2096
2097
648
Result<std::shared_ptr<StreamMetadata>> CDCServiceImpl::GetStream(const std::string& stream_id) {
2098
648
  auto stream = GetStreamMetadataFromCache(stream_id);
2099
648
  if (stream != nullptr) {
2100
647
    return stream;
2101
647
  }
2102
2103
  // Look up stream in sys catalog.
2104
1
  std::vector<ObjectId> object_ids;
2105
1
  NamespaceId ns_id;
2106
1
  std::unordered_map<std::string, std::string> options;
2107
1
  RETURN_NOT_OK(client()->GetCDCStream(stream_id, &ns_id, &object_ids, &options));
2108
2109
0
  auto stream_metadata = std::make_shared<StreamMetadata>();
2110
0
  for (const auto& option : options) {
2111
0
    if (option.first == kRecordType) {
2112
0
      SCHECK(CDCRecordType_Parse(option.second, &stream_metadata->record_type),
2113
0
             IllegalState, "CDC record type parsing error");
2114
0
    } else if (option.first == kRecordFormat) {
2115
0
      SCHECK(CDCRecordFormat_Parse(option.second, &stream_metadata->record_format),
2116
0
             IllegalState, "CDC record format parsing error");
2117
0
    } else if (option.first == kSourceType) {
2118
0
      SCHECK(CDCRequestSource_Parse(option.second, &stream_metadata->source_type), IllegalState,
2119
0
             "CDC record format parsing error");
2120
0
    } else if (option.first == kCheckpointType) {
2121
0
      SCHECK(CDCCheckpointType_Parse(option.second, &stream_metadata->checkpoint_type),
2122
0
             IllegalState, "CDC record format parsing error");
2123
0
    } else if (option.first == cdc::kIdType && option.second == cdc::kNamespaceId) {
2124
0
      stream_metadata->ns_id = ns_id;
2125
0
      stream_metadata->table_ids.insert(
2126
0
          stream_metadata->table_ids.end(), object_ids.begin(), object_ids.end());
2127
0
    } else if (option.first == cdc::kIdType && option.second == cdc::kTableId) {
2128
0
      stream_metadata->table_ids.insert(
2129
0
          stream_metadata->table_ids.end(), object_ids.begin(), object_ids.end());
2130
0
    } else {
2131
0
      LOG(WARNING) << "Unsupported CDC option: " << option.first;
2132
0
    }
2133
0
  }
2134
2135
0
  AddStreamMetadataToCache(stream_id, stream_metadata);
2136
0
  return stream_metadata;
2137
0
}
2138
2139
void CDCServiceImpl::AddStreamMetadataToCache(const std::string& stream_id,
2140
306
                                              const std::shared_ptr<StreamMetadata>& metadata) {
2141
306
  std::lock_guard<decltype(mutex_)> l(mutex_);
2142
306
  stream_metadata_.emplace(stream_id, metadata);
2143
306
}
2144
2145
std::shared_ptr<StreamMetadata> CDCServiceImpl::GetStreamMetadataFromCache(
2146
648
    const std::string& stream_id) {
2147
648
  SharedLock<decltype(mutex_)> l(mutex_);
2148
648
  auto it = stream_metadata_.find(stream_id);
2149
648
  if (it != stream_metadata_.end()) {
2150
647
    return it->second;
2151
647
  } else {
2152
1
    return nullptr;
2153
1
  }
2154
648
}
2155
2156
656
Status CDCServiceImpl::CheckTabletValidForStream(const ProducerTabletInfo& info) {
2157
656
  auto result = VERIFY_RESULT(impl_->PreCheckTabletValidForStream(info));
2158
656
  if (result) {
2159
655
    return Status::OK();
2160
655
  }
2161
  // If we don't recognize the stream_id, populate our full tablet list for this stream.
2162
1
  auto tablets = 
VERIFY_RESULT0
(GetTablets(info.stream_id));0
2163
0
  return impl_->CheckTabletValidForStream(info, tablets);
2164
1
}
2165
2166
}  // namespace cdc
2167
}  // namespace yb