YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/master/catalog_manager_ent.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include <memory>
14
#include <regex>
15
#include <set>
16
#include <unordered_set>
17
#include <google/protobuf/util/message_differencer.h>
18
19
#include "yb/common/common_fwd.h"
20
#include "yb/master/catalog_entity_info.h"
21
#include "yb/master/catalog_manager-internal.h"
22
#include "yb/master/cdc_consumer_registry_service.h"
23
#include "yb/master/cdc_rpc_tasks.h"
24
#include "yb/master/cluster_balance.h"
25
#include "yb/master/master.h"
26
#include "yb/master/master_backup.pb.h"
27
#include "yb/master/master_error.h"
28
29
#include "yb/cdc/cdc_consumer.pb.h"
30
#include "yb/cdc/cdc_service.h"
31
32
#include "yb/client/client-internal.h"
33
#include "yb/client/schema.h"
34
#include "yb/client/session.h"
35
#include "yb/client/table.h"
36
#include "yb/client/table_alterer.h"
37
#include "yb/client/table_handle.h"
38
#include "yb/client/table_info.h"
39
#include "yb/client/yb_op.h"
40
#include "yb/client/yb_table_name.h"
41
42
#include "yb/common/common.pb.h"
43
#include "yb/common/entity_ids.h"
44
#include "yb/common/ql_name.h"
45
#include "yb/common/ql_type.h"
46
#include "yb/common/schema.h"
47
#include "yb/common/wire_protocol.h"
48
#include "yb/consensus/consensus.h"
49
50
#include "yb/docdb/consensus_frontier.h"
51
#include "yb/docdb/doc_write_batch.h"
52
53
#include "yb/gutil/bind.h"
54
#include "yb/gutil/casts.h"
55
#include "yb/gutil/strings/join.h"
56
#include "yb/gutil/strings/substitute.h"
57
#include "yb/master/master_client.pb.h"
58
#include "yb/master/master_ddl.pb.h"
59
#include "yb/master/master_defaults.h"
60
#include "yb/master/master_heartbeat.pb.h"
61
#include "yb/master/master_replication.pb.h"
62
#include "yb/master/master_util.h"
63
#include "yb/master/sys_catalog.h"
64
#include "yb/master/sys_catalog-internal.h"
65
#include "yb/master/async_snapshot_tasks.h"
66
#include "yb/master/async_rpc_tasks.h"
67
#include "yb/master/encryption_manager.h"
68
#include "yb/master/restore_sys_catalog_state.h"
69
#include "yb/master/scoped_leader_shared_lock.h"
70
#include "yb/master/scoped_leader_shared_lock-internal.h"
71
72
#include "yb/rpc/messenger.h"
73
74
#include "yb/tablet/operations/snapshot_operation.h"
75
#include "yb/tablet/tablet_metadata.h"
76
#include "yb/tablet/tablet_snapshots.h"
77
78
#include "yb/tserver/backup.proxy.h"
79
#include "yb/tserver/service_util.h"
80
81
#include "yb/util/cast.h"
82
#include "yb/util/date_time.h"
83
#include "yb/util/flag_tags.h"
84
#include "yb/util/format.h"
85
#include "yb/util/logging.h"
86
#include "yb/util/random_util.h"
87
#include "yb/util/scope_exit.h"
88
#include "yb/util/service_util.h"
89
#include "yb/util/status.h"
90
#include "yb/util/status_format.h"
91
#include "yb/util/status_log.h"
92
#include "yb/util/tostring.h"
93
#include "yb/util/string_util.h"
94
#include "yb/util/trace.h"
95
96
#include "yb/yql/cql/ql/util/statement_result.h"
97
98
using namespace std::literals;
99
using namespace std::placeholders;
100
101
using std::string;
102
using std::unique_ptr;
103
using std::vector;
104
105
using google::protobuf::RepeatedPtrField;
106
using google::protobuf::util::MessageDifferencer;
107
using strings::Substitute;
108
109
DEFINE_int32(cdc_state_table_num_tablets, 0,
110
             "Number of tablets to use when creating the CDC state table. "
111
             "0 to use the same default num tablets as for regular tables.");
112
113
DEFINE_int32(cdc_wal_retention_time_secs, 4 * 3600,
114
             "WAL retention time in seconds to be used for tables for which a CDC stream was "
115
             "created.");
116
DECLARE_int32(master_rpc_timeout_ms);
117
118
DEFINE_bool(enable_transaction_snapshots, true,
119
            "The flag enables usage of transaction aware snapshots.");
120
TAG_FLAG(enable_transaction_snapshots, hidden);
121
TAG_FLAG(enable_transaction_snapshots, advanced);
122
TAG_FLAG(enable_transaction_snapshots, runtime);
123
124
DEFINE_test_flag(bool, disable_cdc_state_insert_on_setup, false,
125
                 "Disable inserting new entries into cdc state as part of the setup flow.");
126
127
DEFINE_bool(allow_consecutive_restore, true,
128
            "Is it allowed to restore to a time before the last restoration was done.");
129
TAG_FLAG(allow_consecutive_restore, runtime);
130
131
namespace yb {
132
133
using rpc::RpcContext;
134
using pb_util::ParseFromSlice;
135
136
namespace master {
137
namespace enterprise {
138
139
namespace {
140
141
456
CHECKED_STATUS CheckStatus(const Status& status, const char* action) {
142
456
  if (status.ok()) {
143
456
    return status;
144
456
  }
145
146
0
  const Status s = status.CloneAndPrepend(std::string("An error occurred while ") + action);
147
0
  LOG(WARNING) << s;
148
0
  return s;
149
456
}
150
151
0
CHECKED_STATUS CheckLeaderStatus(const Status& status, const char* action) {
152
0
  return CheckIfNoLongerLeader(CheckStatus(status, action));
153
0
}
154
155
template<class RespClass>
156
CHECKED_STATUS CheckLeaderStatusAndSetupError(
157
312
    const Status& status, const char* action, RespClass* resp) {
158
312
  return CheckIfNoLongerLeaderAndSetupError(CheckStatus(status, action), resp);
159
312
}
catalog_manager_ent.cc:yb::Status yb::master::enterprise::(anonymous namespace)::CheckLeaderStatusAndSetupError<yb::master::CreateCDCStreamResponsePB>(yb::Status const&, char const*, yb::master::CreateCDCStreamResponsePB*)
Line
Count
Source
157
310
    const Status& status, const char* action, RespClass* resp) {
158
310
  return CheckIfNoLongerLeaderAndSetupError(CheckStatus(status, action), resp);
159
310
}
catalog_manager_ent.cc:yb::Status yb::master::enterprise::(anonymous namespace)::CheckLeaderStatusAndSetupError<yb::master::SetupUniverseReplicationResponsePB>(yb::Status const&, char const*, yb::master::SetupUniverseReplicationResponsePB*)
Line
Count
Source
157
2
    const Status& status, const char* action, RespClass* resp) {
158
2
  return CheckIfNoLongerLeaderAndSetupError(CheckStatus(status, action), resp);
159
2
}
160
161
} // namespace
162
163
////////////////////////////////////////////////////////////
164
// Snapshot Loader
165
////////////////////////////////////////////////////////////
166
167
class SnapshotLoader : public Visitor<PersistentSnapshotInfo> {
168
 public:
169
3.75k
  explicit SnapshotLoader(CatalogManager* catalog_manager) : catalog_manager_(catalog_manager) {}
170
171
8
  CHECKED_STATUS Visit(const SnapshotId& snapshot_id, const SysSnapshotEntryPB& metadata) override {
172
8
    if (TryFullyDecodeTxnSnapshotId(snapshot_id)) {
173
      // Transaction aware snapshots should be already loaded.
174
8
      return Status::OK();
175
8
    }
176
0
    return VisitNonTransactionAwareSnapshot(snapshot_id, metadata);
177
8
  }
178
179
  CHECKED_STATUS VisitNonTransactionAwareSnapshot(
180
0
      const SnapshotId& snapshot_id, const SysSnapshotEntryPB& metadata) {
181
182
    // Setup the snapshot info.
183
0
    auto snapshot_info = make_scoped_refptr<SnapshotInfo>(snapshot_id);
184
0
    auto l = snapshot_info->LockForWrite();
185
0
    l.mutable_data()->pb.CopyFrom(metadata);
186
187
    // Add the snapshot to the IDs map (if the snapshot is not deleted).
188
0
    auto emplace_result = catalog_manager_->non_txn_snapshot_ids_map_.emplace(
189
0
        snapshot_id, std::move(snapshot_info));
190
0
    CHECK(emplace_result.second) << "Snapshot already exists: " << snapshot_id;
191
192
0
    LOG(INFO) << "Loaded metadata for snapshot (id=" << snapshot_id << "): "
193
0
              << emplace_result.first->second->ToString() << ": " << metadata.ShortDebugString();
194
0
    l.Commit();
195
0
    return Status::OK();
196
0
  }
197
198
 private:
199
  CatalogManager *catalog_manager_;
200
201
  DISALLOW_COPY_AND_ASSIGN(SnapshotLoader);
202
};
203
204
205
////////////////////////////////////////////////////////////
206
// CDC Stream Loader
207
////////////////////////////////////////////////////////////
208
209
class CDCStreamLoader : public Visitor<PersistentCDCStreamInfo> {
210
 public:
211
3.75k
  explicit CDCStreamLoader(CatalogManager* catalog_manager) : catalog_manager_(catalog_manager) {}
212
213
  Status Visit(const CDCStreamId& stream_id, const SysCDCStreamEntryPB& metadata)
214
0
      REQUIRES(catalog_manager_->mutex_) {
215
0
    DCHECK(!ContainsKey(catalog_manager_->cdc_stream_map_, stream_id))
216
0
        << "CDC stream already exists: " << stream_id;
217
218
0
    scoped_refptr<NamespaceInfo> ns;
219
0
    scoped_refptr<TableInfo> table;
220
221
0
    if (metadata.has_namespace_id()) {
222
0
      ns = FindPtrOrNull(catalog_manager_->namespace_ids_map_, metadata.namespace_id());
223
224
0
      if (!ns) {
225
0
        LOG(DFATAL) << "Invalid namespace ID " << metadata.namespace_id() << " for stream "
226
0
                   << stream_id;
227
        // TODO (#2059): Potentially signals a race condition that namesapce got deleted
228
        // while stream was being created.
229
        // Log error and continue without loading the stream.
230
0
        return Status::OK();
231
0
      }
232
0
    } else {
233
0
      table = FindPtrOrNull(*catalog_manager_->table_ids_map_, metadata.table_id(0));
234
0
      if (!table) {
235
0
        LOG(ERROR) << "Invalid table ID " << metadata.table_id(0) << " for stream " << stream_id;
236
        // TODO (#2059): Potentially signals a race condition that table got deleted while stream
237
        //  was being created.
238
        // Log error and continue without loading the stream.
239
0
        return Status::OK();
240
0
      }
241
0
    }
242
243
    // Setup the CDC stream info.
244
0
    auto stream = make_scoped_refptr<CDCStreamInfo>(stream_id);
245
0
    auto l = stream->LockForWrite();
246
0
    l.mutable_data()->pb.CopyFrom(metadata);
247
248
    // If the table has been deleted, then mark this stream as DELETING so it can be deleted by the
249
    // catalog manager background thread. Otherwise if this stream is missing an entry
250
    // for state, then mark its state as Active.
251
252
0
    if (((table && table->LockForRead()->is_deleting()) ||
253
0
         (ns && ns->state() == SysNamespaceEntryPB::DELETING)) &&
254
0
        !l.data().is_deleting()) {
255
0
      l.mutable_data()->pb.set_state(SysCDCStreamEntryPB::DELETING);
256
0
    } else if (!l.mutable_data()->pb.has_state()) {
257
0
      l.mutable_data()->pb.set_state(SysCDCStreamEntryPB::ACTIVE);
258
0
    }
259
260
    // Add the CDC stream to the CDC stream map.
261
0
    catalog_manager_->cdc_stream_map_[stream->id()] = stream;
262
0
    if (table) {
263
0
      catalog_manager_->cdc_stream_tables_count_map_[metadata.table_id(0)]++;
264
0
    }
265
266
0
    l.Commit();
267
268
0
    LOG(INFO) << "Loaded metadata for CDC stream " << stream->ToString() << ": "
269
0
              << metadata.ShortDebugString();
270
271
0
    return Status::OK();
272
0
  }
273
274
 private:
275
  CatalogManager *catalog_manager_;
276
277
  DISALLOW_COPY_AND_ASSIGN(CDCStreamLoader);
278
};
279
280
////////////////////////////////////////////////////////////
281
// Universe Replication Loader
282
////////////////////////////////////////////////////////////
283
284
class UniverseReplicationLoader : public Visitor<PersistentUniverseReplicationInfo> {
285
 public:
286
  explicit UniverseReplicationLoader(CatalogManager* catalog_manager)
287
3.75k
      : catalog_manager_(catalog_manager) {}
288
289
  Status Visit(const std::string& producer_id, const SysUniverseReplicationEntryPB& metadata)
290
0
      REQUIRES(catalog_manager_->mutex_) {
291
0
    DCHECK(!ContainsKey(catalog_manager_->universe_replication_map_, producer_id))
292
0
        << "Producer universe already exists: " << producer_id;
293
294
    // Setup the universe replication info.
295
0
    UniverseReplicationInfo* const ri = new UniverseReplicationInfo(producer_id);
296
0
    {
297
0
      auto l = ri->LockForWrite();
298
0
      l.mutable_data()->pb.CopyFrom(metadata);
299
300
0
      if (!l->is_active() && !l->is_deleted_or_failed()) {
301
        // Replication was not fully setup.
302
0
        LOG(WARNING) << "Universe replication in transient state: " << producer_id;
303
304
        // TODO: Should we delete all failed universe replication items?
305
0
      }
306
307
      // Add universe replication info to the universe replication map.
308
0
      catalog_manager_->universe_replication_map_[ri->id()] = ri;
309
310
0
      l.Commit();
311
0
    }
312
313
    // Also keep track of consumer tables.
314
0
    for (const auto& table : metadata.validated_tables()) {
315
0
      CDCStreamId stream_id = FindWithDefault(metadata.table_streams(), table.first, "");
316
0
      if (stream_id.empty()) {
317
0
        LOG(WARNING) << "Unable to find stream id for table: " << table.first;
318
0
        continue;
319
0
      }
320
0
      catalog_manager_->xcluster_consumer_tables_to_stream_map_[table.second].emplace(
321
0
          metadata.producer_id(), stream_id);
322
0
    }
323
324
0
    LOG(INFO) << "Loaded metadata for universe replication " << ri->ToString();
325
0
    VLOG(1) << "Metadata for universe replication " << ri->ToString() << ": "
326
0
            << metadata.ShortDebugString();
327
328
0
    return Status::OK();
329
0
  }
330
331
 private:
332
  CatalogManager *catalog_manager_;
333
334
  DISALLOW_COPY_AND_ASSIGN(UniverseReplicationLoader);
335
};
336
337
////////////////////////////////////////////////////////////
338
// CatalogManager
339
////////////////////////////////////////////////////////////
340
341
92
CatalogManager::~CatalogManager() {
342
92
  if (StartShutdown()) {
343
6
    CompleteShutdown();
344
6
  }
345
92
}
346
347
94
void CatalogManager::CompleteShutdown() {
348
94
  snapshot_coordinator_.Shutdown();
349
  // Call shutdown on base class before exiting derived class destructor
350
  // because BgTasks is part of base & uses this derived class on Shutdown.
351
94
  super::CompleteShutdown();
352
94
}
353
354
3.75k
Status CatalogManager::RunLoaders(int64_t term) {
355
3.75k
  RETURN_NOT_OK(super::RunLoaders(term));
356
357
  // Clear the snapshots.
358
3.75k
  non_txn_snapshot_ids_map_.clear();
359
360
  // Clear CDC stream map.
361
3.75k
  cdc_stream_map_.clear();
362
3.75k
  cdc_stream_tables_count_map_.clear();
363
364
  // Clear universe replication map.
365
3.75k
  universe_replication_map_.clear();
366
3.75k
  xcluster_consumer_tables_to_stream_map_.clear();
367
368
3.75k
  LOG_WITH_FUNC(INFO) << "Loading snapshots into memory.";
369
3.75k
  unique_ptr<SnapshotLoader> snapshot_loader(new SnapshotLoader(this));
370
3.75k
  RETURN_NOT_OK_PREPEND(
371
3.75k
      sys_catalog_->Visit(snapshot_loader.get()),
372
3.75k
      "Failed while visiting snapshots in sys catalog");
373
374
3.75k
  LOG_WITH_FUNC(INFO) << "Loading CDC streams into memory.";
375
3.75k
  auto cdc_stream_loader = std::make_unique<CDCStreamLoader>(this);
376
3.75k
  RETURN_NOT_OK_PREPEND(
377
3.75k
      sys_catalog_->Visit(cdc_stream_loader.get()),
378
3.75k
      "Failed while visiting CDC streams in sys catalog");
379
380
3.75k
  LOG_WITH_FUNC(INFO) << "Loading universe replication info into memory.";
381
3.75k
  auto universe_replication_loader = std::make_unique<UniverseReplicationLoader>(this);
382
3.75k
  RETURN_NOT_OK_PREPEND(
383
3.75k
      sys_catalog_->Visit(universe_replication_loader.get()),
384
3.75k
      "Failed while visiting universe replication info in sys catalog");
385
386
3.75k
  return Status::OK();
387
3.75k
}
388
389
Status CatalogManager::CreateSnapshot(const CreateSnapshotRequestPB* req,
390
                                      CreateSnapshotResponsePB* resp,
391
3
                                      RpcContext* rpc) {
392
3
  LOG(INFO) << "Servicing CreateSnapshot request: " << req->ShortDebugString();
393
394
3
  if (FLAGS_enable_transaction_snapshots && req->transaction_aware()) {
395
2
    return CreateTransactionAwareSnapshot(*req, resp, rpc);
396
2
  }
397
398
1
  if (req->has_schedule_id()) {
399
1
    auto schedule_id = VERIFY_RESULT(FullyDecodeSnapshotScheduleId(req->schedule_id()));
400
0
    auto snapshot_id = snapshot_coordinator_.CreateForSchedule(
401
1
        schedule_id, leader_ready_term(), rpc->GetClientDeadline());
402
1
    if (!snapshot_id.ok()) {
403
0
      LOG(INFO) << "Create snapshot failed: " << snapshot_id.status();
404
0
      return snapshot_id.status();
405
0
    }
406
1
    resp->set_snapshot_id(snapshot_id->data(), snapshot_id->size());
407
1
    return Status::OK();
408
1
  }
409
410
0
  return CreateNonTransactionAwareSnapshot(req, resp, rpc);
411
1
}
412
413
Status CatalogManager::CreateNonTransactionAwareSnapshot(
414
    const CreateSnapshotRequestPB* req,
415
    CreateSnapshotResponsePB* resp,
416
0
    RpcContext* rpc) {
417
0
  SnapshotId snapshot_id;
418
0
  {
419
0
    LockGuard lock(mutex_);
420
0
    TRACE("Acquired catalog manager lock");
421
422
    // Verify that the system is not in snapshot creating/restoring state.
423
0
    if (!current_snapshot_id_.empty()) {
424
0
      return STATUS(IllegalState,
425
0
                    Format(
426
0
                        "Current snapshot id: $0. Parallel snapshot operations are not supported"
427
0
                        ": $1", current_snapshot_id_, req),
428
0
                    MasterError(MasterErrorPB::PARALLEL_SNAPSHOT_OPERATION));
429
0
    }
430
431
    // Create a new snapshot UUID.
432
0
    snapshot_id = GenerateIdUnlocked(SysRowEntryType::SNAPSHOT);
433
0
  }
434
435
0
  vector<scoped_refptr<TabletInfo>> all_tablets;
436
437
  // Create in memory snapshot data descriptor.
438
0
  scoped_refptr<SnapshotInfo> snapshot(new SnapshotInfo(snapshot_id));
439
0
  snapshot->mutable_metadata()->StartMutation();
440
0
  snapshot->mutable_metadata()->mutable_dirty()->pb.set_state(SysSnapshotEntryPB::CREATING);
441
442
0
  auto tables = VERIFY_RESULT(CollectTables(req->tables(),
443
0
                                            req->add_indexes(),
444
0
                                            true /* include_parent_colocated_table */));
445
0
  std::unordered_set<NamespaceId> added_namespaces;
446
0
  for (const auto& table : tables) {
447
0
    snapshot->AddEntries(table, &added_namespaces);
448
0
    all_tablets.insert(all_tablets.end(), table.tablet_infos.begin(), table.tablet_infos.end());
449
0
  }
450
451
0
  VLOG(1) << "Snapshot " << snapshot->ToString()
452
0
          << ": PB=" << snapshot->mutable_metadata()->mutable_dirty()->pb.DebugString();
453
454
  // Write the snapshot data descriptor to the system catalog (in "creating" state).
455
0
  RETURN_NOT_OK(CheckLeaderStatus(
456
0
      sys_catalog_->Upsert(leader_ready_term(), snapshot),
457
0
      "inserting snapshot into sys-catalog"));
458
0
  TRACE("Wrote snapshot to system catalog");
459
460
  // Commit in memory snapshot data descriptor.
461
0
  snapshot->mutable_metadata()->CommitMutation();
462
463
  // Put the snapshot data descriptor to the catalog manager.
464
0
  {
465
0
    LockGuard lock(mutex_);
466
0
    TRACE("Acquired catalog manager lock");
467
468
    // Verify that the snapshot does not exist.
469
0
    auto inserted = non_txn_snapshot_ids_map_.emplace(snapshot_id, snapshot).second;
470
0
    RSTATUS_DCHECK(inserted, IllegalState, Format("Snapshot already exists: $0", snapshot_id));
471
0
    current_snapshot_id_ = snapshot_id;
472
0
  }
473
474
  // Send CreateSnapshot requests to all TServers (one tablet - one request).
475
0
  for (const scoped_refptr<TabletInfo>& tablet : all_tablets) {
476
0
    TRACE("Locking tablet");
477
0
    auto l = tablet->LockForRead();
478
479
0
    LOG(INFO) << "Sending CreateTabletSnapshot to tablet: " << tablet->ToString();
480
481
    // Send Create Tablet Snapshot request to each tablet leader.
482
0
    auto call = CreateAsyncTabletSnapshotOp(
483
0
        tablet, snapshot_id, tserver::TabletSnapshotOpRequestPB::CREATE_ON_TABLET,
484
0
        TabletSnapshotOperationCallback());
485
0
    ScheduleTabletSnapshotOp(call);
486
0
  }
487
488
0
  resp->set_snapshot_id(snapshot_id);
489
0
  LOG(INFO) << "Successfully started snapshot " << snapshot_id << " creation";
490
0
  return Status::OK();
491
0
}
492
493
52
void CatalogManager::Submit(std::unique_ptr<tablet::Operation> operation, int64_t leader_term) {
494
52
  operation->SetTablet(tablet_peer()->tablet());
495
52
  tablet_peer()->Submit(std::move(operation), leader_term);
496
52
}
497
498
Result<SysRowEntries> CatalogManager::CollectEntries(
499
    const google::protobuf::RepeatedPtrField<TableIdentifierPB>& table_identifiers,
500
41
    CollectFlags flags) {
501
41
  RETURN_NOT_OK(CheckIsLeaderAndReady());
502
41
  SysRowEntries entries;
503
41
  std::unordered_set<NamespaceId> namespaces;
504
41
  auto tables = VERIFY_RESULT(CollectTables(table_identifiers, flags, &namespaces));
505
41
  if (!namespaces.empty()) {
506
39
    SharedLock lock(mutex_);
507
39
    for (const auto& ns_id : namespaces) {
508
39
      auto ns_info = VERIFY_RESULT(FindNamespaceByIdUnlocked(ns_id));
509
0
      AddInfoEntry(ns_info.get(), entries.mutable_entries());
510
39
    }
511
39
  }
512
41
  for (const auto& table : tables) {
513
    // TODO(txn_snapshot) use single lock to resolve all tables to tablets
514
30
    SnapshotInfo::AddEntries(table, entries.mutable_entries(), /* tablet_infos= */ nullptr,
515
30
                             &namespaces);
516
30
  }
517
518
41
  return entries;
519
41
}
520
521
Result<SysRowEntries> CatalogManager::CollectEntriesForSnapshot(
522
39
    const google::protobuf::RepeatedPtrField<TableIdentifierPB>& tables) {
523
39
  return CollectEntries(
524
39
      tables,
525
39
      CollectFlags{CollectFlag::kAddIndexes, CollectFlag::kIncludeParentColocatedTable,
526
39
                   CollectFlag::kSucceedIfCreateInProgress});
527
39
}
528
529
318k
server::Clock* CatalogManager::Clock() {
530
318k
  return master_->clock();
531
318k
}
532
533
Status CatalogManager::CreateTransactionAwareSnapshot(
534
2
    const CreateSnapshotRequestPB& req, CreateSnapshotResponsePB* resp, rpc::RpcContext* rpc) {
535
2
  CollectFlags flags{CollectFlag::kIncludeParentColocatedTable};
536
2
  flags.SetIf(CollectFlag::kAddIndexes, req.add_indexes());
537
2
  SysRowEntries entries = VERIFY_RESULT(CollectEntries(req.tables(), flags));
538
539
2
  auto snapshot_id = VERIFY_RESULT(snapshot_coordinator_.Create(
540
2
      entries, req.imported(), leader_ready_term(), rpc->GetClientDeadline()));
541
0
  resp->set_snapshot_id(snapshot_id.data(), snapshot_id.size());
542
2
  return Status::OK();
543
2
}
544
545
Status CatalogManager::ListSnapshots(const ListSnapshotsRequestPB* req,
546
9
                                     ListSnapshotsResponsePB* resp) {
547
9
  auto txn_snapshot_id = TryFullyDecodeTxnSnapshotId(req->snapshot_id());
548
9
  {
549
9
    SharedLock lock(mutex_);
550
9
    TRACE("Acquired catalog manager lock");
551
552
9
    if (!current_snapshot_id_.empty()) {
553
0
      resp->set_current_snapshot_id(current_snapshot_id_);
554
0
    }
555
556
9
    auto setup_snapshot_pb_lambda = [resp](scoped_refptr<SnapshotInfo> snapshot_info) {
557
0
      auto snapshot_lock = snapshot_info->LockForRead();
558
559
0
      SnapshotInfoPB* const snapshot = resp->add_snapshots();
560
0
      snapshot->set_id(snapshot_info->id());
561
0
      *snapshot->mutable_entry() = snapshot_info->metadata().state().pb;
562
0
    };
563
564
9
    if (req->has_snapshot_id()) {
565
8
      if (!txn_snapshot_id) {
566
0
        TRACE("Looking up snapshot");
567
0
        scoped_refptr<SnapshotInfo> snapshot_info =
568
0
            FindPtrOrNull(non_txn_snapshot_ids_map_, req->snapshot_id());
569
0
        if (snapshot_info == nullptr) {
570
0
          return STATUS(InvalidArgument, "Could not find snapshot", req->snapshot_id(),
571
0
                        MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND));
572
0
        }
573
574
0
        setup_snapshot_pb_lambda(snapshot_info);
575
0
      }
576
8
    } else {
577
1
      for (const SnapshotInfoMap::value_type& entry : non_txn_snapshot_ids_map_) {
578
0
        setup_snapshot_pb_lambda(entry.second);
579
0
      }
580
1
    }
581
9
  }
582
583
9
  if (req->prepare_for_backup() && 
(0
!req->has_snapshot_id()0
||
!txn_snapshot_id0
)) {
584
0
    return STATUS(
585
0
        InvalidArgument, "Request must have correct snapshot_id", (req->has_snapshot_id() ?
586
0
        req->snapshot_id() : "None"), MasterError(MasterErrorPB::SNAPSHOT_FAILED));
587
0
  }
588
589
9
  RETURN_NOT_OK(snapshot_coordinator_.ListSnapshots(
590
9
      txn_snapshot_id, req->list_deleted_snapshots(), resp));
591
592
9
  if (req->prepare_for_backup()) {
593
0
    SharedLock lock(mutex_);
594
0
    TRACE("Acquired catalog manager lock");
595
596
    // Repack & extend the backup row entries.
597
0
    for (SnapshotInfoPB& snapshot : *resp->mutable_snapshots()) {
598
0
      snapshot.set_format_version(2);
599
0
      SysSnapshotEntryPB& sys_entry = *snapshot.mutable_entry();
600
0
      snapshot.mutable_backup_entries()->Reserve(sys_entry.entries_size());
601
602
0
      for (SysRowEntry& entry : *sys_entry.mutable_entries()) {
603
0
        BackupRowEntryPB* const backup_entry = snapshot.add_backup_entries();
604
        // Setup BackupRowEntryPB fields.
605
        // Set BackupRowEntryPB::pg_schema_name for YSQL table to disambiguate in case tables
606
        // in different schema have same name.
607
0
        if (entry.type() == SysRowEntryType::TABLE) {
608
0
          TRACE("Looking up table");
609
0
          scoped_refptr<TableInfo> table_info = FindPtrOrNull(*table_ids_map_, entry.id());
610
0
          if (table_info == nullptr) {
611
0
            return STATUS(
612
0
                InvalidArgument, "Table not found by ID", entry.id(),
613
0
                MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
614
0
          }
615
616
0
          TRACE("Locking table");
617
0
          auto l = table_info->LockForRead();
618
          // PG schema name is available for YSQL table only.
619
          // Except '<uuid>.colocated.parent.uuid' table ID.
620
0
          if (l->table_type() == PGSQL_TABLE_TYPE && !IsColocatedParentTableId(entry.id())) {
621
0
            const string pg_schema_name = VERIFY_RESULT(GetPgSchemaName(table_info));
622
0
            VLOG(1) << "PG Schema: " << pg_schema_name << " for table " << table_info->ToString();
623
0
            backup_entry->set_pg_schema_name(pg_schema_name);
624
0
          }
625
0
        }
626
627
        // Init BackupRowEntryPB::entry.
628
0
        backup_entry->mutable_entry()->Swap(&entry);
629
0
      }
630
631
0
      sys_entry.clear_entries();
632
0
    }
633
0
  }
634
635
9
  return Status::OK();
636
9
}
637
638
Status CatalogManager::ListSnapshotRestorations(const ListSnapshotRestorationsRequestPB* req,
639
6
                                                ListSnapshotRestorationsResponsePB* resp) {
640
6
  TxnSnapshotRestorationId restoration_id = TxnSnapshotRestorationId::Nil();
641
6
  if (!req->restoration_id().empty()) {
642
5
    restoration_id = VERIFY_RESULT(FullyDecodeTxnSnapshotRestorationId(req->restoration_id()));
643
5
  }
644
6
  TxnSnapshotId snapshot_id = TxnSnapshotId::Nil();
645
6
  if (!req->snapshot_id().empty()) {
646
0
    snapshot_id = VERIFY_RESULT(FullyDecodeTxnSnapshotId(req->snapshot_id()));
647
0
  }
648
649
6
  return snapshot_coordinator_.ListRestorations(restoration_id, snapshot_id, resp);
650
6
}
651
652
Status CatalogManager::RestoreSnapshot(const RestoreSnapshotRequestPB* req,
653
3
                                       RestoreSnapshotResponsePB* resp) {
654
3
  LOG(INFO) << "Servicing RestoreSnapshot request: " << req->ShortDebugString();
655
656
3
  auto txn_snapshot_id = TryFullyDecodeTxnSnapshotId(req->snapshot_id());
657
3
  if (txn_snapshot_id) {
658
3
    HybridTime ht;
659
3
    if (req->has_restore_ht()) {
660
3
      ht = HybridTime(req->restore_ht());
661
3
    }
662
3
    TxnSnapshotRestorationId id = VERIFY_RESULT(snapshot_coordinator_.Restore(
663
3
        txn_snapshot_id, ht, leader_ready_term()));
664
0
    resp->set_restoration_id(id.data(), id.size());
665
3
    return Status::OK();
666
3
  }
667
668
0
  return RestoreNonTransactionAwareSnapshot(req->snapshot_id());
669
3
}
670
671
0
Status CatalogManager::RestoreNonTransactionAwareSnapshot(const string& snapshot_id) {
672
0
  LockGuard lock(mutex_);
673
0
  TRACE("Acquired catalog manager lock");
674
675
0
  if (!current_snapshot_id_.empty()) {
676
0
    return STATUS(
677
0
        IllegalState,
678
0
        Format(
679
0
            "Current snapshot id: $0. Parallel snapshot operations are not supported: $1",
680
0
            current_snapshot_id_, snapshot_id),
681
0
        MasterError(MasterErrorPB::PARALLEL_SNAPSHOT_OPERATION));
682
0
  }
683
684
0
  TRACE("Looking up snapshot");
685
0
  scoped_refptr<SnapshotInfo> snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, snapshot_id);
686
0
  if (snapshot == nullptr) {
687
0
    return STATUS(InvalidArgument, "Could not find snapshot", snapshot_id,
688
0
                  MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND));
689
0
  }
