YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdc_service.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/cdc/cdc_service.h"
14
15
#include <chrono>
16
#include <memory>
17
18
#include <boost/multi_index/hashed_index.hpp>
19
#include <boost/multi_index/mem_fun.hpp>
20
#include <boost/multi_index/member.hpp>
21
#include <boost/multi_index_container.hpp>
22
23
#include "yb/cdc/cdc_producer.h"
24
#include "yb/cdc/cdc_rpc.h"
25
#include "yb/cdc/cdc_service.proxy.h"
26
27
#include "yb/client/client.h"
28
#include "yb/client/meta_cache.h"
29
#include "yb/client/schema.h"
30
#include "yb/client/session.h"
31
#include "yb/client/table.h"
32
#include "yb/client/table_alterer.h"
33
#include "yb/client/table_handle.h"
34
#include "yb/client/yb_op.h"
35
#include "yb/client/yb_table_name.h"
36
37
#include "yb/common/entity_ids.h"
38
#include "yb/common/pg_system_attr.h"
39
#include "yb/common/ql_expr.h"
40
#include "yb/common/ql_value.h"
41
#include "yb/common/schema.h"
42
#include "yb/common/wire_protocol.h"
43
44
#include "yb/consensus/log.h"
45
#include "yb/consensus/raft_consensus.h"
46
#include "yb/consensus/replicate_msgs_holder.h"
47
48
#include "yb/gutil/dynamic_annotations.h"
49
#include "yb/gutil/strings/join.h"
50
51
#include "yb/master/master_client.pb.h"
52
#include "yb/master/master_ddl.pb.h"
53
#include "yb/master/master_defaults.h"
54
55
#include "yb/rpc/rpc_context.h"
56
#include "yb/rpc/rpc_controller.h"
57
58
#include "yb/tablet/tablet_metadata.h"
59
#include "yb/tablet/tablet_peer.h"
60
#include "yb/tablet/transaction_participant.h"
61
62
#include "yb/tserver/tablet_server.h"
63
#include "yb/tserver/ts_tablet_manager.h"
64
65
#include "yb/util/debug/trace_event.h"
66
#include "yb/util/flag_tags.h"
67
#include "yb/util/format.h"
68
#include "yb/util/logging.h"
69
#include "yb/util/metrics.h"
70
#include "yb/util/monotime.h"
71
#include "yb/util/scope_exit.h"
72
#include "yb/util/shared_lock.h"
73
#include "yb/util/status_format.h"
74
#include "yb/util/status_log.h"
75
#include "yb/util/trace.h"
76
77
#include "yb/yql/cql/ql/util/statement_result.h"
78
79
constexpr uint32_t kUpdateIntervalMs = 15 * 1000;
80
81
DEFINE_int32(cdc_read_rpc_timeout_ms, 30 * 1000,
82
             "Timeout used for CDC read rpc calls.  Reads normally occur cross-cluster.");
83
TAG_FLAG(cdc_read_rpc_timeout_ms, advanced);
84
85
DEFINE_int32(cdc_write_rpc_timeout_ms, 30 * 1000,
86
             "Timeout used for CDC write rpc calls.  Writes normally occur intra-cluster.");
87
TAG_FLAG(cdc_write_rpc_timeout_ms, advanced);
88
89
DEFINE_int32(cdc_ybclient_reactor_threads, 50,
90
             "The number of reactor threads to be used for processing ybclient "
91
             "requests for CDC.");
92
TAG_FLAG(cdc_ybclient_reactor_threads, advanced);
93
94
DEFINE_int32(cdc_state_checkpoint_update_interval_ms, kUpdateIntervalMs,
95
             "Rate at which CDC state's checkpoint is updated.");
96
97
DEFINE_string(certs_for_cdc_dir, "",
98
              "The parent directory of where all certificates for xCluster producer universes will "
99
              "be stored, for when the producer and consumer clusters use different certificates. "
100
              "Place the certificates for each producer cluster in "
101
              "<certs_for_cdc_dir>/<producer_cluster_id>/*.");
102
103
DEFINE_int32(update_min_cdc_indices_interval_secs, 60,
104
             "How often to read cdc_state table to get the minimum applied index for each tablet "
105
             "across all streams. This information is used to correctly keep log files that "
106
             "contain unapplied entries. This is also the rate at which a tablet's minimum "
107
             "replicated index across all streams is sent to the other peers in the configuration. "
108
             "If flag enable_log_retention_by_op_idx is disabled, this flag has no effect.");
109
110
DEFINE_int32(update_metrics_interval_ms, kUpdateIntervalMs,
111
             "How often to update xDC cluster metrics.");
112
113
DEFINE_bool(enable_cdc_state_table_caching, true, "Enable caching the cdc_state table schema.");
114
115
DEFINE_bool(enable_collect_cdc_metrics, true, "Enable collecting cdc metrics.");
116
117
DEFINE_double(cdc_read_safe_deadline_ratio, .10,
118
              "When the heartbeat deadline has this percentage of time remaining, "
119
              "the master should halt tablet report processing so it can respond in time.");
120
121
DECLARE_bool(enable_log_retention_by_op_idx);
122
123
DECLARE_int32(cdc_checkpoint_opid_interval_ms);
124
125
METRIC_DEFINE_entity(cdc);
126
127
namespace yb {
128
namespace cdc {
129
130
using namespace std::literals;
131
132
using rpc::RpcContext;
133
using tserver::TSTabletManager;
134
using client::internal::RemoteTabletServer;
135
136
constexpr int kMaxDurationForTabletLookup = 50;
137
const client::YBTableName kCdcStateTableName(
138
    YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
139
140
namespace {
141
142
// These are guarded by lock_.
143
// Map of checkpoints that have been sent to CDC consumer and stored in cdc_state.
144
struct TabletCheckpointInfo {
145
 public:
146
  ProducerTabletInfo producer_tablet_info;
147
148
  mutable TabletCheckpoint cdc_state_checkpoint;
149
  mutable TabletCheckpoint sent_checkpoint;
150
  mutable MemTrackerPtr mem_tracker;
151
152
8.45k
  const TabletId& tablet_id() const {
153
8.45k
    return producer_tablet_info.tablet_id;
154
8.45k
  }
155
156
7.58k
  const CDCStreamId& stream_id() const {
157
7.58k
    return producer_tablet_info.stream_id;
158
7.58k
  }
159
};
160
161
struct CDCStateMetadataInfo {
162
  ProducerTabletInfo producer_tablet_info;
163
164
  mutable std::string commit_timestamp;
165
  mutable std::shared_ptr<Schema>  current_schema;
166
  mutable OpId last_streamed_op_id;
167
168
  std::shared_ptr<MemTracker> mem_tracker;
169
170
206
  const TableId& tablet_id() const {
171
206
    return producer_tablet_info.tablet_id;
172
206
  }
173
174
226
  const CDCStreamId& stream_id() const {
175
226
    return producer_tablet_info.stream_id;
176
226
  }
177
178
};
179
180
class TabletTag;
181
class StreamTag;
182
183
using TabletCheckpoints = boost::multi_index_container <
184
    TabletCheckpointInfo,
185
    boost::multi_index::indexed_by <
186
        boost::multi_index::hashed_unique <
187
            boost::multi_index::member <
188
                TabletCheckpointInfo, ProducerTabletInfo,
189
                &TabletCheckpointInfo::producer_tablet_info
190
            >
191
        >,
192
        boost::multi_index::hashed_non_unique <
193
            boost::multi_index::tag <TabletTag>,
194
            boost::multi_index::const_mem_fun <
195
                TabletCheckpointInfo, const TabletId&, &TabletCheckpointInfo::tablet_id
196
            >
197
        >,
198
        boost::multi_index::hashed_non_unique <
199
            boost::multi_index::tag <StreamTag>,
200
            boost::multi_index::const_mem_fun <
201
                TabletCheckpointInfo, const CDCStreamId&, &TabletCheckpointInfo::stream_id
202
            >
203
        >
204
    >
205
>;
206
207
using CDCStateMetadata = boost::multi_index_container <
208
    CDCStateMetadataInfo,
209
    boost::multi_index::indexed_by <
210
        boost::multi_index::hashed_unique <
211
            boost::multi_index::member <
212
                CDCStateMetadataInfo, ProducerTabletInfo,
213
                &CDCStateMetadataInfo::producer_tablet_info>
214
        >,
215
        boost::multi_index::hashed_non_unique <
216
            boost::multi_index::tag <TabletTag>,
217
            boost::multi_index::const_mem_fun <
218
                CDCStateMetadataInfo, const TabletId&, &CDCStateMetadataInfo::tablet_id
219
            >
220
        >,
221
        boost::multi_index::hashed_non_unique <
222
            boost::multi_index::tag <StreamTag>,
223
            boost::multi_index::const_mem_fun <
224
                CDCStateMetadataInfo, const CDCStreamId&, &CDCStateMetadataInfo::stream_id
225
            >
226
        >
227
    >
228
>;
229
230
} // namespace
231
232
class CDCServiceImpl::Impl {
233
 public:
234
5.81k
  explicit Impl(TSTabletManager* tablet_manager, rw_spinlock* mutex) : mutex_(*mutex) {
235
5.81k
    const auto server = tablet_manager->server();
236
5.81k
    async_client_init_.emplace(
237
5.81k
        "cdc_client", FLAGS_cdc_ybclient_reactor_threads, FLAGS_cdc_read_rpc_timeout_ms / 1000,
238
5.81k
        server->permanent_uuid(), &server->options(), server->metric_entity(),
239
5.81k
        server->mem_tracker(), server->messenger());
240
5.81k
    async_client_init_->Start();
241
5.81k
  }
242
243
  void UpdateCDCStateMetadata(
244
      const ProducerTabletInfo& producer_tablet,
245
      const std::string& timestamp,
246
      const std::shared_ptr<Schema>& schema,
247
319
      const OpId& op_id) {
248
319
    std::lock_guard<decltype(mutex_)> l(mutex_);
249
319
    auto it = cdc_state_metadata_.find(producer_tablet);
250
319
    if (it == cdc_state_metadata_.end()) {
251
0
      LOG(DFATAL) << "Failed to update the cdc state metadata for tablet id: "
252
0
                  << producer_tablet.tablet_id;
253
0
      return;
254
0
    }
255
319
    it->commit_timestamp = timestamp;
256
319
    it->current_schema = schema;
257
319
    it->last_streamed_op_id = op_id;
258
319
  }
259
260
319
  std::shared_ptr<Schema> GetOrAddSchema(const ProducerTabletInfo& producer_tablet) {
261
319
    std::lock_guard<decltype(mutex_)> l(mutex_);
262
319
    auto it = cdc_state_metadata_.find(producer_tablet);
263
264
319
    if (it != cdc_state_metadata_.end()) {
265
159
      return it->current_schema;
266
159
    }
267
160
    CDCStateMetadataInfo info = CDCStateMetadataInfo {
268
160
      .producer_tablet_info = producer_tablet,
269
160
      .current_schema = std::make_shared<Schema>()
270
160
    };
271
160
    cdc_state_metadata_.emplace(info);
272
160
    return info.current_schema;
273
160
  }
274
275
  void AddTabletCheckpoint(
276
      OpId op_id,
277
      const CDCStreamId& stream_id,
278
      const TabletId& tablet_id,
279
2.54k
      std::vector<ProducerTabletInfo>* producer_entries_modified) {
280
2.54k
    ProducerTabletInfo producer_tablet{
281
2.54k
      .universe_uuid = "",
282
2.54k
      .stream_id = stream_id,
283
2.54k
      .tablet_id = tablet_id
284
2.54k
    };
285
2.54k
    CoarseTimePoint time;
286
2.54k
    if (producer_entries_modified) {
287
2.54k
      producer_entries_modified->push_back(producer_tablet);
288
2.54k
      time = CoarseMonoClock::Now();
289
0
    } else {
290
0
      time = CoarseTimePoint::min();
291
0
    }
292
2.54k
    std::lock_guard<decltype(mutex_)> l(mutex_);
293
2.54k
    if (!producer_entries_modified && tablet_checkpoints_.count(producer_tablet)) {
294
0
      return;
295
0
    }
296
2.54k
    tablet_checkpoints_.emplace(TabletCheckpointInfo {
297
2.54k
      .producer_tablet_info = producer_tablet,
298
2.54k
      .cdc_state_checkpoint = {op_id, time},
299
2.54k
      .sent_checkpoint = {op_id, time},
300
2.54k
    });
301
2.54k
  }
302
303
  void EraseTablets(const std::vector<ProducerTabletInfo>& producer_entries_modified,
304
                    bool erase_cdc_states)
305
0
      NO_THREAD_SAFETY_ANALYSIS {
306
0
    for (const auto& entry : producer_entries_modified) {
307
0
      tablet_checkpoints_.get<TabletTag>().erase(entry.tablet_id);
308
0
      if (erase_cdc_states) {
309
0
        cdc_state_metadata_.get<TabletTag>().erase(entry.tablet_id);
310
0
      }
311
0
    }
312
0
  }
313
314
308
  boost::optional<OpId> GetLastCheckpoint(const ProducerTabletInfo& producer_tablet) {
315
308
    SharedLock<rw_spinlock> lock(mutex_);
316
308
    auto it = tablet_checkpoints_.find(producer_tablet);
317
308
    if (it != tablet_checkpoints_.end()) {
318
      // Use checkpoint from cache only if it is current.
319
308
      if (it->cdc_state_checkpoint.op_id.index > 0 &&
320
2
          !it->cdc_state_checkpoint.ExpiredAt(
321
2
              FLAGS_cdc_state_checkpoint_update_interval_ms * 1ms, CoarseMonoClock::Now())) {
322
2
        return it->cdc_state_checkpoint.op_id;
323
2
      }
324
306
    }
325
306
    return boost::none;
326
306
  }
327
328
  bool UpdateCheckpoint(const ProducerTabletInfo& producer_tablet,
329
                        const OpId& sent_op_id,
330
314
                        const OpId& commit_op_id) {
331
314
    auto now = CoarseMonoClock::Now();
332
333
314
    TabletCheckpoint sent_checkpoint = {
334
314
      .op_id = sent_op_id,
335
314
      .last_update_time = now,
336
314
    };
337
314
    TabletCheckpoint commit_checkpoint = {
338
314
      .op_id = commit_op_id,
339
314
      .last_update_time = now,
340
314
    };
341
342
314
    std::lock_guard<decltype(mutex_)> l(mutex_);
343
314
    auto it = tablet_checkpoints_.find(producer_tablet);
344
314
    if (it != tablet_checkpoints_.end()) {
345
314
      it->sent_checkpoint = sent_checkpoint;
346
347
314
      if (commit_op_id.index > 0) {
348
14
        it->cdc_state_checkpoint.op_id = commit_op_id;
349
14
      }
350
351
      // Check if we need to update cdc_state table.
352
314
      if (!it->cdc_state_checkpoint.ExpiredAt(
353
314
              FLAGS_cdc_state_checkpoint_update_interval_ms * 1ms, now)) {
354
314
        return false;
355
314
      }
356
357
0
      it->cdc_state_checkpoint.last_update_time = now;
358
0
    } else {
359
0
      tablet_checkpoints_.emplace(TabletCheckpointInfo{
360
0
        .producer_tablet_info = producer_tablet,
361
0
        .cdc_state_checkpoint = commit_checkpoint,
362
0
        .sent_checkpoint = sent_checkpoint
363
0
      });
364
0
    }
365
366
0
    return true;
367
314
  }
368
369
317
  OpId GetMinSentCheckpointForTablet(const TabletId& tablet_id) {
370
317
    OpId min_op_id = OpId::Max();
371
372
317
    SharedLock<rw_spinlock> l(mutex_);
373
317
    auto it_range = tablet_checkpoints_.get<TabletTag>().equal_range(tablet_id);
374
317
    if (it_range.first == it_range.second) {
375
0
      LOG(WARNING) << "Tablet ID not found in stream_tablets map: " << tablet_id;
376
0
      return min_op_id;
377
0
    }
378
379
317
    auto cdc_checkpoint_opid_interval = FLAGS_cdc_checkpoint_opid_interval_ms * 1ms;
380
4.21k
    for (auto it = it_range.first; it != it_range.second; ++it) {
381
      // We don't want to include streams that are not being actively polled.
382
      // So, if the stream has not been polled in the last x seconds,
383
      // then we ignore that stream while calculating min op ID.
384
3.90k
      if (!it->sent_checkpoint.ExpiredAt(cdc_checkpoint_opid_interval, CoarseMonoClock::Now()) &&
385
3.90k
          it->sent_checkpoint.op_id.index < min_op_id.index) {
386
389
        min_op_id = it->sent_checkpoint.op_id;
387
389
      }
388
3.90k
    }
389
317
    return min_op_id;
390
317
  }
391
392
  MemTrackerPtr GetMemTracker(
393
      const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
394
319
      const ProducerTabletInfo& producer_info) {
395
319
    {
396
319
      SharedLock<rw_spinlock> l(mutex_);
397
319
      auto it = tablet_checkpoints_.find(producer_info);
398
319
      if (it == tablet_checkpoints_.end()) {
399
0
        return nullptr;
400
0
      }
401
319
      if (it->mem_tracker) {
402
159
        return it->mem_tracker;
403
159
      }
404
160
    }
405
160
    std::lock_guard<rw_spinlock> l(mutex_);
406
160
    auto it = tablet_checkpoints_.find(producer_info);
407
160
    if (it == tablet_checkpoints_.end()) {
408
0
      return nullptr;
409
0
    }
410
160
    if (it->mem_tracker) {
411
0
      return it->mem_tracker;
412
0
    }
413
160
    auto cdc_mem_tracker = MemTracker::FindOrCreateTracker(
414
160
        "CDC", tablet_peer->tablet()->mem_tracker());
415
160
    it->mem_tracker = MemTracker::FindOrCreateTracker(producer_info.stream_id, cdc_mem_tracker);
416
160
    return it->mem_tracker;
417
160
  }
418
419
327
  Result<bool> PreCheckTabletValidForStream(const ProducerTabletInfo& info) {
420
327
    SharedLock<rw_spinlock> l(mutex_);
421
327
    if (tablet_checkpoints_.count(info) != 0) {
422
326
      return true;
423
326
    }
424
1
    if (tablet_checkpoints_.get<StreamTag>().count(info.stream_id) != 0) {
425
      // Did not find matching tablet ID.
426
      // TODO: Add the split tablets in during tablet split?
427
0
      LOG(INFO) << "Tablet ID " << info.tablet_id << " is not part of stream ID " << info.stream_id
428
0
                << ". Repopulating tablet list for this stream.";
429
0
    }
430
1
    return false;
431
1
  }
432
433
  CHECKED_STATUS CheckTabletValidForStream(
434
      const ProducerTabletInfo& info,
435
0
      const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& tablets) {
436
0
    bool found = false;
437
0
    {
438
0
      std::lock_guard<rw_spinlock> l(mutex_);
439
0
      for (const auto &tablet : tablets) {
440
        // Add every tablet in the stream.
441
0
        ProducerTabletInfo producer_info{info.universe_uuid, info.stream_id, tablet.tablet_id()};
442
0
        tablet_checkpoints_.emplace(TabletCheckpointInfo{
443
0
            .producer_tablet_info = producer_info
444
0
        });
445
0
        cdc_state_metadata_.emplace(CDCStateMetadataInfo{
446
0
            .producer_tablet_info = producer_info,
447
0
            .current_schema = std::make_shared<Schema>()
448
0
        });
449
        // If this is the tablet that the user requested.
450
0
        if (tablet.tablet_id() == info.tablet_id) {
451
0
          found = true;
452
0
        }
453
0
      }
454
0
    }
455
0
    return found ? Status::OK()
456
0
                 : STATUS_FORMAT(InvalidArgument, "Tablet ID $0 is not part of stream ID $1",
457
0
                                 info.tablet_id, info.stream_id);
458
0
  }
459
460
0
  boost::optional<OpId> MinOpId(const TabletId& tablet_id) {
461
0
    boost::optional<OpId> result;
462
0
    SharedLock<rw_spinlock> l(mutex_);
463
0
    // right => multimap where keys are tablet_ids and values are stream_ids.
464
0
    // left => multimap where keys are stream_ids and values are tablet_ids.
465
0
    auto it_range = tablet_checkpoints_.get<TabletTag>().equal_range(tablet_id);
466
0
    if (it_range.first != it_range.second) {
467
0
      // Iterate over all the streams for this tablet.
468
0
      for (auto it = it_range.first; it != it_range.second; ++it) {
469
0
        if (!result || it->cdc_state_checkpoint.op_id.index < result->index) {
470
0
          result = it->cdc_state_checkpoint.op_id;
471
0
        }
472
0
      }
473
0
    } else {
474
0
      VLOG(2) << "Didn't find any streams for tablet " << tablet_id;
475
0
    }
476
0
477
0
    return result;
478
0
  }
479
480
80
  TabletCheckpoints TabletCheckpointsCopy() {
481
80
    SharedLock<rw_spinlock> lock(mutex_);
482
80
    return tablet_checkpoints_;
483
80
  }
484
485
  boost::optional<client::AsyncClientInitialiser> async_client_init_;
486
487
  // this will be used for the std::call_once call while caching the client
488
  std::once_flag is_client_cached_;
489
 private:
490
  rw_spinlock& mutex_;
491
492
  TabletCheckpoints tablet_checkpoints_ GUARDED_BY(mutex_);
493
494
  CDCStateMetadata cdc_state_metadata_ GUARDED_BY(mutex_);
495
};
496
497
CDCServiceImpl::CDCServiceImpl(TSTabletManager* tablet_manager,
498
                               const scoped_refptr<MetricEntity>& metric_entity_server,
499
                               MetricRegistry* metric_registry)
500
    : CDCServiceIf(metric_entity_server),
501
      tablet_manager_(tablet_manager),
502
      metric_registry_(metric_registry),
503
      server_metrics_(std::make_shared<CDCServerMetrics>(metric_entity_server)),
504
5.81k
      impl_(new Impl(tablet_manager, &mutex_)) {
505
506
5.81k
  update_peers_and_metrics_thread_.reset(new std::thread(
507
5.81k
      &CDCServiceImpl::UpdatePeersAndMetrics, this));
508
5.81k
}
509
510
73
CDCServiceImpl::~CDCServiceImpl() {
511
73
  Shutdown();
512
73
}
513
514
9.52k
client::YBClient* CDCServiceImpl::client() {
515
9.52k
  return impl_->async_client_init_->client();
516
9.52k
}
517
518
namespace {
519
520
5.07k
bool YsqlTableHasPrimaryKey(const client::YBSchema& schema) {
521
10.4k
  for (const auto& col : schema.columns()) {
522
10.4k
    if (col.order() == static_cast<int32_t>(PgSystemAttrNum::kYBRowId)) {
523
      // ybrowid column is added for tables that don't have user-specified primary key.
524
1
      return false;
525
1
    }
526
10.4k
  }
527
5.07k
  return true;
528
5.07k
}
529
530
166
bool IsTabletPeerLeader(const std::shared_ptr<tablet::TabletPeer>& peer) {
531
166
  return peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY;
532
166
}
533
534
std::unordered_map<std::string, std::string> GetCreateCDCStreamOptions(
535
153
    const CreateCDCStreamRequestPB* req) {
536
153
  std::unordered_map<std::string, std::string> options;
537
153
  if(req->has_namespace_name()) {
538
153
    options.reserve(5);
539
0
  } else {
540
0
    options.reserve(4);
541
0
  }
542
543
153
  options.emplace(kRecordType, CDCRecordType_Name(req->record_type()));
544
153
  options.emplace(kRecordFormat, CDCRecordFormat_Name(req->record_format()));
545
153
  options.emplace(kSourceType, CDCRequestSource_Name(req->source_type()));
546
153
  options.emplace(kCheckpointType, CDCCheckpointType_Name(req->checkpoint_type()));
547
153
  if (req->has_namespace_name()) {
548
153
    options.emplace(kIdType, kNamespaceId);
549
153
  }
550
551
153
  return options;
552
153
}
553
554
Status DoUpdateCDCConsumerOpId(const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
555
                               const OpId& checkpoint,
556
479
                               const TabletId& tablet_id) {
557
479
  std::shared_ptr<consensus::Consensus> shared_consensus = tablet_peer->shared_consensus();
558
559
479
  if (shared_consensus == nullptr) {
560
0
    return STATUS_FORMAT(InternalError,
561
0
                         "Failed to get tablet $0 peer consensus", tablet_id);
562
0
  }
563
564
479
  shared_consensus->UpdateCDCConsumerOpId(checkpoint);
565
479
  return Status::OK();
566
479
}
567
568
bool UpdateCheckpointRequired(const StreamMetadata& record,
569
317
                              const CDCSDKCheckpointPB& cdc_sdk_op_id) {
570
571
317
  switch (record.source_type) {
572
0
    case XCLUSTER:
573
0
      return true;
574
575
317
    case CDCSDK:
576
317
      if (cdc_sdk_op_id.write_id() == 0) {
577
305
        return true;
578
305
      }
579
12
      return cdc_sdk_op_id.write_id() == -1 && cdc_sdk_op_id.key().empty() &&
580
12
             cdc_sdk_op_id.snapshot_time() != 0;
581
582
0
    default:
583
0
      return false;
584
317
  }
585
586
317
}
587
588
bool GetFromOpId(const GetChangesRequestPB* req,
589
                 OpId* op_id,
590
319
                 CDCSDKCheckpointPB* cdc_sdk_op_id) {
591
319
  if (req->has_from_checkpoint()) {
592
0
    *op_id = OpId::FromPB(req->from_checkpoint().op_id());
593
319
  } else if (req->has_from_cdc_sdk_checkpoint()) {
594
18
    *cdc_sdk_op_id = req->from_cdc_sdk_checkpoint();
595
18
    *op_id = OpId::FromPB(*cdc_sdk_op_id);
596
301
  } else {
597
301
    return false;
598
301
  }
599
18
  return true;
600
18
}
601
602
// Check for compatibility whether CDC can be setup on the table
603
// This essentially checks that the table should not be a REDIS table since we do not support it
604
// and if it's a YSQL or YCQL one, it should have a primary key
605
2.53k
Status CheckCdcCompatibility(const std::shared_ptr<client::YBTable>& table) {
606
  // return if it is a CQL table because they always have a user specified primary key
607
2.53k
  if (table->table_type() == client::YBTableType::YQL_TABLE_TYPE) {
608
0
    LOG(INFO) << "Returning while checking CDC compatibility, table is a YCQL table";
609
0
    return Status::OK();
610
0
  }
611
612
2.53k
  if (table->table_type() == client::YBTableType::REDIS_TABLE_TYPE) {
613
0
    return STATUS(InvalidArgument, "Cannot setup CDC on YEDIS_TABLE");;
614
0
  }
615
616
  // Check if YSQL table has a primary key. CQL tables always have a
617
  // user specified primary key.
618
2.53k
  if (!YsqlTableHasPrimaryKey(table->schema())) {
619
0
    return STATUS(InvalidArgument, "Cannot setup CDC on table without primary key");
620
0
  }
621
622
2.53k
  return Status::OK();
623
2.53k
}
624
625
481
CoarseTimePoint GetDeadline(const RpcContext& context, client::YBClient* client) {
626
481
  CoarseTimePoint deadline = context.GetClientDeadline();
627
481
  if (deadline == CoarseTimePoint::max()) {  // Not specified by user.
628
0
    deadline = CoarseMonoClock::now() + client->default_rpc_timeout();
629
0
  }
630
481
  return deadline;
631
481
}
632
633
3
CHECKED_STATUS VerifyArg(const SetCDCCheckpointRequestPB& req) {
634
3
  if (!req.has_checkpoint()) {
635
0
    return STATUS(InvalidArgument, "OpId is required to set checkpoint");
636
0
  }
637
638
3
  if (!req.has_tablet_id()) {
639
0
    return STATUS(InvalidArgument, "Tablet ID is required to set checkpoint");
640
0
  }
641
642
3
  if(!req.has_stream_id()) {
643
0
    return STATUS(InvalidArgument, "Stream ID is required to set checkpoint");
644
0
  }
645
646
3
  return Status::OK();
647
3
}
648
649
} // namespace
650
651
template <class ReqType, class RespType>
652
640
bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) {
653
640
  TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString());
654
640
  if (PREDICT_FALSE(!tablet_manager_)) {
655
0
    SetupErrorAndRespond(resp->mutable_error(),
656
0
                         STATUS(ServiceUnavailable, "Tablet Server is not running"),
657
0
                         CDCErrorPB::NOT_RUNNING,
658
0
                         rpc);
659
0
    return false;
660
0
  }
661
640
  return true;
662
640
}
_ZN2yb3cdc14CDCServiceImpl11CheckOnlineINS0_24CreateCDCStreamRequestPBENS0_25CreateCDCStreamResponsePBEEEbPKT_PT0_PNS_3rpc10RpcContextE
Line
Count
Source
652
154
bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) {
653
154
  TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString());