690
691
0
  auto snapshot_lock = snapshot->LockForWrite();
692
693
0
  if (snapshot_lock->started_deleting()) {
694
0
    return STATUS(NotFound, "The snapshot was deleted", snapshot_id,
695
0
                  MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND));
696
0
  }
697
698
0
  if (!snapshot_lock->is_complete()) {
699
0
    return STATUS(IllegalState, "The snapshot state is not complete", snapshot_id,
700
0
                  MasterError(MasterErrorPB::SNAPSHOT_IS_NOT_READY));
701
0
  }
702
703
0
  TRACE("Updating snapshot metadata on disk");
704
0
  SysSnapshotEntryPB& snapshot_pb = snapshot_lock.mutable_data()->pb;
705
0
  snapshot_pb.set_state(SysSnapshotEntryPB::RESTORING);
706
707
  // Update tablet states.
708
0
  SetTabletSnapshotsState(SysSnapshotEntryPB::RESTORING, &snapshot_pb);
709
710
  // Update sys-catalog with the updated snapshot state.
711
  // The mutation will be aborted when 'l' exits the scope on early return.
712
0
  RETURN_NOT_OK(CheckLeaderStatus(
713
0
      sys_catalog_->Upsert(leader_ready_term(), snapshot),
714
0
      "updating snapshot in sys-catalog"));
715
716
  // CataloManager lock 'lock_' is still locked here.
717
0
  current_snapshot_id_ = snapshot_id;
718
719
  // Restore all entries.
720
0
  for (const SysRowEntry& entry : snapshot_pb.entries()) {
721
0
    RETURN_NOT_OK(RestoreEntry(entry, snapshot_id));
722
0
  }
723
724
  // Commit in memory snapshot data descriptor.
725
0
  TRACE("Committing in-memory snapshot state");
726
0
  snapshot_lock.Commit();
727
728
0
  LOG(INFO) << "Successfully started snapshot " << snapshot->ToString() << " restoring";
729
0
  return Status::OK();
730
0
}
731
732
0
Status CatalogManager::RestoreEntry(const SysRowEntry& entry, const SnapshotId& snapshot_id) {
733
0
  switch (entry.type()) {
734
0
    case SysRowEntryType::NAMESPACE: { // Restore NAMESPACES.
735
0
      TRACE("Looking up namespace");
736
0
      scoped_refptr<NamespaceInfo> ns = FindPtrOrNull(namespace_ids_map_, entry.id());
737
0
      if (ns == nullptr) {
738
        // Restore Namespace.
739
        // TODO: implement
740
0
        LOG(INFO) << "Restoring: NAMESPACE id = " << entry.id();
741
742
0
        return STATUS(NotSupported, Substitute(
743
0
            "Not implemented: restoring namespace: id=$0", entry.type()));
744
0
      }
745
0
      break;
746
0
    }
747
0
    case SysRowEntryType::TABLE: { // Restore TABLES.
748
0
      TRACE("Looking up table");
749
0
      scoped_refptr<TableInfo> table = FindPtrOrNull(*table_ids_map_, entry.id());
750
0
      if (table == nullptr) {
751
        // Restore Table.
752
        // TODO: implement
753
0
        LOG(INFO) << "Restoring: TABLE id = " << entry.id();
754
755
0
        return STATUS(NotSupported, Substitute(
756
0
            "Not implemented: restoring table: id=$0", entry.type()));
757
0
      }
758
0
      break;
759
0
    }
760
0
    case SysRowEntryType::TABLET: { // Restore TABLETS.
761
0
      TRACE("Looking up tablet");
762
0
      scoped_refptr<TabletInfo> tablet = FindPtrOrNull(*tablet_map_, entry.id());
763
0
      if (tablet == nullptr) {
764
        // Restore Tablet.
765
        // TODO: implement
766
0
        LOG(INFO) << "Restoring: TABLET id = " << entry.id();
767
768
0
        return STATUS(NotSupported, Substitute(
769
0
            "Not implemented: restoring tablet: id=$0", entry.type()));
770
0
      } else {
771
0
        TRACE("Locking tablet");
772
0
        auto l = tablet->LockForRead();
773
774
0
        LOG(INFO) << "Sending RestoreTabletSnapshot to tablet: " << tablet->ToString();
775
        // Send RestoreSnapshot requests to all TServers (one tablet - one request).
776
0
        auto task = CreateAsyncTabletSnapshotOp(
777
0
            tablet, snapshot_id, tserver::TabletSnapshotOpRequestPB::RESTORE_ON_TABLET,
778
0
            TabletSnapshotOperationCallback());
779
0
        ScheduleTabletSnapshotOp(task);
780
0
      }
781
0
      break;
782
0
    }
783
0
    default:
784
0
      return STATUS_FORMAT(
785
0
          InternalError, "Unexpected entry type in the snapshot: $0", entry.type());
786
0
  }
787
788
0
  return Status::OK();
789
0
}
790
791
Status CatalogManager::DeleteSnapshot(const DeleteSnapshotRequestPB* req,
792
                                      DeleteSnapshotResponsePB* resp,
793
0
                                      RpcContext* rpc) {
794
0
  LOG(INFO) << "Servicing DeleteSnapshot request: " << req->ShortDebugString();
795
796
0
  auto txn_snapshot_id = TryFullyDecodeTxnSnapshotId(req->snapshot_id());
797
0
  if (txn_snapshot_id) {
798
0
    return snapshot_coordinator_.Delete(
799
0
        txn_snapshot_id, leader_ready_term(), rpc->GetClientDeadline());
800
0
  }
801
802
0
  return DeleteNonTransactionAwareSnapshot(req->snapshot_id());
803
0
}
804
805
0
Status CatalogManager::DeleteNonTransactionAwareSnapshot(const SnapshotId& snapshot_id) {
806
0
  LockGuard lock(mutex_);
807
0
  TRACE("Acquired catalog manager lock");
808
809
0
  TRACE("Looking up snapshot");
810
0
  scoped_refptr<SnapshotInfo> snapshot = FindPtrOrNull(
811
0
      non_txn_snapshot_ids_map_, snapshot_id);
812
0
  if (snapshot == nullptr) {
813
0
    return STATUS(InvalidArgument, "Could not find snapshot", snapshot_id,
814
0
                  MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND));
815
0
  }
816
817
0
  auto snapshot_lock = snapshot->LockForWrite();
818
819
0
  if (snapshot_lock->started_deleting()) {
820
0
    return STATUS(NotFound, "The snapshot was deleted", snapshot_id,
821
0
                  MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND));
822
0
  }
823
824
0
  if (snapshot_lock->is_restoring()) {
825
0
    return STATUS(InvalidArgument, "The snapshot is being restored now", snapshot_id,
826
0
                  MasterError(MasterErrorPB::PARALLEL_SNAPSHOT_OPERATION));
827
0
  }
828
829
0
  TRACE("Updating snapshot metadata on disk");
830
0
  SysSnapshotEntryPB& snapshot_pb = snapshot_lock.mutable_data()->pb;
831
0
  snapshot_pb.set_state(SysSnapshotEntryPB::DELETING);
832
833
  // Update tablet states.
834
0
  SetTabletSnapshotsState(SysSnapshotEntryPB::DELETING, &snapshot_pb);
835
836
  // Update sys-catalog with the updated snapshot state.
837
  // The mutation will be aborted when 'l' exits the scope on early return.
838
0
  RETURN_NOT_OK(CheckStatus(
839
0
      sys_catalog_->Upsert(leader_ready_term(), snapshot),
840
0
      "updating snapshot in sys-catalog"));
841
842
  // Send DeleteSnapshot requests to all TServers (one tablet - one request).
843
0
  for (const SysRowEntry& entry : snapshot_pb.entries()) {
844
0
    if (entry.type() == SysRowEntryType::TABLET) {
845
0
      TRACE("Looking up tablet");
846
0
      scoped_refptr<TabletInfo> tablet = FindPtrOrNull(*tablet_map_, entry.id());
847
0
      if (tablet == nullptr) {
848
0
        LOG(WARNING) << "Deleting tablet not found " << entry.id();
849
0
      } else {
850
0
        TRACE("Locking tablet");
851
0
        auto l = tablet->LockForRead();
852
853
0
        LOG(INFO) << "Sending DeleteTabletSnapshot to tablet: " << tablet->ToString();
854
        // Send DeleteSnapshot requests to all TServers (one tablet - one request).
855
0
        auto task = CreateAsyncTabletSnapshotOp(
856
0
            tablet, snapshot_id, tserver::TabletSnapshotOpRequestPB::DELETE_ON_TABLET,
857
0
            TabletSnapshotOperationCallback());
858
0
        ScheduleTabletSnapshotOp(task);
859
0
      }
860
0
    }
861
0
  }
862
863
  // Commit in memory snapshot data descriptor.
864
0
  TRACE("Committing in-memory snapshot state");
865
0
  snapshot_lock.Commit();
866
867
0
  LOG(INFO) << "Successfully started snapshot " << snapshot->ToString() << " deletion";
868
0
  return Status::OK();
869
0
}
870
871
Status CatalogManager::ImportSnapshotPreprocess(const SnapshotInfoPB& snapshot_pb,
872
                                                ImportSnapshotMetaResponsePB* resp,
873
                                                NamespaceMap* namespace_map,
874
0
                                                ExternalTableSnapshotDataMap* tables_data) {
875
0
  for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) {
876
0
    const SysRowEntry& entry = backup_entry.entry();
877
0
    switch (entry.type()) {
878
0
      case SysRowEntryType::NAMESPACE: // Recreate NAMESPACE.
879
0
        RETURN_NOT_OK(ImportNamespaceEntry(entry, namespace_map));
880
0
        break;
881
0
      case SysRowEntryType::TABLE: { // Create TABLE metadata.
882
0
          LOG_IF(DFATAL, entry.id().empty()) << "Empty entry id";
883
0
          ExternalTableSnapshotData& data = (*tables_data)[entry.id()];
884
885
0
          if (data.old_table_id.empty()) {
886
0
            data.old_table_id = entry.id();
887
0
            data.table_meta = resp->mutable_tables_meta()->Add();
888
0
            data.tablet_id_map = data.table_meta->mutable_tablets_ids();
889
0
            data.table_entry_pb = VERIFY_RESULT(ParseFromSlice<SysTablesEntryPB>(entry.data()));
890
891
0
            if (backup_entry.has_pg_schema_name()) {
892
0
              data.pg_schema_name = backup_entry.pg_schema_name();
893
0
            }
894
0
          } else {
895
0
            LOG_WITH_FUNC(WARNING) << "Ignoring duplicate table with id " << entry.id();
896
0
          }
897
898
0
          LOG_IF(DFATAL, data.old_table_id.empty()) << "Not initialized table id";
899
0
        }
900
0
        break;
901
0
      case SysRowEntryType::TABLET: // Preprocess original tablets.
902
0
        RETURN_NOT_OK(PreprocessTabletEntry(entry, tables_data));
903
0
        break;
904
0
      case SysRowEntryType::CLUSTER_CONFIG: FALLTHROUGH_INTENDED;
905
0
      case SysRowEntryType::REDIS_CONFIG: FALLTHROUGH_INTENDED;
906
0
      case SysRowEntryType::UDTYPE: FALLTHROUGH_INTENDED;
907
0
      case SysRowEntryType::ROLE: FALLTHROUGH_INTENDED;
908
0
      case SysRowEntryType::SYS_CONFIG: FALLTHROUGH_INTENDED;
909
0
      case SysRowEntryType::CDC_STREAM: FALLTHROUGH_INTENDED;
910
0
      case SysRowEntryType::UNIVERSE_REPLICATION: FALLTHROUGH_INTENDED;
911
0
      case SysRowEntryType::SNAPSHOT:  FALLTHROUGH_INTENDED;
912
0
      case SysRowEntryType::SNAPSHOT_SCHEDULE: FALLTHROUGH_INTENDED;
913
0
      case SysRowEntryType::DDL_LOG_ENTRY: FALLTHROUGH_INTENDED;
914
0
      case SysRowEntryType::UNKNOWN:
915
0
        FATAL_INVALID_ENUM_VALUE(SysRowEntryType, entry.type());
916
0
    }
917
0
  }
918
919
0
  return Status::OK();
920
0
}
921
922
Status CatalogManager::ImportSnapshotCreateObject(const SnapshotInfoPB& snapshot_pb,
923
                                                  ImportSnapshotMetaResponsePB* resp,
924
                                                  NamespaceMap* namespace_map,
925
                                                  ExternalTableSnapshotDataMap* tables_data,
926
0
                                                  CreateObjects create_objects) {
927
  // Create ONLY TABLES or ONLY INDEXES in accordance to the argument.
928
0
  for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) {
929
0
    const SysRowEntry& entry = backup_entry.entry();
930
0
    if (entry.type() == SysRowEntryType::TABLE) {
931
0
      ExternalTableSnapshotData& data = (*tables_data)[entry.id()];
932
0
      if ((create_objects == CreateObjects::kOnlyIndexes) == data.is_index()) {
933
0
        RETURN_NOT_OK(ImportTableEntry(*namespace_map, *tables_data, &data));
934
0
      }
935
0
    }
936
0
  }
937
938
0
  return Status::OK();
939
0
}
940
941
Status CatalogManager::ImportSnapshotWaitForTables(const SnapshotInfoPB& snapshot_pb,
942
                                                   ImportSnapshotMetaResponsePB* resp,
943
                                                   ExternalTableSnapshotDataMap* tables_data,
944
0
                                                   CoarseTimePoint deadline) {
945
0
  for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) {
946
0
    const SysRowEntry& entry = backup_entry.entry();
947
0
    if (entry.type() == SysRowEntryType::TABLE) {
948
0
      ExternalTableSnapshotData& data = (*tables_data)[entry.id()];
949
0
      if (!data.is_index()) {
950
0
        RETURN_NOT_OK(WaitForCreateTableToFinish(data.new_table_id, deadline));
951
0
      }
952
0
    }
953
0
  }
954
955
0
  return Status::OK();
956
0
}
957
958
Status CatalogManager::ImportSnapshotProcessTablets(const SnapshotInfoPB& snapshot_pb,
959
                                                    ImportSnapshotMetaResponsePB* resp,
960
0
                                                    ExternalTableSnapshotDataMap* tables_data) {
961
0
  for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) {
962
0
    const SysRowEntry& entry = backup_entry.entry();
963
0
    if (entry.type() == SysRowEntryType::TABLET) {
964
      // Create tablets IDs map.
965
0
      RETURN_NOT_OK(ImportTabletEntry(entry, tables_data));
966
0
    }
967
0
  }
968
969
0
  return Status::OK();
970
0
}
971
972
template <class RespClass>
973
void ProcessDeleteObjectStatus(const string& obj_name,
974
                               const string& id,
975
                               const RespClass& resp,
976
0
                               const Status& s) {
977
0
  Status result = s;
978
0
  if (result.ok() && resp.has_error()) {
979
0
    result = StatusFromPB(resp.error().status());
980
0
    LOG_IF(DFATAL, result.ok()) << "Expecting error status";
981
0
  }
982
983
0
  if (!result.ok()) {
984
0
    LOG_WITH_FUNC(WARNING) << "Failed to delete new " << obj_name << " with id=" << id
985
0
                           << ": " << result;
986
0
  }
987
0
}
Unexecuted instantiation: void yb::master::enterprise::ProcessDeleteObjectStatus<yb::master::DeleteTableResponsePB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::DeleteTableResponsePB const&, yb::Status const&)
Unexecuted instantiation: void yb::master::enterprise::ProcessDeleteObjectStatus<yb::master::DeleteNamespaceResponsePB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::DeleteNamespaceResponsePB const&, yb::Status const&)
988
989
void CatalogManager::DeleteNewSnapshotObjects(const NamespaceMap& namespace_map,
990
0
                                              const ExternalTableSnapshotDataMap& tables_data) {
991
0
  for (const ExternalTableSnapshotDataMap::value_type& entry : tables_data) {
992
0
    const TableId& old_id = entry.first;
993
0
    const TableId& new_id = entry.second.new_table_id;
994
0
    const TableType type = entry.second.table_entry_pb.table_type();
995
996
    // Do not delete YSQL objects - it must be deleted via PG API.
997
0
    if (new_id.empty() || new_id == old_id || type == TableType::PGSQL_TABLE_TYPE) {
998
0
      continue;
999
0
    }
1000
1001
0
    LOG_WITH_FUNC(INFO) << "Deleting new table with id=" << new_id << " old id=" << old_id;
1002
0
    DeleteTableRequestPB req;
1003
0
    DeleteTableResponsePB resp;
1004
0
    req.mutable_table()->set_table_id(new_id);
1005
0
    req.set_is_index_table(entry.second.is_index());
1006
0
    ProcessDeleteObjectStatus("table", new_id, resp, DeleteTable(&req, &resp, nullptr));
1007
0
  }
1008
1009
0
  for (const NamespaceMap::value_type& entry : namespace_map) {
1010
0
    const NamespaceId& old_id = entry.first;
1011
0
    const NamespaceId& new_id = entry.second.first;
1012
0
    const YQLDatabase& db_type = entry.second.second;
1013
1014
    // Do not delete YSQL objects - it must be deleted via PG API.
1015
0
    if (new_id.empty() || new_id == old_id || db_type == YQL_DATABASE_PGSQL) {
1016
0
      continue;
1017
0
    }
1018
1019
0
    LOG_WITH_FUNC(INFO) << "Deleting new namespace with id=" << new_id << " old id=" << old_id;
1020
0
    DeleteNamespaceRequestPB req;
1021
0
    DeleteNamespaceResponsePB resp;
1022
0
    req.mutable_namespace_()->set_id(new_id);
1023
0
    ProcessDeleteObjectStatus(
1024
0
        "namespace", new_id, resp, DeleteNamespace(&req, &resp, nullptr));
1025
0
  }
1026
0
}
1027
1028
Status CatalogManager::ImportSnapshotMeta(const ImportSnapshotMetaRequestPB* req,
1029
                                          ImportSnapshotMetaResponsePB* resp,
1030
0
                                          rpc::RpcContext* rpc) {
1031
0
  LOG(INFO) << "Servicing ImportSnapshotMeta request: " << req->ShortDebugString();
1032
1033
0
  NamespaceMap namespace_map;
1034
0
  ExternalTableSnapshotDataMap tables_data;
1035
0
  bool successful_exit = false;
1036
1037
0
  auto se = ScopeExit([this, &namespace_map, &tables_data, &successful_exit] {
1038
0
    if (!successful_exit) {
1039
0
      DeleteNewSnapshotObjects(namespace_map, tables_data);
1040
0
    }
1041
0
  });
1042
1043
0
  const SnapshotInfoPB& snapshot_pb = req->snapshot();
1044
1045
0
  if (!snapshot_pb.has_format_version() || snapshot_pb.format_version() != 2) {
1046
0
    return STATUS(InternalError, "Expected snapshot data in format 2",
1047
0
        snapshot_pb.ShortDebugString(), MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1048
0
  }
1049
1050
0
  if (snapshot_pb.backup_entries_size() == 0) {
1051
0
    return STATUS(InternalError, "Expected snapshot data prepared for backup",
1052
0
        snapshot_pb.ShortDebugString(), MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1053
0
  }
1054
1055
  // PHASE 1: Recreate namespaces, create table's meta data.
1056
0
  RETURN_NOT_OK(ImportSnapshotPreprocess(snapshot_pb, resp, &namespace_map, &tables_data));
1057
1058
  // PHASE 2: Recreate ONLY tables.
1059
0
  RETURN_NOT_OK(ImportSnapshotCreateObject(
1060
0
      snapshot_pb, resp, &namespace_map, &tables_data, CreateObjects::kOnlyTables));
1061
1062
  // PHASE 3: Wait for all tables creation complete.
1063
0
  RETURN_NOT_OK(ImportSnapshotWaitForTables(
1064
0
      snapshot_pb, resp, &tables_data, rpc->GetClientDeadline()));
1065
1066
  // PHASE 4: Recreate ONLY indexes.
1067
0
  RETURN_NOT_OK(ImportSnapshotCreateObject(
1068
0
      snapshot_pb, resp, &namespace_map, &tables_data, CreateObjects::kOnlyIndexes));
1069
1070
  // PHASE 5: Restore tablets.
1071
0
  RETURN_NOT_OK(ImportSnapshotProcessTablets(snapshot_pb, resp, &tables_data));
1072
1073
0
  successful_exit = true;
1074
0
  return Status::OK();
1075
0
}
1076
1077
Status CatalogManager::ChangeEncryptionInfo(const ChangeEncryptionInfoRequestPB* req,
1078
17
                                            ChangeEncryptionInfoResponsePB* resp) {
1079
17
  auto cluster_config = ClusterConfig();
1080
17
  auto l = cluster_config->LockForWrite();
1081
17
  auto encryption_info = l.mutable_data()->pb.mutable_encryption_info();
1082
1083
17
  RETURN_NOT_OK(encryption_manager_->ChangeEncryptionInfo(req, encryption_info));
1084
1085
17
  l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
1086
17
  RETURN_NOT_OK(CheckStatus(
1087
17
      sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
1088
17
      "updating cluster config in sys-catalog"));
1089
17
  l.Commit();
1090
1091
17
  std::lock_guard<simple_spinlock> lock(should_send_universe_key_registry_mutex_);
1092
17
  for (auto& entry : should_send_universe_key_registry_) {
1093
1
    entry.second = true;
1094
1
  }
1095
1096
17
  return Status::OK();
1097
17
}
1098
1099
Status CatalogManager::IsEncryptionEnabled(const IsEncryptionEnabledRequestPB* req,
1100
17
                                           IsEncryptionEnabledResponsePB* resp) {
1101
17
  return encryption_manager_->IsEncryptionEnabled(
1102
17
      ClusterConfig()->LockForRead()->pb.encryption_info(), resp);
1103
17
}
1104
1105
Status CatalogManager::ImportNamespaceEntry(const SysRowEntry& entry,
1106
0
                                            NamespaceMap* namespace_map) {
1107
0
  LOG_IF(DFATAL, entry.type() != SysRowEntryType::NAMESPACE)
1108
0
      << "Unexpected entry type: " << entry.type();
1109
1110
0
  SysNamespaceEntryPB meta = VERIFY_RESULT(ParseFromSlice<SysNamespaceEntryPB>(entry.data()));
1111
0
  const YQLDatabase db_type = GetDatabaseType(meta);
1112
0
  NamespaceData& ns_data = (*namespace_map)[entry.id()];
1113
0
  ns_data.second = db_type;
1114
1115
0
  TRACE("Looking up namespace");
1116
  // First of all try to find the namespace by ID. It will work if we are restoring the backup
1117
  // on the original cluster where the backup was created.
1118
0
  scoped_refptr<NamespaceInfo> ns;
1119
0
  {
1120
0
    SharedLock lock(mutex_);
1121
0
    ns = FindPtrOrNull(namespace_ids_map_, entry.id());
1122
0
  }
1123
1124
0
  if (ns != nullptr && ns->name() == meta.name() && ns->state() == SysNamespaceEntryPB::RUNNING) {
1125
0
    ns_data.first = entry.id();
1126
0
    return Status::OK();
1127
0
  }
1128
1129
  // If the namespace was not found by ID, it's ok on a new cluster OR if the namespace was
1130
  // deleted and created again. In both cases the namespace can be found by NAME.
1131
0
  if (db_type == YQL_DATABASE_PGSQL) {
1132
    // YSQL database must be created via external call. Find it by name.
1133
0
    {
1134
0
      SharedLock lock(mutex_);
1135
0
      ns = FindPtrOrNull(namespace_names_mapper_[db_type], meta.name());
1136
0
    }
1137
1138
0
    if (ns == nullptr) {
1139
0
      const string msg = Format("YSQL database must exist: $0", meta.name());
1140
0
      LOG_WITH_FUNC(WARNING) << msg;
1141
0
      return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::NAMESPACE_NOT_FOUND));
1142
0
    }
1143
0
    if (ns->state() != SysNamespaceEntryPB::RUNNING) {
1144
0
      const string msg = Format("Found YSQL database must be running: $0", meta.name());
1145
0
      LOG_WITH_FUNC(WARNING) << msg;
1146
0
      return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::NAMESPACE_NOT_FOUND));
1147
0
    }
1148
1149
0
    auto ns_lock = ns->LockForRead();
1150
0
    ns_data.first = ns->id();
1151
0
  } else {
1152
0
    CreateNamespaceRequestPB req;
1153
0
    CreateNamespaceResponsePB resp;
1154
0
    req.set_name(meta.name());
1155
0
    const Status s = CreateNamespace(&req, &resp, nullptr);
1156
1157
0
    if (!s.ok() && !s.IsAlreadyPresent()) {
1158
0
      return s.CloneAndAppend("Failed to create namespace");
1159
0
    }
1160
1161
0
    if (s.IsAlreadyPresent()) {
1162
0
      LOG_WITH_FUNC(INFO) << "Using existing namespace '" << meta.name() << "': " << resp.id();
1163
0
    }
1164
1165
0
    ns_data.first = resp.id();
1166
0
  }
1167
0
  return Status::OK();