654
154
  if (PREDICT_FALSE(!tablet_manager_)) {
655
0
    SetupErrorAndRespond(resp->mutable_error(),
656
0
                         STATUS(ServiceUnavailable, "Tablet Server is not running"),
657
0
                         CDCErrorPB::NOT_RUNNING,
658
0
                         rpc);
659
0
    return false;
660
0
  }
661
154
  return true;
662
154
}
Unexecuted instantiation: _ZN2yb3cdc14CDCServiceImpl11CheckOnlineINS0_24DeleteCDCStreamRequestPBENS0_25DeleteCDCStreamResponsePBEEEbPKT_PT0_PNS_3rpc10RpcContextE
Unexecuted instantiation: _ZN2yb3cdc14CDCServiceImpl11CheckOnlineINS0_20ListTabletsRequestPBENS0_21ListTabletsResponsePBEEEbPKT_PT0_PNS_3rpc10RpcContextE
_ZN2yb3cdc14CDCServiceImpl11CheckOnlineINS0_19GetChangesRequestPBENS0_20GetChangesResponsePBEEEbPKT_PT0_PNS_3rpc10RpcContextE
Line
Count
Source
652
320
bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) {
653
320
  TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString());
654
320
  if (PREDICT_FALSE(!tablet_manager_)) {
655
0
    SetupErrorAndRespond(resp->mutable_error(),
656
0
                         STATUS(ServiceUnavailable, "Tablet Server is not running"),
657
0
                         CDCErrorPB::NOT_RUNNING,
658
0
                         rpc);
659
0
    return false;
660
0
  }
661
320
  return true;
662
320
}
_ZN2yb3cdc14CDCServiceImpl11CheckOnlineINS0_22GetCheckpointRequestPBENS0_23GetCheckpointResponsePBEEEbPKT_PT0_PNS_3rpc10RpcContextE
Line
Count
Source
652
7
bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) {
653
7
  TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString());
654
7
  if (PREDICT_FALSE(!tablet_manager_)) {
655
0
    SetupErrorAndRespond(resp->mutable_error(),
656
0
                         STATUS(ServiceUnavailable, "Tablet Server is not running"),
657
0
                         CDCErrorPB::NOT_RUNNING,
658
0
                         rpc);
659
0
    return false;
660
0
  }
661
7
  return true;
662
7
}
_ZN2yb3cdc14CDCServiceImpl11CheckOnlineINS0_33UpdateCdcReplicatedIndexRequestPBENS0_34UpdateCdcReplicatedIndexResponsePBEEEbPKT_PT0_PNS_3rpc10RpcContextE
Line
Count
Source
652
159
bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc) {
653
159
  TRACE("Received RPC $0: $1", rpc->ToString(), req->DebugString());
654
159
  if (PREDICT_FALSE(!tablet_manager_)) {
655
0
    SetupErrorAndRespond(resp->mutable_error(),
656
0
                         STATUS(ServiceUnavailable, "Tablet Server is not running"),
657
0
                         CDCErrorPB::NOT_RUNNING,
658
0
                         rpc);
659
0
    return false;
660
0
  }
661
159
  return true;
662
159
}
Unexecuted instantiation: _ZN2yb3cdc14CDCServiceImpl11CheckOnlineINS0_27GetCDCDBStreamInfoRequestPBENS0_28GetCDCDBStreamInfoResponsePBEEEbPKT_PT0_PNS_3rpc10RpcContextE
663
664
void CDCServiceImpl::CreateEntryInCdcStateTable(
665
    const std::shared_ptr<client::TableHandle>& cdc_state_table,
666
    std::vector<ProducerTabletInfo>* producer_entries_modified,
667
    std::vector<client::YBOperationPtr>* ops,
668
    const CDCStreamId& stream_id,
669
    const TableId& table_id,
670
2.54k
    const TabletId& tablet_id) {
671
2.54k
  OpId op_id;
672
673
2.54k
  const auto cdc_state_table_op = cdc_state_table->NewWriteOp(
674
2.54k
      QLWriteRequestPB::QL_STMT_INSERT);
675
2.54k
  auto *const cdc_state_table_write_req = cdc_state_table_op->mutable_request();
676
677
2.54k
  QLAddStringHashValue(cdc_state_table_write_req, tablet_id);
678
2.54k
  QLAddStringRangeValue(cdc_state_table_write_req, stream_id);
679
2.54k
  cdc_state_table->AddStringColumnValue(cdc_state_table_write_req,
680
2.54k
                                        master::kCdcCheckpoint, op_id.ToString());
681
2.54k
  ops->push_back(std::move(cdc_state_table_op));
682
683
2.54k
  impl_->AddTabletCheckpoint(op_id, stream_id, tablet_id, producer_entries_modified);
684
2.54k
}
685
686
154
Result<NamespaceId> CDCServiceImpl::GetNamespaceId(const std::string& ns_name) {
687
154
  master::GetNamespaceInfoResponsePB namespace_info_resp;
688
154
  RETURN_NOT_OK(client()->GetNamespaceInfo(std::string(),
689
154
                                           ns_name,
690
154
                                           YQL_DATABASE_PGSQL,
691
154
                                           &namespace_info_resp));
692
693
153
  return namespace_info_resp.namespace_().id();
694
154
}
695
696
Status CDCServiceImpl::CreateCDCStreamForNamespace(
697
    const CreateCDCStreamRequestPB* req,
698
    CreateCDCStreamResponsePB* resp,
699
154
    CoarseTimePoint deadline) {
700
154
  auto session = client()->NewSession();
701
702
  // Used to delete streams in case of failure.
703
154
  CDCCreationState creation_state;
704
705
154
  auto scope_exit = ScopeExit([this, &creation_state] {
706
154
    RollbackPartialCreate(creation_state);
707
154
  });
708
709
153
  auto ns_id = VERIFY_RESULT_OR_SET_CODE(
710
153
      GetNamespaceId(req->namespace_name()), CDCError(CDCErrorPB::INVALID_REQUEST));
711
712
  // Generate a stream id by calling CreateCDCStream, and also setup the stream in the master.
713
153
  std::unordered_map<std::string, std::string> options = GetCreateCDCStreamOptions(req);
714
715
153
  CDCStreamId db_stream_id = VERIFY_RESULT_OR_SET_CODE(
716
153
      client()->CreateCDCStream(ns_id, options), CDCError(CDCErrorPB::INTERNAL_ERROR));
717
718
153
  auto table_list = VERIFY_RESULT_OR_SET_CODE(
719
153
      client()->ListUserTables(ns_id), CDCError(CDCErrorPB::INTERNAL_ERROR));
720
721
153
  options.erase(kIdType);
722
723
153
  std::vector<client::YBOperationPtr> ops;
724
153
  std::vector<TableId> table_ids;
725
153
  std::vector<CDCStreamId> stream_ids;
726
727
153
  auto cdc_state_table =
728
153
      VERIFY_RESULT_OR_SET_CODE(GetCdcStateTable(), CDCError(CDCErrorPB::INTERNAL_ERROR));
729
730
2.53k
  for (const auto& table_iter : table_list) {
731
2.53k
    std::shared_ptr<client::YBTable> table;
732
733
2.53k
    RETURN_NOT_OK_SET_CODE(
734
2.53k
        client()->OpenTable(table_iter.table_id(), &table), CDCError(CDCErrorPB::TABLE_NOT_FOUND));
735
736
    // internally if any of the table doesn't have a primary key, then do not create
737
    // a CDC stream ID for that table
738
2.53k
    if (!YsqlTableHasPrimaryKey(table->schema())) {
739
1
      LOG(WARNING) << "Skipping CDC stream creation on " << table->name().table_name()
740
1
                   << " because it does not have a primary key";
741
1
      continue;
742
1
    }
743
744
    // We don't allow CDC on YEDIS and tables without a primary key.
745
2.53k
    if (req->record_format() != CDCRecordFormat::WAL) {
746
2.53k
      RETURN_NOT_OK_SET_CODE(CheckCdcCompatibility(table), CDCError(CDCErrorPB::INVALID_REQUEST));
747
2.53k
    }
748
749
2.53k
    const CDCStreamId stream_id = VERIFY_RESULT_OR_SET_CODE(
750
2.53k
        client()->CreateCDCStream(table_iter.table_id(), options, true, db_stream_id),
751
2.53k
        CDCError(CDCErrorPB::INTERNAL_ERROR));
752
753
2.53k
    creation_state.created_cdc_streams.push_back(stream_id);
754
755
2.53k
    google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
756
2.53k
    RETURN_NOT_OK_SET_CODE(
757
2.53k
        client()->GetTabletsFromTableId(table_iter.table_id(), 0, &tablets),
758
2.53k
        CDCError(CDCErrorPB::TABLE_NOT_FOUND));
759
760
    // For each tablet, create a row in cdc_state table containing the generated stream id, and
761
    // the op id as max in the logs.
762
2.54k
    for (const auto& tablet : tablets) {
763
2.54k
      CreateEntryInCdcStateTable(
764
2.54k
          cdc_state_table,
765
2.54k
          &creation_state.producer_entries_modified,
766
2.54k
          &ops,
767
2.54k
          db_stream_id,
768
2.54k
          table_iter.table_id(),
769
2.54k
          tablet.tablet_id());
770
2.54k
    }
771
2.53k
    stream_ids.push_back(std::move(stream_id));
772
2.53k
    table_ids.push_back(table_iter.table_id());
773
2.53k
  }
774
775
  // Add stream to cache.
776
153
  AddStreamMetadataToCache(
777
153
      db_stream_id,
778
153
      std::make_shared<StreamMetadata>(
779
153
          ns_id, table_ids, req->record_type(), req->record_format(), req->source_type(),
780
153
          req->checkpoint_type()));
781
782
153
  session->SetDeadline(deadline);
783
784
153
  RETURN_NOT_OK_SET_CODE(
785
153
      RefreshCacheOnFail(session->ApplyAndFlush(ops)), CDCError(CDCErrorPB::INTERNAL_ERROR));
786
787
153
  resp->set_db_stream_id(db_stream_id);
788
789
  // Clear creation_state so no changes are reversed by scope_exit since we succeeded.
790
153
  creation_state.Clear();
791
792
153
  return Status::OK();
793
153
}
794
795
void CDCServiceImpl::CreateCDCStream(const CreateCDCStreamRequestPB* req,
796
                                     CreateCDCStreamResponsePB* resp,
797
154
                                     RpcContext context) {
798
154
  CDCStreamId streamId;
799
800
154
  if (!CheckOnline(req, resp, &context)) {
801
0
    return;
802
0
  }
803
804
154
  RPC_CHECK_AND_RETURN_ERROR(req->has_table_id() || req->has_namespace_name(),
805
154
                             STATUS(InvalidArgument,
806
154
                                    "Table ID or Database name is required to create CDC stream"),
807
154
                             resp->mutable_error(),
808
154
                             CDCErrorPB::INVALID_REQUEST,
809
154
                             context);
810
811
154
  bool is_xcluster = req->source_type() == XCLUSTER;
812
154
  if (is_xcluster || req->has_table_id()) {
813
0
    std::shared_ptr<client::YBTable> table;
814
0
    Status s = client()->OpenTable(req->table_id(), &table);
815
0
    RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::TABLE_NOT_FOUND, context);
816
817
    // We don't allow CDC on YEDIS and tables without a primary key.
818
0
    if (req->record_format() != CDCRecordFormat::WAL) {
819
0
      s = CheckCdcCompatibility(table);
820
0
      RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);
821
0
    }
822
823
0
    std::unordered_map<std::string, std::string> options = GetCreateCDCStreamOptions(req);
824
825
0
    auto result = client()->CreateCDCStream(req->table_id(), options);
826
0
    RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(),
827
0
                               CDCErrorPB::INTERNAL_ERROR, context);
828
829
0
    resp->set_stream_id(*result);
830
831
    // Add stream to cache.
832
0
    AddStreamMetadataToCache(
833
0
        *result, std::make_shared<StreamMetadata>(
834
0
                     "",
835
0
                     std::vector<TableId>{req->table_id()},
836
0
                     req->record_type(),
837
0
                     req->record_format(),
838
0
                     req->source_type(),
839
0
                     req->checkpoint_type()));
840
154
  } else if (req->has_namespace_name()) {
841
154
    auto deadline = GetDeadline(context, client());
842
154
    Status status = CreateCDCStreamForNamespace(req, resp, deadline);
843
154
    CDCError error(status);
844
845
154
    if (!status.ok()) {
846
1
      SetupErrorAndRespond(resp->mutable_error(), status, error.value(), &context);
847
1
      return;
848
1
    }
849
153
  }
850
851
153
  context.RespondSuccess();
852
153
}
853
854
Result<SetCDCCheckpointResponsePB> CDCServiceImpl::SetCDCCheckpoint(
855
3
    const SetCDCCheckpointRequestPB& req, CoarseTimePoint deadline) {
856
0
  VLOG(1) << "Received SetCDCCheckpoint request " << req.ShortDebugString();
857
858
3
  RETURN_NOT_OK_SET_CODE(VerifyArg(req), CDCError(CDCErrorPB::INVALID_REQUEST));
859
860
3
  auto record = VERIFY_RESULT(GetStream(req.stream_id()));
861
3
  if ((*record).checkpoint_type != EXPLICIT) {
862
1
    LOG(WARNING) << "Setting the checkpoint explicitly even though the checkpoint type is implicit";
863
1
  }
864
865
3
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
866
3
  auto s = tablet_manager_->GetTabletPeer(req.tablet_id(), &tablet_peer);
867
868
3
  if (s.IsNotFound()) {
869
0
    RETURN_NOT_OK_SET_CODE(s, CDCError(CDCErrorPB::TABLET_NOT_FOUND));
870
3
  } else if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::NOT_LEADER) {
871
0
    RETURN_NOT_OK_SET_CODE(s, CDCError(CDCErrorPB::NOT_LEADER));
872
3
  } else if (!s.ok()) {
873
0
    RETURN_NOT_OK_SET_CODE(s, CDCError(CDCErrorPB::LEADER_NOT_READY));
874
0
  }
875
876
3
  ProducerTabletInfo producer_tablet{"" /* UUID */, req.stream_id(), req.tablet_id()};
877
3
  OpId checkpoint = OpId::FromPB(req.checkpoint().op_id());
878
879
3
  auto session = client()->NewSession();
880
3
  session->SetDeadline(deadline);
881
3
  RETURN_NOT_OK_SET_CODE(
882
3
      UpdateCheckpoint(producer_tablet, checkpoint, checkpoint, session, GetCurrentTimeMicros()),
883
3
      CDCError(CDCErrorPB::INTERNAL_ERROR));
884
885
3
  RETURN_NOT_OK_SET_CODE(
886
3
      DoUpdateCDCConsumerOpId(tablet_peer, checkpoint, req.tablet_id()),
887
3
      CDCError(CDCErrorPB::INTERNAL_ERROR));
888
889
3
  return SetCDCCheckpointResponsePB();
890
3
}
891
892
void CDCServiceImpl::DeleteCDCStream(const DeleteCDCStreamRequestPB* req,
893
                                     DeleteCDCStreamResponsePB* resp,
894
0
                                     RpcContext context) {
895
0
  if (!CheckOnline(req, resp, &context)) {
896
0
    return;
897
0
  }
898
899
0
  LOG(INFO) << "Received DeleteCDCStream request " << req->ShortDebugString();
900
901
0
  RPC_CHECK_AND_RETURN_ERROR(
902
0
      !req->stream_id().empty(),
903
0
      STATUS(InvalidArgument, "Stream ID or Database stream ID is required to delete CDC stream"),
904
0
      resp->mutable_error(),
905
0
      CDCErrorPB::INVALID_REQUEST,
906
0
      context);
907
908
0
  vector<CDCStreamId> streams(req->stream_id().begin(), req->stream_id().end());
909
0
  Status s = client()->DeleteCDCStream(
910
0
        streams,
911
0
        (req->has_force_delete() && req->force_delete()),
912
0
        (req->has_ignore_errors() && req->ignore_errors()));
913
0
  RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
914
915
0
  context.RespondSuccess();
916
0
}
917
918
void CDCServiceImpl::ListTablets(const ListTabletsRequestPB* req,
919
                                 ListTabletsResponsePB* resp,
920
0
                                 RpcContext context) {
921
0
  if (!CheckOnline(req, resp, &context)) {
922
0
    return;
923
0
  }
924
925
0
  RPC_CHECK_AND_RETURN_ERROR(req->has_stream_id(),
926
0
                             STATUS(InvalidArgument, "Stream ID is required to list tablets"),
927
0
                             resp->mutable_error(),
928
0
                             CDCErrorPB::INVALID_REQUEST,
929
0
                             context);
930
931
0
  auto tablets = GetTablets(req->stream_id());
932
0
  RPC_CHECK_AND_RETURN_ERROR(tablets.ok(), tablets.status(), resp->mutable_error(),
933
0
                             CDCErrorPB::INTERNAL_ERROR, context);
934
935
0
  if (!req->local_only()) {
936
0
    resp->mutable_tablets()->Reserve(tablets->size());
937
0
  }
938
939
0
  for (const auto& tablet : *tablets) {
940
    // Filter local tablets if needed.
941
0
    if (req->local_only()) {
942
0
      bool is_local = false;
943
0
      for (const auto& replica : tablet.replicas()) {
944
0
        if (replica.ts_info().permanent_uuid() == tablet_manager_->server()->permanent_uuid()) {
945
0
          is_local = true;
946
0
          break;
947
0
        }
948
0
      }
949
950
0
      if (!is_local) {
951
0
        continue;
952
0
      }
953
0
    }
954
955
0
    auto res = resp->add_tablets();
956
0
    res->set_tablet_id(tablet.tablet_id());
957
0
    res->mutable_tservers()->Reserve(tablet.replicas_size());
958
0
    for (const auto& replica : tablet.replicas()) {
959
0
      auto tserver =  res->add_tservers();
960
0
      tserver->mutable_broadcast_addresses()->CopyFrom(replica.ts_info().broadcast_addresses());
961
0
      if (tserver->broadcast_addresses_size() == 0) {
962
0
        LOG(WARNING) << "No public broadcast addresses found for "
963
0
                     << replica.ts_info().permanent_uuid() << ".  Using private addresses instead.";
964
0
        tserver->mutable_broadcast_addresses()->CopyFrom(replica.ts_info().private_rpc_addresses());
965
0
      }
966
0
    }
967
0
  }
968
969
0
  context.RespondSuccess();
970
0
}
971
972
Result<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> CDCServiceImpl::GetTablets(
973
1
    const CDCStreamId& stream_id) {
974
0
  auto stream_metadata = VERIFY_RESULT(GetStream(stream_id));
975
0
  client::YBTableName table_name;
976
0
  google::protobuf::RepeatedPtrField<master::TabletLocationsPB> all_tablets;
977
978
0
  for (const auto& table_id : stream_metadata->table_ids) {
979
0
    google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
980
0
    table_name.set_table_id(table_id);
981
0
    RETURN_NOT_OK(client()->GetTablets(
982
0
        table_name, 0, &tablets, /* partition_list_version =*/nullptr,
983
0
        RequireTabletsRunning::kFalse, master::IncludeInactive::kTrue));
984
985
0
    all_tablets.MergeFrom(tablets);
986
0
  }
987
988
0
  return all_tablets;
989
0
}
990
991
void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
992
                                GetChangesResponsePB* resp,
993
320
                                RpcContext context) {
994
320
  if (!CheckOnline(req, resp, &context)) {
995
0
    return;
996
0
  }
997
320
  YB_LOG_EVERY_N_SECS(INFO, 300) << "Received GetChanges request " << req->ShortDebugString();
998
999
320
  RPC_CHECK_AND_RETURN_ERROR(req->has_tablet_id(),
1000
320
                             STATUS(InvalidArgument, "Tablet ID is required to get CDC changes"),
1001
320
                             resp->mutable_error(),
1002
320
                             CDCErrorPB::INVALID_REQUEST,
1003
320
                             context);
1004
320
  RPC_CHECK_AND_RETURN_ERROR(req->has_stream_id() || req->has_db_stream_id(),
1005
320
                             STATUS(InvalidArgument,
1006
320
                             "Stream ID/DB Stream ID is required to get CDC changes"),
1007
320
                             resp->mutable_error(),
1008
320
                             CDCErrorPB::INVALID_REQUEST,
1009
320
                             context);
1010
1011
320
  ProducerTabletInfo producer_tablet;
1012
320
  CDCStreamId stream_id = req->has_db_stream_id() ? req->db_stream_id() : req->stream_id();
1013
1014
320
  auto session = client()->NewSession();
1015
320
  CoarseTimePoint deadline = GetDeadline(context, client());
1016
320
  session->SetDeadline(deadline);
1017
1018
  // Check that requested tablet_id is part of the CDC stream.
1019
320
  producer_tablet = {"" /* UUID */, stream_id, req->tablet_id()};
1020
1021
320
  Status s = CheckTabletValidForStream(producer_tablet);
1022
320
  RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);
1023
1024
319
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1025
319
  s = tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer);
1026
319
  auto original_leader_term = tablet_peer ? tablet_peer->LeaderTerm() : OpId::kUnknownTerm;
1027
1028
  // If we can't serve this tablet...
1029
319
  if (s.IsNotFound() || tablet_peer->LeaderStatus() != consensus::LeaderStatus::LEADER_AND_READY) {
1030
0
    if (req->serve_as_proxy()) {
1031
      // Forward GetChanges() to tablet leader. This commonly happens in Kubernetes setups.
1032
0
      auto context_ptr = std::make_shared<RpcContext>(std::move(context));
1033
0
      TabletLeaderGetChanges(req, resp, context_ptr, tablet_peer);
1034
    // Otherwise, figure out the proper return code.
1035
0
    } else if (s.IsNotFound()) {
1036
0
      SetupErrorAndRespond(resp->mutable_error(), s, CDCErrorPB::TABLET_NOT_FOUND, &context);
1037
0
    } else if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::NOT_LEADER) {
1038
      // TODO: we may be able to get some changes, even if we're not the leader.
1039
0
      SetupErrorAndRespond(resp->mutable_error(),
1040
0
          STATUS(NotFound, Format("Not leader for $0", req->tablet_id())),
1041
0
          CDCErrorPB::TABLET_NOT_FOUND, &context);
1042
0
    } else {
1043
0
      SetupErrorAndRespond(resp->mutable_error(),
1044
0
          STATUS(LeaderNotReadyToServe, "Not ready to serve"),
1045
0
          CDCErrorPB::LEADER_NOT_READY, &context);
1046
0
    }
1047
0
    return;
1048
0
  }
1049
1050
  // This is the leader tablet, so mark cdc as enabled.
1051
319
  cdc_enabled_.store(true, std::memory_order_release);
1052
1053
319
  auto res = GetStream(stream_id);
1054
319
  RPC_CHECK_AND_RETURN_ERROR(res.ok(), res.status(), resp->mutable_error(),
1055
319
                             CDCErrorPB::INTERNAL_ERROR, context);
1056
319
  StreamMetadata record = **res;
1057
1058
319
  OpId op_id;
1059
319
  CDCSDKCheckpointPB cdc_sdk_op_id;
1060
  // Get opId from request.
1061
319
  if (!GetFromOpId(req, &op_id, &cdc_sdk_op_id)) {
1062
301
    auto result = GetLastCheckpoint(producer_tablet, session);
1063
301
    RPC_CHECK_AND_RETURN_ERROR(
1064
301
        result.ok(), result.status(), resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1065
301
    if (record.source_type == XCLUSTER) {
1066
0
      op_id = *result;
1067
301
    } else {
1068
301
      result->ToPB(&cdc_sdk_op_id);
1069
301
      op_id = OpId::FromPB(cdc_sdk_op_id);
1070
301
    }
1071
301
  }
1072
1073
319
  int64_t last_readable_index;
1074
319
  consensus::ReplicateMsgsHolder msgs_holder;
1075
319
  MemTrackerPtr mem_tracker = impl_->GetMemTracker(tablet_peer, producer_tablet);
1076
1077
  // Calculate deadline to be passed to GetChanges.
1078
319
  CoarseTimePoint get_changes_deadline = CoarseTimePoint::max();
1079
319
  if (deadline != CoarseTimePoint::max()) {
1080
    // Check if we are too close to calculate a safe deadline.
1081
319
    RPC_CHECK_AND_RETURN_ERROR(
1082
319
      deadline - CoarseMonoClock::Now() > 1ms,
1083
319
      STATUS(TimedOut, "Too close to rpc timeout to call GetChanges."),
1084
319
      resp->mutable_error(),
1085
319
      CDCErrorPB::INTERNAL_ERROR, context);
1086
1087
    // Calculate a safe deadline so that CdcProducer::GetChanges times out
1088
    // 20% faster than CdcServiceImpl::GetChanges. This gives enough
1089
    // time (unless timeouts are unrealistically small) for CdcServiceImpl::GetChanges
1090
    // to finish post-processing and return the partial results without itself timing out.
1091
319
    const auto safe_deadline = deadline -
1092
319
      (FLAGS_cdc_read_rpc_timeout_ms * 1ms * FLAGS_cdc_read_safe_deadline_ratio);
1093
319
    get_changes_deadline = ToCoarse(MonoTime::FromUint64(safe_deadline.time_since_epoch().count()));
1094
319
  }
1095
1096
  // Read the latest changes from the Log.
1097
319
  if (record.source_type == XCLUSTER) {
1098
0
    s = cdc::GetChangesForXCluster(
1099
0
        stream_id, req->tablet_id(), op_id, record, tablet_peer, mem_tracker,
1100
0
        &msgs_holder, resp, &last_readable_index, get_changes_deadline);
1101
319
  } else {
1102
319
    std::string commit_timestamp;
1103
319
    OpId last_streamed_op_id;
1104
1105
319
    auto cached_schema = impl_->GetOrAddSchema(producer_tablet);
1106
319
    s = cdc::GetChangesForCDCSDK(
1107
319
        req->stream_id(), req->tablet_id(), cdc_sdk_op_id, record, tablet_peer, mem_tracker,
1108
319
        &msgs_holder, resp, &commit_timestamp, &cached_schema,
1109
319
        &last_streamed_op_id, &last_readable_index, get_changes_deadline);
1110
1111
319
    impl_->UpdateCDCStateMetadata(
1112
319
        producer_tablet, commit_timestamp, cached_schema, last_streamed_op_id);
1113
319
  }
1114
1115
319
  RPC_STATUS_RETURN_ERROR(
1116
319
      s,
1117
319
      resp->mutable_error(),
1118
319
      s.IsNotFound() ? CDCErrorPB::CHECKPOINT_TOO_OLD : CDCErrorPB::UNKNOWN_ERROR,
1119
319
      context);
1120
1121
  // Verify leadership was maintained for the duration of the GetChanges() read.
1122
319
  s = tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer);
1123
319
  if (s.IsNotFound() || tablet_peer->LeaderStatus() != consensus::LeaderStatus::LEADER_AND_READY ||
1124
319
      tablet_peer->LeaderTerm() != original_leader_term) {
1125
0
    SetupErrorAndRespond(resp->mutable_error(),
1126
0
        STATUS(NotFound, Format("Not leader for $0", req->tablet_id())),
1127
0
        CDCErrorPB::TABLET_NOT_FOUND, &context);
1128
0
    return;
1129
0
  }