1168
0
}
1169
1170
Status CatalogManager::RecreateTable(const NamespaceId& new_namespace_id,
1171
                                     const ExternalTableSnapshotDataMap& table_map,
1172
0
                                     ExternalTableSnapshotData* table_data) {
1173
0
  const SysTablesEntryPB& meta = DCHECK_NOTNULL(table_data)->table_entry_pb;
1174
1175
0
  CreateTableRequestPB req;
1176
0
  CreateTableResponsePB resp;
1177
0
  req.set_name(meta.name());
1178
0
  req.set_table_type(meta.table_type());
1179
0
  req.set_num_tablets(narrow_cast<int32_t>(table_data->num_tablets));
1180
0
  for (const auto& p : table_data->partitions) {
1181
0
    *req.add_partitions() = p;
1182
0
  }
1183
0
  req.mutable_namespace_()->set_id(new_namespace_id);
1184
0
  *req.mutable_partition_schema() = meta.partition_schema();
1185
0
  *req.mutable_replication_info() = meta.replication_info();
1186
1187
0
  SchemaPB* const schema = req.mutable_schema();
1188
0
  *schema = meta.schema();
1189
1190
  // Setup Index info.
1191
0
  if (table_data->is_index()) {
1192
0
    TRACE("Looking up indexed table");
1193
    // First of all try to attach to the new copy of the referenced table,
1194
    // because the table restored from the snapshot is preferred.
1195
    // For that try to map old indexed table ID into new table ID.
1196
0
    ExternalTableSnapshotDataMap::const_iterator it = table_map.find(meta.indexed_table_id());
1197
0
    const bool using_existing_table = (it == table_map.end());
1198
1199
0
    if (using_existing_table) {
1200
0
      LOG_WITH_FUNC(INFO) << "Try to use old indexed table id " << meta.indexed_table_id();
1201
0
      req.set_indexed_table_id(meta.indexed_table_id());
1202
0
    } else {
1203
0
      LOG_WITH_FUNC(INFO) << "Found new table id " << it->second.new_table_id
1204
0
                          << " for old table id " << meta.indexed_table_id()
1205
0
                          << " from the snapshot";
1206
0
      req.set_indexed_table_id(it->second.new_table_id);
1207
0
    }
1208
1209
0
    scoped_refptr<TableInfo> indexed_table;
1210
0
    {
1211
0
      SharedLock lock(mutex_);
1212
      // Try to find the specified indexed table by id.
1213
0
      indexed_table = FindPtrOrNull(*table_ids_map_, req.indexed_table_id());
1214
0
    }
1215
1216
0
    if (indexed_table == nullptr) {
1217
0
      const string msg = Format("Indexed table not found by id: $0", req.indexed_table_id());
1218
0
      LOG_WITH_FUNC(WARNING) << msg;
1219
0
      return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
1220
0
    }
1221
1222
0
    LOG_WITH_FUNC(INFO) << "Found indexed table by id " << req.indexed_table_id();
1223
1224
    // Ensure the main table schema (including column ids) was not changed.
1225
0
    if (!using_existing_table) {
1226
0
      Schema new_indexed_schema, src_indexed_schema;
1227
0
      RETURN_NOT_OK(indexed_table->GetSchema(&new_indexed_schema));
1228
0
      RETURN_NOT_OK(SchemaFromPB(it->second.table_entry_pb.schema(), &src_indexed_schema));
1229
1230
0
      if (!new_indexed_schema.Equals(src_indexed_schema)) {
1231
0
          const string msg = Format(
1232
0
              "Recreated table has changes in schema: new schema={$0}, source schema={$1}",
1233
0
              new_indexed_schema.ToString(), src_indexed_schema.ToString());
1234
0
          LOG_WITH_FUNC(WARNING) << msg;
1235
0
          return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1236
0
      }
1237
1238
0
      if (new_indexed_schema.column_ids() != src_indexed_schema.column_ids()) {
1239
0
          const string msg = Format(
1240
0
              "Recreated table has changes in column ids: new ids=$0, source ids=$1",
1241
0
              ToString(new_indexed_schema.column_ids()), ToString(src_indexed_schema.column_ids()));
1242
0
          LOG_WITH_FUNC(WARNING) << msg;
1243
0
          return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1244
0
      }
1245
0
    }
1246
1247
0
    req.set_is_local_index(meta.is_local_index());
1248
0
    req.set_is_unique_index(meta.is_unique_index());
1249
0
    req.set_skip_index_backfill(true);
1250
    // Setup IndexInfoPB - self descriptor.
1251
0
    IndexInfoPB* const index_info_pb = req.mutable_index_info();
1252
0
    *index_info_pb = meta.index_info();
1253
0
    index_info_pb->clear_table_id();
1254
0
    index_info_pb->set_indexed_table_id(req.indexed_table_id());
1255
1256
    // Reset column ids.
1257
0
    for (int i = 0; i < index_info_pb->columns_size(); ++i) {
1258
0
      index_info_pb->mutable_columns(i)->clear_column_id();
1259
0
    }
1260
0
  }
1261
1262
0
  RETURN_NOT_OK(CreateTable(&req, &resp, /* RpcContext */nullptr));
1263
0
  table_data->new_table_id = resp.table_id();
1264
0
  LOG_WITH_FUNC(INFO) << "New table id " << table_data->new_table_id << " for "
1265
0
                      << table_data->old_table_id;
1266
0
  return Status::OK();
1267
0
}
1268
1269
Status CatalogManager::RepartitionTable(const scoped_refptr<TableInfo> table,
1270
0
                                        const ExternalTableSnapshotData* table_data) {
1271
0
  DCHECK_EQ(table->id(), table_data->new_table_id);
1272
0
  if (table->GetTableType() != PGSQL_TABLE_TYPE) {
1273
0
    return STATUS_FORMAT(InvalidArgument,
1274
0
                         "Cannot repartition non-YSQL table: got $0",
1275
0
                         TableType_Name(table->GetTableType()));
1276
0
  }
1277
0
  LOG_WITH_FUNC(INFO) << "Repartition table " << table->id()
1278
0
                      << " using external snapshot table " << table_data->old_table_id;
1279
1280
  // Get partitions from external snapshot.
1281
0
  size_t i = 0;
1282
0
  vector<Partition> partitions(table_data->partitions.size());
1283
0
  for (const auto& partition_pb : table_data->partitions) {
1284
0
    Partition::FromPB(partition_pb, &partitions[i++]);
1285
0
  }
1286
0
  VLOG_WITH_FUNC(3) << "Got " << partitions.size()
1287
0
                    << " partitions from external snapshot for table " << table->id();
1288
1289
  // Change TableInfo to point to the new tablets.
1290
0
  string deletion_msg;
1291
0
  vector<scoped_refptr<TabletInfo>> new_tablets;
1292
0
  vector<scoped_refptr<TabletInfo>> old_tablets;
1293
0
  {
1294
    // Acquire the TableInfo pb write lock. Although it is not required for some of the individual
1295
    // steps, we want to hold it through so that we guarantee the state does not change during the
1296
    // whole process. Consequently, we hold it through some steps that require mutex_, but since
1297
    // taking mutex_ after TableInfo pb lock is prohibited for deadlock reasons, acquire mutex_
1298
    // first, then release it when it is no longer needed, still holding table pb lock.
1299
0
    TableInfo::WriteLock table_lock;
1300
0
    {
1301
0
      LockGuard lock(mutex_);
1302
0
      TRACE("Acquired catalog manager lock");
1303
1304
      // Make sure the table is in RUNNING state.
1305
      // This by itself doesn't need a pb write lock: just a read lock. However, we want to prevent
1306
      // other writers from entering from this point forward, so take the write lock now.
1307
0
      table_lock = table->LockForWrite();
1308
0
      if (table->old_pb().state() != SysTablesEntryPB::RUNNING) {
1309
0
        return STATUS_FORMAT(IllegalState,
1310
0
                             "Table $0 not running: $1",
1311
0
                             table->ToString(),
1312
0
                             SysTablesEntryPB_State_Name(table->old_pb().state()));
1313
0
      }
1314
      // Make sure the table's tablets can be deleted.
1315
0
      RETURN_NOT_OK_PREPEND(CheckIfForbiddenToDeleteTabletOf(table),
1316
0
                            Format("Cannot repartition table $0", table->id()));
1317
1318
      // Create and mark new tablets for creation.
1319
1320
      // Use partitions from external snapshot to create new tablets in state PREPARING. The tablets
1321
      // will start CREATING once they are committed in memory.
1322
0
      for (const auto& partition : partitions) {
1323
0
        PartitionPB partition_pb;
1324
0
        partition.ToPB(&partition_pb);
1325
0
        new_tablets.push_back(CreateTabletInfo(table.get(), partition_pb));
1326
0
      }
1327
1328
      // Add tablets to catalog manager tablet_map_. This should be safe to do after creating
1329
      // tablets since we're still holding mutex_.
1330
0
      auto tablet_map_checkout = tablet_map_.CheckOut();
1331
0
      for (auto& new_tablet : new_tablets) {
1332
0
        InsertOrDie(tablet_map_checkout.get_ptr(), new_tablet->tablet_id(), new_tablet);
1333
0
      }
1334
0
      VLOG_WITH_FUNC(3) << "Prepared creation of " << new_tablets.size()
1335
0
                        << " new tablets for table " << table->id();
1336
1337
      // mutex_ is no longer needed, so release by going out of scope.
1338
0
    }
1339
    // The table pb write lock is still held, ensuring that the table state does not change. Later
1340
    // steps, like GetTablets or AddTablets, will acquire/release the TableInfo lock_, but it's
1341
    // probably fine that they are released between the steps since the table pb write lock is held
1342
    // throughout. In other words, there should be no risk that TableInfo tablets_ changes between
1343
    // GetTablets and RemoveTablets.
1344
1345
    // Abort tablet mutations in case of early returns.
1346
0
    ScopedInfoCommitter<TabletInfo> unlocker_new(&new_tablets);
1347
1348
    // Mark old tablets for deletion.
1349
0
    old_tablets = table->GetTablets(IncludeInactive::kTrue);
1350
    // Sort so that locking can be done in a deterministic order.
1351
0
    std::sort(old_tablets.begin(), old_tablets.end(), [](const auto& lhs, const auto& rhs) {
1352
0
      return lhs->tablet_id() < rhs->tablet_id();
1353
0
    });
1354
0
    deletion_msg = Format("Old tablets of table $0 deleted at $1",
1355
0
                          table->id(), LocalTimeAsString());
1356
0
    for (auto& old_tablet : old_tablets) {
1357
0
      old_tablet->mutable_metadata()->StartMutation();
1358
0
      old_tablet->mutable_metadata()->mutable_dirty()->set_state(
1359
0
          SysTabletsEntryPB::DELETED, deletion_msg);
1360
0
    }
1361
0
    VLOG_WITH_FUNC(3) << "Prepared deletion of " << old_tablets.size() << " old tablets for table "
1362
0
                      << table->id();
1363
1364
    // Abort tablet mutations in case of early returns.
1365
0
    ScopedInfoCommitter<TabletInfo> unlocker_old(&old_tablets);
1366
1367
    // Change table's partition schema to the external snapshot's.
1368
0
    auto& table_pb = table_lock.mutable_data()->pb;
1369
0
    table_pb.mutable_partition_schema()->CopyFrom(
1370
0
        table_data->table_entry_pb.partition_schema());
1371
0
    table_pb.set_partition_list_version(table_pb.partition_list_version() + 1);
1372
1373
    // Remove old tablets from TableInfo.
1374
0
    table->RemoveTablets(old_tablets);
1375
    // Add new tablets to TableInfo. This must be done after removing tablets because
1376
    // TableInfo::partitions_ has key PartitionKey, which old and new tablets may conflict on.
1377
0
    table->AddTablets(new_tablets);
1378
1379
    // Commit table and tablets to disk.
1380
0
    RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), table, new_tablets, old_tablets));
1381
0
    VLOG_WITH_FUNC(2) << "Committed to disk: table " << table->id() << " repartition from "
1382
0
                      << old_tablets.size() << " tablets to " << new_tablets.size() << " tablets";
1383
1384
    // Commit to memory. Commit new tablets (addition) first since that doesn't break anything.
1385
    // Commit table next since new tablets are already committed and ready to be referenced. Commit
1386
    // old tablets (deletion) last since the table is not referencing them anymore.
1387
0
    unlocker_new.Commit();
1388
0
    table_lock.Commit();
1389
0
    unlocker_old.Commit();
1390
0
    VLOG_WITH_FUNC(1) << "Committed to memory: table " << table->id() << " repartition from "
1391
0
                      << old_tablets.size() << " tablets to " << new_tablets.size() << " tablets";
1392
0
  }
1393
1394
  // Finally, now that everything is committed, send the delete tablet requests.
1395
0
  for (auto& old_tablet : old_tablets) {
1396
0
    DeleteTabletReplicas(old_tablet.get(), deletion_msg, HideOnly::kFalse);
1397
0
  }
1398
0
  VLOG_WITH_FUNC(2) << "Sent delete tablet requests for " << old_tablets.size() << " old tablets"
1399
0
                    << " of table " << table->id();
1400
  // The create tablet requests should be handled by bg tasks which find the PREPARING tablets after
1401
  // commit.
1402
1403
0
  return Status::OK();
1404
0
}
1405
1406
// Helper function for ImportTableEntry.
1407
//
1408
// Given an internal table and an external table snapshot, do some checks to determine if we should
1409
// move forward with using this internal table for import.
1410
//
1411
// table: internal table's info
1412
// snapshot_data: external table's snapshot data
1413
Result<bool> CatalogManager::CheckTableForImport(scoped_refptr<TableInfo> table,
1414
0
                                                 ExternalTableSnapshotData* snapshot_data) {
1415
0
  auto table_lock = table->LockForRead();
1416
1417
  // Check if table is live.
1418
0
  if (!table_lock->visible_to_client()) {
1419
0
    VLOG_WITH_FUNC(2) << "Table not visible to client: " << table->ToString();
1420
0
    return false;
1421
0
  }
1422
  // Check if table names match.
1423
0
  const string& external_table_name = snapshot_data->table_entry_pb.name();
1424
0
  if (table_lock->name() != external_table_name) {
1425
0
    VLOG_WITH_FUNC(2) << "Table names do not match: "
1426
0
                      << table_lock->name() << " vs " << external_table_name
1427
0
                      << " for " << table->ToString();
1428
0
    return false;
1429
0
  }
1430
  // Check index vs table.
1431
0
  if (snapshot_data->is_index() ? table->indexed_table_id().empty()
1432
0
                                : !table->indexed_table_id().empty()) {
1433
0
    VLOG_WITH_FUNC(2) << "External snapshot table is " << (snapshot_data->is_index() ? "" : "not ")
1434
0
                      << "index but internal table is the opposite: " << table->ToString();
1435
0
    return false;
1436
0
  }
1437
  // Check if table schemas match (if present in snapshot).
1438
0
  if (!snapshot_data->pg_schema_name.empty()) {
1439
0
    if (table->GetTableType() != PGSQL_TABLE_TYPE) {
1440
0
      LOG_WITH_FUNC(DFATAL) << "ExternalTableSnapshotData.pg_schema_name set when table type is not"
1441
0
          << " PGSQL: schema name: " << snapshot_data->pg_schema_name
1442
0
          << ", table type: " << TableType_Name(table->GetTableType());
1443
      // If not a debug build, ignore pg_schema_name.
1444
0
    } else {
1445
0
      const string internal_schema_name = VERIFY_RESULT(GetPgSchemaName(table));
1446
0
      const string& external_schema_name = snapshot_data->pg_schema_name;
1447
0
      if (internal_schema_name != external_schema_name) {
1448
0
        LOG_WITH_FUNC(INFO) << "Schema names do not match: "
1449
0
                            << internal_schema_name << " vs " << external_schema_name
1450
0
                            << " for " << table->ToString();
1451
0
        return false;
1452
0
      }
1453
0
    }
1454
0
  }
1455
1456
0
  return true;
1457
0
}
1458
1459
Status CatalogManager::ImportTableEntry(const NamespaceMap& namespace_map,
1460
                                        const ExternalTableSnapshotDataMap& table_map,
1461
0
                                        ExternalTableSnapshotData* table_data) {
1462
0
  const SysTablesEntryPB& meta = DCHECK_NOTNULL(table_data)->table_entry_pb;
1463
0
  bool is_parent_colocated_table = false;
1464
1465
0
  table_data->old_namespace_id = meta.namespace_id();
1466
0
  LOG_IF(DFATAL, table_data->old_namespace_id.empty()) << "No namespace id";
1467
1468
0
  LOG_IF(DFATAL, namespace_map.find(table_data->old_namespace_id) == namespace_map.end())
1469
0
      << "Namespace not found: " << table_data->old_namespace_id;
1470
0
  const NamespaceId new_namespace_id =
1471
0
      namespace_map.find(table_data->old_namespace_id)->second.first;
1472
0
  LOG_IF(DFATAL, new_namespace_id.empty()) << "No namespace id";
1473
1474
0
  Schema schema;
1475
0
  RETURN_NOT_OK(SchemaFromPB(meta.schema(), &schema));
1476
0
  const vector<ColumnId>& column_ids = schema.column_ids();
1477
0
  scoped_refptr<TableInfo> table;
1478
1479
  // First, check if namespace id and table id match. If, in addition, other properties match, we
1480
  // found the destination table.
1481
0
  if (new_namespace_id == table_data->old_namespace_id) {
1482
0
    TRACE("Looking up table");
1483
0
    {
1484
0
      SharedLock lock(mutex_);
1485
0
      table = FindPtrOrNull(*table_ids_map_, table_data->old_table_id);
1486
0
    }
1487
1488
0
    if (table != nullptr) {
1489
0
      VLOG_WITH_PREFIX(3) << "Begin first search";
1490
      // At this point, namespace id and table id match. Check other properties, like whether the
1491
      // table is active and whether table name matches.
1492
0
      SharedLock lock(mutex_);
1493
0
      if (VERIFY_RESULT(CheckTableForImport(table, table_data))) {
1494
0
        LOG_WITH_FUNC(INFO) << "Found existing table: '" << table->ToString() << "'";
1495
0
      } else {
1496
        // A property did not match, so this search by ids failed.
1497
0
        auto table_lock = table->LockForRead();
1498
0
        LOG_WITH_FUNC(WARNING) << "Existing table " << table->ToString() << " not suitable: "
1499
0
                               << table_lock->pb.ShortDebugString()
1500
0
                               << ", name: " << table->name() << " vs " << meta.name();
1501
0
        table.reset();
1502
0
      }
1503
0
    }
1504
0
  }
1505
1506
  // Second, if we still didn't find a match...
1507
0
  if (table == nullptr) {
1508
0
    VLOG_WITH_PREFIX(3) << "Begin second search";
1509
0
    switch (meta.table_type()) {
1510
0
      case TableType::YQL_TABLE_TYPE: FALLTHROUGH_INTENDED;
1511
0
      case TableType::REDIS_TABLE_TYPE: {
1512
        // For YCQL and YEDIS, simply create the missing table.
1513
0
        RETURN_NOT_OK(RecreateTable(new_namespace_id, table_map, table_data));
1514
0
        break;
1515
0
      }
1516
0
      case TableType::PGSQL_TABLE_TYPE: {
1517
        // For YSQL, the table must be created via external call. Therefore, continue the search for
1518
        // the table, this time checking for name matches rather than id matches.
1519
1520
0
        if (meta.colocated() && IsColocatedParentTableId(table_data->old_table_id)) {
1521
          // For the parent colocated table we need to generate the new_table_id ourselves
1522
          // since the names will not match.
1523
          // For normal colocated tables, we are still able to follow the normal table flow, so no
1524
          // need to generate the new_table_id ourselves.
1525
0
          table_data->new_table_id = new_namespace_id + kColocatedParentTableIdSuffix;
1526
0
          is_parent_colocated_table = true;
1527
0
        } else {
1528
0
          if (!table_data->new_table_id.empty()) {
1529
0
            const string msg = Format(
1530
0
                "$0 expected empty new table id but $1 found", __func__, table_data->new_table_id);
1531
0
            LOG_WITH_FUNC(WARNING) << msg;
1532
0
            return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1533
0
          }
1534
0
          SharedLock lock(mutex_);
1535
1536
0
          for (const auto& entry : *table_ids_map_) {
1537
0
            table = entry.second;
1538
1539
0
            if (new_namespace_id != table->namespace_id()) {
1540
0
              VLOG_WITH_FUNC(3) << "Namespace ids do not match: "
1541
0
                                << table->namespace_id() << " vs " << new_namespace_id
1542
0
                                << " for " << table->ToString();
1543
0
              continue;
1544
0
            }
1545
0
            if (!VERIFY_RESULT(CheckTableForImport(table, table_data))) {
1546
              // Some other check failed.
1547
0
              continue;
1548
0
            }
1549
            // Also check if table is user-created.
1550
0
            if (!IsUserCreatedTableUnlocked(*table)) {
1551
0
              VLOG_WITH_FUNC(2) << "Table not user created: " << table->ToString();
1552
0
              continue;
1553
0
            }
1554
1555
            // Found the new YSQL table by name.
1556
0
            if (table_data->new_table_id.empty()) {
1557
0
              LOG_WITH_FUNC(INFO) << "Found existing table " << entry.first << " for "
1558
0
                                  << new_namespace_id << "/" << meta.name() << " (old table "
1559
0
                                  << table_data->old_table_id << ") with schema "
1560
0
                                  << table_data->pg_schema_name;
1561
0
              table_data->new_table_id = entry.first;
1562
0
            } else if (table_data->new_table_id != entry.first) {
1563
0
              const string msg = Format(
1564
0
                  "Found 2 YSQL tables with the same name: $0 - $1, $2",
1565
0
                  meta.name(), table_data->new_table_id, entry.first);
1566
0
              LOG_WITH_FUNC(WARNING) << msg;
1567
0
              return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1568
0
            }
1569
0
          }
1570
1571
0
          if (table_data->new_table_id.empty()) {
1572
0
            const string msg = Format("YSQL table not found: $0", meta.name());
1573
0
            LOG_WITH_FUNC(WARNING) << msg;
1574
0
            return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
1575
0
          }
1576
0
        }
1577
0
        break;
1578
0
      }
1579
0
      case TableType::TRANSACTION_STATUS_TABLE_TYPE: {
1580
0
        return STATUS(
1581
0
            InvalidArgument,
1582
0
            Format("Unexpected table type: $0", TableType_Name(meta.table_type())),
1583
0
            MasterError(MasterErrorPB::INVALID_TABLE_TYPE));
1584
0
      }
1585
0
    }
1586
0
  } else {
1587
0
    table_data->new_table_id = table_data->old_table_id;
1588
0
    LOG_WITH_FUNC(INFO) << "Use existing table " << table_data->new_table_id;
1589
0
  }
1590
1591
  // The destination table should be found or created by now.
1592
0
  TRACE("Looking up new table");
1593
0
  {
1594
0
    SharedLock lock(mutex_);
1595
0
    table = FindPtrOrNull(*table_ids_map_, table_data->new_table_id);
1596
0
  }
1597
0
  if (table == nullptr) {
1598
0
    const string msg = Format("Created table not found: $0", table_data->new_table_id);
1599
0
    LOG_WITH_FUNC(WARNING) << msg;
1600
0
    return STATUS(InternalError, msg, MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
1601
0
  }
1602
1603
  // Don't do schema validation/column updates on the parent colocated table.
1604
  // However, still do the validation for regular colocated tables.
1605
0
  if (!is_parent_colocated_table) {
1606
0
    Schema persisted_schema;
1607
0
    size_t new_num_tablets = 0;
1608
0
    {
1609
0
      TRACE("Locking table");
1610
0
      auto table_lock = table->LockForRead();
1611
0
      RETURN_NOT_OK(table->GetSchema(&persisted_schema));
1612
0
      new_num_tablets = table->NumPartitions();
1613
0
    }
1614
1615
    // Ignore 'nullable' attribute - due to difference in implementation
1616
    // of PgCreateTable::AddColumn() and PgAlterTable::AddColumn().
1617
0
    struct CompareColumnsExceptNullable {
1618
0
      bool operator ()(const ColumnSchema& a, const ColumnSchema& b) {
1619
0
        return ColumnSchema::CompHashKey(a, b) && ColumnSchema::CompSortingType(a, b) &&
1620
0
            ColumnSchema::CompTypeInfo(a, b) && ColumnSchema::CompName(a, b);
1621
0
      }
1622
0
    } comparator;
1623
    // Schema::Equals() compares only column names & types. It does not compare the column ids.
1624
0
    if (!persisted_schema.Equals(schema, comparator)
1625
0
        || persisted_schema.column_ids().size() != column_ids.size()) {
1626
0
      const string msg = Format(
1627
0
          "Invalid created $0 table '$1' in namespace id $2: schema={$3}, expected={$4}",
1628
0
          TableType_Name(meta.table_type()), meta.name(), new_namespace_id,
1629
0
          persisted_schema, schema);
1630
0
      LOG_WITH_FUNC(WARNING) << msg;
1631
0
      return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1632
0
    }
1633
1634
0
    if (table_data->num_tablets > 0) {
1635
0
      if (meta.table_type() == TableType::PGSQL_TABLE_TYPE) {
1636
0
        bool partitions_match = true;
1637
0
        if (new_num_tablets != table_data->num_tablets) {
1638
0
          partitions_match = false;
1639
0
        } else {
1640
          // Check if partition boundaries match.  Only check the starts; assume the ends are fine.
1641
0
          size_t i = 0;
1642
0
          vector<PartitionKey> partition_starts(table_data->num_tablets);
1643
0
          for (const auto& partition_pb : table_data->partitions) {
1644
0
            partition_starts[i] = partition_pb.partition_key_start();
1645
0
            LOG_IF(DFATAL, (i == 0) ? partition_starts[i] != ""
1646
0
                                    : partition_starts[i] <= partition_starts[i-1])
1647
0
                << "Wrong partition key start: " << b2a_hex(partition_starts[i]);
1648
0
            i++;
1649
0
          }
1650
0
          if (!table->HasPartitions(partition_starts)) {
1651
0
            LOG_WITH_FUNC(INFO) << "Partition boundaries mismatch for table " << table->id();
1652
0
            partitions_match = false;
1653
0
          }
1654
0
        }
1655
1656
0
        if (!partitions_match) {
1657
0
          RETURN_NOT_OK(RepartitionTable(table, table_data));
1658
0
        }
1659
0
      } else { // not PGSQL_TABLE_TYPE
1660
0
        if (new_num_tablets != table_data->num_tablets) {
1661
0
          const string msg = Format(
1662
0
              "Wrong number of tablets in created $0 table '$1' in namespace id $2:"
1663
0
              " $3 (expected $4)",
1664
0
              TableType_Name(meta.table_type()), meta.name(), new_namespace_id,
1665
0
              new_num_tablets, table_data->num_tablets);
1666
0
          LOG_WITH_FUNC(WARNING) << msg;
1667
0
          return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1668
0
        }
1669
0
      }
1670
0
    }
1671
1672
    // Update the table column ids if it's not equal to the stored ids.
1673
0
    if (persisted_schema.column_ids() != column_ids) {
1674
0
      if (meta.table_type() != TableType::PGSQL_TABLE_TYPE) {
1675
0
        LOG_WITH_FUNC(WARNING) << "Unexpected wrong column ids in "
1676
0
                               << TableType_Name(meta.table_type()) << " table '" << meta.name()
1677
0
                               << "' in namespace id " << new_namespace_id;
1678
0
      }
1679
1680
0
      LOG_WITH_FUNC(INFO) << "Restoring column ids in " << TableType_Name(meta.table_type())
1681
0
                          << " table '" << meta.name() << "' in namespace id "
1682
0
                          << new_namespace_id;
1683
0
      auto l = table->LockForWrite();
1684
0
      size_t col_idx = 0;
1685
0
      for (auto& column : *l.mutable_data()->pb.mutable_schema()->mutable_columns()) {
1686
        // Expecting here correct schema (columns - order, names, types), but with only wrong
1687
        // column ids. Checking correct column order and column names below.
1688
0
        if (column.name() != schema.column(col_idx).name()) {
1689
0
          const string msg = Format(
1690
0
              "Unexpected column name for index=$0: name=$1, expected name=$2",
1691
0
              col_idx, schema.column(col_idx).name(), column.name());
1692
0
          LOG_WITH_FUNC(WARNING) << msg;
1693
0
          return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1694
0
        }
1695
        // Copy the column id from imported (original) schema.
1696
0
        column.set_id(column_ids[col_idx++]);
1697
0
      }
1698
1699
0
      l.mutable_data()->pb.set_next_column_id(schema.max_col_id() + 1);
1700
0
      l.mutable_data()->pb.set_version(l->pb.version() + 1);
1701
      // Update sys-catalog with the new table schema.
1702
0
      RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), table));
1703
0
      l.Commit();
1704
      // Update the new table schema in tablets.
1705
0
      RETURN_NOT_OK(SendAlterTableRequest(table));
1706
0
    }
1707
0
  }
1708
1709
  // Set the type of the table in the response pb (default is TABLE so only set if colocated).
1710
0
  if (meta.colocated()) {
1711
0
    if (is_parent_colocated_table) {
1712
0
      table_data->table_meta->set_table_type(
1713
0
          ImportSnapshotMetaResponsePB_TableType_PARENT_COLOCATED_TABLE);
1714
0
    } else {
1715
0
      table_data->table_meta->set_table_type(
1716
0
          ImportSnapshotMetaResponsePB_TableType_COLOCATED_TABLE);
1717
0
    }
1718
0
  }
1719
1720
0
  TabletInfos new_tablets;
1721
0
  {
1722
0
    TRACE("Locking table");
1723
0
    auto table_lock = table->LockForRead();
1724
0
    new_tablets = table->GetTablets();
1725
0
  }
1726
1727
0
  for (const scoped_refptr<TabletInfo>& tablet : new_tablets) {
1728
0
    auto tablet_lock = tablet->LockForRead();
1729
0
    const PartitionPB& partition_pb = tablet->metadata().state().pb.partition();
1730
0
    const ExternalTableSnapshotData::PartitionKeys key(
1731
0
        partition_pb.partition_key_start(), partition_pb.partition_key_end());
1732
0
    table_data->new_tablets_map[key] = tablet->id();
1733
0
  }
1734
1735
0
  IdPairPB* const namespace_ids = table_data->table_meta->mutable_namespace_ids();
1736
0
  namespace_ids->set_new_id(new_namespace_id);
1737
0
  namespace_ids->set_old_id(table_data->old_namespace_id);
1738
1739
0
  IdPairPB* const table_ids = table_data->table_meta->mutable_table_ids();
1740
0
  table_ids->set_new_id(table_data->new_table_id);
1741
0
  table_ids->set_old_id(table_data->old_table_id);
1742
1743
0
  return Status::OK();
1744
0
}
1745
1746
Status CatalogManager::PreprocessTabletEntry(const SysRowEntry& entry,
1747
0
                                             ExternalTableSnapshotDataMap* table_map) {
1748
0
  LOG_IF(DFATAL, entry.type() != SysRowEntryType::TABLET)
1749
0
      << "Unexpected entry type: " << entry.type();
1750
1751
0
  SysTabletsEntryPB meta = VERIFY_RESULT(ParseFromSlice<SysTabletsEntryPB>(entry.data()));
1752
1753
0
  ExternalTableSnapshotData& table_data = (*table_map)[meta.table_id()];
1754
0
  ++table_data.num_tablets;
1755
0
  if (meta.has_partition()) {
1756
0
    table_data.partitions.push_back(meta.partition());
1757
0
  }
1758
0
  return Status::OK();
1759
0
}
1760
1761
Status CatalogManager::ImportTabletEntry(const SysRowEntry& entry,
1762
0
                                         ExternalTableSnapshotDataMap* table_map) {
1763
0
  LOG_IF(DFATAL, entry.type() != SysRowEntryType::TABLET)
1764
0
      << "Unexpected entry type: " << entry.type();
1765
1766
0
  SysTabletsEntryPB meta = VERIFY_RESULT(ParseFromSlice<SysTabletsEntryPB>(entry.data()));
1767
1768
0
  LOG_IF(DFATAL, table_map->find(meta.table_id()) == table_map->end())
1769
0
      << "Table not found: " << meta.table_id();
1770
0
  ExternalTableSnapshotData& table_data = (*table_map)[meta.table_id()];
1771
1772
0
  if (meta.colocated() && table_data.tablet_id_map->size() >= 1) {
1773
0
    LOG_WITH_FUNC(INFO) << "Already processed this colocated tablet: " << entry.id();
1774
0
    return Status::OK();
1775
0
  }
1776
1777
  // Update tablets IDs map.
1778
0
  if (table_data.new_table_id == table_data.old_table_id) {
1779
0
    TRACE("Looking up tablet");
1780
0
    SharedLock lock(mutex_);
1781
0
    scoped_refptr<TabletInfo> tablet = FindPtrOrNull(*tablet_map_, entry.id());
1782
1783
0
    if (tablet != nullptr) {
1784
0
      IdPairPB* const pair = table_data.tablet_id_map->Add();
1785
0
      pair->set_old_id(entry.id());
1786
0
      pair->set_new_id(entry.id());
1787
0
      return Status::OK();
1788
0
    }
1789
0
  }
1790
1791
0
  const PartitionPB& partition_pb = meta.partition();
1792
0
  const ExternalTableSnapshotData::PartitionKeys key(
1793
0
      partition_pb.partition_key_start(), partition_pb.partition_key_end());
1794
0
  const ExternalTableSnapshotData::PartitionToIdMap::const_iterator it =
1795
0
      table_data.new_tablets_map.find(key);
1796
1797
0
  if (it == table_data.new_tablets_map.end()) {
1798
0
    const string msg = Format(
1799
0
        "For new table $0 (old table $1, expecting $2 tablets) not found new tablet with "
1800
0
        "expected [$3]", table_data.new_table_id, table_data.old_table_id,
1801
0
        table_data.num_tablets, partition_pb);
1802
0
    LOG_WITH_FUNC(WARNING) << msg;
1803
0
    return STATUS(NotFound, msg, MasterError(MasterErrorPB::INTERNAL_ERROR));
1804
0
  }
1805
1806
0
  IdPairPB* const pair = table_data.tablet_id_map->Add();
1807
0
  pair->set_old_id(entry.id());
1808
0
  pair->set_new_id(it->second);
1809
0
  return Status::OK();
1810
0
}
1811
1812
284
const Schema& CatalogManager::schema() {
1813
284
  return sys_catalog()->schema();
1814
284
}
1815
1816
17
TabletInfos CatalogManager::GetTabletInfos(const std::vector<TabletId>& ids) {
1817
17
  TabletInfos result;
1818
17
  result.reserve(ids.size());
1819
17
  SharedLock lock(mutex_);
1820
45
  for (const auto& id : ids) {
1821
45
    auto it = tablet_map_->find(id);
1822
45
    result.push_back(it != tablet_map_->end() ? it->second : 
nullptr0
);
1823
45
  }
1824
17
  return result;
1825
17
}
1826
1827
AsyncTabletSnapshotOpPtr CatalogManager::CreateAsyncTabletSnapshotOp(
1828
    const TabletInfoPtr& tablet, const std::string& snapshot_id,
1829
    tserver::TabletSnapshotOpRequestPB::Operation operation,
1830
45
    TabletSnapshotOperationCallback callback) {
1831
45
  auto result = std::make_shared<AsyncTabletSnapshotOp>(
1832
45
      master_, AsyncTaskPool(), tablet, snapshot_id, operation);
1833
45
  result->SetCallback(std::move(callback));
1834
45
  tablet->table()->AddTask(result);
1835
45
  return result;
1836
45
}
1837
1838
45
void CatalogManager::ScheduleTabletSnapshotOp(const AsyncTabletSnapshotOpPtr& task) {
1839
45
  WARN_NOT_OK(ScheduleTask(task), "Failed to send create snapshot request");
1840
45
}
1841
1842
Status CatalogManager::RestoreSysCatalog(
1843
9
    SnapshotScheduleRestoration* restoration, tablet::Tablet* tablet, Status* complete_status) {
1844
9
  
VLOG_WITH_PREFIX_AND_FUNC0
(1) << restoration->restoration_id0
;
1845
  // Restore master snapshot and load it to RocksDB.
1846
9
  auto dir = VERIFY_RESULT(tablet->snapshots().RestoreToTemporary(
1847
9
      restoration->snapshot_id, restoration->restore_at));
1848
0
  rocksdb::Options rocksdb_options;
1849
9
  std::string log_prefix = LogPrefix();
1850
  // Remove ": " to patch suffix.
1851
9
  log_prefix.erase(log_prefix.size() - 2);
1852
9
  tablet->InitRocksDBOptions(&rocksdb_options, log_prefix + " [TMP]: ");
1853
9
  auto db = VERIFY_RESULT(rocksdb::DB::Open(rocksdb_options, dir));
1854
1855
0
  auto doc_db = docdb::DocDB::FromRegularUnbounded(db.get());
1856
1857
  // Load objects to restore and determine obsolete objects.
1858
9
  RestoreSysCatalogState state(restoration);
1859
9
  RETURN_NOT_OK(state.LoadRestoringObjects(schema(), doc_db));
1860
  // Load existing objects from RocksDB because on followers they are NOT present in loaded sys
1861
  // catalog state.
1862
9
  RETURN_NOT_OK(state.LoadExistingObjects(schema(), tablet->doc_db()));
1863
9
  RETURN_NOT_OK(state.Process());
1864
1865
9
  docdb::DocWriteBatch write_batch(
1866
9
      tablet->doc_db(), docdb::InitMarkerBehavior::kOptional);
1867
1868
9
  if (FLAGS_enable_ysql) {
1869
    // Restore the pg_catalog tables.
1870
6
    const auto* meta = tablet->metadata();
1871
6
    const auto& pg_yb_catalog_version_schema =
1872
6
        *VERIFY_RESULT(meta->GetTableInfo(kPgYbCatalogVersionTableId))->schema;
1873
6
    RETURN_NOT_OK(state.ProcessPgCatalogRestores(
1874
6
        pg_yb_catalog_version_schema, doc_db, tablet->doc_db(), &write_batch));
1875
6
  }
1876
1877
  // Restore the other tables.
1878
9
  RETURN_NOT_OK(state.PrepareWriteBatch(schema(), &write_batch));
1879
1880
  // Apply write batch to RocksDB.
1881
9
  state.WriteToRocksDB(&write_batch, restoration->write_time, restoration->op_id, tablet);
1882
1883
  // TODO(pitr) Handle master leader failover.
1884
9
  RETURN_NOT_OK(ElectedAsLeaderCb());
1885
1886
9
  return Status::OK();
1887
9
}
1888
1889
3
Status CatalogManager::VerifyRestoredObjects(const SnapshotScheduleRestoration& restoration) {
1890
3
  auto entries = VERIFY_RESULT(CollectEntriesForSnapshot(
1891
3
      restoration.schedules[0].second.tables().tables()));
1892
0
  auto objects_to_restore = restoration.non_system_objects_to_restore;
1893
3
  
VLOG_WITH_PREFIX0
(1) << "Objects to restore: " << AsString(objects_to_restore)0
;
1894
17
  for (const auto& entry : entries.entries()) {
1895
17
    
VLOG_WITH_PREFIX0
(1)
1896
0
        << "Alive " << SysRowEntryType_Name(entry.type()) << ": " << entry.id();
1897
17
    auto it = objects_to_restore.find(entry.id());
1898
17
    if (it == objects_to_restore.end()) {
1899
0
      return STATUS_FORMAT(IllegalState, "Object $0/$1 present, but should not be restored",
1900
0
                           SysRowEntryType_Name(entry.type()), entry.id());
1901
0
    }
1902
17
    if (it->second != entry.type()) {
1903
0
      return STATUS_FORMAT(
1904
0
          IllegalState, "Restored object $0 has wrong type $1, while $2 expected",
1905
0
          entry.id(), SysRowEntryType_Name(entry.type()), SysRowEntryType_Name(it->second));
1906
0
    }
1907
17
    objects_to_restore.erase(it);
1908
17
  }
1909
3
  for (const auto& id_and_type : objects_to_restore) {
1910
0
    return STATUS_FORMAT(
1911
0
        IllegalState, "Expected to restore $0/$1, but it is not present after restoration",
1912
0
        SysRowEntryType_Name(id_and_type.second), id_and_type.first);
1913
0
  }
1914
3
  return Status::OK();
1915
3
}
1916
1917
310k
void CatalogManager::CleanupHiddenObjects(const ScheduleMinRestoreTime& schedule_min_restore_time) {
1918
310k
  
VLOG_WITH_PREFIX_AND_FUNC1
(4) << AsString(schedule_min_restore_time)1
;
1919
1920
310k
  std::vector<TabletInfoPtr> hidden_tablets;
1921
310k
  std::vector<TableInfoPtr> tables;
1922
310k
  {
1923
310k
    SharedLock lock(mutex_);
1924
310k
    hidden_tablets = hidden_tablets_;
1925
310k
    tables.reserve(table_ids_map_->size());
1926
17.1M
    for (const auto& p : *table_ids_map_) {
1927
17.1M
      if (!p.second->is_system()) {
1928
291k
        tables.push_back(p.second);
1929
291k
      }
1930
17.1M
    }
1931
310k
  }
1932
310k
  CleanupHiddenTablets(hidden_tablets, schedule_min_restore_time);
1933
310k
  CleanupHiddenTables(std::move(tables));
1934
310k
}
1935
1936
void CatalogManager::CleanupHiddenTablets(
1937
    const std::vector<TabletInfoPtr>& hidden_tablets,
1938
310k
    const ScheduleMinRestoreTime& schedule_min_restore_time) {
1939
310k
  if (hidden_tablets.empty()) {
1940
310k
    return;
1941
310k
  }
1942
54
  std::vector<TabletInfoPtr> tablets_to_delete;
1943
54
  std::vector<TabletInfoPtr> tablets_to_remove_from_hidden;
1944
1945
162
  for (const auto& tablet : hidden_tablets) {
1946
162
    auto lock = tablet->LockForRead();
1947
162
    if (!lock->ListedAsHidden()) {
1948
0
      tablets_to_remove_from_hidden.push_back(tablet);
1949
0
      continue;
1950
0
    }
1951
162
    auto hide_hybrid_time = HybridTime::FromPB(lock->pb.hide_hybrid_time());
1952
162
    bool cleanup = true;
1953
162
    for (const auto& schedule_id_str : lock->pb.retained_by_snapshot_schedules()) {
1954
162
      auto schedule_id = TryFullyDecodeSnapshotScheduleId(schedule_id_str);
1955
162
      auto it = schedule_min_restore_time.find(schedule_id);
1956
      // If schedule is not present in schedule_min_restore_time then it means that schedule
1957
      // was deleted, so it should not retain the tablet.
1958
162
      if (it != schedule_min_restore_time.end() && it->second <= hide_hybrid_time) {
1959
162
        
VLOG_WITH_PREFIX0
(1)
1960
0
            << "Retaining tablet: " << tablet->tablet_id() << ", hide hybrid time: "
1961
0
            << hide_hybrid_time << ", because of schedule: " << schedule_id
1962
0
            << ", min restore time: " << it->second;
1963
162
        cleanup = false;
1964
162
        break;
1965
162
      }
1966
162
    }
1967
162
    if (cleanup) {
1968
0
      SharedLock read_lock(mutex_);
1969
0
      if (IsTableCdcProducer(*tablet->table())) {
1970
        // We also need to check if this tablet is being kept for xcluster replication.
1971
0
        auto l = tablet->table()->LockForRead();
1972
0
        cleanup = hide_hybrid_time.AddSeconds(l->pb.wal_retention_secs()) < master_->clock()->Now();
1973
0
      }
1974
0
    }
1975
162
    if (cleanup) {
1976
0
      tablets_to_delete.push_back(tablet);
1977
0
    }
1978
162
  }
1979
54
  if (!tablets_to_delete.empty()) {
1980
0
    LOG_WITH_PREFIX(INFO) << "Cleanup hidden tablets: " << AsString(tablets_to_delete);
1981
0
    WARN_NOT_OK(DeleteTabletListAndSendRequests(tablets_to_delete, "Cleanup hidden tablets", {}),
1982
0
                "Failed to cleanup hidden tablets");
1983
0
  }
1984
1985
54
  if (!tablets_to_remove_from_hidden.empty()) {
1986
0
    auto it = tablets_to_remove_from_hidden.begin();
1987
0
    LockGuard lock(mutex_);
1988
    // Order of tablets in tablets_to_remove_from_hidden matches order in hidden_tablets_,
1989
    // so we could avoid searching in tablets_to_remove_from_hidden.
1990
0
    auto filter = [&it, end = tablets_to_remove_from_hidden.end()](const TabletInfoPtr& tablet) {
1991
0
      if (it != end && tablet.get() == it->get()) {
1992
0
        ++it;
1993
0
        return true;
1994
0
      }
1995
0
      return false;
1996
0
    };
1997
0
    hidden_tablets_.erase(std::remove_if(hidden_tablets_.begin(), hidden_tablets_.end(), filter),
1998
0
                          hidden_tablets_.end());
1999
0
  }
2000
54
}
2001
2002
310k
void CatalogManager::CleanupHiddenTables(std::vector<TableInfoPtr> tables) {
2003
310k
  std::vector<TableInfo::WriteLock> locks;
2004
310k
  EraseIf([this, &locks](const TableInfoPtr& table) {
2005
291k
    {
2006
291k
      auto lock = table->LockForRead();
2007
291k
      if (!lock->is_hidden() || 
lock->started_deleting()53
||
!table->AreAllTabletsDeleted()53
) {
2008
291k
        return true;
2009
291k
      }
2010
291k
    }
2011
0
    auto lock = table->LockForWrite();
2012
0
    if (lock->started_deleting()) {
2013
0
      return true;
2014
0
    }
2015
0
    LOG_WITH_PREFIX(INFO) << "Should delete table: " << AsString(table);
2016
0
    lock.mutable_data()->set_state(
2017
0
        SysTablesEntryPB::DELETED, Format("Cleanup hidden table at $0", LocalTimeAsString()));
2018
0
    locks.push_back(std::move(lock));
2019
0
    return false;
2020
0
  }, &tables);
2021
310k
  if (tables.empty()) {
2022
310k
    return;
2023
310k
  }
2024
2025
0
  Status s = sys_catalog_->Upsert(leader_ready_term(), tables);
2026
0
  if (!s.ok()) {
2027
0
    LOG_WITH_PREFIX(WARNING) << "Failed to mark tables as deleted: " << s;
2028
0
    return;
2029
0
  }
2030
0
  for (auto& lock : locks) {
2031
0
    lock.Commit();
2032
0
  }
2033
0
}
2034
2035
7.94k
rpc::Scheduler& CatalogManager::Scheduler() {
2036
7.94k
  return master_->messenger()->scheduler();
2037
7.94k
}
2038
2039
695k
int64_t CatalogManager::LeaderTerm() {
2040
695k
  auto peer = tablet_peer();
2041
695k
  if (!peer) {
2042
159
    return false;
2043
159
  }
2044
694k
  auto consensus = peer->shared_consensus();
2045
694k
  if (!consensus) {
2046
2
    return false;
2047
2
  }
2048
694k
  return consensus->GetLeaderState(/* allow_stale= */ true).term;
2049
694k
}
2050
2051
25
void CatalogManager::HandleCreateTabletSnapshotResponse(TabletInfo *tablet, bool error) {
2052
25
  LOG(INFO) << "Handling Create Tablet Snapshot Response for tablet "
2053
25
            << DCHECK_NOTNULL(tablet)->ToString() << (error ? 
" ERROR"0
: " OK");
2054
2055
  // Get the snapshot data descriptor from the catalog manager.
2056
25
  scoped_refptr<SnapshotInfo> snapshot;
2057
25
  {
2058
25
    LockGuard lock(mutex_);
2059
25
    TRACE("Acquired catalog manager lock");
2060
2061
25
    if (current_snapshot_id_.empty()) {
2062
25
      LOG(WARNING) << "No active snapshot: " << current_snapshot_id_;
2063
25
      return;
2064
25
    }
2065
2066
0
    snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, current_snapshot_id_);
2067
2068
0
    if (!snapshot) {
2069
0
      LOG(WARNING) << "Snapshot not found: " << current_snapshot_id_;
2070
0
      return;
2071
0
    }
2072
0
  }
2073
2074
0
  if (!snapshot->IsCreateInProgress()) {
2075
0
    LOG(WARNING) << "Snapshot is not in creating state: " << snapshot->id();
2076
0
    return;
2077
0
  }
2078
2079
0
  auto tablet_l = tablet->LockForRead();
2080
0
  auto l = snapshot->LockForWrite();
2081
0
  RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots =
2082
0
      l.mutable_data()->pb.mutable_tablet_snapshots();
2083
0
  int num_tablets_complete = 0;
2084
2085
0
  for (int i = 0; i < tablet_snapshots->size(); ++i) {
2086
0
    SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i);
2087
2088
0
    if (tablet_info->id() == tablet->id()) {
2089
0
      tablet_info->set_state(error ? SysSnapshotEntryPB::FAILED : SysSnapshotEntryPB::COMPLETE);
2090
0
    }
2091
2092
0
    if (tablet_info->state() == SysSnapshotEntryPB::COMPLETE) {
2093
0
      ++num_tablets_complete;
2094
0
    }
2095
0
  }
2096
2097
  // Finish the snapshot.
2098
0
  bool finished = true;
2099
0
  if (error) {
2100
0
    l.mutable_data()->pb.set_state(SysSnapshotEntryPB::FAILED);
2101
0
    LOG(WARNING) << "Failed snapshot " << snapshot->id() << " on tablet " << tablet->id();
2102
0
  } else if (num_tablets_complete == tablet_snapshots->size()) {
2103
0
    l.mutable_data()->pb.set_state(SysSnapshotEntryPB::COMPLETE);
2104
0
    LOG(INFO) << "Completed snapshot " << snapshot->id();
2105
0
  } else {
2106
0
    finished = false;
2107
0
  }
2108
2109
0
  if (finished) {
2110
0
    LockGuard lock(mutex_);
2111
0
    TRACE("Acquired catalog manager lock");
2112
0
    current_snapshot_id_ = "";
2113
0
  }
2114
2115
0
  VLOG(1) << "Snapshot: " << snapshot->id()
2116
0
          << " PB: " << l.mutable_data()->pb.DebugString()
2117
0
          << " Complete " << num_tablets_complete << " tablets from " << tablet_snapshots->size();
2118
2119
0
  const Status s = sys_catalog_->Upsert(leader_ready_term(), snapshot);
2120
2121
0
  l.CommitOrWarn(s, "updating snapshot in sys-catalog");
2122
0
}
2123
2124
10
void CatalogManager::HandleRestoreTabletSnapshotResponse(TabletInfo *tablet, bool error) {
2125
10
  LOG(INFO) << "Handling Restore Tablet Snapshot Response for tablet "
2126
10
            << DCHECK_NOTNULL(tablet)->ToString() << (error ? 
" ERROR"0
: " OK");
2127
2128
  // Get the snapshot data descriptor from the catalog manager.
2129
10
  scoped_refptr<SnapshotInfo> snapshot;
2130
10
  {
2131
10
    LockGuard lock(mutex_);
2132
10
    TRACE("Acquired catalog manager lock");
2133
2134
10
    if (current_snapshot_id_.empty()) {
2135
10
      LOG(WARNING) << "No restoring snapshot: " << current_snapshot_id_;
2136
10
      return;
2137
10
    }
2138
2139
0
    snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, current_snapshot_id_);
2140
2141
0
    if (!snapshot) {
2142
0
      LOG(WARNING) << "Restoring snapshot not found: " << current_snapshot_id_;
2143
0
      return;
2144
0
    }
2145
0
  }
2146
2147
0
  if (!snapshot->IsRestoreInProgress()) {
2148
0
    LOG(WARNING) << "Snapshot is not in restoring state: " << snapshot->id();
2149
0
    return;
2150
0
  }
2151
2152
0
  auto tablet_l = tablet->LockForRead();
2153
0
  auto l = snapshot->LockForWrite();
2154
0
  RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots =
2155
0
      l.mutable_data()->pb.mutable_tablet_snapshots();
2156
0
  int num_tablets_complete = 0;
2157
2158
0
  for (int i = 0; i < tablet_snapshots->size(); ++i) {
2159
0
    SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i);
2160
2161
0
    if (tablet_info->id() == tablet->id()) {
2162
0
      tablet_info->set_state(error ? SysSnapshotEntryPB::FAILED : SysSnapshotEntryPB::COMPLETE);
2163
0
    }
2164
2165
0
    if (tablet_info->state() == SysSnapshotEntryPB::COMPLETE) {
2166
0
      ++num_tablets_complete;
2167
0
    }
2168
0
  }
2169
2170
  // Finish the snapshot.
2171
0
  if (error || num_tablets_complete == tablet_snapshots->size()) {
2172
0
    if (error) {
2173
0
      l.mutable_data()->pb.set_state(SysSnapshotEntryPB::FAILED);
2174
0
      LOG(WARNING) << "Failed restoring snapshot " << snapshot->id()
2175
0
                   << " on tablet " << tablet->id();
2176
0
    } else {
2177
0
      LOG_IF(DFATAL, num_tablets_complete != tablet_snapshots->size())
2178
0
          << "Wrong number of tablets";
2179
0
      l.mutable_data()->pb.set_state(SysSnapshotEntryPB::COMPLETE);
2180
0
      LOG(INFO) << "Restored snapshot " << snapshot->id();
2181
0
    }
2182
2183
0
    LockGuard lock(mutex_);
2184
0
    TRACE("Acquired catalog manager lock");
2185
0
    current_snapshot_id_ = "";
2186
0
  }
2187
2188
0
  VLOG(1) << "Snapshot: " << snapshot->id()
2189
0
          << " PB: " << l.mutable_data()->pb.DebugString()
2190
0
          << " Complete " << num_tablets_complete << " tablets from " << tablet_snapshots->size();
2191
2192
0
  const Status s = sys_catalog_->Upsert(leader_ready_term(), snapshot);
2193
2194
0
  l.CommitOrWarn(s, "updating snapshot in sys-catalog");
2195
0
}
2196
2197
void CatalogManager::HandleDeleteTabletSnapshotResponse(
2198
0
    const SnapshotId& snapshot_id, TabletInfo *tablet, bool error) {
2199
0
  LOG(INFO) << "Handling Delete Tablet Snapshot Response for tablet "
2200
0
            << DCHECK_NOTNULL(tablet)->ToString() << (error ? "  ERROR" : "  OK");
2201
2202
  // Get the snapshot data descriptor from the catalog manager.
2203
0
  scoped_refptr<SnapshotInfo> snapshot;
2204
0
  {
2205
0
    LockGuard lock(mutex_);
2206
0
    TRACE("Acquired catalog manager lock");
2207
2208
0
    snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, snapshot_id);
2209
2210
0
    if (!snapshot) {
2211
0
      LOG(WARNING) << __func__ << " Snapshot not found: " << snapshot_id;
2212
0
      return;
2213
0
    }
2214
0
  }
2215
2216
0
  if (!snapshot->IsDeleteInProgress()) {
2217
0
    LOG(WARNING) << "Snapshot is not in deleting state: " << snapshot->id();
2218
0
    return;
2219
0
  }
2220
2221
0
  auto tablet_l = tablet->LockForRead();
2222
0
  auto l = snapshot->LockForWrite();
2223
0
  RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots =
2224
0
      l.mutable_data()->pb.mutable_tablet_snapshots();
2225
0
  int num_tablets_complete = 0;
2226
2227
0
  for (int i = 0; i < tablet_snapshots->size(); ++i) {
2228
0
    SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i);
2229
2230
0
    if (tablet_info->id() == tablet->id()) {
2231
0
      tablet_info->set_state(error ? SysSnapshotEntryPB::FAILED : SysSnapshotEntryPB::DELETED);
2232
0
    }
2233
2234
0
    if (tablet_info->state() != SysSnapshotEntryPB::DELETING) {
2235
0
      ++num_tablets_complete;
2236
0
    }
2237
0
  }
2238
2239
0
  if (num_tablets_complete == tablet_snapshots->size()) {
2240
    // Delete the snapshot.
2241
0
    l.mutable_data()->pb.set_state(SysSnapshotEntryPB::DELETED);
2242
0
    LOG(INFO) << "Deleted snapshot " << snapshot->id();
2243
2244
0
    const Status s = sys_catalog_->Delete(leader_ready_term(), snapshot);
2245
2246
0
    LockGuard lock(mutex_);
2247
0
    TRACE("Acquired catalog manager lock");
2248
2249
0
    if (current_snapshot_id_ == snapshot_id) {
2250
0
      current_snapshot_id_ = "";
2251
0
    }
2252
2253
    // Remove it from the maps.
2254
0
    TRACE("Removing from maps");
2255
0
    if (non_txn_snapshot_ids_map_.erase(snapshot_id) < 1) {
2256
0
      LOG(WARNING) << "Could not remove snapshot " << snapshot_id << " from map";
2257
0
    }
2258
2259
0
    l.CommitOrWarn(s, "deleting snapshot from sys-catalog");
2260
0
  } else if (error) {
2261
0
    l.mutable_data()->pb.set_state(SysSnapshotEntryPB::FAILED);
2262
0
    LOG(WARNING) << "Failed snapshot " << snapshot->id() << " deletion on tablet " << tablet->id();
2263
2264
0
    const Status s = sys_catalog_->Upsert(leader_ready_term(), snapshot);
2265
0
    l.CommitOrWarn(s, "updating snapshot in sys-catalog");
2266
0
  }
2267
2268
0
  VLOG(1) << "Deleting snapshot: " << snapshot->id()
2269
0
          << " PB: " << l.mutable_data()->pb.DebugString()
2270
0
          << " Complete " << num_tablets_complete << " tablets from " << tablet_snapshots->size();
2271
0
}
2272
2273
Status CatalogManager::CreateSnapshotSchedule(const CreateSnapshotScheduleRequestPB* req,
2274
                                              CreateSnapshotScheduleResponsePB* resp,
2275
8
                                              rpc::RpcContext* rpc) {
2276
8
  auto id = VERIFY_RESULT(snapshot_coordinator_.CreateSchedule(
2277
8
      *req, leader_ready_term(), rpc->GetClientDeadline()));
2278
0
  resp->set_snapshot_schedule_id(id.data(), id.size());
2279
8
  return Status::OK();
2280
8
}
2281
2282
Status CatalogManager::ListSnapshotSchedules(const ListSnapshotSchedulesRequestPB* req,
2283
                                             ListSnapshotSchedulesResponsePB* resp,
2284
11
                                             rpc::RpcContext* rpc) {
2285
11
  auto snapshot_schedule_id = TryFullyDecodeSnapshotScheduleId(req->snapshot_schedule_id());
2286
2287
11
  return snapshot_coordinator_.ListSnapshotSchedules(snapshot_schedule_id, resp);
2288
11
}
2289
2290
Status CatalogManager::DeleteSnapshotSchedule(const DeleteSnapshotScheduleRequestPB* req,
2291
                                              DeleteSnapshotScheduleResponsePB* resp,
2292
0
                                              rpc::RpcContext* rpc) {
2293
0
  auto snapshot_schedule_id = TryFullyDecodeSnapshotScheduleId(req->snapshot_schedule_id());
2294
2295
0
  return snapshot_coordinator_.DeleteSnapshotSchedule(
2296
0
      snapshot_schedule_id, leader_ready_term(), rpc->GetClientDeadline());
2297
0
}
2298
2299
0
void CatalogManager::DumpState(std::ostream* out, bool on_disk_dump) const {
2300
0
  super::DumpState(out, on_disk_dump);
2301
2302
  // TODO: dump snapshots
2303
0
}
2304
2305
template <typename Registry, typename Mutex>
2306
bool ShouldResendRegistry(
2307
144k
    const std::string& ts_uuid, bool has_registration, Registry* registry, Mutex* mutex) {
2308
144k
  bool should_resend_registry;
2309
144k
  {
2310
144k
    std::lock_guard<Mutex> lock(*mutex);
2311
144k
    auto it = registry->find(ts_uuid);
2312
144k
    should_resend_registry = (it == registry->end() || 
it->second144k
||
has_registration144k
);
2313
144k
    if (it == registry->end()) {
2314
40
      registry->emplace(ts_uuid, false);
2315
144k
    } else {
2316
144k
      it->second = false;
2317
144k
    }
2318
144k
  }
2319
144k
  return should_resend_registry;
2320
144k
}
2321
2322
Status CatalogManager::FillHeartbeatResponse(const TSHeartbeatRequestPB* req,
2323
4.81M
                                             TSHeartbeatResponsePB* resp) {
2324
4.81M
  SysClusterConfigEntryPB cluster_config;
2325
4.81M
  RETURN_NOT_OK(GetClusterConfig(&cluster_config));
2326
4.81M
  RETURN_NOT_OK(FillHeartbeatResponseEncryption(cluster_config, req, resp));
2327
4.81M
  RETURN_NOT_OK(snapshot_coordinator_.FillHeartbeatResponse(resp));
2328
4.81M
  return FillHeartbeatResponseCDC(cluster_config, req, resp);
2329
4.81M
}
2330
2331
Status CatalogManager::FillHeartbeatResponseCDC(const SysClusterConfigEntryPB& cluster_config,
2332
                                                const TSHeartbeatRequestPB* req,
2333
4.81M
                                                TSHeartbeatResponsePB* resp) {
2334
4.81M
  resp->set_cluster_config_version(cluster_config.version());
2335
4.81M
  if (!cluster_config.has_consumer_registry() ||
2336
4.81M
      
req->cluster_config_version() >= cluster_config.version()0
) {
2337
4.81M
    return Status::OK();
2338
4.81M
  }
2339
70
  *resp->mutable_consumer_registry() = cluster_config.consumer_registry();
2340
70
  return Status::OK();
2341
4.81M
}
2342
2343
Status CatalogManager::FillHeartbeatResponseEncryption(
2344
    const SysClusterConfigEntryPB& cluster_config,
2345
    const TSHeartbeatRequestPB* req,
2346
4.81M
    TSHeartbeatResponsePB* resp) {
2347
4.81M
  const auto& ts_uuid = req->common().ts_instance().permanent_uuid();
2348
4.81M
  if (!cluster_config.has_encryption_info() ||
2349
4.81M
      !ShouldResendRegistry(ts_uuid, req->has_registration(), &should_send_universe_key_registry_,
2350
4.81M
                            &should_send_universe_key_registry_mutex_)) {
2351
4.81M
    return Status::OK();
2352
4.81M
  }
2353
2354
519
  const auto& encryption_info = cluster_config.encryption_info();
2355
519
  RETURN_NOT_OK(encryption_manager_->FillHeartbeatResponseEncryption(encryption_info, resp));
2356
2357
519
  return Status::OK();
2358
519
}
2359
2360
void CatalogManager::SetTabletSnapshotsState(SysSnapshotEntryPB::State state,
2361
0
                                             SysSnapshotEntryPB* snapshot_pb) {
2362
0
  RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots =
2363
0
      snapshot_pb->mutable_tablet_snapshots();
2364
2365
0
  for (int i = 0; i < tablet_snapshots->size(); ++i) {
2366
0
    SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i);
2367
0
    tablet_info->set_state(state);
2368
0
  }