1130
1131
  // Store information about the last server read & remote client ACK.
1132
319
  uint64_t last_record_hybrid_time = resp->records_size() > 0 ?
1133
319
      resp->records(resp->records_size() - 1).time() : 0;
1134
1135
319
  if (record.checkpoint_type == IMPLICIT) {
1136
317
    if (UpdateCheckpointRequired(record, cdc_sdk_op_id)) {
1137
311
      s = UpdateCheckpoint(producer_tablet, OpId::FromPB(resp->checkpoint().op_id()),
1138
311
                           op_id, session, last_record_hybrid_time);
1139
311
    }
1140
1141
317
    RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1142
1143
317
    s = DoUpdateCDCConsumerOpId(tablet_peer,
1144
317
                                impl_->GetMinSentCheckpointForTablet(req->tablet_id()),
1145
317
                                req->tablet_id());
1146
1147
317
    RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1148
317
  }
1149
  // Update relevant GetChanges metrics before handing off the Response.
1150
319
  UpdateCDCTabletMetrics(resp, producer_tablet, tablet_peer, op_id, last_readable_index);
1151
319
  context.RespondSuccess();
1152
319
}
1153
1154
Status CDCServiceImpl::UpdatePeersCdcMinReplicatedIndex(const TabletId& tablet_id,
1155
                                                        int64_t min_index,
1156
159
                                                        int64_t min_term) {
1157
159
  std::vector<client::internal::RemoteTabletServer *> servers;
1158
159
  RETURN_NOT_OK(GetTServers(tablet_id, &servers));
1159
159
  for (const auto &server : servers) {
1160
159
    if (server->IsLocal()) {
1161
      // We modify our log directly. Avoid calling itself through the proxy.
1162
0
      continue;
1163
0
    }
1164
159
    LOG(INFO) << "Modifying remote peer " << server->ToString();
1165
159
    auto proxy = GetCDCServiceProxy(server);
1166
159
    UpdateCdcReplicatedIndexRequestPB update_index_req;
1167
159
    UpdateCdcReplicatedIndexResponsePB update_index_resp;
1168
159
    update_index_req.set_tablet_id(tablet_id);
1169
159
    update_index_req.set_replicated_index(min_index);
1170
159
    update_index_req.set_replicated_term(min_term);
1171
159
    rpc::RpcController rpc;
1172
159
    rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms));
1173
159
    RETURN_NOT_OK(proxy->UpdateCdcReplicatedIndex(update_index_req, &update_index_resp, &rpc));
1174
159
    if (update_index_resp.has_error()) {
1175
0
      return StatusFromPB(update_index_resp.error().status());
1176
0
    }
1177
159
  }
1178
159
  return Status::OK();
1179
159
}
1180
1181
void CDCServiceImpl::ComputeLagMetric(int64_t last_replicated_micros,
1182
                                      int64_t metric_last_timestamp_micros,
1183
                                      int64_t cdc_state_last_replication_time_micros,
1184
318
                                      scoped_refptr<AtomicGauge<int64_t>> metric) {
1185
318
  if (metric_last_timestamp_micros == 0) {
1186
    // The tablet metric timestamp is uninitialized, so try to use last replicated time in cdc
1187
    // state.
1188
292
    if (cdc_state_last_replication_time_micros == 0) {
1189
      // Last replicated time in cdc state is uninitialized as well, so set the metric value to
1190
      // 0 and update later when we have a suitable lower bound.
1191
292
      metric->set_value(0);
1192
0
    } else {
1193
0
      metric->set_value(last_replicated_micros - cdc_state_last_replication_time_micros);
1194
0
    }
1195
26
  } else {
1196
26
    metric->set_value(last_replicated_micros - metric_last_timestamp_micros);
1197
26
  }
1198
318
}
1199
1200
80
void CDCServiceImpl::UpdateLagMetrics() {
1201
80
  auto tablet_checkpoints = impl_->TabletCheckpointsCopy();
1202
1203
80
  auto cdc_state_table_result = GetCdcStateTable();
1204
80
  if (!cdc_state_table_result.ok()) {
1205
    // It is possible that this runs before the cdc_state table is created. This is
1206
    // ok. It just means that this is the first time the cluster starts.
1207
0
    YB_LOG_EVERY_N_SECS(WARNING, 30)
1208
0
        << "Unable to open table " << kCdcStateTableName.table_name() << " for metrics update.";
1209
0
    return;
1210
0
  }
1211
1212
80
  std::unordered_set<ProducerTabletInfo, ProducerTabletInfo::Hash> tablets_in_cdc_state_table;
1213
80
  client::TableIteratorOptions options;
1214
80
  options.columns = std::vector<string>{
1215
80
      master::kCdcTabletId, master::kCdcStreamId, master::kCdcLastReplicationTime};
1216
80
  bool failed = false;
1217
0
  options.error_handler = [&failed](const Status& status) {
1218
0
    YB_LOG_EVERY_N_SECS(WARNING, 30) << "Scan of table " << kCdcStateTableName.table_name()
1219
0
                                     << " failed: " << status << ". Could not update metrics.";
1220
0
    failed = true;
1221
0
  };
1222
  // First go through tablets in the cdc_state table and update metrics for each one.
1223
159
  for (const auto& row : client::TableRange(**cdc_state_table_result, options)) {
1224
159
    auto tablet_id = row.column(master::kCdcTabletIdIdx).string_value();
1225
159
    auto stream_id = row.column(master::kCdcStreamIdIdx).string_value();
1226
159
    std::shared_ptr<tablet::TabletPeer> tablet_peer;
1227
159
    Status s = tablet_manager_->GetTabletPeer(tablet_id, &tablet_peer);
1228
159
    if (s.IsNotFound()) {
1229
0
      continue;
1230
0
    }
1231
1232
159
    ProducerTabletInfo tablet_info = {"" /* universe_uuid */, stream_id, tablet_id};
1233
159
    tablets_in_cdc_state_table.insert(tablet_info);
1234
159
    auto tablet_metric = GetCDCTabletMetrics(tablet_info, tablet_peer);
1235
159
    if (!tablet_metric) {
1236
0
      continue;
1237
0
    }
1238
159
    if (tablet_peer->LeaderStatus() != consensus::LeaderStatus::LEADER_AND_READY) {
1239
      // Set lag to 0 because we're not the leader for this tablet anymore, which means another peer
1240
      // is responsible for tracking this tablet's lag.
1241
0
      tablet_metric->async_replication_sent_lag_micros->set_value(0);
1242
0
      tablet_metric->async_replication_committed_lag_micros->set_value(0);
1243
159
    } else {
1244
      // Get the physical time of the last committed record on producer.
1245
159
      auto last_replicated_micros = GetLastReplicatedTime(tablet_peer);
1246
159
      const auto& timestamp_ql_value = row.column(2);
1247
159
      auto cdc_state_last_replication_time_micros =
1248
159
          !timestamp_ql_value.IsNull() ?
1249
159
          timestamp_ql_value.timestamp_value().ToInt64() : 0;
1250
159
      auto last_sent_micros = tablet_metric->last_read_physicaltime->value();
1251
159
      ComputeLagMetric(last_replicated_micros, last_sent_micros,
1252
159
                       cdc_state_last_replication_time_micros,
1253
159
                       tablet_metric->async_replication_sent_lag_micros);
1254
159
      auto last_committed_micros = tablet_metric->last_checkpoint_physicaltime->value();
1255
159
      ComputeLagMetric(last_replicated_micros, last_committed_micros,
1256
159
                       cdc_state_last_replication_time_micros,
1257
159
                       tablet_metric->async_replication_committed_lag_micros);
1258
159
    }
1259
159
  }
1260
80
  if (failed) {
1261
0
    RefreshCdcStateTable();
1262
0
    return;
1263
0
  }
1264
1265
  // Now, go through tablets in tablet_checkpoints_ and set lag to 0 for all tablets we're no
1266
  // longer replicating.
1267
159
  for (const auto& checkpoint : tablet_checkpoints) {
1268
159
    const ProducerTabletInfo& tablet_info = checkpoint.producer_tablet_info;
1269
159
    if (tablets_in_cdc_state_table.find(tablet_info) == tablets_in_cdc_state_table.end()) {
1270
      // We're no longer replicating this tablet, so set lag to 0.
1271
0
      std::shared_ptr<tablet::TabletPeer> tablet_peer;
1272
0
      Status s = tablet_manager_->GetTabletPeer(checkpoint.tablet_id(), &tablet_peer);
1273
0
      if (s.IsNotFound()) {
1274
0
        continue;
1275
0
      }
1276
0
      auto tablet_metric = GetCDCTabletMetrics(checkpoint.producer_tablet_info, tablet_peer);
1277
0
      if (!tablet_metric) {
1278
0
        continue;
1279
0
      }
1280
0
      tablet_metric->async_replication_sent_lag_micros->set_value(0);
1281
0
      tablet_metric->async_replication_committed_lag_micros->set_value(0);
1282
0
    }
1283
159
  }
1284
80
}
1285
1286
333
bool CDCServiceImpl::ShouldUpdateLagMetrics(MonoTime time_since_update_metrics) {
1287
  // Only update metrics if cdc is enabled, which means we have a valid replication stream.
1288
333
  return GetAtomicFlag(&FLAGS_enable_collect_cdc_metrics) &&
1289
333
         (time_since_update_metrics == MonoTime::kUninitialized ||
1290
253
         MonoTime::Now() - time_since_update_metrics >=
1291
253
             MonoDelta::FromMilliseconds(GetAtomicFlag(&FLAGS_update_metrics_interval_ms)));
1292
333
}
1293
1294
0
bool CDCServiceImpl::CDCEnabled() {
1295
0
  return cdc_enabled_.load(std::memory_order_acquire);
1296
0
}
1297
1298
619
Result<std::shared_ptr<client::TableHandle>> CDCServiceImpl::GetCdcStateTable() {
1299
619
  bool use_cache = GetAtomicFlag(&FLAGS_enable_cdc_state_table_caching);
1300
619
  {
1301
619
    SharedLock<decltype(mutex_)> l(mutex_);
1302
619
    if (cdc_state_table_ && use_cache) {
1303
535
      return cdc_state_table_;
1304
535
    }
1305
84
    if (cdc_service_stopped_) {
1306
0
      return STATUS(ShutdownInProgress, "");
1307
0
    }
1308
84
  }
1309
1310
84
  auto cdc_state_table = std::make_shared<yb::client::TableHandle>();
1311
84
  auto s = cdc_state_table->Open(kCdcStateTableName, client());
1312
  // It is possible that this runs before the cdc_state table is created.
1313
84
  RETURN_NOT_OK(s);
1314
1315
84
  {
1316
84
    std::lock_guard<decltype(mutex_)> l(mutex_);
1317
84
    if (cdc_state_table_ && use_cache) {
1318
0
      return cdc_state_table_;
1319
0
    }
1320
84
    if (cdc_service_stopped_) {
1321
0
      return STATUS(ShutdownInProgress, "");
1322
0
    }
1323
84
    cdc_state_table_ = cdc_state_table;
1324
84
    return cdc_state_table_;
1325
84
  }
1326
84
}
1327
1328
0
void CDCServiceImpl::RefreshCdcStateTable() {
1329
  // Set cached value to null so we regenerate it on the next call.
1330
0
  std::lock_guard<decltype(mutex_)> l(mutex_);
1331
0
  cdc_state_table_ = nullptr;
1332
0
}
1333
1334
459
Status CDCServiceImpl::RefreshCacheOnFail(const Status& s) {
1335
459
  if (!s.ok()) {
1336
0
    RefreshCdcStateTable();
1337
0
  }
1338
459
  return s;
1339
459
}
1340
1341
MicrosTime CDCServiceImpl::GetLastReplicatedTime(
1342
478
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer) {
1343
478
  tablet::RemoveIntentsData data;
1344
478
  tablet_peer->GetLastReplicatedData(&data);
1345
478
  return data.log_ht.GetPhysicalValueMicros();
1346
478
}
1347
1348
4.65k
void CDCServiceImpl::UpdatePeersAndMetrics() {
1349
4.65k
  int64_t current_term = -1;
1350
4.65k
  MonoTime time_since_update_peers = MonoTime::kUninitialized;
1351
4.65k
  MonoTime time_since_update_metrics = MonoTime::kUninitialized;
1352
1353
  // Returns false if the CDC service has been stopped.
1354
1.60M
  auto sleep_while_not_stopped = [this]() {
1355
1.60M
    int min_sleep_ms = std::min(100, GetAtomicFlag(&FLAGS_update_metrics_interval_ms));
1356
1.60M
    auto sleep_period = MonoDelta::FromMilliseconds(min_sleep_ms);
1357
1.60M
    SleepFor(sleep_period);
1358
1359
1.60M
    SharedLock<decltype(mutex_)> l(mutex_);
1360
1.60M
    return !cdc_service_stopped_;
1361
1.60M
  };
1362
1363
773k
  do {
1364
773k
    if (!cdc_enabled_.load(std::memory_order_acquire)) {
1365
      // Have not yet received any GetChanges requests, so skip background thread work.
1366
773k
      continue;
1367
773k
    }
1368
    // Should we update lag metrics default every 1s.
1369
333
    if (ShouldUpdateLagMetrics(time_since_update_metrics)) {
1370
80
      UpdateLagMetrics();
1371
80
      time_since_update_metrics = MonoTime::Now();
1372
80
    }
1373
1374
    // If its not been 60s since the last peer update, continue.
1375
333
    if (!FLAGS_enable_log_retention_by_op_idx ||
1376
333
        (time_since_update_peers != MonoTime::kUninitialized &&
1377
253
         MonoTime::Now() - time_since_update_peers <
1378
253
             MonoDelta::FromSeconds(GetAtomicFlag(&FLAGS_update_min_cdc_indices_interval_secs)))) {
1379
253
      continue;
1380
253
    }
1381
1382
80
    time_since_update_peers = MonoTime::Now();
1383
80
    LOG(INFO) << "Started to read minimum replicated indices for all tablets";
1384
1385
80
    auto cdc_state_table_result = GetCdcStateTable();
1386
80
    if (!cdc_state_table_result.ok()) {
1387
      // It is possible that this runs before the cdc_state table is created. This is
1388
      // ok. It just means that this is the first time the cluster starts.
1389
0
      YB_LOG_EVERY_N_SECS(WARNING, 3600) << "Unable to open table "
1390
0
                                         << kCdcStateTableName.table_name()
1391
0
                                         << ". CDC min replicated indices won't be updated";
1392
0
      continue;
1393
0
    }
1394
1395
80
    int count = 0;
1396
80
    std::unordered_map<std::string, int64_t> tablet_min_checkpoint_index;
1397
80
    client::TableIteratorOptions options;
1398
80
    bool failed = false;
1399
0
    options.error_handler = [&failed](const Status& status) {
1400
0
      LOG(WARNING) << "Scan of table " << kCdcStateTableName.table_name() << " failed: " << status;
1401
0
      failed = true;
1402
0
    };
1403
80
    options.columns = std::vector<std::string>{master::kCdcTabletId, master::kCdcStreamId,
1404
80
        master::kCdcCheckpoint, master::kCdcLastReplicationTime};
1405
159
    for (const auto& row : client::TableRange(**cdc_state_table_result, options)) {
1406
159
      count++;
1407
159
      auto tablet_id = row.column(master::kCdcTabletIdIdx).string_value();
1408
159
      auto stream_id = row.column(master::kCdcStreamIdIdx).string_value();
1409
159
      auto checkpoint = row.column(master::kCdcCheckpointIdx).string_value();
1410
159
      std::string last_replicated_time_str;
1411
159
      const auto& timestamp_ql_value = row.column(3);
1412
159
      if (!timestamp_ql_value.IsNull()) {
1413
0
        last_replicated_time_str = timestamp_ql_value.timestamp_value().ToFormattedString();
1414
0
      }
1415
1416
0
      VLOG(1) << "stream_id: " << stream_id << ", tablet_id: " << tablet_id
1417
0
              << ", checkpoint: " << checkpoint << ", last replicated time: "
1418
0
              << last_replicated_time_str;
1419
1420
159
      auto result = OpId::FromString(checkpoint);
1421
159
      if (!result.ok()) {
1422
0
        LOG(WARNING) << "Read invalid op id " << row.column(1).string_value()
1423
0
                     << " for tablet " << tablet_id;
1424
0
        continue;
1425
0
      }
1426
1427
159
      auto index = (*result).index;
1428
159
      current_term = (*result).term;
1429
159
      auto it = tablet_min_checkpoint_index.find(tablet_id);
1430
159
      if (it == tablet_min_checkpoint_index.end()) {
1431
159
        tablet_min_checkpoint_index[tablet_id] = index;
1432
0
      } else {
1433
0
        if (index < it->second) {
1434
0
          it->second = index;
1435
0
        }
1436
0
      }
1437
159
    }
1438
80
    if (failed) {
1439
0
      RefreshCdcStateTable();
1440
0
      continue;
1441
0
    }
1442
80
    LOG(INFO) << "Read " << count << " records from " << kCdcStateTableName.table_name();
1443
1444
0
    VLOG(3) << "tablet_min_checkpoint_index size " << tablet_min_checkpoint_index.size();
1445
159
    for (const auto &elem : tablet_min_checkpoint_index) {
1446
159
      auto tablet_id = elem.first;
1447
159
      std::shared_ptr<tablet::TabletPeer> tablet_peer;
1448
1449
159
      Status s = tablet_manager_->GetTabletPeer(tablet_id, &tablet_peer);
1450
159
      if (s.IsNotFound()) {
1451
0
        VLOG(2) << "Did not found tablet peer for tablet " << tablet_id;
1452
0
        continue;
1453
159
      } else if (!IsTabletPeerLeader(tablet_peer)) {
1454
0
        VLOG(2) << "Tablet peer " << tablet_peer->permanent_uuid()
1455
0
                << " is not the leader for tablet " << tablet_id;
1456
0
        continue;
1457
159
      } else if (!s.ok()) {
1458
0
        LOG(WARNING) << "Error getting tablet_peer for tablet " << tablet_id << ": " << s;
1459
0
        continue;
1460
0
      }
1461
1462
159
      auto min_index = elem.second;
1463
159
      s = tablet_peer->set_cdc_min_replicated_index(min_index);
1464
159
      if (!s.ok()) {
1465
0
        LOG(WARNING) << "Unable to set cdc min index for tablet peer "
1466
0
                     << tablet_peer->permanent_uuid()
1467
0
                     << " and tablet " << tablet_peer->tablet_id()
1468
0
                     << ": " << s;
1469
0
      }
1470
0
      VLOG(1) << "Updating followers for tablet " << tablet_id << " with index " << min_index;
1471
159
      WARN_NOT_OK(UpdatePeersCdcMinReplicatedIndex(tablet_id, min_index, current_term),
1472
159
                  "UpdatePeersCdcMinReplicatedIndex failed");
1473
159
    }
1474
80
    LOG(INFO) << "Done reading all the indices for all tablets and updating peers";
1475
773k
  } while (sleep_while_not_stopped());
1476
4.65k
}
1477
1478
Result<client::internal::RemoteTabletPtr> CDCServiceImpl::GetRemoteTablet(
1479
159
    const TabletId& tablet_id) {
1480
159
  std::promise<Result<client::internal::RemoteTabletPtr>> tablet_lookup_promise;
1481
159
  auto future = tablet_lookup_promise.get_future();
1482
159
  auto callback = [&tablet_lookup_promise](
1483
159
      const Result<client::internal::RemoteTabletPtr>& result) {
1484
159
    tablet_lookup_promise.set_value(result);
1485
159
  };
1486
1487
159
  auto start = CoarseMonoClock::Now();
1488
159
  client()->LookupTabletById(
1489
159
      tablet_id,
1490
159
      /* table =*/ nullptr,
1491
      // In case this is a split parent tablet, it will be hidden so we need this flag to access it.
1492
159
      master::IncludeInactive::kTrue,
1493
159
      CoarseMonoClock::Now() + MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms),
1494
159
      callback, client::UseCache::kFalse);