2369
0
}
2370
2371
310
Status CatalogManager::CreateCdcStateTableIfNeeded(rpc::RpcContext *rpc) {
2372
  // If CDC state table exists do nothing, otherwise create it.
2373
310
  if (VERIFY_RESULT(TableExists(kSystemNamespaceName, kCdcStateTableName))) {
2374
208
    return Status::OK();
2375
208
  }
2376
  // Set up a CreateTable request internally.
2377
102
  CreateTableRequestPB req;
2378
102
  CreateTableResponsePB resp;
2379
102
  req.set_name(kCdcStateTableName);
2380
102
  req.mutable_namespace_()->set_name(kSystemNamespaceName);
2381
102
  req.set_table_type(TableType::YQL_TABLE_TYPE);
2382
2383
102
  client::YBSchemaBuilder schema_builder;
2384
102
  schema_builder.AddColumn(master::kCdcTabletId)->HashPrimaryKey()->Type(DataType::STRING);
2385
102
  schema_builder.AddColumn(master::kCdcStreamId)->PrimaryKey()->Type(DataType::STRING);
2386
102
  schema_builder.AddColumn(master::kCdcCheckpoint)->Type(DataType::STRING);
2387
102
  schema_builder.AddColumn(master::kCdcData)->Type(QLType::CreateTypeMap(
2388
102
      DataType::STRING, DataType::STRING));
2389
102
  schema_builder.AddColumn(master::kCdcLastReplicationTime)->Type(DataType::TIMESTAMP);
2390
2391
102
  client::YBSchema yb_schema;
2392
102
  CHECK_OK(schema_builder.Build(&yb_schema));
2393
2394
102
  auto schema = yb::client::internal::GetSchema(yb_schema);
2395
102
  SchemaToPB(schema, req.mutable_schema());
2396
  // Explicitly set the number tablets if the corresponding flag is set, otherwise CreateTable
2397
  // will use the same defaults as for regular tables.
2398
102
  if (FLAGS_cdc_state_table_num_tablets > 0) {
2399
4
    req.mutable_schema()->mutable_table_properties()->set_num_tablets(
2400
4
        FLAGS_cdc_state_table_num_tablets);
2401
4
  }
2402
2403
102
  Status s = CreateTable(&req, &resp, rpc);
2404
  // We do not lock here so it is technically possible that the table was already created.
2405
  // If so, there is nothing to do so we just ignore the "AlreadyPresent" error.
2406
102
  if (!s.ok() && 
!s.IsAlreadyPresent()0
) {
2407
0
    return s;
2408
0
  }
2409
102
  return Status::OK();
2410
102
}
2411
2412
0
Status CatalogManager::IsCdcStateTableCreated(IsCreateTableDoneResponsePB* resp) {
2413
0
  IsCreateTableDoneRequestPB req;
2414
2415
0
  req.mutable_table()->set_table_name(kCdcStateTableName);
2416
0
  req.mutable_table()->mutable_namespace_()->set_name(kSystemNamespaceName);
2417
2418
0
  return IsCreateTableDone(&req, resp);
2419
0
}
2420
2421
// Helper class to print a vector of CDCStreamInfo pointers.
2422
namespace {
2423
2424
template<class CDCStreamInfoPointer>
2425
192
std::string JoinStreamsCSVLine(std::vector<CDCStreamInfoPointer> cdc_streams) {
2426
192
  std::vector<CDCStreamId> cdc_stream_ids;
2427
192
  for (const auto& cdc_stream : cdc_streams) {
2428
192
    cdc_stream_ids.push_back(cdc_stream->id());
2429
192
  }
2430
192
  return JoinCSVLine(cdc_stream_ids);
2431
192
}
catalog_manager_ent.cc:std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > yb::master::enterprise::(anonymous namespace)::JoinStreamsCSVLine<scoped_refptr<yb::master::CDCStreamInfo> >(std::__1::vector<scoped_refptr<yb::master::CDCStreamInfo>, std::__1::allocator<scoped_refptr<yb::master::CDCStreamInfo> > >)
Line
Count
Source
2425
2
std::string JoinStreamsCSVLine(std::vector<CDCStreamInfoPointer> cdc_streams) {
2426
2
  std::vector<CDCStreamId> cdc_stream_ids;
2427
2
  for (const auto& cdc_stream : cdc_streams) {
2428
2
    cdc_stream_ids.push_back(cdc_stream->id());
2429
2
  }
2430
2
  return JoinCSVLine(cdc_stream_ids);
2431
2
}
catalog_manager_ent.cc:std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > yb::master::enterprise::(anonymous namespace)::JoinStreamsCSVLine<yb::master::CDCStreamInfo*>(std::__1::vector<yb::master::CDCStreamInfo*, std::__1::allocator<yb::master::CDCStreamInfo*> >)
Line
Count
Source
2425
190
std::string JoinStreamsCSVLine(std::vector<CDCStreamInfoPointer> cdc_streams) {
2426
190
  std::vector<CDCStreamId> cdc_stream_ids;
2427
190
  for (const auto& cdc_stream : cdc_streams) {
2428
190
    cdc_stream_ids.push_back(cdc_stream->id());
2429
190
  }
2430
190
  return JoinCSVLine(cdc_stream_ids);
2431
190
}
2432
2433
} // namespace
2434
2435
5.58k
Status CatalogManager::DeleteCDCStreamsForTable(const TableId& table_id) {
2436
5.58k
  return DeleteCDCStreamsForTables({table_id});
2437
5.58k
}
2438
2439
5.67k
Status CatalogManager::DeleteCDCStreamsForTables(const vector<TableId>& table_ids) {
2440
5.67k
  std::ostringstream tid_stream;
2441
13.6k
  for (const auto& tid : table_ids) {
2442
13.6k
    tid_stream << " " << tid;
2443
13.6k
  }
2444
5.67k
  LOG(INFO) << "Deleting CDC streams for tables:" << tid_stream.str();
2445
2446
5.67k
  std::vector<scoped_refptr<CDCStreamInfo>> streams;
2447
13.6k
  for (const auto& tid : table_ids) {
2448
13.6k
    auto newstreams = FindCDCStreamsForTable(tid);
2449
13.6k
    streams.insert(streams.end(), newstreams.begin(), newstreams.end());
2450
13.6k
  }
2451
2452
5.67k
  if (streams.empty()) {
2453
5.61k
    return Status::OK();
2454
5.61k
  }
2455
2456
  // Do not delete them here, just mark them as DELETING and the catalog manager background thread
2457
  // will handle the deletion.
2458
63
  return MarkCDCStreamsAsDeleting(streams);
2459
5.67k
}
2460
2461
std::vector<scoped_refptr<CDCStreamInfo>> CatalogManager::FindCDCStreamsForTable(
2462
13.8k
    const TableId& table_id) const {
2463
13.8k
  std::vector<scoped_refptr<CDCStreamInfo>> streams;
2464
13.8k
  SharedLock lock(mutex_);
2465
2466
13.8k
  for (const auto& entry : cdc_stream_map_) {
2467
72
    auto ltm = entry.second->LockForRead();
2468
    // for xCluster the first entry will be the table_id
2469
72
    if (ltm->table_id().Get(0) == table_id && 
!ltm->started_deleting()62
) {
2470
62
      streams.push_back(entry.second);
2471
62
    }
2472
72
  }
2473
13.8k
  return streams;
2474
13.8k
}
2475
2476
0
void CatalogManager::GetAllCDCStreams(std::vector<scoped_refptr<CDCStreamInfo>>* streams) {
2477
0
  streams->clear();
2478
0
  SharedLock lock(mutex_);
2479
0
  streams->reserve(cdc_stream_map_.size());
2480
0
  for (const CDCStreamInfoMap::value_type& e : cdc_stream_map_) {
2481
0
    if (!e.second->LockForRead()->is_deleting()) {
2482
0
      streams->push_back(e.second);
2483
0
    }
2484
0
  }
2485
0
}
2486
2487
Status CatalogManager::CreateCDCStream(const CreateCDCStreamRequestPB* req,
2488
                                       CreateCDCStreamResponsePB* resp,
2489
5.50k
                                       rpc::RpcContext* rpc) {
2490
5.50k
  LOG(INFO) << "CreateCDCStream from " << RequestorString(rpc) << ": " << req->ShortDebugString();
2491
2492
5.50k
  std::string id_type_option_value(cdc::kTableId);
2493
2494
22.2k
  for (auto option : req->options()) {
2495
22.2k
    if (option.key() == cdc::kIdType) {
2496
306
      id_type_option_value = option.value();
2497
306
    }
2498
22.2k
  }
2499
2500
2501
5.50k
  if (id_type_option_value != cdc::kNamespaceId) {
2502
5.19k
    scoped_refptr<TableInfo> table = 
VERIFY_RESULT5.19k
(FindTableById(req->table_id()));5.19k
2503
0
    {
2504
5.19k
      auto l = table->LockForRead();
2505
5.19k
      if (l->started_deleting()) {
2506
0
        return STATUS(
2507
0
            NotFound, "Table does not exist", req->table_id(),
2508
0
            MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
2509
0
      }
2510
5.19k
    }
2511
2512
5.19k
    AlterTableRequestPB alter_table_req;
2513
5.19k
    alter_table_req.mutable_table()->set_table_id(req->table_id());
2514
5.19k
    alter_table_req.set_wal_retention_secs(FLAGS_cdc_wal_retention_time_secs);
2515
5.19k
    AlterTableResponsePB alter_table_resp;
2516
5.19k
    Status s = this->AlterTable(&alter_table_req, &alter_table_resp, rpc);
2517
5.19k
    if (!s.ok()) {
2518
0
      return STATUS(InternalError,
2519
0
                    "Unable to change the WAL retention time for table", req->table_id(),
2520
0
                    MasterError(MasterErrorPB::INTERNAL_ERROR));
2521
0
    }
2522
5.19k
  }
2523
2524
5.49k
  scoped_refptr<CDCStreamInfo> stream;
2525
5.49k
  if (!req->has_db_stream_id()) {
2526
310
    {
2527
310
      TRACE("Acquired catalog manager lock");
2528
310
      LockGuard lock(mutex_);
2529
      // Construct the CDC stream if the producer wasn't bootstrapped.
2530
310
      CDCStreamId stream_id;
2531
310
      stream_id = GenerateIdUnlocked(SysRowEntryType::CDC_STREAM);
2532
2533
310
      stream = make_scoped_refptr<CDCStreamInfo>(stream_id);
2534
310
      stream->mutable_metadata()->StartMutation();
2535
310
      SysCDCStreamEntryPB* metadata = &stream->mutable_metadata()->mutable_dirty()->pb;
2536
310
      bool create_namespace = id_type_option_value == cdc::kNamespaceId;
2537
310
      if (create_namespace) {
2538
306
        metadata->set_namespace_id(req->table_id());
2539
306
      } else {
2540
4
        metadata->add_table_id(req->table_id());
2541
4
      }
2542
310
      metadata->mutable_options()->CopyFrom(req->options());
2543
310
      metadata->set_state(
2544
310
          req->has_initial_state() ? 
req->initial_state()306
:
SysCDCStreamEntryPB::ACTIVE4
);
2545
2546
      // Add the stream to the in-memory map.
2547
310
      cdc_stream_map_[stream->id()] = stream;
2548
310
      if (!create_namespace) {
2549
4
        cdc_stream_tables_count_map_[req->table_id()]++;
2550
4
      }
2551
310
      resp->set_stream_id(stream->id());
2552
310
    }
2553
310
    TRACE("Inserted new CDC stream into CatalogManager maps");
2554
2555
    // Update the on-disk system catalog.
2556
310
    RETURN_NOT_OK(CheckLeaderStatusAndSetupError(
2557
310
        sys_catalog_->Upsert(leader_ready_term(), stream),
2558
310
        "inserting CDC stream into sys-catalog", resp));
2559
310
    TRACE("Wrote CDC stream to sys-catalog");
2560
2561
    // Commit the in-memory state.
2562
310
    stream->mutable_metadata()->CommitMutation();
2563
310
    LOG(INFO) << "Created CDC stream " << stream->ToString();
2564
2565
310
    RETURN_NOT_OK(CreateCdcStateTableIfNeeded(rpc));
2566
310
    if (!PREDICT_FALSE(FLAGS_TEST_disable_cdc_state_insert_on_setup) &&
2567
310
        
(306
!req->has_initial_state()306
||
2568
306
         (req->initial_state() == master::SysCDCStreamEntryPB::ACTIVE)) &&
2569
310
        
(id_type_option_value != cdc::kNamespaceId)306
) {
2570
      // Create the cdc state entries for the tablets in this table from scratch since we have no
2571
      // data to bootstrap. If we data to bootstrap, let the BootstrapProducer logic take care of
2572
      // populating entries in cdc_state.
2573
0
      auto ybclient = master_->cdc_state_client_initializer().client();
2574
0
      if (!ybclient) {
2575
0
        return STATUS(IllegalState, "Client not initialized or shutting down");
2576
0
      }
2577
0
      client::TableHandle cdc_table;
2578
0
      const client::YBTableName cdc_state_table_name(
2579
0
          YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
2580
0
      RETURN_NOT_OK(ybclient->WaitForCreateTableToFinish(cdc_state_table_name));
2581
0
      RETURN_NOT_OK(cdc_table.Open(cdc_state_table_name, ybclient));
2582
0
      std::shared_ptr<client::YBSession> session = ybclient->NewSession();
2583
0
      scoped_refptr<TableInfo> table = VERIFY_RESULT(FindTableById(req->table_id()));
2584
0
      auto tablets = table->GetTablets();
2585
0
      for (const auto& tablet : tablets) {
2586
0
        const auto op = cdc_table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
2587
0
        auto* const req = op->mutable_request();
2588
0
        QLAddStringHashValue(req, tablet->id());
2589
0
        QLAddStringRangeValue(req, stream->id());
2590
0
        cdc_table.AddStringColumnValue(req, master::kCdcCheckpoint, OpId().ToString());
2591
0
        cdc_table.AddTimestampColumnValue(
2592
0
            req, master::kCdcLastReplicationTime, GetCurrentTimeMicros());
2593
0
        session->Apply(op);
2594
0
      }
2595
0
      RETURN_NOT_OK(session->Flush());
2596
0
    }
2597
5.18k
  } else {
2598
    // Update and add table_id.
2599
5.18k
    {
2600
5.18k
      SharedLock lock(mutex_);
2601
5.18k
      stream = FindPtrOrNull(cdc_stream_map_, req->db_stream_id());
2602
5.18k
    }
2603
2604
5.18k
    if (stream == nullptr) {
2605
0
      return STATUS(
2606
0
          NotFound, "Could not find CDC stream", req->ShortDebugString(),
2607
0
          MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
2608
0
    }
2609
2610
5.18k
    auto stream_lock = stream->LockForWrite();
2611
5.18k
    if (stream_lock->is_deleting()) {
2612
0
      return STATUS(
2613
0
          NotFound, "CDC stream has been deleted", req->ShortDebugString(),
2614
0
          MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
2615
0
    }
2616
5.18k
    stream_lock.mutable_data()->pb.add_table_id(req->table_id());
2617
2618
    // Also need to persist changes in sys catalog.
2619
5.18k
    RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), stream));
2620
5.18k
    stream_lock.Commit();
2621
5.18k
  }
2622
5.49k
  return Status::OK();
2623
5.49k
}
2624
2625
Status CatalogManager::DeleteCDCStream(const DeleteCDCStreamRequestPB* req,
2626
                                       DeleteCDCStreamResponsePB* resp,
2627
2
                                       rpc::RpcContext* rpc) {
2628
2
  LOG(INFO) << "Servicing DeleteCDCStream request from " << RequestorString(rpc)
2629
2
            << ": " << req->ShortDebugString();
2630
2631
2
  if (req->stream_id_size() < 1) {
2632
0
    return STATUS(InvalidArgument, "No CDC Stream ID given", req->ShortDebugString(),
2633
0
                  MasterError(MasterErrorPB::INVALID_REQUEST));
2634
0
  }
2635
2636
2
  std::vector<scoped_refptr<CDCStreamInfo>> streams;
2637
2
  {
2638
2
    SharedLock lock(mutex_);
2639
2
    for (const auto& stream_id : req->stream_id()) {
2640
2
      auto stream = FindPtrOrNull(cdc_stream_map_, stream_id);
2641
2642
2
      if (stream == nullptr || stream->LockForRead()->is_deleting()) {
2643
0
        resp->add_not_found_stream_ids(stream_id);
2644
0
        LOG(WARNING) << "CDC stream does not exist: " << stream_id;
2645
2
      } else {
2646
2
        auto ltm = stream->LockForRead();
2647
2
        bool active = (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE);
2648
2
        bool is_WAL = false;
2649
5
        for (const auto& option : ltm->pb.options()) {
2650
5
          if (option.key() == "record_format" && 
option.value() == "WAL"1
) {
2651
0
            is_WAL = true;
2652
0
          }
2653
5
        }
2654
2
        if (!req->force_delete() && is_WAL && 
active0
) {
2655
0
          return STATUS(NotSupported,
2656
0
                        "Cannot delete an xCluster Stream in replication. "
2657
0
                        "Use 'force_delete' to override",
2658
0
                        req->ShortDebugString(),
2659
0
                        MasterError(MasterErrorPB::INVALID_REQUEST));
2660
0
        }
2661
2
        streams.push_back(stream);
2662
2
      }
2663
2
    }
2664
2
  }
2665
2666
2
  if (!resp->not_found_stream_ids().empty() && 
!req->ignore_errors()0
) {
2667
0
    string missing_streams = JoinElementsIterator(resp->not_found_stream_ids().begin(),
2668
0
                                                  resp->not_found_stream_ids().end(),
2669
0
                                                  ",");
2670
0
    return STATUS(
2671
0
        NotFound,
2672
0
        Substitute("Did not find all requested CDC streams. Missing streams: [$0]. Request: $1",
2673
0
                   missing_streams, req->ShortDebugString()),
2674
0
        MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
2675
0
  }
2676
2677
  // Do not delete them here, just mark them as DELETING and the catalog manager background thread
2678
  // will handle the deletion.
2679
2
  Status s = MarkCDCStreamsAsDeleting(streams);
2680
2
  if (!s.ok()) {
2681
0
    if (s.IsIllegalState()) {
2682
0
      PANIC_RPC(rpc, s.message().ToString());
2683
0
    }
2684
0
    return CheckIfNoLongerLeaderAndSetupError(s, resp);
2685
0
  }
2686
2687
2
  LOG(INFO) << "Successfully deleted CDC streams " << JoinStreamsCSVLine(streams)
2688
2
            << " per request from " << RequestorString(rpc);
2689
2690
2
  return Status::OK();
2691
2
}
2692
2693
Status CatalogManager::MarkCDCStreamsAsDeleting(
2694
64
    const std::vector<scoped_refptr<CDCStreamInfo>>& streams) {
2695
64
  if (streams.empty()) {
2696
0
    return Status::OK();
2697
0
  }
2698
64
  std::vector<CDCStreamInfo::WriteLock> locks;
2699
64
  std::vector<CDCStreamInfo*> streams_to_mark;
2700
64
  locks.reserve(streams.size());
2701
64
  for (auto& stream : streams) {
2702
64
    auto l = stream->LockForWrite();
2703
64
    l.mutable_data()->pb.set_state(SysCDCStreamEntryPB::DELETING);
2704
64
    locks.push_back(std::move(l));
2705
64
    streams_to_mark.push_back(stream.get());
2706
64
  }
2707
  // The mutation will be aborted when 'l' exits the scope on early return.
2708
64
  RETURN_NOT_OK(CheckStatus(
2709
64
      sys_catalog_->Upsert(leader_ready_term(), streams_to_mark),
2710
64
      "updating CDC streams in sys-catalog"));
2711
64
  LOG(INFO) << "Successfully marked streams " << JoinStreamsCSVLine(streams_to_mark)
2712
64
            << " as DELETING in sys catalog";
2713
64
  for (auto& lock : locks) {
2714
64
    lock.Commit();
2715
64
  }
2716
64
  return Status::OK();
2717
64
}
2718
2719
Status CatalogManager::FindCDCStreamsMarkedAsDeleting(
2720
1.56M
    std::vector<scoped_refptr<CDCStreamInfo>>* streams) {
2721
1.56M
  TRACE("Acquired catalog manager lock");
2722
1.56M
  SharedLock lock(mutex_);
2723
1.56M
  for (const CDCStreamInfoMap::value_type& entry : cdc_stream_map_) {
2724
1.71k
    auto ltm = entry.second->LockForRead();
2725
1.71k
    if (ltm->is_deleting()) {
2726
63
      LOG(INFO) << "Stream " << entry.second->id() << " was marked as DELETING";
2727
63
      streams->push_back(entry.second);
2728
63
    }
2729
1.71k
  }
2730
1.56M
  return Status::OK();
2731
1.56M
}
2732
2733
Status CatalogManager::CleanUpDeletedCDCStreams(
2734
63
    const std::vector<scoped_refptr<CDCStreamInfo>>& streams) {
2735
63
  auto ybclient = master_->cdc_state_client_initializer().client();
2736
63
  if (!ybclient) {
2737
0
    return STATUS(IllegalState, "Client not initialized or shutting down");
2738
0
  }
2739
2740
  // First. For each deleted stream, delete the cdc state rows.
2741
  // Delete all the entries in cdc_state table that contain all the deleted cdc streams.
2742
63
  client::TableHandle cdc_table;
2743
63
  const client::YBTableName cdc_state_table_name(
2744
63
      YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
2745
63
  Status s = cdc_table.Open(cdc_state_table_name, ybclient);
2746
63
  if (!s.ok()) {
2747
0
    LOG(WARNING) << "Unable to open table " << master::kCdcStateTableName
2748
0
                 << " to delete stream ids entries: " << s;
2749
0
    return s.CloneAndPrepend("Unable to open cdc_state table");
2750
0
  }
2751
2752
63
  std::shared_ptr<client::YBSession> session = ybclient->NewSession();
2753
63
  std::vector<std::pair<CDCStreamId, std::shared_ptr<client::YBqlWriteOp>>> stream_ops;
2754
63
  std::set<CDCStreamId> failed_streams;
2755
63
  for (const auto& stream : streams) {
2756
63
    LOG(INFO) << "Deleting rows for stream " << stream->id();
2757
63
    for (const auto& table_id : stream->table_id()) {
2758
62
      TabletInfos tablets;
2759
62
      scoped_refptr<TableInfo> table;
2760
62
      {
2761
62
        TRACE("Acquired catalog manager lock");
2762
62
        SharedLock lock(mutex_);
2763
62
        table = FindPtrOrNull(*table_ids_map_, table_id);
2764
62
      }
2765
      // GetTablets locks lock_ in shared mode.
2766
62
      if (table) {
2767
62
        tablets = table->GetTablets();
2768
62
      }
2769
2770
62
      for (const auto& tablet : tablets) {
2771
0
        const auto delete_op = cdc_table.NewDeleteOp();
2772
0
        auto* delete_req = delete_op->mutable_request();
2773
2774
0
        QLAddStringHashValue(delete_req, tablet->tablet_id());
2775
0
        QLAddStringRangeValue(delete_req, stream->id());
2776
0
        session->Apply(delete_op);
2777
0
        stream_ops.push_back(std::make_pair(stream->id(), delete_op));
2778
0
        LOG(INFO) << "Deleting stream " << stream->id() << " for tablet " << tablet->tablet_id()
2779
0
                  << " with request " << delete_req->ShortDebugString();
2780
0
      }
2781
62
    }
2782
63
  }
2783
  // Flush all the delete operations.
2784
63
  s = session->Flush();
2785
63
  if (!s.ok()) {
2786
0
    LOG(ERROR) << "Unable to flush operations to delete cdc streams: " << s;
2787
0
    return s.CloneAndPrepend("Error deleting cdc stream rows from cdc_state table");
2788
0
  }
2789
2790
63
  for (const auto& e : stream_ops) {
2791
0
    if (!e.second->succeeded()) {
2792
0
      LOG(WARNING) << "Error deleting cdc_state row with tablet id "
2793
0
                   << e.second->request().hashed_column_values(0).value().string_value()
2794
0
                   << " and stream id "
2795
0
                   << e.second->request().range_column_values(0).value().string_value()
2796
0
                   << ": " << e.second->response().status();
2797
0
      failed_streams.insert(e.first);
2798
0
    }
2799
0
  }
2800
2801
  // TODO: Read cdc_state table and verify that there are not rows with the specified cdc stream
2802
  // and keep those in the map in the DELETED state to retry later.
2803
2804
63
  std::vector<CDCStreamInfo::WriteLock> locks;
2805
63
  locks.reserve(streams.size() - failed_streams.size());
2806
63
  std::vector<CDCStreamInfo*> streams_to_delete;
2807
63
  streams_to_delete.reserve(streams.size() - failed_streams.size());
2808
2809
  // Delete from sys catalog only those streams that were successfully delete from cdc_state.
2810
63
  for (auto& stream : streams) {
2811
63
    if (failed_streams.find(stream->id()) == failed_streams.end()) {
2812
63
      locks.push_back(stream->LockForWrite());
2813
63
      streams_to_delete.push_back(stream.get());
2814
63
    }
2815
63
  }
2816
2817
  // The mutation will be aborted when 'l' exits the scope on early return.
2818
63
  RETURN_NOT_OK(CheckStatus(
2819
63
      sys_catalog_->Delete(leader_ready_term(), streams_to_delete),
2820
63
      "deleting CDC streams from sys-catalog"));
2821
63
  LOG(INFO) << "Successfully deleted streams " << JoinStreamsCSVLine(streams_to_delete)
2822
63
            << " from sys catalog";
2823
2824
  // Remove it from the map.
2825
63
  TRACE("Removing from CDC stream maps");
2826
63
  {
2827
63
    LockGuard lock(mutex_);
2828
63
    for (const auto& stream : streams_to_delete) {
2829
63
      if (cdc_stream_map_.erase(stream->id()) < 1) {
2830
0
        return STATUS(IllegalState, "Could not remove CDC stream from map", stream->id());
2831
0
      }
2832
63
      for (auto& id : stream->table_id()) {
2833
62
        cdc_stream_tables_count_map_[id]--;
2834
62
      }
2835
63
    }
2836
63
  }
2837
63
  LOG(INFO) << "Successfully deleted streams " << JoinStreamsCSVLine(streams_to_delete)
2838
63
            << " from stream map";
2839
2840
63
  for (auto& lock : locks) {
2841
63
    lock.Commit();
2842
63
  }
2843
63
  return Status::OK();
2844
63
}
2845
2846
Status CatalogManager::GetCDCStream(const GetCDCStreamRequestPB* req,
2847
                                    GetCDCStreamResponsePB* resp,
2848
6
                                    rpc::RpcContext* rpc) {
2849
6
  LOG(INFO) << "GetCDCStream from " << RequestorString(rpc)
2850
6
            << ": " << req->DebugString();
2851
2852
6
  if (!req->has_stream_id()) {
2853
0
    return STATUS(InvalidArgument, "CDC Stream ID must be provided", req->ShortDebugString(),
2854
0
                  MasterError(MasterErrorPB::INVALID_REQUEST));
2855
0
  }
2856
2857
6
  scoped_refptr<CDCStreamInfo> stream;
2858
6
  {
2859
6
    SharedLock lock(mutex_);
2860
6
    stream = FindPtrOrNull(cdc_stream_map_, req->stream_id());
2861
6
  }
2862
2863
6
  if (stream == nullptr || 
stream->LockForRead()->is_deleting()5
) {
2864
2
    return STATUS(NotFound, "Could not find CDC stream", req->ShortDebugString(),
2865
2
                  MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
2866
2
  }
2867
2868
4
  auto stream_lock = stream->LockForRead();
2869
2870
4
  CDCStreamInfoPB* stream_info = resp->mutable_stream();
2871
2872
4
  stream_info->set_stream_id(stream->id());
2873
4
  std::string id_type_option_value(cdc::kTableId);
2874
2875
4
  for (auto option : stream_lock->options()) {
2876
0
    if (option.has_key() && option.key() == cdc::kIdType)
2877
0
      id_type_option_value = option.value();
2878
0
  }
2879
4
  if (id_type_option_value == cdc::kNamespaceId) {
2880
0
    stream_info->set_namespace_id(stream_lock->namespace_id());
2881
0
  }
2882
2883
4
  for (auto& table_id : stream_lock->table_id()) {
2884
4
    stream_info->add_table_id(table_id);
2885
4
  }
2886
2887
4
  stream_info->mutable_options()->CopyFrom(stream_lock->options());
2888
2889
4
  return Status::OK();
2890
6
}
2891
2892
Status CatalogManager::GetCDCDBStreamInfo(const GetCDCDBStreamInfoRequestPB* req,
2893
22
                                          GetCDCDBStreamInfoResponsePB* resp) {
2894
2895
22
  if (!req->has_db_stream_id()) {
2896
0
    return STATUS(InvalidArgument, "CDC DB Stream ID must be provided", req->ShortDebugString(),
2897
0
                  MasterError(MasterErrorPB::INVALID_REQUEST));
2898
0
  }
2899
2900
22
  scoped_refptr<CDCStreamInfo> stream;
2901
22
  {
2902
22
    SharedLock lock(mutex_);
2903
22
    stream = FindPtrOrNull(cdc_stream_map_, req->db_stream_id());
2904
22
  }
2905
2906
22
  if (stream == nullptr || stream->LockForRead()->is_deleting()) {
2907
0
    return STATUS(NotFound, "Could not find CDC stream", req->ShortDebugString(),
2908
0
                  MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
2909
0
  }
2910
2911
22
  auto stream_lock = stream->LockForRead();
2912
2913
22
  if (!stream->namespace_id().empty()) {
2914
22
    resp->set_namespace_id(stream->namespace_id());
2915
22
  }
2916
2917
22
  for (const auto& table_id : stream_lock->table_id()) {
2918
22
    const auto table_info = resp->add_table_info();
2919
22
    table_info->set_stream_id(req->db_stream_id());
2920
22
    table_info->set_table_id(table_id);
2921
22
  }
2922
2923
22
  return Status::OK();
2924
22
}
2925
2926
Status CatalogManager::ListCDCStreams(const ListCDCStreamsRequestPB* req,
2927
1
                                      ListCDCStreamsResponsePB* resp) {
2928
2929
1
  scoped_refptr<TableInfo> table;
2930
1
  bool filter_table = req->has_table_id();
2931
1
  if (filter_table) {
2932
0
    table = VERIFY_RESULT(FindTableById(req->table_id()));
2933
0
  }
2934
2935
1
  SharedLock lock(mutex_);
2936
2937
1
  for (const CDCStreamInfoMap::value_type& entry : cdc_stream_map_) {
2938
1
    bool skip_stream = false;
2939
1
    bool id_type_option_present = false;
2940
2941
    // if the request is to list the DB streams of a specific namespace then the other namespaces
2942
    // should not be considered
2943
1
    if (req->has_namespace_id() && 
(req->namespace_id() != entry.second->namespace_id())0
) {
2944
0
      continue;
2945
0
    }
2946
2947
1
    if (filter_table && 
table->id() != entry.second->table_id().Get(0)0
) {
2948
0
      continue; // Skip deleting/deleted streams and streams from other tables.
2949
0
    }
2950
2951
1
    auto ltm = entry.second->LockForRead();
2952
2953
1
    if (ltm->is_deleting()) {
2954
0
      continue;
2955
0
    }
2956
2957
1
    for (const auto& option : ltm->options()) {
2958
0
      if (option.key() == cdc::kIdType) {
2959
0
        id_type_option_present = true;
2960
0
        if (req->has_id_type()) {
2961
0
          if (req->id_type() == IdTypePB::NAMESPACE_ID &&
2962
0
              option.value() != cdc::kNamespaceId) {
2963
0
            skip_stream = true;
2964
0
            break;
2965
0
          }
2966
0
          if (req->id_type() == IdTypePB::TABLE_ID &&
2967
0
              option.value() == cdc::kNamespaceId) {
2968
0
            skip_stream = true;
2969
0
            break;
2970
0
          }
2971
0
        }
2972
0
      }
2973
0
    }
2974
2975
1
    if ((!id_type_option_present && req->id_type() == IdTypePB::NAMESPACE_ID) ||
2976
1
        skip_stream)
2977
0
      continue;
2978
2979
1
    CDCStreamInfoPB* stream = resp->add_streams();
2980
1
    stream->set_stream_id(entry.second->id());
2981
1
    for (const auto& table_id : ltm->table_id()) {
2982
1
      stream->add_table_id(table_id);
2983
1
    }
2984
1
    stream->mutable_options()->CopyFrom(ltm->options());
2985
    // Also add an option for the current state.
2986
1
    if (ltm->pb.has_state()) {
2987
1
      auto state_option = stream->add_options();
2988
1
      state_option->set_key("state");
2989
1
      state_option->set_value(master::SysCDCStreamEntryPB::State_Name(ltm->pb.state()));
2990
1
    }
2991
1
  }
2992
1
  return Status::OK();
2993
1
}
2994
2995
310
bool CatalogManager::CDCStreamExistsUnlocked(const CDCStreamId& stream_id) {
2996
310
  scoped_refptr<CDCStreamInfo> stream = FindPtrOrNull(cdc_stream_map_, stream_id);
2997
310
  if (stream == nullptr || 
stream->LockForRead()->is_deleting()0
) {
2998
310
    return false;
2999
310
  }
3000
0
  return true;
3001
310
}
3002
3003
Status CatalogManager::UpdateCDCStream(const UpdateCDCStreamRequestPB *req,
3004
                                       UpdateCDCStreamResponsePB* resp,
3005
0
                                       rpc::RpcContext* rpc) {
3006
0
  LOG(INFO) << "UpdateCDCStream from " << RequestorString(rpc)
3007
0
            << ": " << req->DebugString();
3008
3009
  // Check fields.
3010
0
  if (!req->has_stream_id()) {
3011
0
    return STATUS(InvalidArgument, "Stream ID must be provided",
3012
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3013
0
  }
3014
0
  if (!req->has_entry()) {
3015
0
    return STATUS(InvalidArgument, "CDC Stream Entry must be provided",
3016
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3017
0
  }
3018
3019
0
  scoped_refptr<CDCStreamInfo> stream;
3020
0
  {
3021
0
    SharedLock lock(mutex_);
3022
0
    stream = FindPtrOrNull(cdc_stream_map_, req->stream_id());
3023
0
  }
3024
0
  if (stream == nullptr) {
3025
0
    return STATUS(NotFound, "Could not find CDC stream", req->ShortDebugString(),
3026
0
                  MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
3027
0
  }
3028
3029
0
  auto stream_lock = stream->LockForWrite();
3030
0
  if (stream_lock->is_deleting()) {
3031
0
    return STATUS(NotFound, "CDC stream has been deleted", req->ShortDebugString(),
3032
0
                  MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
3033
0
  }
3034
3035
0
  stream_lock.mutable_data()->pb.CopyFrom(req->entry());
3036
  // Also need to persist changes in sys catalog.
3037
0
  RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), stream));
3038
3039
0
  stream_lock.Commit();
3040
3041
0
  return Status::OK();
3042
0
}
3043
3044
/*
3045
 * UniverseReplication is setup in 4 stages within the Catalog Manager
3046
 * 1. SetupUniverseReplication: Validates user input & requests Producer schema.
3047
 * 2. GetTableSchemaCallback:   Validates Schema compatibility & requests Producer CDC init.
3048
 * 3. AddCDCStreamToUniverseAndInitConsumer:  Setup RPC connections for CDC Streaming
3049
 * 4. InitCDCConsumer:          Initializes the Consumer settings to begin tailing data
3050
 */
3051
Status CatalogManager::SetupUniverseReplication(const SetupUniverseReplicationRequestPB* req,
3052
                                                SetupUniverseReplicationResponsePB* resp,
3053
2
                                                rpc::RpcContext* rpc) {
3054
2
  LOG(INFO) << "SetupUniverseReplication from " << RequestorString(rpc)
3055
2
            << ": " << req->DebugString();
3056
3057
  // Sanity checking section.
3058
2
  if (!req->has_producer_id()) {
3059
0
    return STATUS(InvalidArgument, "Producer universe ID must be provided",
3060
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3061
0
  }
3062
3063
2
  if (req->producer_master_addresses_size() <= 0) {
3064
0
    return STATUS(InvalidArgument, "Producer master address must be provided",
3065
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3066
0
  }
3067
3068
2
  if (req->producer_bootstrap_ids().size() > 0 &&
3069
2
      
req->producer_bootstrap_ids().size() != req->producer_table_ids().size()0
) {
3070
0
    return STATUS(InvalidArgument, "Number of bootstrap ids must be equal to number of tables",
3071
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3072
0
  }
3073
3074
2
  {
3075
2
    auto l = ClusterConfig()->LockForRead();
3076
2
    if (l->pb.cluster_uuid() == req->producer_id()) {
3077
0
      return STATUS(InvalidArgument, "The request UUID and cluster UUID are identical.",
3078
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3079
0
    }
3080
2
  }
3081
3082
2
  {
3083
2
    auto request_master_addresses = req->producer_master_addresses();
3084
2
    std::vector<ServerEntryPB> cluster_master_addresses;
3085
2
    RETURN_NOT_OK(master_->ListMasters(&cluster_master_addresses));
3086
2
    for (const auto &req_elem : request_master_addresses) {
3087
2
      for (const auto &cluster_elem : cluster_master_addresses) {
3088
2
        if (cluster_elem.has_registration()) {
3089
2
          auto p_rpc_addresses = cluster_elem.registration().private_rpc_addresses();
3090
2
          for (const auto &p_rpc_elem : p_rpc_addresses) {
3091
2
            if (req_elem.host() == p_rpc_elem.host() && 
req_elem.port() == p_rpc_elem.port()0
) {
3092
0
              return STATUS(InvalidArgument,
3093
0
                "Duplicate between request master addresses and private RPC addresses",
3094
0
                req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3095
0
            }
3096
2
          }
3097
3098
2
          auto broadcast_addresses = cluster_elem.registration().broadcast_addresses();
3099
2
          for (const auto &bc_elem : broadcast_addresses) {
3100
2
            if (req_elem.host() == bc_elem.host() && 
req_elem.port() == bc_elem.port()0
) {
3101
0
              return STATUS(InvalidArgument,
3102
0
                "Duplicate between request master addresses and broadcast addresses",
3103
0
                req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3104
0
            }
3105
2
          }
3106
2
        }
3107
2
      }
3108
2
    }
3109
2
  }
3110
3111
2
  std::unordered_map<TableId, std::string> table_id_to_bootstrap_id;
3112
3113
2
  if (req->producer_bootstrap_ids_size() > 0) {
3114
0
    for (int i = 0; i < req->producer_table_ids().size(); i++) {
3115
0
      table_id_to_bootstrap_id[req->producer_table_ids(i)] = req->producer_bootstrap_ids(i);
3116
0
    }
3117
0
  }
3118
3119
  // We assume that the list of table ids is unique.
3120
2
  if (req->producer_bootstrap_ids().size() > 0 &&
3121
2
      
implicit_cast<size_t>(req->producer_table_ids().size()) != table_id_to_bootstrap_id.size()0
) {
3122
0
    return STATUS(InvalidArgument, "When providing bootstrap ids, "
3123
0
                  "the list of tables must be unique", req->ShortDebugString(),
3124
0
                  MasterError(MasterErrorPB::INVALID_REQUEST));
3125
0
  }
3126
3127
2
  scoped_refptr<UniverseReplicationInfo> ri;
3128
2
  {
3129
2
    TRACE("Acquired catalog manager lock");
3130
2
    SharedLock lock(mutex_);
3131
3132
2
    if (FindPtrOrNull(universe_replication_map_, req->producer_id()) != nullptr) {
3133
0
      return STATUS(InvalidArgument, "Producer already present", req->producer_id(),
3134
0
                    MasterError(MasterErrorPB::INVALID_REQUEST));
3135
0
    }
3136
2
  }
3137
3138
  // Create an entry in the system catalog DocDB for this new universe replication.
3139
2
  ri = new UniverseReplicationInfo(req->producer_id());
3140
2
  ri->mutable_metadata()->StartMutation();
3141
2
  SysUniverseReplicationEntryPB *metadata = &ri->mutable_metadata()->mutable_dirty()->pb;
3142
2
  metadata->set_producer_id(req->producer_id());
3143
2
  metadata->mutable_producer_master_addresses()->CopyFrom(req->producer_master_addresses());
3144
2
  metadata->mutable_tables()->CopyFrom(req->producer_table_ids());
3145
2
  metadata->set_state(SysUniverseReplicationEntryPB::INITIALIZING);
3146
3147
2
  RETURN_NOT_OK(CheckLeaderStatusAndSetupError(
3148
2
      sys_catalog_->Upsert(leader_ready_term(), ri),
3149
2
      "inserting universe replication info into sys-catalog", resp));
3150
2
  TRACE("Wrote universe replication info to sys-catalog");
3151
3152
  // Commit the in-memory state now that it's added to the persistent catalog.
3153
2
  ri->mutable_metadata()->CommitMutation();
3154
2
  LOG(INFO) << "Setup universe replication from producer " << ri->ToString();
3155
3156
2
  {
3157
2
    LockGuard lock(mutex_);
3158
2
    universe_replication_map_[ri->id()] = ri;
3159
2
  }
3160
3161
  // Initialize the CDC Stream by querying the Producer server for RPC sanity checks.
3162
2
  auto result = ri->GetOrCreateCDCRpcTasks(req->producer_master_addresses());
3163
2
  LOG(INFO) << "GetOrCreateCDCRpcTasks: " << result.ok();
3164
2
  if (!result.ok()) {
3165
0
    MarkUniverseReplicationFailed(ri, ResultToStatus(result));
3166
0
    return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, result.status());
3167
0
  }
3168
2
  std::shared_ptr<CDCRpcTasks> cdc_rpc = *result;
3169
3170
  // For each table, run an async RPC task to verify a sufficient Producer:Consumer schema match.
3171
2
  for (int i = 0; i < req->producer_table_ids_size(); 
i++0
) {
3172
3173
    // SETUP CONTINUES after this async call.
3174
0
    Status s;
3175
0
    if (IsColocatedParentTableId(req->producer_table_ids(i))) {
3176
0
      auto tables_info = std::make_shared<std::vector<client::YBTableInfo>>();
3177
0
      s = cdc_rpc->client()->GetColocatedTabletSchemaById(
3178
0
          req->producer_table_ids(i), tables_info,
3179
0
          Bind(&enterprise::CatalogManager::GetColocatedTabletSchemaCallback, Unretained(this),
3180
0
               ri->id(), tables_info, table_id_to_bootstrap_id));
3181
0
    } else if (IsTablegroupParentTableId(req->producer_table_ids(i))) {
3182
0
      auto tablegroup_info = std::make_shared<std::vector<client::YBTableInfo>>();
3183
0
      s = cdc_rpc->client()->GetTablegroupSchemaById(
3184
0
          req->producer_table_ids(i), tablegroup_info,
3185
0
          Bind(&enterprise::CatalogManager::GetTablegroupSchemaCallback, Unretained(this),
3186
0
               ri->id(), tablegroup_info, req->producer_table_ids(i), table_id_to_bootstrap_id));
3187
0
    } else {
3188
0
      auto table_info = std::make_shared<client::YBTableInfo>();
3189
0
      s = cdc_rpc->client()->GetTableSchemaById(
3190
0
          req->producer_table_ids(i), table_info,
3191
0
          Bind(&enterprise::CatalogManager::GetTableSchemaCallback, Unretained(this),
3192
0
               ri->id(), table_info, table_id_to_bootstrap_id));
3193
0
    }
3194
3195
0
    if (!s.ok()) {
3196
0
      MarkUniverseReplicationFailed(ri, s);
3197
0
      return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s);
3198
0
    }
3199
0
  }
3200
3201
2
  LOG(INFO) << "Started schema validation for universe replication " << ri->ToString();
3202
2
  return Status::OK();
3203
2
}
3204
3205
void CatalogManager::MarkUniverseReplicationFailed(
3206
0
    scoped_refptr<UniverseReplicationInfo> universe, const Status& failure_status) {
3207
0
  auto l = universe->LockForWrite();
3208
0
  if (l->pb.state() == SysUniverseReplicationEntryPB::DELETED) {
3209
0
    l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DELETED_ERROR);
3210
0
  } else {
3211
0
    l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED);
3212
0
  }
3213
3214
0
  universe->SetSetupUniverseReplicationErrorStatus(failure_status);
3215
3216
  // Update sys_catalog.
3217
0
  const Status s = sys_catalog_->Upsert(leader_ready_term(), universe);
3218
3219
0
  l.CommitOrWarn(s, "updating universe replication info in sys-catalog");
3220
0
}
3221
3222
Status CatalogManager::ValidateTableSchema(
3223
    const std::shared_ptr<client::YBTableInfo>& info,
3224
    const std::unordered_map<TableId, std::string>& table_bootstrap_ids,
3225
0
    GetTableSchemaResponsePB* resp) {
3226
  // Get corresponding table schema on local universe.
3227
0
  GetTableSchemaRequestPB req;
3228
3229
0
  auto* table = req.mutable_table();
3230
0
  table->set_table_name(info->table_name.table_name());
3231
0
  table->mutable_namespace_()->set_name(info->table_name.namespace_name());
3232
0
  table->mutable_namespace_()->set_database_type(
3233
0
      GetDatabaseTypeForTable(client::ClientToPBTableType(info->table_type)));
3234
3235
  // Since YSQL tables are not present in table map, we first need to list tables to get the table
3236
  // ID and then get table schema.
3237
  // Remove this once table maps are fixed for YSQL.
3238
0
  ListTablesRequestPB list_req;
3239
0
  ListTablesResponsePB list_resp;
3240
3241
0
  list_req.set_name_filter(info->table_name.table_name());
3242
0
  Status status = ListTables(&list_req, &list_resp);
3243
0
  if (!status.ok() || list_resp.has_error()) {
3244
0
    return STATUS(NotFound, Substitute("Error while listing table: $0", status.ToString()));
3245
0
  }
3246
3247
  // TODO: This does not work for situation where tables in different YSQL schemas have the same
3248
  // name. This will be fixed as part of #1476.
3249
0
  for (const auto& t : list_resp.tables()) {
3250
0
    if (t.name() == info->table_name.table_name() &&
3251
0
        t.namespace_().name() == info->table_name.namespace_name()) {
3252
0
      table->set_table_id(t.id());
3253
0
      break;
3254
0
    }
3255
0
  }
3256
3257
0
  if (!table->has_table_id()) {
3258
0
    return STATUS(NotFound,
3259
0
        Substitute("Could not find matching table for $0", info->table_name.ToString()));
3260
0
  }
3261
3262
  // We have a table match.  Now get the table schema and validate.
3263
0
  status = GetTableSchema(&req, resp);
3264
0
  if (!status.ok() || resp->has_error()) {
3265
0
    return STATUS(NotFound, Substitute("Error while getting table schema: $0", status.ToString()));
3266
0
  }
3267
3268
0
  auto result = info->schema.EquivalentForDataCopy(resp->schema());
3269
0
  if (!result.ok() || !*result) {
3270
0
    return STATUS(IllegalState,
3271
0
        Substitute("Source and target schemas don't match: "
3272
0
                   "Source: $0, Target: $1, Source schema: $2, Target schema: $3",
3273
0
                   info->table_id, resp->identifier().table_id(),
3274
0
                   info->schema.ToString(), resp->schema().DebugString()));
3275
0
  }
3276
3277
  // Still need to make map of table id to resp table id (to add to validated map)
3278
  // For colocated tables, only add the parent table since we only added the parent table to the
3279
  // original pb (we use the number of tables in the pb to determine when validation is done).
3280
0
  if (info->colocated) {
3281
    // We require that colocated tables have the same colocation ID.
3282
    //
3283
    // Backward compatibility: tables created prior to #7378 use YSQL table OID as a colocation ID.
3284
0
    auto source_clc_id = info->schema.has_colocation_id()
3285
0
        ? info->schema.colocation_id()
3286
0
        : CHECK_RESULT(GetPgsqlTableOid(info->table_id));
3287
0
    auto target_clc_id = (resp->schema().has_colocated_table_id() &&
3288
0
                          resp->schema().colocated_table_id().has_colocation_id())
3289
0
        ? resp->schema().colocated_table_id().colocation_id()
3290
0
        : CHECK_RESULT(GetPgsqlTableOid(resp->identifier().table_id()));
3291
0
    if (source_clc_id != target_clc_id) {
3292
0
      return STATUS(IllegalState,
3293
0
          Substitute("Source and target colocation IDs don't match for colocated table: "
3294
0
                     "Source: $0, Target: $1, Source colocation ID: $2, Target colocation ID: $3",
3295
0
                     info->table_id, resp->identifier().table_id(), source_clc_id, target_clc_id));
3296
0
    }
3297
0
  }
3298
3299
0
  return Status::OK();
3300
0
}
3301
3302
Status CatalogManager::AddValidatedTableAndCreateCdcStreams(
3303
      scoped_refptr<UniverseReplicationInfo> universe,
3304
      const std::unordered_map<TableId, std::string>& table_bootstrap_ids,
3305
      const TableId& producer_table,
3306
0
      const TableId& consumer_table) {
3307
0
  auto l = universe->LockForWrite();
3308
0
  auto master_addresses = l->pb.producer_master_addresses();
3309
3310
0
  auto res = universe->GetOrCreateCDCRpcTasks(master_addresses);
3311
0
  if (!res.ok()) {
3312
0
    l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED);
3313
0
    const Status s = sys_catalog_->Upsert(leader_ready_term(), universe);
3314
0
    if (!s.ok()) {
3315
0
      return CheckStatus(s, "updating universe replication info in sys-catalog");
3316
0
    }
3317
0
    l.Commit();
3318
0
    return STATUS(InternalError,
3319
0
        Substitute("Error while setting up client for producer $0", universe->id()));
3320
0
  }
3321
0
  std::shared_ptr<CDCRpcTasks> cdc_rpc = *res;
3322
0
  vector<TableId> validated_tables;
3323
3324
0
  if (l->is_deleted_or_failed()) {
3325
    // Nothing to do since universe is being deleted.
3326
0
    return STATUS(Aborted, "Universe is being deleted");
3327
0
  }
3328
3329
0
  auto map = l.mutable_data()->pb.mutable_validated_tables();
3330
0
  (*map)[producer_table] = consumer_table;
3331
3332
  // Now, all tables are validated.
3333
0
  if (l.mutable_data()->pb.validated_tables_size() == l.mutable_data()->pb.tables_size()) {
3334
0
    l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::VALIDATED);
3335
0
    auto tbl_iter = l->pb.tables();
3336
0
    validated_tables.insert(validated_tables.begin(), tbl_iter.begin(), tbl_iter.end());
3337
0
  }
3338
3339
  // TODO: end of config validation should be where SetupUniverseReplication exits back to user
3340
0
  LOG(INFO) << "UpdateItem in AddValidatedTable";
3341
3342
  // Update sys_catalog.
3343
0
  Status status = sys_catalog_->Upsert(leader_ready_term(), universe);
3344
0
  if (!status.ok()) {
3345
0
    LOG(ERROR) << "Error during UpdateItem: " << status;
3346
0
    return CheckStatus(status, "updating universe replication info in sys-catalog");
3347
0
  }
3348
0
  l.Commit();
3349
3350
  // Create CDC stream for each validated table, after persisting the replication state change.
3351
0
  if (!validated_tables.empty()) {
3352
0
    std::unordered_map<std::string, std::string> options;
3353
0
    options.reserve(4);
3354
0
    options.emplace(cdc::kRecordType, CDCRecordType_Name(cdc::CDCRecordType::CHANGE));
3355
0
    options.emplace(cdc::kRecordFormat, CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL));
3356
0
    options.emplace(cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER));
3357
0
    options.emplace(cdc::kCheckpointType, CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT));
3358
3359
0
    for (const auto& table : validated_tables) {
3360
0
      string producer_bootstrap_id;
3361
0
      auto it = table_bootstrap_ids.find(table);
3362
0
      if (it != table_bootstrap_ids.end()) {
3363
0
        producer_bootstrap_id = it->second;
3364
0
      }
3365
0
      if (!producer_bootstrap_id.empty()) {
3366
0
        auto table_id = std::make_shared<TableId>();
3367
0
        auto stream_options = std::make_shared<std::unordered_map<std::string, std::string>>();
3368
0
        cdc_rpc->client()->GetCDCStream(producer_bootstrap_id, table_id, stream_options,
3369
0
            std::bind(&enterprise::CatalogManager::GetCDCStreamCallback, this,
3370
0
                producer_bootstrap_id, table_id, stream_options, universe->id(), table, cdc_rpc,
3371
0
                std::placeholders::_1));
3372
0
      } else {
3373
0
        cdc_rpc->client()->CreateCDCStream(
3374
0
            table, options,
3375
0
            std::bind(&enterprise::CatalogManager::AddCDCStreamToUniverseAndInitConsumer, this,
3376
0
                universe->id(), table, std::placeholders::_1, nullptr /* on_success_cb */));
3377
0
      }
3378
0
    }
3379
0
  }
3380
0
  return Status::OK();
3381
0
}
3382
3383
void CatalogManager::GetTableSchemaCallback(
3384
    const std::string& universe_id, const std::shared_ptr<client::YBTableInfo>& info,
3385
0
    const std::unordered_map<TableId, std::string>& table_bootstrap_ids, const Status& s) {
3386
  // First get the universe.
3387
0
  scoped_refptr<UniverseReplicationInfo> universe;
3388
0
  {
3389
0
    SharedLock lock(mutex_);
3390
0
    TRACE("Acquired catalog manager lock");
3391
3392
0
    universe = FindPtrOrNull(universe_replication_map_, universe_id);
3393
0
    if (universe == nullptr) {
3394
0
      LOG(ERROR) << "Universe not found: " << universe_id;
3395
0
      return;
3396
0
    }
3397
0
  }
3398
3399
0
  if (!s.ok()) {
3400
0
    MarkUniverseReplicationFailed(universe, s);
3401
0
    LOG(ERROR) << "Error getting schema for table " << info->table_id << ": " << s;
3402
0
    return;
3403
0
  }
3404
3405
  // Validate the table schema.
3406
0
  GetTableSchemaResponsePB resp;
3407
0
  Status status = ValidateTableSchema(info, table_bootstrap_ids, &resp);
3408
0
  if (!status.ok()) {
3409
0
    MarkUniverseReplicationFailed(universe, status);
3410
0
    LOG(ERROR) << "Found error while validating table schema for table " << info->table_id
3411
0
               << ": " << status;
3412
0
    return;
3413
0
  }
3414
3415
0
  status = AddValidatedTableAndCreateCdcStreams(universe,
3416
0
                                                table_bootstrap_ids,
3417
0
                                                info->table_id,
3418
0
                                                resp.identifier().table_id());
3419
0
  if (!status.ok()) {
3420
0
    LOG(ERROR) << "Found error while adding validated table to system catalog: " << info->table_id
3421
0
               << ": " << status;
3422
0
    return;
3423
0
  }
3424
0
}
3425
3426
void CatalogManager::GetTablegroupSchemaCallback(
3427
    const std::string& universe_id,
3428
    const std::shared_ptr<std::vector<client::YBTableInfo>>& infos,
3429
    const TablegroupId& producer_tablegroup_id,
3430
    const std::unordered_map<TableId, std::string>& table_bootstrap_ids,
3431
0
    const Status& s) {
3432
  // First get the universe.
3433
0
  scoped_refptr<UniverseReplicationInfo> universe;
3434
0
  {
3435
0
    SharedLock lock(mutex_);
3436
0
    TRACE("Acquired catalog manager lock");
3437
3438
0
    universe = FindPtrOrNull(universe_replication_map_, universe_id);
3439
0
    if (universe == nullptr) {
3440
0
      LOG(ERROR) << "Universe not found: " << universe_id;
3441
0
      return;
3442
0
    }
3443
0
  }
3444
3445
0
  if (!s.ok()) {
3446
0
    MarkUniverseReplicationFailed(universe, s);
3447
0
    std::ostringstream oss;
3448
0
    for (size_t i = 0; i < infos->size(); ++i) {
3449
0
      oss << ((i == 0) ? "" : ", ") << (*infos)[i].table_id;
3450
0
    }
3451
0
    LOG(ERROR) << "Error getting schema for tables: [ " << oss.str() << " ]: " << s;
3452
0
    return;
3453
0
  }
3454
3455
0
  if (infos->empty()) {
3456
0
    LOG(WARNING) << "Received empty list of tables to validate: " << s;
3457
0
    return;
3458
0
  }
3459
3460
  // validated_consumer_tables contains the table IDs corresponding to that
3461
  // from the producer tables.
3462
0
  std::unordered_set<TableId> validated_consumer_tables;
3463
0
  for (const auto& info : *infos) {
3464
    // Validate each of the member table in the tablegroup.
3465
0
    GetTableSchemaResponsePB resp;
3466
0
    Status table_status = ValidateTableSchema(std::make_shared<client::YBTableInfo>(info),
3467
0
                                              table_bootstrap_ids,
3468
0
                                              &resp);
3469
3470
0
    if (!table_status.ok()) {
3471
0
      MarkUniverseReplicationFailed(universe, table_status);
3472
0
      LOG(ERROR) << "Found error while validating table schema for table " << info.table_id
3473
0
                 << ": " << table_status;
3474
0
      return;
3475
0
    }
3476
3477
0
    validated_consumer_tables.insert(resp.identifier().table_id());
3478
0
  }
3479
3480
  // Get the consumer tablegroup ID. Since this call is expensive (one needs to reverse lookup
3481
  // the tablegroup ID from table ID), we only do this call once and do validation afterward.
3482
0
  TablegroupId consumer_tablegroup_id;
3483
0
  {
3484
0
    const auto& result = FindTablegroupByTableId(*validated_consumer_tables.begin());
3485
0
    if (!result.has_value()) {
3486
0
      std::string message =
3487
0
          Format("No consumer tablegroup found for producer tablegroup: $0",
3488
0
                 producer_tablegroup_id);
3489
0
      MarkUniverseReplicationFailed(universe, STATUS(IllegalState, message));
3490
0
      LOG(ERROR) << message;
3491
0
      return;
3492
0
    }
3493
0
    consumer_tablegroup_id = result.value();
3494
0
  }
3495
3496
  // tables_in_consumer_tablegroup are the tables listed within the consumer_tablegroup_id.
3497
  // We need validated_consumer_tables and tables_in_consumer_tablegroup to be identical.
3498
0
  std::unordered_set<TableId> tables_in_consumer_tablegroup;
3499
0
  {
3500
0
    GetTablegroupSchemaRequestPB req;
3501
0
    GetTablegroupSchemaResponsePB resp;
3502
0
    req.mutable_parent_tablegroup()->set_id(consumer_tablegroup_id);
3503
0
    Status status = GetTablegroupSchema(&req, &resp);
3504
0
    if (!status.ok() || resp.has_error()) {
3505
0
      std::string message = Format("Error when getting consumer tablegroup schema: $0",
3506
0
                                   consumer_tablegroup_id);
3507
0
      MarkUniverseReplicationFailed(universe, STATUS(IllegalState, message));
3508
0
      LOG(ERROR) << message;
3509
0
      return;
3510
0
    }
3511
3512
0
    for (const auto& info : resp.get_table_schema_response_pbs()) {
3513
0
      tables_in_consumer_tablegroup.insert(info.identifier().table_id());
3514
0
    }
3515
0
  }
3516
3517
0
  if (validated_consumer_tables != tables_in_consumer_tablegroup) {
3518
0
    std::ostringstream validated_tables_oss;
3519
0
    for (auto it = validated_consumer_tables.begin();
3520
0
        it != validated_consumer_tables.end(); it++) {
3521
0
      validated_tables_oss << (it == validated_consumer_tables.begin() ? "" : ",") << *it;
3522
0
    }
3523
0
    std::ostringstream consumer_tables_oss;
3524
0
    for (auto it = tables_in_consumer_tablegroup.begin();
3525
0
        it != tables_in_consumer_tablegroup.end(); it++) {
3526
0
      consumer_tables_oss << (it == tables_in_consumer_tablegroup.begin() ? "" : ",") << *it;
3527
0
    }
3528
3529
0
    std::string message =
3530
0
        Format("Mismatch between tables associated with producer tablegroup $0 and "
3531
0
               "tables in consumer tablegroup $1: ($2) vs ($3).",
3532
0
               producer_tablegroup_id, consumer_tablegroup_id,
3533
0
               validated_tables_oss.str(), consumer_tables_oss.str());
3534
0
    MarkUniverseReplicationFailed(universe, STATUS(IllegalState, message));
3535
0
    LOG(ERROR) << message;
3536
0
    return;
3537
0
  }
3538
3539
0
  Status status = AddValidatedTableAndCreateCdcStreams(universe,
3540
0
                                                       table_bootstrap_ids,
3541
0
                                                       producer_tablegroup_id,
3542
0
                                                       consumer_tablegroup_id);
3543
0
  if (!status.ok()) {
3544
0
    LOG(ERROR) << "Found error while adding validated table to system catalog: "
3545
0
               << producer_tablegroup_id << ": " << status;
3546
0
    return;
3547
0
  }
3548
0
}
3549
3550
void CatalogManager::GetColocatedTabletSchemaCallback(
3551
    const std::string& universe_id, const std::shared_ptr<std::vector<client::YBTableInfo>>& infos,
3552
0
    const std::unordered_map<TableId, std::string>& table_bootstrap_ids, const Status& s) {
3553
  // First get the universe.
3554
0
  scoped_refptr<UniverseReplicationInfo> universe;
3555
0
  {
3556
0
    SharedLock lock(mutex_);
3557
0
    TRACE("Acquired catalog manager lock");
3558
3559
0
    universe = FindPtrOrNull(universe_replication_map_, universe_id);
3560
0
    if (universe == nullptr) {
3561
0
      LOG(ERROR) << "Universe not found: " << universe_id;
3562
0
      return;
3563
0
    }
3564
0
  }
3565
3566
0
  if (!s.ok()) {
3567
0
    MarkUniverseReplicationFailed(universe, s);
3568
0
    std::ostringstream oss;
3569
0
    for (size_t i = 0; i < infos->size(); ++i) {
3570
0
      oss << ((i == 0) ? "" : ", ") << (*infos)[i].table_id;
3571
0
    }
3572
0
    LOG(ERROR) << "Error getting schema for tables: [ " << oss.str() << " ]: " << s;
3573
0
    return;
3574
0
  }
3575
3576
0
  if (infos->empty()) {
3577
0
    LOG(WARNING) << "Received empty list of tables to validate: " << s;
3578
0
    return;
3579
0
  }
3580
3581
  // Validate table schemas.
3582
0
  std::unordered_set<TableId> producer_parent_table_ids;
3583
0
  std::unordered_set<TableId> consumer_parent_table_ids;
3584
0
  for (const auto& info : *infos) {
3585
    // Verify that we have a colocated table.
3586
0
    if (!info.colocated) {
3587
0
      MarkUniverseReplicationFailed(universe,
3588
0
          STATUS(InvalidArgument, Substitute("Received non-colocated table: $0", info.table_id)));
3589
0
      LOG(ERROR) << "Received non-colocated table: " << info.table_id;
3590
0
      return;
3591
0
    }
3592
    // Validate each table, and get the parent colocated table id for the consumer.
3593
0
    GetTableSchemaResponsePB resp;
3594
0
    Status table_status = ValidateTableSchema(std::make_shared<client::YBTableInfo>(info),
3595
0
                                              table_bootstrap_ids,
3596
0
                                              &resp);
3597
0
    if (!table_status.ok()) {
3598
0
      MarkUniverseReplicationFailed(universe, table_status);
3599
0
      LOG(ERROR) << "Found error while validating table schema for table " << info.table_id
3600
0
                 << ": " << table_status;
3601
0
      return;
3602
0
    }
3603
    // Store the parent table ids.
3604
0
    producer_parent_table_ids.insert(
3605
0
        info.table_name.namespace_id() + kColocatedParentTableIdSuffix);
3606
0
    consumer_parent_table_ids.insert(
3607
0
        resp.identifier().namespace_().id() + kColocatedParentTableIdSuffix);
3608
0
  }
3609
3610
  // Verify that we only found one producer and one consumer colocated parent table id.
3611
0
  if (producer_parent_table_ids.size() != 1) {
3612
0
    std::ostringstream oss;
3613
0
    for (auto it = producer_parent_table_ids.begin(); it != producer_parent_table_ids.end(); ++it) {
3614
0
      oss << ((it == producer_parent_table_ids.begin()) ? "" : ", ") << *it;
3615
0
    }
3616
0
    MarkUniverseReplicationFailed(universe, STATUS(InvalidArgument,
3617
0
        Substitute("Found incorrect number of producer colocated parent table ids. "
3618
0
                   "Expected 1, but found: [ $0 ]", oss.str())));
3619
0
    LOG(ERROR) << "Found incorrect number of producer colocated parent table ids. "
3620
0
               << "Expected 1, but found: [ " << oss.str() << " ]";
3621
0
    return;
3622
0
  }
3623
0
  if (consumer_parent_table_ids.size() != 1) {
3624
0
    std::ostringstream oss;
3625
0
    for (auto it = consumer_parent_table_ids.begin(); it != consumer_parent_table_ids.end(); ++it) {
3626
0
      oss << ((it == consumer_parent_table_ids.begin()) ? "" : ", ") << *it;
3627
0
    }
3628
0
    MarkUniverseReplicationFailed(universe, STATUS(InvalidArgument,
3629
0
        Substitute("Found incorrect number of consumer colocated parent table ids. "
3630
0
                   "Expected 1, but found: [ $0 ]", oss.str())));
3631
0
    LOG(ERROR) << "Found incorrect number of consumer colocated parent table ids. "
3632
0
               << "Expected 1, but found: [ " << oss.str() << " ]";
3633
0
    return;
3634
0
  }
3635
3636
0
  Status status = AddValidatedTableAndCreateCdcStreams(universe,
3637
0
                                                       table_bootstrap_ids,
3638
0
                                                       *producer_parent_table_ids.begin(),
3639
0
                                                       *consumer_parent_table_ids.begin());
3640
0
  if (!status.ok()) {
3641
0
    LOG(ERROR) << "Found error while adding validated table to system catalog: "
3642
0
               << *producer_parent_table_ids.begin() << ": " << status;
3643
0
    return;
3644
0
  }
3645
0
}
3646
3647
void CatalogManager::GetCDCStreamCallback(
3648
    const CDCStreamId& bootstrap_id,
3649
    std::shared_ptr<TableId> table_id,
3650
    std::shared_ptr<std::unordered_map<std::string, std::string>> options,
3651
    const std::string& universe_id,
3652
    const TableId& table,
3653
    std::shared_ptr<CDCRpcTasks> cdc_rpc,
3654
0
    const Status& s) {
3655
0
  if (!s.ok()) {
3656
0
    LOG(ERROR) << "Unable to find bootstrap id " << bootstrap_id;
3657
0
    AddCDCStreamToUniverseAndInitConsumer(universe_id, table, s);
3658
0
  } else {
3659
0
    if (*table_id != table) {
3660
0
      const Status invalid_bootstrap_id_status = STATUS_FORMAT(
3661
0
          InvalidArgument, "Invalid bootstrap id for table $0. Bootstrap id $1 belongs to table $2",
3662
0
          table, bootstrap_id, *table_id);
3663
0
      LOG(ERROR) << invalid_bootstrap_id_status;
3664
0
      AddCDCStreamToUniverseAndInitConsumer(universe_id, table, invalid_bootstrap_id_status);
3665
0
      return;
3666
0
    }
3667
    // todo check options
3668
0
    AddCDCStreamToUniverseAndInitConsumer(universe_id, table, bootstrap_id, [&] () {
3669
        // Extra callback on universe setup success - update the producer to let it know that
3670
        // the bootstrapping is complete.
3671
0
        SysCDCStreamEntryPB new_entry;
3672
0
        new_entry.add_table_id(*table_id);
3673
0
        new_entry.mutable_options()->Reserve(narrow_cast<int>(options->size()));
3674
0
        for (const auto& option : *options) {
3675
0
          auto new_option = new_entry.add_options();
3676
0
          new_option->set_key(option.first);
3677
0
          new_option->set_value(option.second);
3678
0
        }
3679
0
        new_entry.set_state(master::SysCDCStreamEntryPB::ACTIVE);
3680
3681
0
        WARN_NOT_OK(cdc_rpc->client()->UpdateCDCStream(bootstrap_id, new_entry),
3682
0
                  "Unable to update CDC stream options");
3683
0
      });
3684
0
  }
3685
0
}
3686
3687
void CatalogManager::AddCDCStreamToUniverseAndInitConsumer(
3688
    const std::string& universe_id, const TableId& table_id, const Result<CDCStreamId>& stream_id,
3689
0
    std::function<void()> on_success_cb) {
3690
0
  scoped_refptr<UniverseReplicationInfo> universe;
3691
0
  {
3692
0
    SharedLock lock(mutex_);
3693
0
    TRACE("Acquired catalog manager lock");
3694
3695
0
    universe = FindPtrOrNull(universe_replication_map_, universe_id);
3696
0
    if (universe == nullptr) {
3697
0
      LOG(ERROR) << "Universe not found: " << universe_id;
3698
0
      return;
3699
0
    }
3700
0
  }
3701
3702
0
  if (!stream_id.ok()) {
3703
0
    LOG(ERROR) << "Error setting up CDC stream for table " << table_id;
3704
0
    MarkUniverseReplicationFailed(universe, ResultToStatus(stream_id));
3705
0
    return;
3706
0
  }
3707
3708
0
  bool merge_alter = false;
3709
0
  bool validated_all_tables = false;
3710
0
  {
3711
0
    auto l = universe->LockForWrite();
3712
0
    if (l->is_deleted_or_failed()) {
3713
      // Nothing to do if universe is being deleted.
3714
0
      return;
3715
0
    }
3716
3717
0
    auto map = l.mutable_data()->pb.mutable_table_streams();
3718
0
    (*map)[table_id] = *stream_id;
3719
3720
    // This functions as a barrier: waiting for the last RPC call from GetTableSchemaCallback.
3721
0
    if (l.mutable_data()->pb.table_streams_size() == l->pb.tables_size()) {
3722
      // All tables successfully validated! Register CDC consumers & start replication.
3723
0
      validated_all_tables = true;
3724
0
      LOG(INFO) << "Registering CDC consumers for universe " << universe->id();
3725
3726
0
      auto& validated_tables = l->pb.validated_tables();
3727
3728
0
      std::vector<CDCConsumerStreamInfo> consumer_info;
3729
0
      consumer_info.reserve(l->pb.tables_size());
3730
0
      for (const auto& table : validated_tables) {
3731
0
        CDCConsumerStreamInfo info;
3732
0
        info.producer_table_id = table.first;
3733
0
        info.consumer_table_id = table.second;
3734
0
        info.stream_id = (*map)[info.producer_table_id];
3735
0
        consumer_info.push_back(info);
3736
0
      }
3737
3738
0
      std::vector<HostPort> hp;
3739
0
      HostPortsFromPBs(l->pb.producer_master_addresses(), &hp);
3740
3741
0
      auto cdc_rpc_tasks_result =
3742
0
          universe->GetOrCreateCDCRpcTasks(l->pb.producer_master_addresses());
3743
0
      if (!cdc_rpc_tasks_result.ok()) {
3744
0
        LOG(WARNING) << "CDC streams won't be created: " << cdc_rpc_tasks_result;
3745
0
        l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED);
3746
0
      } else {
3747
0
        auto cdc_rpc_tasks = *cdc_rpc_tasks_result;
3748
0
        Status s = InitCDCConsumer(
3749
0
            consumer_info, HostPort::ToCommaSeparatedString(hp), l->pb.producer_id(),
3750
0
            cdc_rpc_tasks);
3751
0
        if (!s.ok()) {
3752
0
          LOG(ERROR) << "Error registering subscriber: " << s;
3753
0
          l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED);
3754
0
        } else {
3755
0
          GStringPiece original_producer_id(universe->id());
3756
0
          if (original_producer_id.ends_with(".ALTER")) {
3757
            // Don't enable ALTER universes, merge them into the main universe instead.
3758
0
            merge_alter = true;
3759
0
          } else {
3760
0
            l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::ACTIVE);
3761
0
          }
3762
0
        }
3763
0
      }
3764
0
    }
3765
3766
    // Update sys_catalog with new producer table id info.
3767
0
    Status status = sys_catalog_->Upsert(leader_ready_term(), universe);
3768
3769
    // Before committing, run any callbacks on success.
3770
0
    if (status.ok() && on_success_cb &&
3771
0
        (l.mutable_data()->pb.state() == SysUniverseReplicationEntryPB::ACTIVE || merge_alter)) {
3772
0
      on_success_cb();
3773
0
    }
3774
3775
0
    l.CommitOrWarn(status, "updating universe replication info in sys-catalog");
3776
0
  }
3777
3778
0
  if (validated_all_tables) {
3779
    // Also update the set of consumer tables.
3780
0
    LockGuard lock(mutex_);
3781
0
    auto l = universe->LockForRead();
3782
0
    for (const auto& table : l->pb.validated_tables()) {
3783
0
      xcluster_consumer_tables_to_stream_map_[table.second].emplace(universe->id(), *stream_id);
3784
0
    }
3785
0
  }
3786
3787
  // If this is an 'alter', merge back into primary command now that setup is a success.
3788
0
  if (merge_alter) {
3789
0
    MergeUniverseReplication(universe);
3790
0
  }
3791
0
}
3792
3793
/*
3794
 * UpdateXClusterConsumerOnTabletSplit updates the consumer -> producer tablet mapping after a local
3795
 * tablet split.
3796
 */
3797
Status CatalogManager::UpdateXClusterConsumerOnTabletSplit(
3798
    const TableId& consumer_table_id,
3799
140
    const SplitTabletIds& split_tablet_ids) {
3800
  // Check if this table is consuming a stream.
3801
140
  XClusterConsumerTableStreamInfoMap stream_infos =
3802
140
      GetXClusterStreamInfoForConsumerTable(consumer_table_id);
3803
3804
140
  if (stream_infos.empty()) {
3805
140
    return Status::OK();
3806
140
  }
3807
3808
0
  auto cluster_config = ClusterConfig();
3809
0
  auto l = cluster_config->LockForWrite();
3810
0
  for (const auto& stream_info : stream_infos) {
3811
0
    std::string universe_id = stream_info.first;
3812
0
    CDCStreamId stream_id = stream_info.second;
3813
    // Fetch the stream entry so we can update the mappings.
3814
0
    auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
3815
0
    auto producer_entry = FindOrNull(*producer_map, universe_id);
3816
0
    if (!producer_entry) {
3817
0
      return STATUS_FORMAT(NotFound,
3818
0
                           "Unable to find the producer entry for universe $0",
3819
0
                           universe_id);
3820
0
    }
3821
0
    auto stream_entry = FindOrNull(*producer_entry->mutable_stream_map(), stream_id);
3822
0
    if (!stream_entry) {
3823
0
      return STATUS_FORMAT(NotFound,
3824
0
                           "Unable to find the stream entry for universe $0, stream $1",
3825
0
                           universe_id,
3826
0
                           stream_id);
3827
0
    }
3828
3829
0
    RETURN_NOT_OK(UpdateTableMappingOnTabletSplit(stream_entry, split_tablet_ids));
3830
3831
    // We also need to mark this stream as no longer being able to perform 1-1 mappings.
3832
0
    stream_entry->set_same_num_producer_consumer_tablets(false);
3833
0
  }
3834
3835
  // Also bump the cluster_config_ version so that changes are propagated to tservers.
3836
0
  l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
3837
3838
0
  RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
3839
0
                            "Updating cluster config in sys-catalog"));