1495
159
  future.wait();
1496
1497
159
  auto duration = CoarseMonoClock::Now() - start;
1498
159
  if (duration > (kMaxDurationForTabletLookup * 1ms)) {
1499
0
    LOG(WARNING) << "LookupTabletByKey took long time: " << duration << " ms";
1500
0
  }
1501
1502
159
  auto remote_tablet = VERIFY_RESULT(future.get());
1503
159
  return remote_tablet;
1504
159
}
1505
1506
0
Result<RemoteTabletServer *> CDCServiceImpl::GetLeaderTServer(const TabletId& tablet_id) {
1507
0
  auto result = VERIFY_RESULT(GetRemoteTablet(tablet_id));
1508
1509
0
  auto ts = result->LeaderTServer();
1510
0
  if (ts == nullptr) {
1511
0
    return STATUS(NotFound, "Tablet leader not found for tablet", tablet_id);
1512
0
  }
1513
0
  return ts;
1514
0
}
1515
1516
Status CDCServiceImpl::GetTServers(const TabletId& tablet_id,
1517
159
                                   std::vector<client::internal::RemoteTabletServer*>* servers) {
1518
159
  auto result = VERIFY_RESULT(GetRemoteTablet(tablet_id));
1519
1520
159
  result->GetRemoteTabletServers(servers);
1521
159
  return Status::OK();
1522
159
}
1523
1524
159
std::shared_ptr<CDCServiceProxy> CDCServiceImpl::GetCDCServiceProxy(RemoteTabletServer* ts) {
1525
159
  auto hostport = HostPortFromPB(DesiredHostPort(
1526
159
      ts->public_rpc_hostports(), ts->private_rpc_hostports(), ts->cloud_info(),
1527
159
      client()->cloud_info()));
1528
159
  DCHECK(!hostport.host().empty());
1529
1530
159
  {
1531
159
    SharedLock<decltype(mutex_)> l(mutex_);
1532
159
    auto it = cdc_service_map_.find(hostport);
1533
159
    if (it != cdc_service_map_.end()) {
1534
79
      return it->second;
1535
79
    }
1536
80
  }
1537
1538
80
  auto cdc_service = std::make_shared<CDCServiceProxy>(&client()->proxy_cache(), hostport);
1539
1540
80
  {
1541
80
    std::lock_guard<decltype(mutex_)> l(mutex_);
1542
80
    auto it = cdc_service_map_.find(hostport);
1543
80
    if (it != cdc_service_map_.end()) {
1544
0
      return it->second;
1545
0
    }
1546
80
    cdc_service_map_.emplace(hostport, cdc_service);
1547
80
  }
1548
80
  return cdc_service;
1549
80
}
1550
1551
void CDCServiceImpl::TabletLeaderGetChanges(const GetChangesRequestPB* req,
1552
                                            GetChangesResponsePB* resp,
1553
                                            std::shared_ptr<RpcContext> context,
1554
0
                                            std::shared_ptr<tablet::TabletPeer> peer) {
1555
0
  auto rpc_handle = rpcs_.Prepare();
1556
0
  RPC_CHECK_AND_RETURN_ERROR(rpc_handle != rpcs_.InvalidHandle(),
1557
0
      STATUS(Aborted,
1558
0
          Format("Could not create valid handle for GetChangesCDCRpc: tablet=$0, peer=$1",
1559
0
                 req->tablet_id(),
1560
0
                 peer->permanent_uuid())),
1561
0
      resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, *context.get());
1562
1563
  // Increment Proxy Metric.
1564
0
  server_metrics_->cdc_rpc_proxy_count->Increment();
1565
1566
  // Forward this Request Info to the proper TabletServer.
1567
0
  GetChangesRequestPB new_req;
1568
0
  new_req.CopyFrom(*req);
1569
0
  new_req.set_serve_as_proxy(false);
1570
0
  CoarseTimePoint deadline = GetDeadline(*context.get(), client());
1571
1572
0
  *rpc_handle = CreateGetChangesCDCRpc(
1573
0
      deadline,
1574
0
      nullptr, /* RemoteTablet: will get this from 'new_req' */
1575
0
      client(),
1576
0
      &new_req,
1577
0
      [=] (Status status, GetChangesResponsePB&& new_resp) {
1578
0
        auto retained = rpcs_.Unregister(rpc_handle);
1579
0
        *resp = std::move(new_resp);
1580
0
        RPC_STATUS_RETURN_ERROR(status, resp->mutable_error(), resp->error().code(),
1581
0
                                *context.get());
1582
0
        context->RespondSuccess();
1583
0
      });
1584
0
  (**rpc_handle).SendRpc();
1585
0
}
1586
1587
void CDCServiceImpl::TabletLeaderGetCheckpoint(const GetCheckpointRequestPB* req,
1588
                                               GetCheckpointResponsePB* resp,
1589
                                               RpcContext* context,
1590
0
                                               const std::shared_ptr<tablet::TabletPeer>& peer) {
1591
0
  auto result = GetLeaderTServer(req->tablet_id());
1592
0
  RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(),
1593
0
                             CDCErrorPB::TABLET_NOT_FOUND, *context);
1594
1595
0
  auto ts_leader = *result;
1596
  // Check that tablet leader identified by master is not current tablet peer.
1597
  // This can happen during tablet rebalance if master and tserver have different views of
1598
  // leader. We need to avoid self-looping in this case.
1599
0
  if (peer) {
1600
0
    RPC_CHECK_NE_AND_RETURN_ERROR(ts_leader->permanent_uuid(), peer->permanent_uuid(),
1601
0
                                  STATUS(IllegalState,
1602
0
                                         Format("Tablet leader changed: leader=$0, peer=$1",
1603
0
                                                ts_leader->permanent_uuid(),
1604
0
                                                peer->permanent_uuid())),
1605
0
                                  resp->mutable_error(), CDCErrorPB::NOT_LEADER, *context);
1606
0
  }
1607
1608
0
  auto cdc_proxy = GetCDCServiceProxy(ts_leader);
1609
0
  rpc::RpcController rpc;
1610
0
  rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms));
1611
  // TODO(NIC): Change to GetCheckpointAsync like CDCPoller::DoPoll.
1612
0
  auto status = cdc_proxy->GetCheckpoint(*req, resp, &rpc);
1613
0
  RPC_STATUS_RETURN_ERROR(status, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, *context);
1614
0
  context->RespondSuccess();
1615
0
}
1616
1617
void CDCServiceImpl::GetCheckpoint(const GetCheckpointRequestPB* req,
1618
                                   GetCheckpointResponsePB* resp,
1619
7
                                   RpcContext context) {
1620
7
  if (!CheckOnline(req, resp, &context)) {
1621
0
    return;
1622
0
  }
1623
1624
7
  RPC_CHECK_AND_RETURN_ERROR(req->has_tablet_id(),
1625
7
                             STATUS(InvalidArgument, "Tablet ID is required to get CDC checkpoint"),
1626
7
                             resp->mutable_error(),
1627
7
                             CDCErrorPB::INVALID_REQUEST,
1628
7
                             context);
1629
7
  RPC_CHECK_AND_RETURN_ERROR(req->has_stream_id(),
1630
7
                             STATUS(InvalidArgument, "Stream ID is required to get CDC checkpoint"),
1631
7
                             resp->mutable_error(),
1632
7
                             CDCErrorPB::INVALID_REQUEST,
1633
7
                             context);
1634
1635
7
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1636
7
  Status s = tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer);
1637
1638
7
  if (s.IsNotFound() || !IsTabletPeerLeader(tablet_peer)) {
1639
    // Forward GetChanges() to tablet leader. This happens often in Kubernetes setups.
1640
0
    TabletLeaderGetCheckpoint(req, resp, &context, tablet_peer);
1641
0
    return;
1642
0
  }
1643
1644
  // Check that requested tablet_id is part of the CDC stream.
1645
7
  ProducerTabletInfo producer_tablet = {"" /* UUID */, req->stream_id(), req->tablet_id()};
1646
7
  s = CheckTabletValidForStream(producer_tablet);
1647
7
  RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);
1648
1649
7
  auto session = client()->NewSession();
1650
7
  CoarseTimePoint deadline = GetDeadline(context, client());