3840
0
  l.Commit();
3841
3842
0
  return Status::OK();
3843
0
}
3844
3845
Status CatalogManager::UpdateXClusterProducerOnTabletSplit(
3846
    const TableId& producer_table_id,
3847
140
    const SplitTabletIds& split_tablet_ids) {
3848
  // First check if this table has any streams associated with it.
3849
140
  auto streams = FindCDCStreamsForTable(producer_table_id);
3850
3851
140
  if (!streams.empty()) {
3852
    // For each stream, need to add in the children entries to the cdc_state table.
3853
0
    client::TableHandle cdc_table;
3854
0
    const client::YBTableName cdc_state_table_name(
3855
0
        YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
3856
0
    auto ybclient = master_->cdc_state_client_initializer().client();
3857
0
    if (!ybclient) {
3858
0
      return STATUS(IllegalState, "Client not initialized or shutting down");
3859
0
    }
3860
0
    RETURN_NOT_OK(cdc_table.Open(cdc_state_table_name, ybclient));
3861
0
    std::shared_ptr<client::YBSession> session = ybclient->NewSession();
3862
3863
0
    for (const auto& stream : streams) {
3864
0
      for (const auto& child_tablet_id :
3865
0
           {split_tablet_ids.children.first, split_tablet_ids.children.second}) {
3866
0
        const auto insert_op = cdc_table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
3867
0
        auto* insert_req = insert_op->mutable_request();
3868
0
        QLAddStringHashValue(insert_req, child_tablet_id);
3869
0
        QLAddStringRangeValue(insert_req, stream->id());
3870
        // TODO(JHE) set the checkpoint to a different value? OpId of the SPLIT_OP itself perhaps?
3871
0
        cdc_table.AddStringColumnValue(insert_req, master::kCdcCheckpoint, OpId().ToString());
3872
        // TODO(JHE) what to set the time to?
3873
        // cdc_table.AddTimestampColumnValue(
3874
        //     insert_req, master::kCdcLastReplicationTime, GetCurrentTimeMicros());
3875
0
        session->Apply(insert_op);
3876
0
      }
3877
0
    }
3878
0
    RETURN_NOT_OK(session->Flush());
3879
0
  }
3880
3881
140
  return Status::OK();
3882
140
}
3883
3884
Status CatalogManager::InitCDCConsumer(
3885
    const std::vector<CDCConsumerStreamInfo>& consumer_info,
3886
    const std::string& master_addrs,
3887
    const std::string& producer_universe_uuid,
3888
0
    std::shared_ptr<CDCRpcTasks> cdc_rpc_tasks) {
3889
3890
0
  std::unordered_set<HostPort, HostPortHash> tserver_addrs;
3891
  // Get the tablets in the consumer table.
3892
0
  cdc::ProducerEntryPB producer_entry;
3893
0
  for (const auto& stream_info : consumer_info) {
3894
0
    GetTableLocationsRequestPB consumer_table_req;
3895
0
    consumer_table_req.set_max_returned_locations(std::numeric_limits<int32_t>::max());
3896
0
    GetTableLocationsResponsePB consumer_table_resp;
3897
0
    TableIdentifierPB table_identifer;
3898
0
    table_identifer.set_table_id(stream_info.consumer_table_id);
3899
0
    *(consumer_table_req.mutable_table()) = table_identifer;
3900
0
    RETURN_NOT_OK(GetTableLocations(&consumer_table_req, &consumer_table_resp));
3901
3902
0
    cdc::StreamEntryPB stream_entry;
3903
    // Get producer tablets and map them to the consumer tablets
3904
0
    RETURN_NOT_OK(CreateTabletMapping(
3905
0
        stream_info.producer_table_id, stream_info.consumer_table_id, producer_universe_uuid,
3906
0
        master_addrs, consumer_table_resp, &tserver_addrs, &stream_entry, cdc_rpc_tasks));
3907
0
    (*producer_entry.mutable_stream_map())[stream_info.stream_id] = std::move(stream_entry);
3908
0
  }
3909
3910
  // Log the Network topology of the Producer Cluster
3911
0
  auto master_addrs_list = StringSplit(master_addrs, ',');
3912
0
  producer_entry.mutable_master_addrs()->Reserve(narrow_cast<int>(master_addrs_list.size()));
3913
0
  for (const auto& addr : master_addrs_list) {
3914
0
    auto hp = VERIFY_RESULT(HostPort::FromString(addr, 0));
3915
0
    HostPortToPB(hp, producer_entry.add_master_addrs());
3916
0
  }
3917
3918
0
  producer_entry.mutable_tserver_addrs()->Reserve(narrow_cast<int>(tserver_addrs.size()));
3919
0
  for (const auto& addr : tserver_addrs) {
3920
0
    HostPortToPB(addr, producer_entry.add_tserver_addrs());
3921
0
  }
3922
3923
0
  auto cluster_config = ClusterConfig();
3924
0
  auto l = cluster_config->LockForWrite();
3925
0
  auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
3926
0
  auto it = producer_map->find(producer_universe_uuid);
3927
0
  if (it != producer_map->end()) {
3928
0
    return STATUS(InvalidArgument, "Already created a consumer for this universe");
3929
0
  }
3930
3931
  // TServers will use the ClusterConfig to create CDC Consumers for applicable local tablets.
3932
0
  (*producer_map)[producer_universe_uuid] = std::move(producer_entry);
3933
0
  l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
3934
0
  RETURN_NOT_OK(CheckStatus(
3935
0
      sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
3936
0
      "updating cluster config in sys-catalog"));
3937
0
  l.Commit();
3938
3939
0
  return Status::OK();
3940
0
}
3941
3942
0
void CatalogManager::MergeUniverseReplication(scoped_refptr<UniverseReplicationInfo> universe) {
3943
  // Merge back into primary command now that setup is a success.
3944
0
  GStringPiece original_producer_id(universe->id());
3945
0
  if (!original_producer_id.ends_with(".ALTER")) {
3946
0
    return;
3947
0
  }
3948
0
  original_producer_id.remove_suffix(sizeof(".ALTER")-1 /* exclude \0 ending */);
3949
0
  LOG(INFO) << "Merging CDC universe: " << universe->id()
3950
0
            << " into " << original_producer_id.ToString();
3951
3952
0
  scoped_refptr<UniverseReplicationInfo> original_universe;
3953
0
  {
3954
0
    SharedLock lock(mutex_);
3955
0
    TRACE("Acquired catalog manager lock");
3956
3957
0
    original_universe = FindPtrOrNull(universe_replication_map_, original_producer_id.ToString());
3958
0
    if (original_universe == nullptr) {
3959
0
      LOG(ERROR) << "Universe not found: " << original_producer_id.ToString();
3960
0
      return;
3961
0
    }
3962
0
  }
3963
  // Merge Cluster Config for TServers.
3964
0
  {
3965
0
    auto cluster_config = ClusterConfig();
3966
0
    auto cl = cluster_config->LockForWrite();
3967
0
    auto pm = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
3968
0
    auto original_producer_entry = pm->find(original_universe->id());
3969
0
    auto alter_producer_entry = pm->find(universe->id());
3970
0
    if (original_producer_entry != pm->end() && alter_producer_entry != pm->end()) {
3971
      // Merge the Tables from the Alter into the original.
3972
0
      auto as = alter_producer_entry->second.stream_map();
3973
0
      original_producer_entry->second.mutable_stream_map()->insert(as.begin(), as.end());
3974
      // Delete the Alter
3975
0
      pm->erase(alter_producer_entry);
3976
0
    } else {
3977
0
      LOG(WARNING) << "Could not find both universes in Cluster Config: " << universe->id();
3978
0
    }
3979
0
    cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1);
3980
0
    const Status s = sys_catalog_->Upsert(leader_ready_term(), cluster_config.get());
3981
0
    cl.CommitOrWarn(s, "updating cluster config in sys-catalog");
3982
0
  }
3983
  // Merge Master Config on Consumer. (no need for Producer changes, since it uses stream_id)
3984
0
  {
3985
0
    auto original_lock = original_universe->LockForWrite();
3986
0
    auto alter_lock = universe->LockForWrite();
3987
    // Merge Table->StreamID mapping.
3988
0
    auto at = alter_lock.mutable_data()->pb.mutable_tables();
3989
0
    original_lock.mutable_data()->pb.mutable_tables()->MergeFrom(*at);
3990
0
    at->Clear();
3991
0
    auto as = alter_lock.mutable_data()->pb.mutable_table_streams();
3992
0
    original_lock.mutable_data()->pb.mutable_table_streams()->insert(as->begin(), as->end());
3993
0
    as->clear();
3994
0
    auto av = alter_lock.mutable_data()->pb.mutable_validated_tables();
3995
0
    original_lock.mutable_data()->pb.mutable_validated_tables()->insert(av->begin(), av->end());
3996
0
    av->clear();
3997
0
    alter_lock.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DELETED);
3998
3999
0
    vector<UniverseReplicationInfo*> universes{original_universe.get(), universe.get()};
4000
0
    const Status s = sys_catalog_->Upsert(leader_ready_term(), universes);
4001
0
    alter_lock.CommitOrWarn(s, "updating universe replication entries in sys-catalog");
4002
0
    if (s.ok()) {
4003
0
      original_lock.Commit();
4004
0
    }
4005
0
  }
4006
  // TODO: universe_replication_map_.erase(universe->id()) at a later time.
4007
  //       TwoDCTest.AlterUniverseReplicationTables crashes due to undiagnosed race right now.
4008
0
  LOG(INFO) << "Done with Merging " << universe->id() << " into " << original_universe->id();
4009
0
}
4010
4011
Status ReturnErrorOrAddWarning(const Status& s,
4012
                               const DeleteUniverseReplicationRequestPB* req,
4013
0
                               DeleteUniverseReplicationResponsePB* resp) {
4014
0
  if (!s.ok()) {
4015
0
    if (req->ignore_errors()) {
4016
      // Continue executing, save the status as a warning.
4017
0
      AppStatusPB* warning = resp->add_warnings();
4018
0
      StatusToPB(s, warning);
4019
0
      return Status::OK();
4020
0
    }
4021
0
    return s.CloneAndAppend("\nUse 'ignore-errors' to ignore this error.");
4022
0
  }
4023
0
  return s;
4024
0
}
4025
4026
Status CatalogManager::DeleteUniverseReplication(const DeleteUniverseReplicationRequestPB* req,
4027
                                                 DeleteUniverseReplicationResponsePB* resp,
4028
0
                                                 rpc::RpcContext* rpc) {
4029
0
  LOG(INFO) << "Servicing DeleteUniverseReplication request from " << RequestorString(rpc)
4030
0
            << ": " << req->ShortDebugString();
4031
4032
0
  if (!req->has_producer_id()) {
4033
0
    return STATUS(InvalidArgument, "Producer universe ID required", req->ShortDebugString(),
4034
0
                  MasterError(MasterErrorPB::INVALID_REQUEST));
4035
0
  }
4036
4037
0
  scoped_refptr<UniverseReplicationInfo> ri;
4038
0
  {
4039
0
    SharedLock lock(mutex_);
4040
0
    TRACE("Acquired catalog manager lock");
4041
4042
0
    ri = FindPtrOrNull(universe_replication_map_, req->producer_id());
4043
0
    if (ri == nullptr) {
4044
0
      return STATUS(NotFound, "Universe replication info does not exist",
4045
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
4046
0
    }
4047
0
  }
4048
4049
0
  auto l = ri->LockForWrite();
4050
0
  l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DELETED);
4051
4052
  // Delete subscribers on the Consumer Registry (removes from TServers).
4053
0
  LOG(INFO) << "Deleting subscribers for producer " << req->producer_id();
4054
0
  {
4055
0
    auto cluster_config = ClusterConfig();
4056
0
    auto cl = cluster_config->LockForWrite();
4057
0
    auto producer_map = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
4058
0
    auto it = producer_map->find(req->producer_id());
4059
0
    if (it != producer_map->end()) {
4060
0
      producer_map->erase(it);
4061
0
      cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1);
4062
0
      RETURN_NOT_OK(CheckStatus(
4063
0
          sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
4064
0
          "updating cluster config in sys-catalog"));
4065
0
      cl.Commit();
4066
0
    }
4067
0
  }
4068
4069
  // Delete CDC stream config on the Producer.
4070
0
  if (!l->pb.table_streams().empty()) {
4071
0
    auto result = ri->GetOrCreateCDCRpcTasks(l->pb.producer_master_addresses());
4072
0
    if (!result.ok()) {
4073
0
      LOG(WARNING) << "Unable to create cdc rpc task. CDC streams won't be deleted: " << result;
4074
0
    } else {
4075
0
      auto cdc_rpc = *result;
4076
0
      vector<CDCStreamId> streams;
4077
0
      std::unordered_map<CDCStreamId, TableId> stream_to_producer_table_id;
4078
0
      for (const auto& table : l->pb.table_streams()) {
4079
0
        streams.push_back(table.second);
4080
0
        stream_to_producer_table_id.emplace(table.second, table.first);
4081
0
      }
4082
4083
0
      DeleteCDCStreamResponsePB delete_cdc_stream_resp;
4084
      // Set force_delete=true since we are deleting active xCluster streams.
4085
0
      auto s = cdc_rpc->client()->DeleteCDCStream(streams,
4086
0
                                                  true, /* force_delete */
4087
0
                                                  req->ignore_errors(),
4088
0
                                                  &delete_cdc_stream_resp);
4089
4090
0
      if (delete_cdc_stream_resp.not_found_stream_ids().size() > 0) {
4091
0
        std::ostringstream missing_streams;
4092
0
        for (auto it = delete_cdc_stream_resp.not_found_stream_ids().begin();
4093
0
               it != delete_cdc_stream_resp.not_found_stream_ids().end();
4094
0
               ++it) {
4095
0
          if (it != delete_cdc_stream_resp.not_found_stream_ids().begin()) {
4096
0
            missing_streams << ",";
4097
0
          }
4098
0
          missing_streams << *it << " (table_id: " << stream_to_producer_table_id[*it] << ")";
4099
0
        }
4100
0
        if (s.ok()) {
4101
          // Returned but did not find some streams, so still need to warn the user about those.
4102
0
          s = STATUS(NotFound,
4103
0
                     "Could not find the following streams: [" + missing_streams.str() + "].");
4104
0
        } else {
4105
0
          s = s.CloneAndPrepend(
4106
0
              "Could not find the following streams: [" + missing_streams.str() + "].");
4107
0
        }
4108
0
      }
4109
4110
0
      RETURN_NOT_OK(ReturnErrorOrAddWarning(s, req, resp));
4111
0
    }
4112
0
  }
4113
4114
  // Delete universe in the Universe Config.
4115
0
  RETURN_NOT_OK(ReturnErrorOrAddWarning(DeleteUniverseReplicationUnlocked(ri), req, resp));
4116
0
  l.Commit();
4117
4118
0
  LOG(INFO) << "Processed delete universe replication " << ri->ToString()
4119
0
            << " per request from " << RequestorString(rpc);
4120
4121
0
  return Status::OK();