1651
1652
7
  session->SetDeadline(deadline);
1653
1654
7
  auto result = GetLastCheckpoint(producer_tablet, session);
1655
7
  RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(),
1656
7
                             CDCErrorPB::INTERNAL_ERROR, context);
1657
1658
7
  result->ToPB(resp->mutable_checkpoint()->mutable_op_id());
1659
7
  context.RespondSuccess();
1660
7
}
1661
1662
void CDCServiceImpl::UpdateCdcReplicatedIndex(const UpdateCdcReplicatedIndexRequestPB* req,
1663
                                              UpdateCdcReplicatedIndexResponsePB* resp,
1664
159
                                              rpc::RpcContext context) {
1665
159
  if (!CheckOnline(req, resp, &context)) {
1666
0
    return;
1667
0
  }
1668
1669
159
  RPC_CHECK_AND_RETURN_ERROR(req->has_tablet_id(),
1670
159
                             STATUS(InvalidArgument,
1671
159
                                    "Tablet ID is required to set the log replicated index"),
1672
159
                             resp->mutable_error(),
1673
159
                             CDCErrorPB::INVALID_REQUEST,
1674
159
                             context);
1675
1676
159
  RPC_CHECK_AND_RETURN_ERROR(req->has_replicated_index(),
1677
159
                             STATUS(InvalidArgument,
1678
159
                                    "Replicated index is required to set the log replicated index"),
1679
159
                             resp->mutable_error(),
1680
159
                             CDCErrorPB::INVALID_REQUEST,
1681
159
                             context);
1682
1683
159
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1684
159
  RPC_STATUS_RETURN_ERROR(tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer),
1685
159
                          resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1686
1687
159
  RPC_CHECK_AND_RETURN_ERROR(tablet_peer->log_available(),
1688
159
                             STATUS(TryAgain, "Tablet peer is not ready to set its log cdc index"),
1689
159
                             resp->mutable_error(),
1690
159
                             CDCErrorPB::INTERNAL_ERROR,
1691
159
                             context);
1692
1693
159
  RPC_STATUS_RETURN_ERROR(tablet_peer->set_cdc_min_replicated_index(req->replicated_index()),
1694
159
                          resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1695
1696
159
  auto status = DoUpdateCDCConsumerOpId(tablet_peer,
1697
159
                                        OpId(req->replicated_term(), req->replicated_index()),
1698
159
                                        req->tablet_id());
1699
1700
159
  RPC_STATUS_RETURN_ERROR(status, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1701
1702
159
  {
1703
159
    RequestScope request_scope;
1704
159
    auto txn_participant = tablet_peer->tablet()->transaction_participant();
1705
159
    if (txn_participant) {
1706
0
      VLOG(1) << "Registering and unregistering request so that transactions are "
1707
0
                   "cleaned up on followers.";
1708
159
      request_scope = RequestScope(txn_participant);
1709
159
    }
1710
159
  }
1711
1712
159
  context.RespondSuccess();
1713
159
}
1714
1715
0
Result<OpId> CDCServiceImpl::TabletLeaderLatestEntryOpId(const TabletId& tablet_id) {
1716
0
    auto ts_leader = VERIFY_RESULT(GetLeaderTServer(tablet_id));
1717
1718
0
    auto cdc_proxy = GetCDCServiceProxy(ts_leader);
1719
0
    rpc::RpcController rpc;
1720
0
    rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms));
1721
0
    GetLatestEntryOpIdRequestPB req;
1722
0
    GetLatestEntryOpIdResponsePB resp;
1723
0
    req.set_tablet_id(tablet_id);
1724
0
    auto status = cdc_proxy->GetLatestEntryOpId(req, &resp, &rpc);
1725
0
    if (!status.ok()) {
1726
      // If we failed to get the latest entry op id, we try other tservers. The leader is guaranteed
1727
      // to have the most up-to-date information, but for our purposes, it's ok to be slightly
1728
      // behind.
1729
0
      std::vector<client::internal::RemoteTabletServer *> servers;
1730
0
      auto s = GetTServers(tablet_id, &servers);
1731
0
      for (const auto& server : servers) {
1732
        // We don't want to try the leader again.
1733
0
        if (server->permanent_uuid() == ts_leader->permanent_uuid()) {
1734
0
          continue;
1735
0
        }
1736
0
        auto follower_cdc_proxy = GetCDCServiceProxy(server);
1737
0
        status = follower_cdc_proxy->GetLatestEntryOpId(req, &resp, &rpc);
1738
0
        if (status.ok()) {
1739
0
          return OpId::FromPB(resp.op_id());
1740
0
        }
1741
0
      }
1742
0
      DCHECK(!status.ok());
1743
0
      return status;
1744
0
    }
1745
0
    return OpId::FromPB(resp.op_id());
1746
0
  }
1747
1748
void CDCServiceImpl::GetLatestEntryOpId(const GetLatestEntryOpIdRequestPB* req,
1749
                                        GetLatestEntryOpIdResponsePB* resp,
1750
0
                                        rpc::RpcContext context) {
1751
0
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1752
0
  Status s = tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer);
1753
0
  RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1754
1755
0
  if (!tablet_peer->log_available()) {
1756
0
    const string err_message = strings::Substitute("Unable to get the latest entry op id from "
1757
0
        "peer $0 and tablet $1 because its log object hasn't been initialized",
1758
0
        tablet_peer->permanent_uuid(), tablet_peer->tablet_id());
1759
0
    LOG(WARNING) << err_message;
1760
0
    SetupErrorAndRespond(resp->mutable_error(),
1761
0
                         STATUS(ServiceUnavailable, err_message),
1762
0
                         CDCErrorPB::INTERNAL_ERROR,
1763
0
                         &context);
1764
0
    return;
1765
0
  }
1766
0
  OpId op_id = tablet_peer->log()->GetLatestEntryOpId();
1767
0
  op_id.ToPB(resp->mutable_op_id());
1768
0
  context.RespondSuccess();
1769
0
}
1770
1771
void CDCServiceImpl::GetCDCDBStreamInfo(const GetCDCDBStreamInfoRequestPB* req,
1772
                                        GetCDCDBStreamInfoResponsePB* resp,
1773
0
                                        rpc::RpcContext context) {
1774
0
  if (!CheckOnline(req, resp, &context)) {
1775
0
    return;
1776
0
  }
1777
1778
0
  LOG(INFO) << "Received GetCDCDBStreamInfo request " << req->ShortDebugString();
1779
1780
0
  RPC_CHECK_AND_RETURN_ERROR(
1781
0
    req->has_db_stream_id(),
1782
0
    STATUS(InvalidArgument, "Database Stream ID is required to get DB stream information"),
1783
0
    resp->mutable_error(),
1784
0
    CDCErrorPB::INVALID_REQUEST,
1785
0
    context);
1786
1787
0
  std::vector<pair<std::string, std::string>> db_stream_info;
1788
0
  Status s = client()->GetCDCDBStreamInfo(req->db_stream_id(), &db_stream_info);
1789
0
  RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1790
1791
0
  for (const auto& tabinfo : db_stream_info) {
1792
0
    auto* const table_info = resp->add_table_info();
1793
0
    table_info->set_stream_id(tabinfo.first);
1794
0
    table_info->set_table_id(tabinfo.second);
1795
0
  }
1796
1797
0
  context.RespondSuccess();
1798
0
}
1799
1800
154
void CDCServiceImpl::RollbackPartialCreate(const CDCCreationState& creation_state) {
1801
154
  if (!creation_state.created_cdc_streams.empty()) {
1802
0
    Status s = client()->DeleteCDCStream(creation_state.created_cdc_streams);
1803
0
    if (!s.ok()) {
1804
0
      LOG(WARNING) << "Unable to delete streams " << JoinCSVLine(creation_state.created_cdc_streams)
1805
0
                   << ": " << s;
1806
0
    }
1807
0
  }
1808
1809
  // For all tablets we modified state for, reverse those changes if the operation failed
1810
  // halfway through.
1811
154
  if (creation_state.producer_entries_modified.empty()) {
1812
154
    return;
1813
154
  }
1814
0
  std::lock_guard<decltype(mutex_)> l(mutex_);
1815
0
  impl_->EraseTablets(creation_state.producer_entries_modified, false);
1816
0
  for (const auto& entry : creation_state.producer_entries_modified) {
1817
0
    WARN_NOT_OK(
1818
0
        UpdatePeersCdcMinReplicatedIndex(entry.tablet_id, numeric_limits<uint64_t>::max()),
1819
0
        "Unable to update tablet " + entry.tablet_id);
1820
0
  }
1821
1822
0
}
1823
1824
void CDCServiceImpl::BootstrapProducer(const BootstrapProducerRequestPB* req,
1825
                                       BootstrapProducerResponsePB* resp,
1826
0
                                       rpc::RpcContext context) {
1827
0
  LOG(INFO) << "Received BootstrapProducer request " << req->ShortDebugString();
1828
0
  RPC_CHECK_AND_RETURN_ERROR(req->table_ids().size() > 0,
1829
0
                             STATUS(InvalidArgument, "Table ID is required to create CDC stream"),
1830
0
                             resp->mutable_error(),
1831
0
                             CDCErrorPB::INVALID_REQUEST,
1832
0
                             context);
1833
1834
0
  std::shared_ptr<client::TableHandle> cdc_state_table;
1835
1836
0
  std::vector<client::YBOperationPtr> ops;
1837
0
  auto session = client()->NewSession();
1838
1839
  // Used to delete streams in case of failure.
1840
0
  CDCCreationState creation_state;
1841
0
  auto scope_exit = ScopeExit([this, &creation_state] {
1842
0
    RollbackPartialCreate(creation_state);
1843
0
  });
1844
1845
0
  std::vector<CDCStreamId> bootstrap_ids;
1846
1847
0
  for (const auto& table_id : req->table_ids()) {
1848
0
    std::shared_ptr<client::YBTable> table;
1849
0
    Status s = client()->OpenTable(table_id, &table);
1850
0
    RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::TABLE_NOT_FOUND, context);
1851
1852
    // Generate a bootstrap id by calling CreateCDCStream, and also setup the stream in the master.
1853
    // If the consumer's master sends a CreateCDCStream with a bootstrap id, the producer's master
1854
    // will verify that the stream id exists and return success if it does since everything else
1855
    // has already been done by this call.
1856
0
    std::unordered_map<std::string, std::string> options;
1857
0
    options.reserve(4);
1858
0
    options.emplace(cdc::kRecordType, CDCRecordType_Name(cdc::CDCRecordType::CHANGE));
1859
0
    options.emplace(cdc::kRecordFormat, CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL));
1860
0
    options.emplace(cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER));
1861
0
    options.emplace(cdc::kCheckpointType, CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT));
1862
1863
    // Mark this stream as being bootstrapped, to help in finding dangling streams.
1864
0
    auto result = client()->CreateCDCStream(table_id, options, false);
1865
0
    RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(),
1866
0
                               CDCErrorPB::INTERNAL_ERROR, context);
1867
0
    const std::string& bootstrap_id = *result;
1868
0
    creation_state.created_cdc_streams.push_back(bootstrap_id);
1869
1870
0
    if (cdc_state_table == nullptr) {
1871
0
      auto res = GetCdcStateTable();
1872
0
      RPC_CHECK_AND_RETURN_ERROR(res.ok(), res.status(), resp->mutable_error(),
1873
0
          CDCErrorPB::INTERNAL_ERROR, context);
1874
0
      cdc_state_table = *res;
1875
0
    }
1876
1877
0
    google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
1878
0
    s = client()->GetTabletsFromTableId(table_id, 0, &tablets);
1879
0
    RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::TABLE_NOT_FOUND, context);
1880
1881
    // For each tablet, create a row in cdc_state table containing the generated bootstrap id, and
1882
    // the latest op id in the logs.
1883
0
    for (const auto &tablet : tablets) {
1884
0
      std::shared_ptr<tablet::TabletPeer> tablet_peer;
1885
0
      OpId op_id;
1886
1887
0
      s = tablet_manager_->GetTabletPeer(tablet.tablet_id(), &tablet_peer);
1888
0
      if (!s.ok()) {
1889
0
        auto res = TabletLeaderLatestEntryOpId(tablet.tablet_id());
1890
0
        RPC_CHECK_AND_RETURN_ERROR(res.ok(), res.status(), resp->mutable_error(),
1891
0
            CDCErrorPB::INTERNAL_ERROR, context);
1892
0
        op_id = *res;
1893
0
      } else {
1894
0
        if (!tablet_peer->log_available()) {
1895
0
          const string err_message = strings::Substitute("Unable to get the latest entry op id "
1896
0
              "from peer $0 and tablet $1 because its log object hasn't been initialized",
1897
0
              tablet_peer->permanent_uuid(), tablet_peer->tablet_id());
1898
0
          LOG(WARNING) << err_message;
1899
0
          SetupErrorAndRespond(resp->mutable_error(),
1900
0
                               STATUS(ServiceUnavailable, err_message),
1901
0
                               CDCErrorPB::INTERNAL_ERROR,
1902
0
                               &context);
1903
0
          return;
1904
0
        }
1905
0
        op_id = tablet_peer->log()->GetLatestEntryOpId();
1906
0
        RPC_STATUS_RETURN_ERROR(UpdatePeersCdcMinReplicatedIndex(tablet.tablet_id(), op_id.index),
1907
0
                                resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR,
1908
0
                                context);
1909
0
      }
1910
1911
0
      const auto op = cdc_state_table->NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
1912
0
      auto *const write_req = op->mutable_request();
1913
1914
0
      QLAddStringHashValue(write_req, tablet.tablet_id());
1915
0
      QLAddStringRangeValue(write_req, bootstrap_id);
1916
0
      cdc_state_table->AddStringColumnValue(write_req, master::kCdcCheckpoint, op_id.ToString());
1917
0
      ops.push_back(std::move(op));
1918
0
      impl_->AddTabletCheckpoint(
1919
0
          op_id, bootstrap_id, tablet.tablet_id(), &creation_state.producer_entries_modified);
1920
0
    }
1921
0
    bootstrap_ids.push_back(std::move(bootstrap_id));
1922
0
  }
1923
0
  CoarseTimePoint deadline = GetDeadline(context, client());
1924
1925
0
  session->SetDeadline(deadline);
1926
0
  Status s = RefreshCacheOnFail(session->ApplyAndFlush(ops));
1927
0
  RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1928
1929
0
  for (const auto& bootstrap_id : bootstrap_ids) {
1930
0
    resp->add_cdc_bootstrap_ids(bootstrap_id);
1931
0
  }
1932
  // Clear these vectors so no changes are reversed by scope_exit since we succeeded.
1933
0
  creation_state.Clear();
1934
0
  context.RespondSuccess();
1935
0
}
1936
1937
146
void CDCServiceImpl::Shutdown() {
1938
146
  if (impl_->async_client_init_) {
1939
73
    impl_->async_client_init_->Shutdown();
1940
73
    rpcs_.Shutdown();
1941
73
    {
1942
73
      std::lock_guard<decltype(mutex_)> l(mutex_);
1943
73
      cdc_service_stopped_ = true;
1944
73
      cdc_state_table_ = nullptr;
1945
73
    }
1946
73
    if (update_peers_and_metrics_thread_) {
1947
73
      update_peers_and_metrics_thread_->join();
1948
73
    }
1949
73
    impl_->async_client_init_ = boost::none;
1950
73
  }
1951
146
}
1952
1953
Result<OpId> CDCServiceImpl::GetLastCheckpoint(
1954
    const ProducerTabletInfo& producer_tablet,
1955
308
    const client::YBSessionPtr& session) {
1956
308
  auto result = impl_->GetLastCheckpoint(producer_tablet);
1957
308
  if (result) {
1958
2
    return *result;
1959
2
  }
1960
1961
306
  auto cdc_state_table_result = GetCdcStateTable();
1962
306
  RETURN_NOT_OK(cdc_state_table_result);
1963
1964
306
  const auto op = (*cdc_state_table_result)->NewReadOp();
1965
306
  auto* const req = op->mutable_request();
1966
306
  DCHECK(!producer_tablet.stream_id.empty() && !producer_tablet.tablet_id.empty());
1967
306
  QLAddStringHashValue(req, producer_tablet.tablet_id);
1968
1969
306
  auto cond = req->mutable_where_expr()->mutable_condition();
1970
306
  cond->set_op(QLOperator::QL_OP_AND);
1971
306
  QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx,
1972
306
      QL_OP_EQUAL, producer_tablet.stream_id);
1973
306
  req->mutable_column_refs()->add_ids(Schema::first_column_id() + master::kCdcTabletIdIdx);
1974
306
  req->mutable_column_refs()->add_ids(Schema::first_column_id() + master::kCdcStreamIdIdx);
1975
306
  (*cdc_state_table_result)->AddColumns({master::kCdcCheckpoint}, req);
1976
1977
306
  RETURN_NOT_OK(RefreshCacheOnFail(session->ReadSync(op)));
1978
306
  auto row_block = ql::RowsResult(op.get()).GetRowBlock();
1979
306
  if (row_block->row_count() == 0) {
1980
0
    return OpId(0, 0);
1981
0
  }
1982
1983
306
  DCHECK_EQ(row_block->row_count(), 1);
1984
306
  DCHECK_EQ(row_block->row(0).column(0).type(), InternalType::kStringValue);
1985
1986
306
  return OpId::FromString(row_block->row(0).column(0).string_value());
1987
306
}
1988
1989
void CDCServiceImpl::UpdateCDCTabletMetrics(
1990
    const GetChangesResponsePB* resp,
1991
    const ProducerTabletInfo& producer_tablet,
1992
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
1993
    const OpId& op_id,
1994
319
    int64_t last_readable_index) {
1995
319
  auto tablet_metric = GetCDCTabletMetrics(producer_tablet, tablet_peer);
1996
319
  if (!tablet_metric) {
1997
0
    return;
1998
0
  }
1999
2000
319
  auto lid = resp->checkpoint().op_id();
2001
319
  tablet_metric->last_read_opid_term->set_value(lid.term());
2002
319
  tablet_metric->last_read_opid_index->set_value(lid.index());
2003
319
  tablet_metric->last_readable_opid_index->set_value(last_readable_index);
2004
319
  tablet_metric->last_checkpoint_opid_index->set_value(op_id.index);
2005
319
  if (resp->records_size() > 0) {
2006
0
    auto& last_record = resp->records(resp->records_size() - 1);
2007
0
    tablet_metric->last_read_hybridtime->set_value(last_record.time());
2008
0
    auto last_record_micros = HybridTime(last_record.time()).GetPhysicalValueMicros();
2009
0
    tablet_metric->last_read_physicaltime->set_value(last_record_micros);
2010
    // Only count bytes responded if we are including a response payload.
2011
0
    tablet_metric->rpc_payload_bytes_responded->Increment(resp->ByteSize());
2012
    // Get the physical time of the last committed record on producer.
2013
0
    auto last_replicated_micros = GetLastReplicatedTime(tablet_peer);
2014
0
    tablet_metric->async_replication_sent_lag_micros->set_value(
2015
0
        last_replicated_micros - last_record_micros);
2016
0
    auto& first_record = resp->records(0);
2017
0
    auto first_record_micros = HybridTime(first_record.time()).GetPhysicalValueMicros();
2018
0
    tablet_metric->last_checkpoint_physicaltime->set_value(first_record_micros);
2019
0
    tablet_metric->async_replication_committed_lag_micros->set_value(
2020
0
        last_replicated_micros - first_record_micros);
2021
319
  } else {
2022
319
    tablet_metric->rpc_heartbeats_responded->Increment();
2023
    // If there are no more entries to be read, that means we're caught up.
2024
319
    auto last_replicated_micros = GetLastReplicatedTime(tablet_peer);
2025
319
    tablet_metric->last_read_physicaltime->set_value(last_replicated_micros);
2026
319
    tablet_metric->last_checkpoint_physicaltime->set_value(last_replicated_micros);
2027
319
    tablet_metric->async_replication_sent_lag_micros->set_value(0);
2028
319
    tablet_metric->async_replication_committed_lag_micros->set_value(0);
2029
319
  }
2030
319
}
2031
2032
Status CDCServiceImpl::UpdateCheckpoint(const ProducerTabletInfo& producer_tablet,
2033
                                        const OpId& sent_op_id,
2034
                                        const OpId& commit_op_id,
2035
                                        const client::YBSessionPtr& session,
2036
314
                                        uint64_t last_record_hybrid_time) {
2037
314
  bool update_cdc_state = impl_->UpdateCheckpoint(producer_tablet, sent_op_id, commit_op_id);
2038
2039
314
  if (update_cdc_state) {
2040
0
    auto cdc_state = VERIFY_RESULT(GetCdcStateTable());
2041
0
    const auto op = cdc_state->NewUpdateOp();
2042
0
    auto* const req = op->mutable_request();
2043
0
    DCHECK(!producer_tablet.stream_id.empty() && !producer_tablet.tablet_id.empty());
2044
0
    QLAddStringHashValue(req, producer_tablet.tablet_id);
2045
0
    QLAddStringRangeValue(req, producer_tablet.stream_id);
2046
2047
0
    cdc_state->AddStringColumnValue(req, master::kCdcCheckpoint, commit_op_id.ToString());
2048
    // If we have a last record hybrid time, use that for physical time. If not, it means we're
2049
    // caught up, so the current time.
2050
0
    uint64_t last_replication_time_micros = last_record_hybrid_time != 0 ?
2051
0
        HybridTime(last_record_hybrid_time).GetPhysicalValueMicros() : GetCurrentTimeMicros();
2052
0
    cdc_state->AddTimestampColumnValue(req, master::kCdcLastReplicationTime,
2053
0
                                       last_replication_time_micros);
2054
    // Only perform the update if we have a row in cdc_state to prevent a race condition where
2055
    // a stream is deleted and then this logic inserts entries in cdc_state from that deleted
2056
    // stream.
2057
0
    auto* condition = req->mutable_if_expr()->mutable_condition();
2058
0
    condition->set_op(QL_OP_EXISTS);
2059
0
    RETURN_NOT_OK(RefreshCacheOnFail(session->ApplyAndFlush(op)));
2060
0
  }
2061
2062
314
  return Status::OK();
2063
314
}
2064
2065
std::shared_ptr<CDCTabletMetrics> CDCServiceImpl::GetCDCTabletMetrics(
2066
    const ProducerTabletInfo& producer,
2067
478
    std::shared_ptr<tablet::TabletPeer> tablet_peer) {
2068
  // 'nullptr' not recommended: using for tests.
2069
478
  if (tablet_peer == nullptr) {
2070
0
    auto status = tablet_manager_->GetTabletPeer(producer.tablet_id, &tablet_peer);
2071
0
    if (!status.ok() || tablet_peer == nullptr) return nullptr;
2072
478
  }
2073
2074
478
  auto tablet = tablet_peer->shared_tablet();
2075
478
  if (tablet == nullptr) return nullptr;
2076
2077
478
  std::string key = "CDCMetrics::" + producer.stream_id;
2078
478
  std::shared_ptr<void> metrics_raw = tablet->GetAdditionalMetadata(key);
2079
478
  if (metrics_raw == nullptr) {
2080
    //  Create a new METRIC_ENTITY_cdc here.
2081
228
    MetricEntity::AttributeMap attrs;
2082
228
    {
2083
228
      SharedLock<rw_spinlock> l(mutex_);
2084
228
      auto raft_group_metadata = tablet_peer->tablet()->metadata();
2085
228
      attrs["table_id"] = raft_group_metadata->table_id();
2086
228
      attrs["namespace_name"] = raft_group_metadata->namespace_name();
2087
228
      attrs["table_name"] = raft_group_metadata->table_name();
2088
228
      attrs["stream_id"] = producer.stream_id;
2089
228
    }
2090
228
    auto entity = METRIC_ENTITY_cdc.Instantiate(metric_registry_, producer.MetricsString(), attrs);
2091
228
    metrics_raw = std::make_shared<CDCTabletMetrics>(entity);
2092
    // Adding the new metric to the tablet so it maintains the same lifetime scope.
2093
228
    tablet->AddAdditionalMetadata(key, metrics_raw);
2094
228
  }
2095
2096
478
  return std::static_pointer_cast<CDCTabletMetrics>(metrics_raw);
2097
478
}
2098
2099
323
Result<std::shared_ptr<StreamMetadata>> CDCServiceImpl::GetStream(const std::string& stream_id) {
2100
323
  auto stream = GetStreamMetadataFromCache(stream_id);
2101
323
  if (stream != nullptr) {
2102
322
    return stream;
2103
322
  }
2104
2105
  // Look up stream in sys catalog.
2106
1
  std::vector<ObjectId> object_ids;
2107
1
  NamespaceId ns_id;
2108
1
  std::unordered_map<std::string, std::string> options;
2109
1
  RETURN_NOT_OK(client()->GetCDCStream(stream_id, &ns_id, &object_ids, &options));
2110
2111
0
  auto stream_metadata = std::make_shared<StreamMetadata>();
2112
0
  for (const auto& option : options) {
2113
0
    if (option.first == kRecordType) {
2114
0
      SCHECK(CDCRecordType_Parse(option.second, &stream_metadata->record_type),
2115
0
             IllegalState, "CDC record type parsing error");
2116
0
    } else if (option.first == kRecordFormat) {
2117
0
      SCHECK(CDCRecordFormat_Parse(option.second, &stream_metadata->record_format),
2118
0
             IllegalState, "CDC record format parsing error");
2119
0
    } else if (option.first == kSourceType) {
2120
0
      SCHECK(CDCRequestSource_Parse(option.second, &stream_metadata->source_type), IllegalState,
2121
0
             "CDC record format parsing error");
2122
0
    } else if (option.first == kCheckpointType) {
2123
0
      SCHECK(CDCCheckpointType_Parse(option.second, &stream_metadata->checkpoint_type),
2124
0
             IllegalState, "CDC record format parsing error");
2125
0
    } else if (option.first == cdc::kIdType && option.second == cdc::kNamespaceId) {
2126
0
      stream_metadata->ns_id = ns_id;
2127
0
      stream_metadata->table_ids.insert(
2128
0
          stream_metadata->table_ids.end(), object_ids.begin(), object_ids.end());
2129
0
    } else if (option.first == cdc::kIdType && option.second == cdc::kTableId) {
2130
0
      stream_metadata->table_ids.insert(
2131
0
          stream_metadata->table_ids.end(), object_ids.begin(), object_ids.end());
2132
0
    } else {
2133
0
      LOG(WARNING) << "Unsupported CDC option: " << option.first;
2134
0
    }
2135
0
  }
2136
2137
0
  AddStreamMetadataToCache(stream_id, stream_metadata);
2138
0
  return stream_metadata;
2139
0
}
2140
2141
void CDCServiceImpl::AddStreamMetadataToCache(const std::string& stream_id,
2142
153
                                              const std::shared_ptr<StreamMetadata>& metadata) {
2143
153
  std::lock_guard<decltype(mutex_)> l(mutex_);
2144
153
  stream_metadata_.emplace(stream_id, metadata);
2145
153
}
2146
2147
std::shared_ptr<StreamMetadata> CDCServiceImpl::GetStreamMetadataFromCache(
2148
323
    const std::string& stream_id) {
2149
323
  SharedLock<decltype(mutex_)> l(mutex_);
2150
323
  auto it = stream_metadata_.find(stream_id);
2151
323
  if (it != stream_metadata_.end()) {
2152
322
    return it->second;
2153
1
  } else {
2154
1
    return nullptr;
2155
1
  }
2156
323
}
2157
2158
327
Status CDCServiceImpl::CheckTabletValidForStream(const ProducerTabletInfo& info) {
2159
327
  auto result = VERIFY_RESULT(impl_->PreCheckTabletValidForStream(info));
2160
327
  if (result) {
2161
326
    return Status::OK();
2162
326
  }
2163
  // If we don't recognize the stream_id, populate our full tablet list for this stream.
2164
1
  auto tablets = VERIFY_RESULT(GetTablets(info.stream_id));
2165
0
  return impl_->CheckTabletValidForStream(info, tablets);
2166
1
}
2167
2168
}  // namespace cdc
2169
}  // namespace yb