4122
0
}
4123
4124
Status CatalogManager::DeleteUniverseReplicationUnlocked(
4125
0
    scoped_refptr<UniverseReplicationInfo> universe) {
4126
  // Assumes that caller has locked universe.
4127
0
  RETURN_NOT_OK_PREPEND(
4128
0
      sys_catalog_->Delete(leader_ready_term(), universe),
4129
0
      Substitute("An error occurred while updating sys-catalog, universe_id: $0", universe->id()));
4130
4131
  // Remove it from the map.
4132
0
  LockGuard lock(mutex_);
4133
0
  if (universe_replication_map_.erase(universe->id()) < 1) {
4134
0
    LOG(WARNING) << "Failed to remove replication info from map: universe_id: " << universe->id();
4135
0
  }
4136
  // Also update the mapping of consumer tables.
4137
0
  for (const auto& table : universe->metadata().state().pb.validated_tables()) {
4138
0
    if (xcluster_consumer_tables_to_stream_map_[table.second].erase(universe->id()) < 1) {
4139
0
      LOG(WARNING) << "Failed to remove consumer table from mapping. "
4140
0
                   << "table_id: " << table.second << ": universe_id: " << universe->id();
4141
0
    }
4142
0
    if (xcluster_consumer_tables_to_stream_map_[table.second].empty()) {
4143
0
      xcluster_consumer_tables_to_stream_map_.erase(table.second);
4144
0
    }
4145
0
  }
4146
0
  return Status::OK();
4147
0
}
4148
4149
Status CatalogManager::SetUniverseReplicationEnabled(
4150
    const SetUniverseReplicationEnabledRequestPB* req,
4151
    SetUniverseReplicationEnabledResponsePB* resp,
4152
0
    rpc::RpcContext* rpc) {
4153
0
  LOG(INFO) << "Servicing SetUniverseReplicationEnabled request from " << RequestorString(rpc)
4154
0
            << ": " << req->ShortDebugString();
4155
4156
  // Sanity Checking Cluster State and Input.
4157
0
  if (!req->has_producer_id()) {
4158
0
    return STATUS(InvalidArgument, "Producer universe ID must be provided",
4159
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4160
0
  }
4161
0
  if (!req->has_is_enabled()) {
4162
0
    return STATUS(InvalidArgument, "Must explicitly set whether to enable",
4163
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4164
0
  }
4165
4166
0
  scoped_refptr<UniverseReplicationInfo> universe;
4167
0
  {
4168
0
    SharedLock lock(mutex_);
4169
4170
0
    universe = FindPtrOrNull(universe_replication_map_, req->producer_id());
4171
0
    if (universe == nullptr) {
4172
0
      return STATUS(NotFound, "Could not find CDC producer universe",
4173
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
4174
0
    }
4175
0
  }
4176
4177
  // Update the Master's Universe Config with the new state.
4178
0
  {
4179
0
    auto l = universe->LockForWrite();
4180
0
    if (l->pb.state() != SysUniverseReplicationEntryPB::DISABLED &&
4181
0
        l->pb.state() != SysUniverseReplicationEntryPB::ACTIVE) {
4182
0
      return STATUS(
4183
0
          InvalidArgument,
4184
0
          Format("Universe Replication in invalid state: $0.  Retry or Delete.",
4185
0
              SysUniverseReplicationEntryPB::State_Name(l->pb.state())),
4186
0
          req->ShortDebugString(),
4187
0
          MasterError(MasterErrorPB::INVALID_REQUEST));
4188
0
    }
4189
0
    if (req->is_enabled()) {
4190
0
        l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::ACTIVE);
4191
0
    } else { // DISABLE.
4192
0
        l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DISABLED);
4193
0
    }
4194
0
    RETURN_NOT_OK(CheckStatus(
4195
0
        sys_catalog_->Upsert(leader_ready_term(), universe),
4196
0
        "updating universe replication info in sys-catalog"));
4197
0
    l.Commit();
4198
0
  }
4199
4200
  // Modify the Consumer Registry, which will fan out this info to all TServers on heartbeat.
4201
0
  {
4202
0
    auto cluster_config = ClusterConfig();
4203
0
    auto l = cluster_config->LockForWrite();
4204
0
    auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
4205
0
    auto it = producer_map->find(req->producer_id());
4206
0
    if (it == producer_map->end()) {
4207
0
      LOG(WARNING) << "Valid Producer Universe not in Consumer Registry: " << req->producer_id();
4208
0
      return STATUS(NotFound, "Could not find CDC producer universe",
4209
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
4210
0
    }
4211
0
    (*it).second.set_disable_stream(!req->is_enabled());
4212
0
    l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
4213
0
    RETURN_NOT_OK(CheckStatus(
4214
0
        sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
4215
0
        "updating cluster config in sys-catalog"));
4216
0
    l.Commit();
4217
0
  }
4218
4219
0
  return Status::OK();
4220
0
}
4221
4222
Status CatalogManager::AlterUniverseReplication(const AlterUniverseReplicationRequestPB* req,
4223
                                                AlterUniverseReplicationResponsePB* resp,
4224
0
                                                rpc::RpcContext* rpc) {
4225
0
  LOG(INFO) << "Servicing AlterUniverseReplication request from " << RequestorString(rpc)
4226
0
            << ": " << req->ShortDebugString();
4227
4228
  // Sanity Checking Cluster State and Input.
4229
0
  if (!req->has_producer_id()) {
4230
0
    return STATUS(InvalidArgument, "Producer universe ID must be provided",
4231
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4232
0
  }
4233
4234
  // Verify that there is an existing Universe config
4235
0
  scoped_refptr<UniverseReplicationInfo> original_ri;
4236
0
  {
4237
0
    SharedLock lock(mutex_);
4238
4239
0
    original_ri = FindPtrOrNull(universe_replication_map_, req->producer_id());
4240
0
    if (original_ri == nullptr) {
4241
0
      return STATUS(NotFound, "Could not find CDC producer universe",
4242
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
4243
0
    }
4244
0
  }
4245
4246
  // Currently, config options are mutually exclusive to simplify transactionality.
4247
0
  int config_count = (req->producer_master_addresses_size() > 0 ? 1 : 0) +
4248
0
                     (req->producer_table_ids_to_remove_size() > 0 ? 1 : 0) +
4249
0
                     (req->producer_table_ids_to_add_size() > 0 ? 1 : 0) +
4250
0
                     (req->has_new_producer_universe_id() ? 1 : 0);
4251
0
  if (config_count != 1) {
4252
0
    return STATUS(InvalidArgument, "Only 1 Alter operation per request currently supported",
4253
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4254
0
  }
4255
4256
  // Config logic...
4257
0
  if (req->producer_master_addresses_size() > 0) {
4258
    // 'set_master_addresses'
4259
    // TODO: Verify the input. Setup an RPC Task, ListTables, ensure same.
4260
4261
    // 1a. Persistent Config: Update the Universe Config for Master.
4262
0
    {
4263
0
      auto l = original_ri->LockForWrite();
4264
0
      l.mutable_data()->pb.mutable_producer_master_addresses()->CopyFrom(
4265
0
          req->producer_master_addresses());
4266
0
      RETURN_NOT_OK(CheckStatus(
4267
0
          sys_catalog_->Upsert(leader_ready_term(), original_ri),
4268
0
          "updating universe replication info in sys-catalog"));
4269
0
      l.Commit();
4270
0
    }
4271
    // 1b. Persistent Config: Update the Consumer Registry (updates TServers)
4272
0
    {
4273
0
      auto cluster_config = ClusterConfig();
4274
0
      auto l = cluster_config->LockForWrite();
4275
0
      auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
4276
0
      auto it = producer_map->find(req->producer_id());
4277
0
      if (it == producer_map->end()) {
4278
0
        LOG(WARNING) << "Valid Producer Universe not in Consumer Registry: " << req->producer_id();
4279
0
        return STATUS(NotFound, "Could not find CDC producer universe",
4280
0
                      req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
4281
0
      }
4282
0
      (*it).second.mutable_master_addrs()->CopyFrom(req->producer_master_addresses());
4283
0
      l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
4284
0
      RETURN_NOT_OK(CheckStatus(
4285
0
          sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
4286
0
          "updating cluster config in sys-catalog"));
4287
0
      l.Commit();
4288
0
    }
4289
    // 2. Memory Update: Change cdc_rpc_tasks (Master cache)
4290
0
    {
4291
0
      auto result = original_ri->GetOrCreateCDCRpcTasks(req->producer_master_addresses());
4292
0
      if (!result.ok()) {
4293
0
        return SetupError(resp->mutable_error(), MasterErrorPB::INTERNAL_ERROR, result.status());
4294
0
      }
4295
0
    }
4296
0
  } else if (req->producer_table_ids_to_remove_size() > 0) {
4297
    // 'remove_table'
4298
0
    auto it = req->producer_table_ids_to_remove();
4299
0
    std::set<string> table_ids_to_remove(it.begin(), it.end());
4300
    // Filter out any tables that aren't in the existing replication config.
4301
0
    {
4302
0
      auto l = original_ri->LockForRead();
4303
0
      auto tbl_iter = l->pb.tables();
4304
0
      std::set<string> existing_tables(tbl_iter.begin(), tbl_iter.end()), filtered_list;
4305
0
      set_intersection(table_ids_to_remove.begin(), table_ids_to_remove.end(),
4306
0
                       existing_tables.begin(), existing_tables.end(),
4307
0
                       std::inserter(filtered_list, filtered_list.begin()));
4308
0
      filtered_list.swap(table_ids_to_remove);
4309
0
    }
4310
4311
0
    vector<CDCStreamId> streams_to_remove;
4312
    // 1. Update the Consumer Registry (removes from TServers).
4313
0
    {
4314
0
      auto cluster_config = ClusterConfig();
4315
0
      auto cl = cluster_config->LockForWrite();
4316
0
      auto pm = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
4317
0
      auto producer_entry = pm->find(req->producer_id());
4318
0
      if (producer_entry != pm->end()) {
4319
        // Remove the Tables Specified (not part of the key).
4320
0
        auto stream_map = producer_entry->second.mutable_stream_map();
4321
0
        for (auto& p : *stream_map) {
4322
0
          if (table_ids_to_remove.count(p.second.producer_table_id()) > 0) {
4323
0
            streams_to_remove.push_back(p.first);
4324
0
          }
4325
0
        }
4326
0
        if (streams_to_remove.size() == stream_map->size()) {
4327
          // If this ends with an empty Map, disallow and force user to delete.
4328
0
          LOG(WARNING) << "CDC 'remove_table' tried to remove all tables." << req->producer_id();
4329
0
          return STATUS(
4330
0
              InvalidArgument,
4331
0
              "Cannot remove all tables with alter. Use delete_universe_replication instead.",
4332
0
              req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4333
0
        } else if (streams_to_remove.empty()) {
4334
          // If this doesn't delete anything, notify the user.
4335
0
          return STATUS(InvalidArgument, "Removal matched no entries.",
4336
0
                        req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4337
0
        }
4338
0
        for (auto& key : streams_to_remove) {
4339
0
          stream_map->erase(stream_map->find(key));
4340
0
        }
4341
0
      }
4342
0
      cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1);
4343
0
      RETURN_NOT_OK(CheckStatus(
4344
0
          sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
4345
0
          "updating cluster config in sys-catalog"));
4346
0
      cl.Commit();
4347
0
    }
4348
    // 2. Remove from Master Configs on Producer and Consumer.
4349
0
    {
4350
0
      auto l = original_ri->LockForWrite();
4351
0
      if (!l->pb.table_streams().empty()) {
4352
        // Delete Relevant Table->StreamID mappings on Consumer.
4353
0
        auto table_streams = l.mutable_data()->pb.mutable_table_streams();
4354
0
        auto validated_tables = l.mutable_data()->pb.mutable_validated_tables();
4355
0
        for (auto& key : table_ids_to_remove) {
4356
0
          table_streams->erase(table_streams->find(key));
4357
0
          validated_tables->erase(validated_tables->find(key));
4358
0
        }
4359
0
        for (int i = 0; i < l.mutable_data()->pb.tables_size(); i++) {
4360
0
          if (table_ids_to_remove.count(l.mutable_data()->pb.tables(i)) > 0) {
4361
0
            l.mutable_data()->pb.mutable_tables()->DeleteSubrange(i, 1);
4362
0
            --i;
4363
0
          }
4364
0
        }
4365
        // Delete CDC stream config on the Producer.
4366
0
        auto result = original_ri->GetOrCreateCDCRpcTasks(l->pb.producer_master_addresses());
4367
0
        if (!result.ok()) {
4368
0
          LOG(WARNING) << "Unable to create cdc rpc task. CDC streams won't be deleted: " << result;
4369
0
        } else {
4370
0
          auto s = (*result)->client()->DeleteCDCStream(streams_to_remove,
4371
0
                                                        true /* force_delete */);
4372
0
          if (!s.ok()) {
4373
0
            std::stringstream os;
4374
0
            std::copy(streams_to_remove.begin(), streams_to_remove.end(),
4375
0
                      std::ostream_iterator<CDCStreamId>(os, ", "));
4376
0
            LOG(WARNING) << "Unable to delete CDC streams: " << os.str() << s;
4377
0
          }
4378
0
        }
4379
0
      }
4380
0
      RETURN_NOT_OK(CheckStatus(
4381
0
          sys_catalog_->Upsert(leader_ready_term(), original_ri),
4382
0
          "updating universe replication info in sys-catalog"));
4383
0
      l.Commit();
4384
0
    }
4385
0
  } else if (req->producer_table_ids_to_add_size() > 0) {
4386
    // 'add_table'
4387
0
    string alter_producer_id = req->producer_id() + ".ALTER";
4388
4389
    // If user passed in bootstrap ids, check that there is a bootstrap id for every table.
4390
0
    if (req->producer_bootstrap_ids_to_add().size() > 0 &&
4391
0
      req->producer_table_ids_to_add().size() != req->producer_bootstrap_ids_to_add().size()) {
4392
0
      return STATUS(InvalidArgument, "Number of bootstrap ids must be equal to number of tables",
4393
0
        req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4394
0
    }
4395
4396
    // Verify no 'alter' command running.
4397
0
    scoped_refptr<UniverseReplicationInfo> alter_ri;
4398
0
    {
4399
0
      SharedLock lock(mutex_);
4400
0
      alter_ri = FindPtrOrNull(universe_replication_map_, alter_producer_id);
4401
0
    }
4402
0
    {
4403
0
      if (alter_ri != nullptr) {
4404
0
        LOG(INFO) << "Found " << alter_producer_id << "... Removing";
4405
0
        if (alter_ri->LockForRead()->is_deleted_or_failed()) {
4406
          // Delete previous Alter if it's completed but failed.
4407
0
          master::DeleteUniverseReplicationRequestPB delete_req;
4408
0
          delete_req.set_producer_id(alter_ri->id());
4409
0
          master::DeleteUniverseReplicationResponsePB delete_resp;
4410
0
          Status s = DeleteUniverseReplication(&delete_req, &delete_resp, rpc);
4411
0
          if (!s.ok()) {
4412
0
            if (delete_resp.has_error()) {
4413
0
              resp->mutable_error()->Swap(delete_resp.mutable_error());
4414
0
              return s;
4415
0
            }
4416
0
            return SetupError(resp->mutable_error(), s);
4417
0
          }
4418
0
        } else {
4419
0
          return STATUS(InvalidArgument, "Alter for CDC producer currently running",
4420
0
                        req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4421
0
        }
4422
0
      }
4423
0
    }
4424
4425
    // Map each table id to its corresponding bootstrap id.
4426
0
    std::unordered_map<TableId, std::string> table_id_to_bootstrap_id;
4427
0
    if (req->producer_bootstrap_ids_to_add().size() > 0) {
4428
0
      for (int i = 0; i < req->producer_table_ids_to_add().size(); i++) {
4429
0
        table_id_to_bootstrap_id[req->producer_table_ids_to_add(i)]
4430
0
          = req->producer_bootstrap_ids_to_add(i);
4431
0
      }
4432
4433
      // Ensure that table ids are unique. We need to do this here even though
4434
      // the same check is performed by SetupUniverseReplication because
4435
      // duplicate table ids can cause a bootstrap id entry in table_id_to_bootstrap_id
4436
      // to be overwritten.
4437
0
      if (table_id_to_bootstrap_id.size() !=
4438
0
              implicit_cast<size_t>(req->producer_table_ids_to_add().size())) {
4439
0
        return STATUS(InvalidArgument, "When providing bootstrap ids, "
4440
0
                      "the list of tables must be unique", req->ShortDebugString(),
4441
0
                      MasterError(MasterErrorPB::INVALID_REQUEST));
4442
0
      }
4443
0
    }
4444
4445
    // Only add new tables.  Ignore tables that are currently being replicated.
4446
0
    auto tid_iter = req->producer_table_ids_to_add();
4447
0
    std::unordered_set<string> new_tables(tid_iter.begin(), tid_iter.end());
4448
0
    {
4449
0
      auto l = original_ri->LockForRead();
4450
0
      for(auto t : l->pb.tables()) {
4451
0
        auto pos = new_tables.find(t);
4452
0
        if (pos != new_tables.end()) {
4453
0
          new_tables.erase(pos);
4454
0
        }
4455
0
      }
4456
0
    }
4457
0
    if (new_tables.empty()) {
4458
0
      return STATUS(InvalidArgument, "CDC producer already contains all requested tables",
4459
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4460
0
    }
4461
4462
    // 1. create an ALTER table request that mirrors the original 'setup_replication'.
4463
0
    master::SetupUniverseReplicationRequestPB setup_req;
4464
0
    master::SetupUniverseReplicationResponsePB setup_resp;
4465
0
    setup_req.set_producer_id(alter_producer_id);
4466
0
    setup_req.mutable_producer_master_addresses()->CopyFrom(
4467
0
        original_ri->LockForRead()->pb.producer_master_addresses());
4468
0
    for (auto t : new_tables) {
4469
0
      setup_req.add_producer_table_ids(t);
4470
4471
      // Add bootstrap id to request if it exists.
4472
0
      auto bootstrap_id_lookup_result = table_id_to_bootstrap_id.find(t);
4473
0
      if (bootstrap_id_lookup_result != table_id_to_bootstrap_id.end()) {
4474
0
        setup_req.add_producer_bootstrap_ids(bootstrap_id_lookup_result->second);
4475
0
      }
4476
0
    }
4477
4478
    // 2. run the 'setup_replication' pipeline on the ALTER Table
4479
0
    Status s = SetupUniverseReplication(&setup_req, &setup_resp, rpc);
4480
0
    if (!s.ok()) {
4481
0
      if (setup_resp.has_error()) {
4482
0
        resp->mutable_error()->Swap(setup_resp.mutable_error());
4483
0
        return s;
4484
0
      }
4485
0
      return SetupError(resp->mutable_error(), s);
4486
0
    }
4487
    // NOTE: ALTER merges back into original after completion.
4488
0
  } else if (req->has_new_producer_universe_id()) {
4489
0
    Status s = RenameUniverseReplication(original_ri, req, resp, rpc);
4490
0
    if (!s.ok()) {
4491
0
      return SetupError(resp->mutable_error(), s);
4492
0
    }
4493
0
  }
4494
4495
0
  return Status::OK();
4496
0
}
4497
4498
Status CatalogManager::RenameUniverseReplication(
4499
    scoped_refptr<UniverseReplicationInfo> universe,
4500
    const AlterUniverseReplicationRequestPB* req,
4501
    AlterUniverseReplicationResponsePB* resp,
4502
0
    rpc::RpcContext* rpc) {
4503
0
  const string old_universe_replication_id = universe->id();
4504
0
  const string new_producer_universe_id = req->new_producer_universe_id();
4505
0
  if (old_universe_replication_id == new_producer_universe_id) {
4506
0
    return STATUS(InvalidArgument, "Old and new replication ids must be different",
4507
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4508
0
  }
4509
4510
0
  {
4511
0
    LockGuard lock(mutex_);
4512
0
    auto l = universe->LockForWrite();
4513
0
    scoped_refptr<UniverseReplicationInfo> new_ri;
4514
4515
    // Assert that new_replication_name isn't already in use.
4516
0
    if (FindPtrOrNull(universe_replication_map_, new_producer_universe_id) != nullptr) {
4517
0
      return STATUS(InvalidArgument, "New replication id is already in use",
4518
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4519
0
    }
4520
4521
    // Since the producer_id is used as the key, we need to create a new UniverseReplicationInfo.
4522
0
    new_ri = new UniverseReplicationInfo(new_producer_universe_id);
4523
0
    new_ri->mutable_metadata()->StartMutation();
4524
0
    SysUniverseReplicationEntryPB *metadata = &new_ri->mutable_metadata()->mutable_dirty()->pb;
4525
0
    metadata->CopyFrom(l->pb);
4526
0
    metadata->set_producer_id(new_producer_universe_id);
4527
4528
    // Also need to update internal maps.
4529
0
    auto cluster_config = ClusterConfig();
4530
0
    auto cl = cluster_config->LockForWrite();
4531
0
    auto producer_map = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
4532
0
    (*producer_map)[new_producer_universe_id] =
4533
0
        std::move((*producer_map)[old_universe_replication_id]);
4534
0
    producer_map->erase(old_universe_replication_id);
4535
4536
0
    {
4537
      // Need both these updates to be atomic.
4538
0
      auto w = sys_catalog_->NewWriter(leader_ready_term());
4539
0
      RETURN_NOT_OK(w->Mutate(QLWriteRequestPB::QL_STMT_DELETE, universe.get()));
4540
0
      RETURN_NOT_OK(w->Mutate(QLWriteRequestPB::QL_STMT_UPDATE,
4541
0
                              new_ri.get(),
4542
0
                              cluster_config.get()));
4543
0
      RETURN_NOT_OK(CheckStatus(
4544
0
          sys_catalog_->SyncWrite(w.get()),
4545
0
          "Updating universe replication info and cluster config in sys-catalog"));
4546
0
    }
4547
0
    new_ri->mutable_metadata()->CommitMutation();
4548
0
    cl.Commit();
4549
4550
    // Update universe_replication_map after persistent data is saved.
4551
0
    universe_replication_map_[new_producer_universe_id] = new_ri;
4552
0
    universe_replication_map_.erase(old_universe_replication_id);
4553
0
  }
4554
4555
0
  return Status::OK();
4556
0
}
4557
4558
Status CatalogManager::GetUniverseReplication(const GetUniverseReplicationRequestPB* req,
4559
                                              GetUniverseReplicationResponsePB* resp,
4560
0
                                              rpc::RpcContext* rpc) {
4561
0
  LOG(INFO) << "GetUniverseReplication from " << RequestorString(rpc)
4562
0
            << ": " << req->DebugString();
4563
4564
0
  if (!req->has_producer_id()) {
4565
0
    return STATUS(InvalidArgument, "Producer universe ID must be provided",
4566
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4567
0
  }
4568
4569
0
  scoped_refptr<UniverseReplicationInfo> universe;
4570
0
  {
4571
0
    SharedLock lock(mutex_);
4572
4573
0
    universe = FindPtrOrNull(universe_replication_map_, req->producer_id());
4574
0
    if (universe == nullptr) {
4575
0
      return STATUS(NotFound, "Could not find CDC producer universe",
4576
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
4577
0
    }
4578
0
  }
4579
4580
0
  resp->mutable_entry()->CopyFrom(universe->LockForRead()->pb);
4581
0
  return Status::OK();
4582
0
}
4583
4584
/*
4585
 * Checks if the universe replication setup has completed.
4586
 * Returns Status::OK() if this call succeeds, and uses resp->done() to determine if the setup has
4587
 * completed (either failed or succeeded). If the setup has failed, then resp->replication_error()
4588
 * is also set.
4589
 */
4590
Status CatalogManager::IsSetupUniverseReplicationDone(
4591
    const IsSetupUniverseReplicationDoneRequestPB* req,
4592
    IsSetupUniverseReplicationDoneResponsePB* resp,
4593
0
    rpc::RpcContext* rpc) {
4594
0
  LOG(INFO) << "IsSetupUniverseReplicationDone from " << RequestorString(rpc)
4595
0
            << ": " << req->DebugString();
4596
4597
0
  if (!req->has_producer_id()) {
4598
0
    return STATUS(InvalidArgument, "Producer universe ID must be provided",
4599
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4600
0
  }
4601
4602
0
  GetUniverseReplicationRequestPB universe_req;
4603
0
  GetUniverseReplicationResponsePB universe_resp;
4604
0
  universe_req.set_producer_id(req->producer_id());
4605
4606
0
  RETURN_NOT_OK(GetUniverseReplication(&universe_req, &universe_resp, /* RpcContext */ nullptr));
4607
0
  if (universe_resp.has_error()) {
4608
0
    RETURN_NOT_OK(StatusFromPB(universe_resp.error().status()));
4609
0
  }
4610
4611
  // Two cases for completion:
4612
  //  - For a regular SetupUniverseReplication, we want to wait for the universe to become ACTIVE.
4613
  //  - For an AlterUniverseReplication, we need to wait until the .ALTER universe gets merged with
4614
  //    the main universe - at which point the .ALTER universe is deleted.
4615
0
  bool isAlterRequest = GStringPiece(req->producer_id()).ends_with(".ALTER");
4616
0
  if ((!isAlterRequest && universe_resp.entry().state() == SysUniverseReplicationEntryPB::ACTIVE) ||
4617
0
      (isAlterRequest && universe_resp.entry().state() == SysUniverseReplicationEntryPB::DELETED)) {
4618
0
    resp->set_done(true);
4619
0
    StatusToPB(Status::OK(), resp->mutable_replication_error());
4620
0
    return Status::OK();
4621
0
  }
4622
4623
  // Otherwise we have either failed (see MarkUniverseReplicationFailed), or are still working.
4624
0
  if (universe_resp.entry().state() == SysUniverseReplicationEntryPB::DELETED_ERROR ||
4625
0
      universe_resp.entry().state() == SysUniverseReplicationEntryPB::FAILED) {
4626
0
    resp->set_done(true);
4627
4628
    // Get the more detailed error.
4629
0
    scoped_refptr<UniverseReplicationInfo> universe;
4630
0
    {
4631
0
      SharedLock lock(mutex_);
4632
0
      universe = FindPtrOrNull(universe_replication_map_, req->producer_id());
4633
0
      if (universe == nullptr) {
4634
0
        StatusToPB(
4635
0
            STATUS(InternalError, "Could not find CDC producer universe after having failed."),
4636
0
            resp->mutable_replication_error());
4637
0
        return Status::OK();
4638
0
      }
4639
0
    }
4640
0
    if (!universe->GetSetupUniverseReplicationErrorStatus().ok()) {
4641
0
      StatusToPB(universe->GetSetupUniverseReplicationErrorStatus(),
4642
0
                 resp->mutable_replication_error());
4643
0
    } else {
4644
0
      LOG(WARNING) << "Did not find setup universe replication error status.";
4645
0
      StatusToPB(STATUS(InternalError, "unknown error"), resp->mutable_replication_error());
4646
0
    }
4647
0
    return Status::OK();
4648
0
  }
4649
4650
  // Not done yet.
4651
0
  resp->set_done(false);
4652
0
  return Status::OK();
4653
0
}
4654
4655
Status CatalogManager::UpdateConsumerOnProducerSplit(
4656
    const UpdateConsumerOnProducerSplitRequestPB* req,
4657
    UpdateConsumerOnProducerSplitResponsePB* resp,
4658
0
    rpc::RpcContext* rpc) {
4659
0
  LOG(INFO) << "UpdateConsumerOnProducerSplit from " << RequestorString(rpc)
4660
0
            << ": " << req->DebugString();
4661
4662
0
  if (!req->has_producer_id()) {
4663
0
    return STATUS(InvalidArgument, "Producer universe ID must be provided",
4664
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4665
0
  }
4666
0
  if (!req->has_stream_id()) {
4667
0
    return STATUS(InvalidArgument, "Stream ID must be provided",
4668
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4669
0
  }
4670
0
  if (!req->has_producer_split_tablet_info()) {
4671
0
    return STATUS(InvalidArgument, "Producer split tablet info must be provided",
4672
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4673
0
  }
4674
4675
0
  auto cluster_config = ClusterConfig();
4676
0
  auto l = cluster_config->LockForWrite();
4677
0
  auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
4678
0
  auto producer_entry = FindOrNull(*producer_map, req->producer_id());
4679
0
  if (!producer_entry) {
4680
0
    return STATUS_FORMAT(
4681
0
        NotFound, "Unable to find the producer entry for universe $0", req->producer_id());
4682
0
  }
4683
0
  auto stream_entry = FindOrNull(*producer_entry->mutable_stream_map(), req->stream_id());
4684
0
  if (!stream_entry) {
4685
0
    return STATUS_FORMAT(
4686
0
        NotFound, "Unable to find the stream entry for universe $0, stream $1",
4687
0
        req->producer_id(), req->stream_id());
4688
0
  }
4689
4690
  // Find the parent tablet in the tablet mapping.
4691
0
  bool found = false;
4692
0
  auto mutable_map = stream_entry->mutable_consumer_producer_tablet_map();
4693
  // Also keep track if we see the split children tablets.
4694
0
  vector<string> split_child_tablet_ids{req->producer_split_tablet_info().new_tablet1_id(),
4695
0
                                        req->producer_split_tablet_info().new_tablet2_id()};
4696
0
  for (auto& consumer_tablet_to_producer_tablets : *mutable_map) {
4697
0
    auto& producer_tablets_list = consumer_tablet_to_producer_tablets.second;
4698
0
    auto producer_tablets = producer_tablets_list.mutable_tablets();
4699
0
    for (auto tablet = producer_tablets->begin(); tablet < producer_tablets->end(); ++tablet) {
4700
0
      if (*tablet == req->producer_split_tablet_info().tablet_id()) {
4701
        // Remove the parent tablet id.
4702
0
        producer_tablets->erase(tablet);
4703
        // For now we add the children tablets to the same consumer tablet.
4704
        // See github issue #10186 for further improvements.
4705
0
        producer_tablets_list.add_tablets(req->producer_split_tablet_info().new_tablet1_id());
4706
0
        producer_tablets_list.add_tablets(req->producer_split_tablet_info().new_tablet2_id());
4707
        // There should only be one copy of each producer tablet per stream, so can exit early.
4708
0
        found = true;
4709
0
        break;
4710
0
      }
4711
      // Check if this is one of the child split tablets.
4712
0
      auto it = std::find(split_child_tablet_ids.begin(), split_child_tablet_ids.end(), *tablet);
4713
0
      if (it != split_child_tablet_ids.end()) {
4714
0
        split_child_tablet_ids.erase(it);
4715
0
      }
4716
0
    }
4717
0
    if (found) {
4718
0
      break;
4719
0
    }
4720
0
  }
4721
4722
0
  if (!found) {
4723
    // Did not find the source tablet, but did find the children - means that we have already
4724
    // processed this SPLIT_OP, so for idempotency, we can return OK.
4725
0
    if (split_child_tablet_ids.empty()) {
4726
0
      LOG(INFO) << "Already processed this tablet split: " << req->DebugString();
4727
0
      return Status::OK();
4728
0
    }
4729
4730
    // When there are sequential SPLIT_OPs, we may try to reprocess an older SPLIT_OP. However, if
4731
    // one or both of those children have also already been split and processed, then we'll end up
4732
    // here (!found && !split_child_tablet_ids.empty()).
4733
    // This is alright, we can log a warning, and then continue (to not block later records).
4734
0
    LOG(WARNING)
4735
0
        << "Unable to find matching source tablet " << req->producer_split_tablet_info().tablet_id()
4736
0
        << " for universe " << req->producer_id() << " stream " << req->stream_id();
4737
4738
0
    return Status::OK();
4739
0
  }
4740
4741
  // Also make sure that we switch off of 1-1 mapping optimizations.
4742
0
  stream_entry->set_same_num_producer_consumer_tablets(false);
4743
4744
  // Also bump the cluster_config_ version so that changes are propagated to tservers (and new
4745
  // pollers are created for the new tablets).
4746
0
  l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
4747
4748
0
  RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()),
4749
0
                            "Updating cluster config in sys-catalog"));
4750
0
  l.Commit();
4751
4752
0
  return Status::OK();
4753
0
}
4754
4755
93.3M
bool CatalogManager::IsTableCdcProducer(const TableInfo& table_info) const {
4756
93.3M
  auto it = cdc_stream_tables_count_map_.find(table_info.id());
4757
93.3M
  if (it == cdc_stream_tables_count_map_.end()) {
4758
93.3M
    return false;
4759
93.3M
  } else 
if (4
it->second > 04
) {
4760
5
    auto tid = table_info.id();
4761
5
    for (const auto& entry : cdc_stream_map_) {
4762
5
      auto s = entry.second->LockForRead();
4763
      // for xCluster the first entry will be the table_id
4764
5
      const auto& table_id = s->table_id();
4765
5
      if (!table_id.empty() && table_id.Get(0) == tid && !(s->is_deleting() || s->is_deleted())) {
4766
5
        return true;
4767
5
      }
4768
5
    }
4769
5
  }
4770
18.4E
  return false;
4771
93.3M
}
4772
4773
93.3M
bool CatalogManager::IsTableCdcConsumer(const TableInfo& table_info) const {
4774
93.3M
  auto it = xcluster_consumer_tables_to_stream_map_.find(table_info.id());
4775
93.3M
  if (it == xcluster_consumer_tables_to_stream_map_.end()) {
4776
93.3M
    return false;
4777
93.3M
  }
4778
2
  return !it->second.empty();
4779
93.3M
}
4780
4781
CatalogManager::XClusterConsumerTableStreamInfoMap
4782
140
    CatalogManager::GetXClusterStreamInfoForConsumerTable(const TableId& table_id) const {
4783
140
  SharedLock lock(mutex_);
4784
140
  auto it = xcluster_consumer_tables_to_stream_map_.find(table_id);
4785
140
  if (it == xcluster_consumer_tables_to_stream_map_.end()) {
4786
140
    return {};
4787
140
  }
4788
4789
0
  return it->second;
4790
140
}
4791
4792
bool CatalogManager::IsCdcEnabled(
4793
93.3M
    const TableInfo& table_info) const {
4794
93.3M
  SharedLock lock(mutex_);
4795
93.3M
  return IsTableCdcProducer(table_info) || 
IsTableCdcConsumer(table_info)93.3M
;
4796
93.3M
}
4797
4798
7.94k
void CatalogManager::Started() {
4799
7.94k
  snapshot_coordinator_.Start();
4800
7.94k
}
4801
4802
Result<SnapshotSchedulesToObjectIdsMap> CatalogManager::MakeSnapshotSchedulesToObjectIdsMap(
4803
15.4k
    SysRowEntryType type) {
4804
15.4k
  return snapshot_coordinator_.MakeSnapshotSchedulesToObjectIdsMap(type);
4805
15.4k
}
4806
4807
0
Result<bool> CatalogManager::IsTablePartOfSomeSnapshotSchedule(const TableInfo& table_info) {
4808
0
  return snapshot_coordinator_.IsTableCoveredBySomeSnapshotSchedule(table_info);
4809
0
}
4810
4811
3.00k
void CatalogManager::SysCatalogLoaded(int64_t term) {
4812
3.00k
  return snapshot_coordinator_.SysCatalogLoaded(term);
4813
3.00k
}
4814
4815
270
Result<size_t> CatalogManager::GetNumLiveTServersForActiveCluster() {
4816
270
  BlacklistSet blacklist = 
VERIFY_RESULT268
(BlacklistSetFromPB());268
4817
0
  TSDescriptorVector ts_descs;
4818
268
  auto uuid = VERIFY_RESULT(placement_uuid());
4819
0
  master_->ts_manager()->GetAllLiveDescriptorsInCluster(&ts_descs, uuid, blacklist);
4820
268
  return ts_descs.size();
4821
268
}
4822
4823
}  // namespace enterprise
4824
}  // namespace master
4825
}  // namespace yb