YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/master/catalog_manager_ent.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include <memory>
14
#include <regex>
15
#include <set>
16
#include <unordered_set>
17
#include <google/protobuf/util/message_differencer.h>
18
19
#include "yb/common/common_fwd.h"
20
#include "yb/master/catalog_entity_info.h"
21
#include "yb/master/catalog_manager-internal.h"
22
#include "yb/master/cdc_consumer_registry_service.h"
23
#include "yb/master/cdc_rpc_tasks.h"
24
#include "yb/master/cluster_balance.h"
25
#include "yb/master/master.h"
26
#include "yb/master/master_backup.pb.h"
27
#include "yb/master/master_error.h"
28
29
#include "yb/cdc/cdc_consumer.pb.h"
30
#include "yb/cdc/cdc_service.h"
31
32
#include "yb/client/client-internal.h"
33
#include "yb/client/schema.h"
34
#include "yb/client/session.h"
35
#include "yb/client/table.h"
36
#include "yb/client/table_alterer.h"
37
#include "yb/client/table_handle.h"
38
#include "yb/client/table_info.h"
39
#include "yb/client/yb_op.h"
40
#include "yb/client/yb_table_name.h"
41
42
#include "yb/common/common.pb.h"
43
#include "yb/common/entity_ids.h"
44
#include "yb/common/ql_name.h"
45
#include "yb/common/ql_type.h"
46
#include "yb/common/schema.h"
47
#include "yb/common/wire_protocol.h"
48
#include "yb/consensus/consensus.h"
49
50
#include "yb/docdb/consensus_frontier.h"
51
#include "yb/docdb/doc_write_batch.h"
52
53
#include "yb/gutil/bind.h"
54
#include "yb/gutil/casts.h"
55
#include "yb/gutil/strings/join.h"
56
#include "yb/gutil/strings/substitute.h"
57
#include "yb/master/master_client.pb.h"
58
#include "yb/master/master_ddl.pb.h"
59
#include "yb/master/master_defaults.h"
60
#include "yb/master/master_heartbeat.pb.h"
61
#include "yb/master/master_replication.pb.h"
62
#include "yb/master/master_util.h"
63
#include "yb/master/sys_catalog.h"
64
#include "yb/master/sys_catalog-internal.h"
65
#include "yb/master/async_snapshot_tasks.h"
66
#include "yb/master/async_rpc_tasks.h"
67
#include "yb/master/encryption_manager.h"
68
#include "yb/master/restore_sys_catalog_state.h"
69
#include "yb/master/scoped_leader_shared_lock.h"
70
#include "yb/master/scoped_leader_shared_lock-internal.h"
71
72
#include "yb/rpc/messenger.h"
73
74
#include "yb/tablet/operations/snapshot_operation.h"
75
#include "yb/tablet/tablet_metadata.h"
76
#include "yb/tablet/tablet_snapshots.h"
77
78
#include "yb/tserver/backup.proxy.h"
79
#include "yb/tserver/service_util.h"
80
81
#include "yb/util/cast.h"
82
#include "yb/util/date_time.h"
83
#include "yb/util/flag_tags.h"
84
#include "yb/util/format.h"
85
#include "yb/util/logging.h"
86
#include "yb/util/random_util.h"
87
#include "yb/util/scope_exit.h"
88
#include "yb/util/service_util.h"
89
#include "yb/util/status.h"
90
#include "yb/util/status_format.h"
91
#include "yb/util/status_log.h"
92
#include "yb/util/tostring.h"
93
#include "yb/util/string_util.h"
94
#include "yb/util/trace.h"
95
96
#include "yb/yql/cql/ql/util/statement_result.h"
97
98
using namespace std::literals;
99
using namespace std::placeholders;
100
101
using std::string;
102
using std::unique_ptr;
103
using std::vector;
104
105
using google::protobuf::RepeatedPtrField;
106
using google::protobuf::util::MessageDifferencer;
107
using strings::Substitute;
108
109
DEFINE_int32(cdc_state_table_num_tablets, 0,
110
             "Number of tablets to use when creating the CDC state table. "
111
             "0 to use the same default num tablets as for regular tables.");
112
113
DEFINE_int32(cdc_wal_retention_time_secs, 4 * 3600,
114
             "WAL retention time in seconds to be used for tables for which a CDC stream was "
115
             "created.");
116
DECLARE_int32(master_rpc_timeout_ms);
117
118
DEFINE_bool(enable_transaction_snapshots, true,
119
            "The flag enables usage of transaction aware snapshots.");
120
TAG_FLAG(enable_transaction_snapshots, hidden);
121
TAG_FLAG(enable_transaction_snapshots, advanced);
122
TAG_FLAG(enable_transaction_snapshots, runtime);
123
124
DEFINE_test_flag(bool, disable_cdc_state_insert_on_setup, false,
125
                 "Disable inserting new entries into cdc state as part of the setup flow.");
126
127
DEFINE_bool(allow_consecutive_restore, true,
128
            "Is it allowed to restore to a time before the last restoration was done.");
129
TAG_FLAG(allow_consecutive_restore, runtime);
130
131
namespace yb {
132
133
using rpc::RpcContext;
134
using pb_util::ParseFromSlice;
135
136
namespace master {
137
namespace enterprise {
138
139
namespace {
140
141
167
CHECKED_STATUS CheckStatus(const Status& status, const char* action) {
142
167
  if (status.ok()) {
143
167
    return status;
144
167
  }
145
146
0
  const Status s = status.CloneAndPrepend(std::string("An error occurred while ") + action);
147
0
  LOG(WARNING) << s;
148
0
  return s;
149
0
}
150
151
0
CHECKED_STATUS CheckLeaderStatus(const Status& status, const char* action) {
152
0
  return CheckIfNoLongerLeader(CheckStatus(status, action));
153
0
}
154
155
template<class RespClass>
156
CHECKED_STATUS CheckLeaderStatusAndSetupError(
157
159
    const Status& status, const char* action, RespClass* resp) {
158
159
  return CheckIfNoLongerLeaderAndSetupError(CheckStatus(status, action), resp);
159
159
}
catalog_manager_ent.cc:_ZN2yb6master10enterprise12_GLOBAL__N_130CheckLeaderStatusAndSetupErrorINS0_25CreateCDCStreamResponsePBEEENS_6StatusERKS5_PKcPT_
Line
Count
Source
157
157
    const Status& status, const char* action, RespClass* resp) {
158
157
  return CheckIfNoLongerLeaderAndSetupError(CheckStatus(status, action), resp);
159
157
}
catalog_manager_ent.cc:_ZN2yb6master10enterprise12_GLOBAL__N_130CheckLeaderStatusAndSetupErrorINS0_34SetupUniverseReplicationResponsePBEEENS_6StatusERKS5_PKcPT_
Line
Count
Source
157
2
    const Status& status, const char* action, RespClass* resp) {
158
2
  return CheckIfNoLongerLeaderAndSetupError(CheckStatus(status, action), resp);
159
2
}
160
161
} // namespace
162
163
////////////////////////////////////////////////////////////
164
// Snapshot Loader
165
////////////////////////////////////////////////////////////
166
167
class SnapshotLoader : public Visitor<PersistentSnapshotInfo> {
168
 public:
169
2.37k
  explicit SnapshotLoader(CatalogManager* catalog_manager) : catalog_manager_(catalog_manager) {}
170
171
0
  CHECKED_STATUS Visit(const SnapshotId& snapshot_id, const SysSnapshotEntryPB& metadata) override {
172
0
    if (TryFullyDecodeTxnSnapshotId(snapshot_id)) {
173
      // Transaction aware snapshots should be already loaded.
174
0
      return Status::OK();
175
0
    }
176
0
    return VisitNonTransactionAwareSnapshot(snapshot_id, metadata);
177
0
  }
178
179
  CHECKED_STATUS VisitNonTransactionAwareSnapshot(
180
0
      const SnapshotId& snapshot_id, const SysSnapshotEntryPB& metadata) {
181
182
    // Setup the snapshot info.
183
0
    auto snapshot_info = make_scoped_refptr<SnapshotInfo>(snapshot_id);
184
0
    auto l = snapshot_info->LockForWrite();
185
0
    l.mutable_data()->pb.CopyFrom(metadata);
186
187
    // Add the snapshot to the IDs map (if the snapshot is not deleted).
188
0
    auto emplace_result = catalog_manager_->non_txn_snapshot_ids_map_.emplace(
189
0
        snapshot_id, std::move(snapshot_info));
190
0
    CHECK(emplace_result.second) << "Snapshot already exists: " << snapshot_id;
191
192
0
    LOG(INFO) << "Loaded metadata for snapshot (id=" << snapshot_id << "): "
193
0
              << emplace_result.first->second->ToString() << ": " << metadata.ShortDebugString();
194
0
    l.Commit();
195
0
    return Status::OK();
196
0
  }
197
198
 private:
199
  CatalogManager *catalog_manager_;
200
201
  DISALLOW_COPY_AND_ASSIGN(SnapshotLoader);
202
};
203
204
205
////////////////////////////////////////////////////////////
206
// CDC Stream Loader
207
////////////////////////////////////////////////////////////
208
209
class CDCStreamLoader : public Visitor<PersistentCDCStreamInfo> {
210
 public:
211
2.37k
  explicit CDCStreamLoader(CatalogManager* catalog_manager) : catalog_manager_(catalog_manager) {}
212
213
  Status Visit(const CDCStreamId& stream_id, const SysCDCStreamEntryPB& metadata)
214
0
      REQUIRES(catalog_manager_->mutex_) {
215
0
    DCHECK(!ContainsKey(catalog_manager_->cdc_stream_map_, stream_id))
216
0
        << "CDC stream already exists: " << stream_id;
217
218
0
    scoped_refptr<NamespaceInfo> ns;
219
0
    scoped_refptr<TableInfo> table;
220
221
0
    if (metadata.has_namespace_id()) {
222
0
      ns = FindPtrOrNull(catalog_manager_->namespace_ids_map_, metadata.namespace_id());
223
224
0
      if (!ns) {
225
0
        LOG(DFATAL) << "Invalid namespace ID " << metadata.namespace_id() << " for stream "
226
0
                   << stream_id;
227
        // TODO (#2059): Potentially signals a race condition that namesapce got deleted
228
        // while stream was being created.
229
        // Log error and continue without loading the stream.
230
0
        return Status::OK();
231
0
      }
232
0
    } else {
233
0
      table = FindPtrOrNull(*catalog_manager_->table_ids_map_, metadata.table_id(0));
234
0
      if (!table) {
235
0
        LOG(ERROR) << "Invalid table ID " << metadata.table_id(0) << " for stream " << stream_id;
236
        // TODO (#2059): Potentially signals a race condition that table got deleted while stream
237
        //  was being created.
238
        // Log error and continue without loading the stream.
239
0
        return Status::OK();
240
0
      }
241
0
    }
242
243
    // Setup the CDC stream info.
244
0
    auto stream = make_scoped_refptr<CDCStreamInfo>(stream_id);
245
0
    auto l = stream->LockForWrite();
246
0
    l.mutable_data()->pb.CopyFrom(metadata);
247
248
    // If the table has been deleted, then mark this stream as DELETING so it can be deleted by the
249
    // catalog manager background thread. Otherwise if this stream is missing an entry
250
    // for state, then mark its state as Active.
251
252
0
    if (((table && table->LockForRead()->is_deleting()) ||
253
0
         (ns && ns->state() == SysNamespaceEntryPB::DELETING)) &&
254
0
        !l.data().is_deleting()) {
255
0
      l.mutable_data()->pb.set_state(SysCDCStreamEntryPB::DELETING);
256
0
    } else if (!l.mutable_data()->pb.has_state()) {
257
0
      l.mutable_data()->pb.set_state(SysCDCStreamEntryPB::ACTIVE);
258
0
    }
259
260
    // Add the CDC stream to the CDC stream map.
261
0
    catalog_manager_->cdc_stream_map_[stream->id()] = stream;
262
0
    if (table) {
263
0
      catalog_manager_->cdc_stream_tables_count_map_[metadata.table_id(0)]++;
264
0
    }
265
266
0
    l.Commit();
267
268
0
    LOG(INFO) << "Loaded metadata for CDC stream " << stream->ToString() << ": "
269
0
              << metadata.ShortDebugString();
270
271
0
    return Status::OK();
272
0
  }
273
274
 private:
275
  CatalogManager *catalog_manager_;
276
277
  DISALLOW_COPY_AND_ASSIGN(CDCStreamLoader);
278
};
279
280
////////////////////////////////////////////////////////////
281
// Universe Replication Loader
282
////////////////////////////////////////////////////////////
283
284
class UniverseReplicationLoader : public Visitor<PersistentUniverseReplicationInfo> {
285
 public:
286
  explicit UniverseReplicationLoader(CatalogManager* catalog_manager)
287
2.37k
      : catalog_manager_(catalog_manager) {}
288
289
  Status Visit(const std::string& producer_id, const SysUniverseReplicationEntryPB& metadata)
290
0
      REQUIRES(catalog_manager_->mutex_) {
291
0
    DCHECK(!ContainsKey(catalog_manager_->universe_replication_map_, producer_id))
292
0
        << "Producer universe already exists: " << producer_id;
293
294
    // Setup the universe replication info.
295
0
    UniverseReplicationInfo* const ri = new UniverseReplicationInfo(producer_id);
296
0
    {
297
0
      auto l = ri->LockForWrite();
298
0
      l.mutable_data()->pb.CopyFrom(metadata);
299
300
0
      if (!l->is_active() && !l->is_deleted_or_failed()) {
301
        // Replication was not fully setup.
302
0
        LOG(WARNING) << "Universe replication in transient state: " << producer_id;
303
304
        // TODO: Should we delete all failed universe replication items?
305
0
      }
306
307
      // Add universe replication info to the universe replication map.
308
0
      catalog_manager_->universe_replication_map_[ri->id()] = ri;
309
310
0
      l.Commit();
311
0
    }
312
313
    // Also keep track of consumer tables.
314
0
    for (const auto& table : metadata.validated_tables()) {
315
0
      CDCStreamId stream_id = FindWithDefault(metadata.table_streams(), table.first, "");
316
0
      if (stream_id.empty()) {
317
0
        LOG(WARNING) << "Unable to find stream id for table: " << table.first;
318
0
        continue;
319
0
      }
320
0
      catalog_manager_->xcluster_consumer_tables_to_stream_map_[table.second].emplace(
321
0
          metadata.producer_id(), stream_id);
322
0
    }
323
324
0
    LOG(INFO) << "Loaded metadata for universe replication " << ri->ToString();
325
0
    VLOG(1) << "Metadata for universe replication " << ri->ToString() << ": "
326
0
            << metadata.ShortDebugString();
327
328
0
    return Status::OK();
329
0
  }
330
331
 private:
332
  CatalogManager *catalog_manager_;
333
334
  DISALLOW_COPY_AND_ASSIGN(UniverseReplicationLoader);
335
};
336
337
////////////////////////////////////////////////////////////
338
// CatalogManager
339
////////////////////////////////////////////////////////////
340
341
92
CatalogManager::~CatalogManager() {
342
92
  if (StartShutdown()) {
343
7
    CompleteShutdown();
344
7
  }
345
92
}
346
347
92
void CatalogManager::CompleteShutdown() {
348
92
  snapshot_coordinator_.Shutdown();
349
  // Call shutdown on base class before exiting derived class destructor
350
  // because BgTasks is part of base & uses this derived class on Shutdown.
351
92
  super::CompleteShutdown();
352
92
}
353
354
2.37k
Status CatalogManager::RunLoaders(int64_t term) {
355
2.37k
  RETURN_NOT_OK(super::RunLoaders(term));
356
357
  // Clear the snapshots.
358
2.37k
  non_txn_snapshot_ids_map_.clear();
359
360
  // Clear CDC stream map.
361
2.37k
  cdc_stream_map_.clear();
362
2.37k
  cdc_stream_tables_count_map_.clear();
363
364
  // Clear universe replication map.
365
2.37k
  universe_replication_map_.clear();
366
2.37k
  xcluster_consumer_tables_to_stream_map_.clear();
367
368
2.37k
  LOG_WITH_FUNC(INFO) << "Loading snapshots into memory.";
369
2.37k
  unique_ptr<SnapshotLoader> snapshot_loader(new SnapshotLoader(this));
370
2.37k
  RETURN_NOT_OK_PREPEND(
371
2.37k
      sys_catalog_->Visit(snapshot_loader.get()),
372
2.37k
      "Failed while visiting snapshots in sys catalog");
373
374
2.37k
  LOG_WITH_FUNC(INFO) << "Loading CDC streams into memory.";
375
2.37k
  auto cdc_stream_loader = std::make_unique<CDCStreamLoader>(this);
376
2.37k
  RETURN_NOT_OK_PREPEND(
377
2.37k
      sys_catalog_->Visit(cdc_stream_loader.get()),
378
2.37k
      "Failed while visiting CDC streams in sys catalog");
379
380
2.37k
  LOG_WITH_FUNC(INFO) << "Loading universe replication info into memory.";
381
2.37k
  auto universe_replication_loader = std::make_unique<UniverseReplicationLoader>(this);
382
2.37k
  RETURN_NOT_OK_PREPEND(
383
2.37k
      sys_catalog_->Visit(universe_replication_loader.get()),
384
2.37k
      "Failed while visiting universe replication info in sys catalog");
385
386
2.37k
  return Status::OK();
387
2.37k
}
388
389
Status CatalogManager::CreateSnapshot(const CreateSnapshotRequestPB* req,
390
                                      CreateSnapshotResponsePB* resp,
391
0
                                      RpcContext* rpc) {
392
0
  LOG(INFO) << "Servicing CreateSnapshot request: " << req->ShortDebugString();
393
394
0
  if (FLAGS_enable_transaction_snapshots && req->transaction_aware()) {
395
0
    return CreateTransactionAwareSnapshot(*req, resp, rpc);
396
0
  }
397
398
0
  if (req->has_schedule_id()) {
399
0
    auto schedule_id = VERIFY_RESULT(FullyDecodeSnapshotScheduleId(req->schedule_id()));
400
0
    auto snapshot_id = snapshot_coordinator_.CreateForSchedule(
401
0
        schedule_id, leader_ready_term(), rpc->GetClientDeadline());
402
0
    if (!snapshot_id.ok()) {
403
0
      LOG(INFO) << "Create snapshot failed: " << snapshot_id.status();
404
0
      return snapshot_id.status();
405
0
    }
406
0
    resp->set_snapshot_id(snapshot_id->data(), snapshot_id->size());
407
0
    return Status::OK();
408
0
  }
409
410
0
  return CreateNonTransactionAwareSnapshot(req, resp, rpc);
411
0
}
412
413
Status CatalogManager::CreateNonTransactionAwareSnapshot(
414
    const CreateSnapshotRequestPB* req,
415
    CreateSnapshotResponsePB* resp,
416
0
    RpcContext* rpc) {
417
0
  SnapshotId snapshot_id;
418
0
  {
419
0
    LockGuard lock(mutex_);
420
0
    TRACE("Acquired catalog manager lock");
421
422
    // Verify that the system is not in snapshot creating/restoring state.
423
0
    if (!current_snapshot_id_.empty()) {
424
0
      return STATUS(IllegalState,
425
0
                    Format(
426
0
                        "Current snapshot id: $0. Parallel snapshot operations are not supported"
427
0
                        ": $1", current_snapshot_id_, req),
428
0
                    MasterError(MasterErrorPB::PARALLEL_SNAPSHOT_OPERATION));
429
0
    }
430
431
    // Create a new snapshot UUID.
432
0
    snapshot_id = GenerateIdUnlocked(SysRowEntryType::SNAPSHOT);
433
0
  }
434
435
0
  vector<scoped_refptr<TabletInfo>> all_tablets;
436
437
  // Create in memory snapshot data descriptor.
438
0
  scoped_refptr<SnapshotInfo> snapshot(new SnapshotInfo(snapshot_id));
439
0
  snapshot->mutable_metadata()->StartMutation();
440
0
  snapshot->mutable_metadata()->mutable_dirty()->pb.set_state(SysSnapshotEntryPB::CREATING);
441
442
0
  auto tables = VERIFY_RESULT(CollectTables(req->tables(),
443
0
                                            req->add_indexes(),
444
0
                                            true /* include_parent_colocated_table */));
445
0
  std::unordered_set<NamespaceId> added_namespaces;
446
0
  for (const auto& table : tables) {
447
0
    snapshot->AddEntries(table, &added_namespaces);
448
0
    all_tablets.insert(all_tablets.end(), table.tablet_infos.begin(), table.tablet_infos.end());
449
0
  }
450
451
0
  VLOG(1) << "Snapshot " << snapshot->ToString()
452
0
          << ": PB=" << snapshot->mutable_metadata()->mutable_dirty()->pb.DebugString();
453
454
  // Write the snapshot data descriptor to the system catalog (in "creating" state).
455
0
  RETURN_NOT_OK(CheckLeaderStatus(
456
0
      sys_catalog_->Upsert(leader_ready_term(), snapshot),
457
0
      "inserting snapshot into sys-catalog"));
458
0
  TRACE("Wrote snapshot to system catalog");
459
460
  // Commit in memory snapshot data descriptor.
461
0
  snapshot->mutable_metadata()->CommitMutation();
462
463
  // Put the snapshot data descriptor to the catalog manager.
464
0
  {
465
0
    LockGuard lock(mutex_);
466
0
    TRACE("Acquired catalog manager lock");
467
468
    // Verify that the snapshot does not exist.
469
0
    auto inserted = non_txn_snapshot_ids_map_.emplace(snapshot_id, snapshot).second;
470
0
    RSTATUS_DCHECK(inserted, IllegalState, Format("Snapshot already exists: $0", snapshot_id));
471
0
    current_snapshot_id_ = snapshot_id;
472
0
  }
473
474
  // Send CreateSnapshot requests to all TServers (one tablet - one request).
475
0
  for (const scoped_refptr<TabletInfo>& tablet : all_tablets) {
476
0
    TRACE("Locking tablet");
477
0
    auto l = tablet->LockForRead();
478
479
0
    LOG(INFO) << "Sending CreateTabletSnapshot to tablet: " << tablet->ToString();
480
481
    // Send Create Tablet Snapshot request to each tablet leader.
482
0
    auto call = CreateAsyncTabletSnapshotOp(
483
0
        tablet, snapshot_id, tserver::TabletSnapshotOpRequestPB::CREATE_ON_TABLET,
484
0
        TabletSnapshotOperationCallback());
485
0
    ScheduleTabletSnapshotOp(call);
486
0
  }
487
488
0
  resp->set_snapshot_id(snapshot_id);
489
0
  LOG(INFO) << "Successfully started snapshot " << snapshot_id << " creation";
490
0
  return Status::OK();
491
0
}
492
493
0
void CatalogManager::Submit(std::unique_ptr<tablet::Operation> operation, int64_t leader_term) {
494
0
  operation->SetTablet(tablet_peer()->tablet());
495
0
  tablet_peer()->Submit(std::move(operation), leader_term);
496
0
}
497
498
Result<SysRowEntries> CatalogManager::CollectEntries(
499
    const google::protobuf::RepeatedPtrField<TableIdentifierPB>& table_identifiers,
500
0
    CollectFlags flags) {
501
0
  RETURN_NOT_OK(CheckIsLeaderAndReady());
502
0
  SysRowEntries entries;
503
0
  std::unordered_set<NamespaceId> namespaces;
504
0
  auto tables = VERIFY_RESULT(CollectTables(table_identifiers, flags, &namespaces));
505
0
  if (!namespaces.empty()) {
506
0
    SharedLock lock(mutex_);
507
0
    for (const auto& ns_id : namespaces) {
508
0
      auto ns_info = VERIFY_RESULT(FindNamespaceByIdUnlocked(ns_id));
509
0
      AddInfoEntry(ns_info.get(), entries.mutable_entries());
510
0
    }
511
0
  }
512
0
  for (const auto& table : tables) {
513
    // TODO(txn_snapshot) use single lock to resolve all tables to tablets
514
0
    SnapshotInfo::AddEntries(table, entries.mutable_entries(), /* tablet_infos= */ nullptr,
515
0
                             &namespaces);
516
0
  }
517
518
0
  return entries;
519
0
}
520
521
Result<SysRowEntries> CatalogManager::CollectEntriesForSnapshot(
522
0
    const google::protobuf::RepeatedPtrField<TableIdentifierPB>& tables) {
523
0
  return CollectEntries(
524
0
      tables,
525
0
      CollectFlags{CollectFlag::kAddIndexes, CollectFlag::kIncludeParentColocatedTable,
526
0
                   CollectFlag::kSucceedIfCreateInProgress});
527
0
}
528
529
16.5k
server::Clock* CatalogManager::Clock() {
530
16.5k
  return master_->clock();
531
16.5k
}
532
533
Status CatalogManager::CreateTransactionAwareSnapshot(
534
0
    const CreateSnapshotRequestPB& req, CreateSnapshotResponsePB* resp, rpc::RpcContext* rpc) {
535
0
  CollectFlags flags{CollectFlag::kIncludeParentColocatedTable};
536
0
  flags.SetIf(CollectFlag::kAddIndexes, req.add_indexes());
537
0
  SysRowEntries entries = VERIFY_RESULT(CollectEntries(req.tables(), flags));
538
539
0
  auto snapshot_id = VERIFY_RESULT(snapshot_coordinator_.Create(
540
0
      entries, req.imported(), leader_ready_term(), rpc->GetClientDeadline()));
541
0
  resp->set_snapshot_id(snapshot_id.data(), snapshot_id.size());
542
0
  return Status::OK();
543
0
}
544
545
Status CatalogManager::ListSnapshots(const ListSnapshotsRequestPB* req,
546
0
                                     ListSnapshotsResponsePB* resp) {
547
0
  auto txn_snapshot_id = TryFullyDecodeTxnSnapshotId(req->snapshot_id());
548
0
  {
549
0
    SharedLock lock(mutex_);
550
0
    TRACE("Acquired catalog manager lock");
551
552
0
    if (!current_snapshot_id_.empty()) {
553
0
      resp->set_current_snapshot_id(current_snapshot_id_);
554
0
    }
555
556
0
    auto setup_snapshot_pb_lambda = [resp](scoped_refptr<SnapshotInfo> snapshot_info) {
557
0
      auto snapshot_lock = snapshot_info->LockForRead();
558
559
0
      SnapshotInfoPB* const snapshot = resp->add_snapshots();
560
0
      snapshot->set_id(snapshot_info->id());
561
0
      *snapshot->mutable_entry() = snapshot_info->metadata().state().pb;
562
0
    };
563
564
0
    if (req->has_snapshot_id()) {
565
0
      if (!txn_snapshot_id) {
566
0
        TRACE("Looking up snapshot");
567
0
        scoped_refptr<SnapshotInfo> snapshot_info =
568
0
            FindPtrOrNull(non_txn_snapshot_ids_map_, req->snapshot_id());
569
0
        if (snapshot_info == nullptr) {
570
0
          return STATUS(InvalidArgument, "Could not find snapshot", req->snapshot_id(),
571
0
                        MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND));
572
0
        }
573
574
0
        setup_snapshot_pb_lambda(snapshot_info);
575
0
      }
576
0
    } else {
577
0
      for (const SnapshotInfoMap::value_type& entry : non_txn_snapshot_ids_map_) {
578
0
        setup_snapshot_pb_lambda(entry.second);
579
0
      }
580
0
    }
581
0
  }
582
583
0
  if (req->prepare_for_backup() && (!req->has_snapshot_id() || !txn_snapshot_id)) {
584
0
    return STATUS(
585
0
        InvalidArgument, "Request must have correct snapshot_id", (req->has_snapshot_id() ?
586
0
        req->snapshot_id() : "None"), MasterError(MasterErrorPB::SNAPSHOT_FAILED));
587
0
  }
588
589
0
  RETURN_NOT_OK(snapshot_coordinator_.ListSnapshots(
590
0
      txn_snapshot_id, req->list_deleted_snapshots(), resp));
591
592
0
  if (req->prepare_for_backup()) {
593
0
    SharedLock lock(mutex_);
594
0
    TRACE("Acquired catalog manager lock");
595
596
    // Repack & extend the backup row entries.
597
0
    for (SnapshotInfoPB& snapshot : *resp->mutable_snapshots()) {
598
0
      snapshot.set_format_version(2);
599
0
      SysSnapshotEntryPB& sys_entry = *snapshot.mutable_entry();
600
0
      snapshot.mutable_backup_entries()->Reserve(sys_entry.entries_size());
601
602
0
      for (SysRowEntry& entry : *sys_entry.mutable_entries()) {
603
0
        BackupRowEntryPB* const backup_entry = snapshot.add_backup_entries();
604
        // Setup BackupRowEntryPB fields.
605
        // Set BackupRowEntryPB::pg_schema_name for YSQL table to disambiguate in case tables
606
        // in different schema have same name.
607
0
        if (entry.type() == SysRowEntryType::TABLE) {
608
0
          TRACE("Looking up table");
609
0
          scoped_refptr<TableInfo> table_info = FindPtrOrNull(*table_ids_map_, entry.id());
610
0
          if (table_info == nullptr) {
611
0
            return STATUS(
612
0
                InvalidArgument, "Table not found by ID", entry.id(),
613
0
                MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
614
0
          }
615
616
0
          TRACE("Locking table");
617
0
          auto l = table_info->LockForRead();
618
          // PG schema name is available for YSQL table only.
619
          // Except '<uuid>.colocated.parent.uuid' table ID.
620
0
          if (l->table_type() == PGSQL_TABLE_TYPE && !IsColocatedParentTableId(entry.id())) {
621
0
            const string pg_schema_name = VERIFY_RESULT(GetPgSchemaName(table_info));
622
0
            VLOG(1) << "PG Schema: " << pg_schema_name << " for table " << table_info->ToString();
623
0
            backup_entry->set_pg_schema_name(pg_schema_name);
624
0
          }
625
0
        }
626
627
        // Init BackupRowEntryPB::entry.
628
0
        backup_entry->mutable_entry()->Swap(&entry);
629
0
      }
630
631
0
      sys_entry.clear_entries();
632
0
    }
633
0
  }
634
635
0
  return Status::OK();
636
0
}
637
638
Status CatalogManager::ListSnapshotRestorations(const ListSnapshotRestorationsRequestPB* req,
639
0
                                                ListSnapshotRestorationsResponsePB* resp) {
640
0
  TxnSnapshotRestorationId restoration_id = TxnSnapshotRestorationId::Nil();
641
0
  if (!req->restoration_id().empty()) {
642
0
    restoration_id = VERIFY_RESULT(FullyDecodeTxnSnapshotRestorationId(req->restoration_id()));
643
0
  }
644
0
  TxnSnapshotId snapshot_id = TxnSnapshotId::Nil();
645
0
  if (!req->snapshot_id().empty()) {
646
0
    snapshot_id = VERIFY_RESULT(FullyDecodeTxnSnapshotId(req->snapshot_id()));
647
0
  }
648
649
0
  return snapshot_coordinator_.ListRestorations(restoration_id, snapshot_id, resp);
650
0
}
651
652
Status CatalogManager::RestoreSnapshot(const RestoreSnapshotRequestPB* req,
653
0
                                       RestoreSnapshotResponsePB* resp) {
654
0
  LOG(INFO) << "Servicing RestoreSnapshot request: " << req->ShortDebugString();
655
656
0
  auto txn_snapshot_id = TryFullyDecodeTxnSnapshotId(req->snapshot_id());
657
0
  if (txn_snapshot_id) {
658
0
    HybridTime ht;
659
0
    if (req->has_restore_ht()) {
660
0
      ht = HybridTime(req->restore_ht());
661
0
    }
662
0
    TxnSnapshotRestorationId id = VERIFY_RESULT(snapshot_coordinator_.Restore(
663
0
        txn_snapshot_id, ht, leader_ready_term()));
664
0
    resp->set_restoration_id(id.data(), id.size());
665
0
    return Status::OK();
666
0
  }
667
668
0
  return RestoreNonTransactionAwareSnapshot(req->snapshot_id());
669
0
}
670
671
0
Status CatalogManager::RestoreNonTransactionAwareSnapshot(const string& snapshot_id) {
672
0
  LockGuard lock(mutex_);
673
0
  TRACE("Acquired catalog manager lock");
674
675
0
  if (!current_snapshot_id_.empty()) {
676
0
    return STATUS(
677
0
        IllegalState,
678
0
        Format(
679
0
            "Current snapshot id: $0. Parallel snapshot operations are not supported: $1",
680
0
            current_snapshot_id_, snapshot_id),
681
0
        MasterError(MasterErrorPB::PARALLEL_SNAPSHOT_OPERATION));
682
0
  }
683
684
0
  TRACE("Looking up snapshot");
685
0
  scoped_refptr<SnapshotInfo> snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, snapshot_id);
686
0
  if (snapshot == nullptr) {
687
0
    return STATUS(InvalidArgument, "Could not find snapshot", snapshot_id,
688
0
                  MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND));
689
0
  }
690
691
0
  auto snapshot_lock = snapshot->LockForWrite();
692
693
0
  if (snapshot_lock->started_deleting()) {
694
0
    return STATUS(NotFound, "The snapshot was deleted", snapshot_id,
695
0
                  MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND));
696
0
  }
697
698
0
  if (!snapshot_lock->is_complete()) {
699
0
    return STATUS(IllegalState, "The snapshot state is not complete", snapshot_id,
700
0
                  MasterError(MasterErrorPB::SNAPSHOT_IS_NOT_READY));
701
0
  }
702
703
0
  TRACE("Updating snapshot metadata on disk");
704
0
  SysSnapshotEntryPB& snapshot_pb = snapshot_lock.mutable_data()->pb;
705
0
  snapshot_pb.set_state(SysSnapshotEntryPB::RESTORING);
706
707
  // Update tablet states.
708
0
  SetTabletSnapshotsState(SysSnapshotEntryPB::RESTORING, &snapshot_pb);
709
710
  // Update sys-catalog with the updated snapshot state.
711
  // The mutation will be aborted when 'l' exits the scope on early return.
712
0
  RETURN_NOT_OK(CheckLeaderStatus(
713
0
      sys_catalog_->Upsert(leader_ready_term(), snapshot),
714
0
      "updating snapshot in sys-catalog"));
715
716
  // CataloManager lock 'lock_' is still locked here.
717
0
  current_snapshot_id_ = snapshot_id;
718
719
  // Restore all entries.
720
0
  for (const SysRowEntry& entry : snapshot_pb.entries()) {
721
0
    RETURN_NOT_OK(RestoreEntry(entry, snapshot_id));
722
0
  }
723
724
  // Commit in memory snapshot data descriptor.
725
0
  TRACE("Committing in-memory snapshot state");
726
0
  snapshot_lock.Commit();
727
728
0
  LOG(INFO) << "Successfully started snapshot " << snapshot->ToString() << " restoring";
729
0
  return Status::OK();
730
0
}
731
732
0
Status CatalogManager::RestoreEntry(const SysRowEntry& entry, const SnapshotId& snapshot_id) {
733
0
  switch (entry.type()) {
734
0
    case SysRowEntryType::NAMESPACE: { // Restore NAMESPACES.
735
0
      TRACE("Looking up namespace");
736
0
      scoped_refptr<NamespaceInfo> ns = FindPtrOrNull(namespace_ids_map_, entry.id());
737
0
      if (ns == nullptr) {
738
        // Restore Namespace.
739
        // TODO: implement
740
0
        LOG(INFO) << "Restoring: NAMESPACE id = " << entry.id();
741
742
0
        return STATUS(NotSupported, Substitute(
743
0
            "Not implemented: restoring namespace: id=$0", entry.type()));
744
0
      }
745
0
      break;
746
0
    }
747
0
    case SysRowEntryType::TABLE: { // Restore TABLES.
748
0
      TRACE("Looking up table");
749
0
      scoped_refptr<TableInfo> table = FindPtrOrNull(*table_ids_map_, entry.id());
750
0
      if (table == nullptr) {
751
        // Restore Table.
752
        // TODO: implement
753
0
        LOG(INFO) << "Restoring: TABLE id = " << entry.id();
754
755
0
        return STATUS(NotSupported, Substitute(
756
0
            "Not implemented: restoring table: id=$0", entry.type()));
757
0
      }
758
0
      break;
759
0
    }
760
0
    case SysRowEntryType::TABLET: { // Restore TABLETS.
761
0
      TRACE("Looking up tablet");
762
0
      scoped_refptr<TabletInfo> tablet = FindPtrOrNull(*tablet_map_, entry.id());
763
0
      if (tablet == nullptr) {
764
        // Restore Tablet.
765
        // TODO: implement
766
0
        LOG(INFO) << "Restoring: TABLET id = " << entry.id();
767
768
0
        return STATUS(NotSupported, Substitute(
769
0
            "Not implemented: restoring tablet: id=$0", entry.type()));
770
0
      } else {
771
0
        TRACE("Locking tablet");
772
0
        auto l = tablet->LockForRead();
773
774
0
        LOG(INFO) << "Sending RestoreTabletSnapshot to tablet: " << tablet->ToString();
775
        // Send RestoreSnapshot requests to all TServers (one tablet - one request).
776
0
        auto task = CreateAsyncTabletSnapshotOp(
777
0
            tablet, snapshot_id, tserver::TabletSnapshotOpRequestPB::RESTORE_ON_TABLET,
778
0
            TabletSnapshotOperationCallback());
779
0
        ScheduleTabletSnapshotOp(task);
780
0
      }
781
0
      break;
782
0
    }
783
0
    default:
784
0
      return STATUS_FORMAT(
785
0
          InternalError, "Unexpected entry type in the snapshot: $0", entry.type());
786
0
  }
787
788
0
  return Status::OK();
789
0
}
790
791
Status CatalogManager::DeleteSnapshot(const DeleteSnapshotRequestPB* req,
792
                                      DeleteSnapshotResponsePB* resp,
793
0
                                      RpcContext* rpc) {
794
0
  LOG(INFO) << "Servicing DeleteSnapshot request: " << req->ShortDebugString();
795
796
0
  auto txn_snapshot_id = TryFullyDecodeTxnSnapshotId(req->snapshot_id());
797
0
  if (txn_snapshot_id) {
798
0
    return snapshot_coordinator_.Delete(
799
0
        txn_snapshot_id, leader_ready_term(), rpc->GetClientDeadline());
800
0
  }
801
802
0
  return DeleteNonTransactionAwareSnapshot(req->snapshot_id());
803
0
}
804
805
0
Status CatalogManager::DeleteNonTransactionAwareSnapshot(const SnapshotId& snapshot_id) {
806
0
  LockGuard lock(mutex_);
807
0
  TRACE("Acquired catalog manager lock");
808
809
0
  TRACE("Looking up snapshot");
810
0
  scoped_refptr<SnapshotInfo> snapshot = FindPtrOrNull(
811
0
      non_txn_snapshot_ids_map_, snapshot_id);
812
0
  if (snapshot == nullptr) {
813
0
    return STATUS(InvalidArgument, "Could not find snapshot", snapshot_id,
814
0
                  MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND));
815
0
  }
816
817
0
  auto snapshot_lock = snapshot->LockForWrite();
818
819
0
  if (snapshot_lock->started_deleting()) {
820
0
    return STATUS(NotFound, "The snapshot was deleted", snapshot_id,
821
0
                  MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND));
822
0
  }
823
824
0
  if (snapshot_lock->is_restoring()) {
825
0
    return STATUS(InvalidArgument, "The snapshot is being restored now", snapshot_id,
826
0
                  MasterError(MasterErrorPB::PARALLEL_SNAPSHOT_OPERATION));
827
0
  }
828
829
0
  TRACE("Updating snapshot metadata on disk");
830
0
  SysSnapshotEntryPB& snapshot_pb = snapshot_lock.mutable_data()->pb;
831
0
  snapshot_pb.set_state(SysSnapshotEntryPB::DELETING);
832
833
  // Update tablet states.
834
0
  SetTabletSnapshotsState(SysSnapshotEntryPB::DELETING, &snapshot_pb);
835
836
  // Update sys-catalog with the updated snapshot state.
837
  // The mutation will be aborted when 'l' exits the scope on early return.
838
0
  RETURN_NOT_OK(CheckStatus(
839
0
      sys_catalog_->Upsert(leader_ready_term(), snapshot),
840
0
      "updating snapshot in sys-catalog"));
841
842
  // Send DeleteSnapshot requests to all TServers (one tablet - one request).
843
0
  for (const SysRowEntry& entry : snapshot_pb.entries()) {
844
0
    if (entry.type() == SysRowEntryType::TABLET) {
845
0
      TRACE("Looking up tablet");
846
0
      scoped_refptr<TabletInfo> tablet = FindPtrOrNull(*tablet_map_, entry.id());
847
0
      if (tablet == nullptr) {
848
0
        LOG(WARNING) << "Deleting tablet not found " << entry.id();
849
0
      } else {
850
0
        TRACE("Locking tablet");
851
0
        auto l = tablet->LockForRead();
852
853
0
        LOG(INFO) << "Sending DeleteTabletSnapshot to tablet: " << tablet->ToString();
854
        // Send DeleteSnapshot requests to all TServers (one tablet - one request).
855
0
        auto task = CreateAsyncTabletSnapshotOp(
856
0
            tablet, snapshot_id, tserver::TabletSnapshotOpRequestPB::DELETE_ON_TABLET,
857
0
            TabletSnapshotOperationCallback());
858
0
        ScheduleTabletSnapshotOp(task);
859
0
      }
860
0
    }
861
0
  }
862
863
  // Commit in memory snapshot data descriptor.
864
0
  TRACE("Committing in-memory snapshot state");
865
0
  snapshot_lock.Commit();
866
867
0
  LOG(INFO) << "Successfully started snapshot " << snapshot->ToString() << " deletion";
868
0
  return Status::OK();
869
0
}
870
871
Status CatalogManager::ImportSnapshotPreprocess(const SnapshotInfoPB& snapshot_pb,
872
                                                ImportSnapshotMetaResponsePB* resp,
873
                                                NamespaceMap* namespace_map,
874
0
                                                ExternalTableSnapshotDataMap* tables_data) {
875
0
  for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) {
876
0
    const SysRowEntry& entry = backup_entry.entry();
877
0
    switch (entry.type()) {
878
0
      case SysRowEntryType::NAMESPACE: // Recreate NAMESPACE.
879
0
        RETURN_NOT_OK(ImportNamespaceEntry(entry, namespace_map));
880
0
        break;
881
0
      case SysRowEntryType::TABLE: { // Create TABLE metadata.
882
0
          LOG_IF(DFATAL, entry.id().empty()) << "Empty entry id";
883
0
          ExternalTableSnapshotData& data = (*tables_data)[entry.id()];
884
885
0
          if (data.old_table_id.empty()) {
886
0
            data.old_table_id = entry.id();
887
0
            data.table_meta = resp->mutable_tables_meta()->Add();
888
0
            data.tablet_id_map = data.table_meta->mutable_tablets_ids();
889
0
            data.table_entry_pb = VERIFY_RESULT(ParseFromSlice<SysTablesEntryPB>(entry.data()));
890
891
0
            if (backup_entry.has_pg_schema_name()) {
892
0
              data.pg_schema_name = backup_entry.pg_schema_name();
893
0
            }
894
0
          } else {
895
0
            LOG_WITH_FUNC(WARNING) << "Ignoring duplicate table with id " << entry.id();
896
0
          }
897
898
0
          LOG_IF(DFATAL, data.old_table_id.empty()) << "Not initialized table id";
899
0
        }
900
0
        break;
901
0
      case SysRowEntryType::TABLET: // Preprocess original tablets.
902
0
        RETURN_NOT_OK(PreprocessTabletEntry(entry, tables_data));
903
0
        break;
904
0
      case SysRowEntryType::CLUSTER_CONFIG: FALLTHROUGH_INTENDED;
905
0
      case SysRowEntryType::REDIS_CONFIG: FALLTHROUGH_INTENDED;
906
0
      case SysRowEntryType::UDTYPE: FALLTHROUGH_INTENDED;
907
0
      case SysRowEntryType::ROLE: FALLTHROUGH_INTENDED;
908
0
      case SysRowEntryType::SYS_CONFIG: FALLTHROUGH_INTENDED;
909
0
      case SysRowEntryType::CDC_STREAM: FALLTHROUGH_INTENDED;
910
0
      case SysRowEntryType::UNIVERSE_REPLICATION: FALLTHROUGH_INTENDED;
911
0
      case SysRowEntryType::SNAPSHOT:  FALLTHROUGH_INTENDED;
912
0
      case SysRowEntryType::SNAPSHOT_SCHEDULE: FALLTHROUGH_INTENDED;
913
0
      case SysRowEntryType::DDL_LOG_ENTRY: FALLTHROUGH_INTENDED;
914
0
      case SysRowEntryType::UNKNOWN:
915
0
        FATAL_INVALID_ENUM_VALUE(SysRowEntryType, entry.type());
916
0
    }
917
0
  }
918
919
0
  return Status::OK();
920
0
}
921
922
Status CatalogManager::ImportSnapshotCreateObject(const SnapshotInfoPB& snapshot_pb,
923
                                                  ImportSnapshotMetaResponsePB* resp,
924
                                                  NamespaceMap* namespace_map,
925
                                                  ExternalTableSnapshotDataMap* tables_data,
926
0
                                                  CreateObjects create_objects) {
927
  // Create ONLY TABLES or ONLY INDEXES in accordance to the argument.
928
0
  for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) {
929
0
    const SysRowEntry& entry = backup_entry.entry();
930
0
    if (entry.type() == SysRowEntryType::TABLE) {
931
0
      ExternalTableSnapshotData& data = (*tables_data)[entry.id()];
932
0
      if ((create_objects == CreateObjects::kOnlyIndexes) == data.is_index()) {
933
0
        RETURN_NOT_OK(ImportTableEntry(*namespace_map, *tables_data, &data));
934
0
      }
935
0
    }
936
0
  }
937
938
0
  return Status::OK();
939
0
}
940
941
Status CatalogManager::ImportSnapshotWaitForTables(const SnapshotInfoPB& snapshot_pb,
942
                                                   ImportSnapshotMetaResponsePB* resp,
943
0
                                                   ExternalTableSnapshotDataMap* tables_data) {
944
0
  for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) {
945
0
    const SysRowEntry& entry = backup_entry.entry();
946
0
    if (entry.type() == SysRowEntryType::TABLE) {
947
0
      ExternalTableSnapshotData& data = (*tables_data)[entry.id()];
948
0
      if (!data.is_index()) {
949
0
        RETURN_NOT_OK(WaitForCreateTableToFinish(data.new_table_id));
950
0
      }
951
0
    }
952
0
  }
953
954
0
  return Status::OK();
955
0
}
956
957
Status CatalogManager::ImportSnapshotProcessTablets(const SnapshotInfoPB& snapshot_pb,
958
                                                    ImportSnapshotMetaResponsePB* resp,
959
0
                                                    ExternalTableSnapshotDataMap* tables_data) {
960
0
  for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) {
961
0
    const SysRowEntry& entry = backup_entry.entry();
962
0
    if (entry.type() == SysRowEntryType::TABLET) {
963
      // Create tablets IDs map.
964
0
      RETURN_NOT_OK(ImportTabletEntry(entry, tables_data));
965
0
    }
966
0
  }
967
968
0
  return Status::OK();
969
0
}
970
971
template <class RespClass>
972
void ProcessDeleteObjectStatus(const string& obj_name,
973
                               const string& id,
974
                               const RespClass& resp,
975
0
                               const Status& s) {
976
0
  Status result = s;
977
0
  if (result.ok() && resp.has_error()) {
978
0
    result = StatusFromPB(resp.error().status());
979
0
    LOG_IF(DFATAL, result.ok()) << "Expecting error status";
980
0
  }
981
982
0
  if (!result.ok()) {
983
0
    LOG_WITH_FUNC(WARNING) << "Failed to delete new " << obj_name << " with id=" << id
984
0
                           << ": " << result;
985
0
  }
986
0
}
Unexecuted instantiation: _ZN2yb6master10enterprise25ProcessDeleteObjectStatusINS0_21DeleteTableResponsePBEEEvRKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEESC_RKT_RKNS_6StatusE
Unexecuted instantiation: _ZN2yb6master10enterprise25ProcessDeleteObjectStatusINS0_25DeleteNamespaceResponsePBEEEvRKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEESC_RKT_RKNS_6StatusE
987
988
void CatalogManager::DeleteNewSnapshotObjects(const NamespaceMap& namespace_map,
989
0
                                              const ExternalTableSnapshotDataMap& tables_data) {
990
0
  for (const ExternalTableSnapshotDataMap::value_type& entry : tables_data) {
991
0
    const TableId& old_id = entry.first;
992
0
    const TableId& new_id = entry.second.new_table_id;
993
0
    const TableType type = entry.second.table_entry_pb.table_type();
994
995
    // Do not delete YSQL objects - it must be deleted via PG API.
996
0
    if (new_id.empty() || new_id == old_id || type == TableType::PGSQL_TABLE_TYPE) {
997
0
      continue;
998
0
    }
999
1000
0
    LOG_WITH_FUNC(INFO) << "Deleting new table with id=" << new_id << " old id=" << old_id;
1001
0
    DeleteTableRequestPB req;
1002
0
    DeleteTableResponsePB resp;
1003
0
    req.mutable_table()->set_table_id(new_id);
1004
0
    req.set_is_index_table(entry.second.is_index());
1005
0
    ProcessDeleteObjectStatus("table", new_id, resp, DeleteTable(&req, &resp, nullptr));
1006
0
  }
1007
1008
0
  for (const NamespaceMap::value_type& entry : namespace_map) {
1009
0
    const NamespaceId& old_id = entry.first;
1010
0
    const NamespaceId& new_id = entry.second.first;
1011
0
    const YQLDatabase& db_type = entry.second.second;
1012
1013
    // Do not delete YSQL objects - it must be deleted via PG API.
1014
0
    if (new_id.empty() || new_id == old_id || db_type == YQL_DATABASE_PGSQL) {
1015
0
      continue;
1016
0
    }
1017
1018
0
    LOG_WITH_FUNC(INFO) << "Deleting new namespace with id=" << new_id << " old id=" << old_id;
1019
0
    DeleteNamespaceRequestPB req;
1020
0
    DeleteNamespaceResponsePB resp;
1021
0
    req.mutable_namespace_()->set_id(new_id);
1022
0
    ProcessDeleteObjectStatus(
1023
0
        "namespace", new_id, resp, DeleteNamespace(&req, &resp, nullptr));
1024
0
  }
1025
0
}
1026
1027
Status CatalogManager::ImportSnapshotMeta(const ImportSnapshotMetaRequestPB* req,
1028
0
                                          ImportSnapshotMetaResponsePB* resp) {
1029
0
  LOG(INFO) << "Servicing ImportSnapshotMeta request: " << req->ShortDebugString();
1030
1031
0
  NamespaceMap namespace_map;
1032
0
  ExternalTableSnapshotDataMap tables_data;
1033
0
  bool successful_exit = false;
1034
1035
0
  auto se = ScopeExit([this, &namespace_map, &tables_data, &successful_exit] {
1036
0
    if (!successful_exit) {
1037
0
      DeleteNewSnapshotObjects(namespace_map, tables_data);
1038
0
    }
1039
0
  });
1040
1041
0
  const SnapshotInfoPB& snapshot_pb = req->snapshot();
1042
1043
0
  if (!snapshot_pb.has_format_version() || snapshot_pb.format_version() != 2) {
1044
0
    return STATUS(InternalError, "Expected snapshot data in format 2",
1045
0
        snapshot_pb.ShortDebugString(), MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1046
0
  }
1047
1048
0
  if (snapshot_pb.backup_entries_size() == 0) {
1049
0
    return STATUS(InternalError, "Expected snapshot data prepared for backup",
1050
0
        snapshot_pb.ShortDebugString(), MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1051
0
  }
1052
1053
  // PHASE 1: Recreate namespaces, create table's meta data.
1054
0
  RETURN_NOT_OK(ImportSnapshotPreprocess(snapshot_pb, resp, &namespace_map, &tables_data));
1055
1056
  // PHASE 2: Recreate ONLY tables.
1057
0
  RETURN_NOT_OK(ImportSnapshotCreateObject(
1058
0
      snapshot_pb, resp, &namespace_map, &tables_data, CreateObjects::kOnlyTables));
1059
1060
  // PHASE 3: Wait for all tables creation complete.
1061
0
  RETURN_NOT_OK(ImportSnapshotWaitForTables(snapshot_pb, resp, &tables_data));
1062
1063
  // PHASE 4: Recreate ONLY indexes.
1064
0
  RETURN_NOT_OK(ImportSnapshotCreateObject(
1065
0
      snapshot_pb, resp, &namespace_map, &tables_data, CreateObjects::kOnlyIndexes));
1066
1067
  // PHASE 5: Restore tablets.
1068
0
  RETURN_NOT_OK(ImportSnapshotProcessTablets(snapshot_pb, resp, &tables_data));
1069
1070
0
  successful_exit = true;
1071
0
  return Status::OK();
1072
0
}
1073
1074
Status CatalogManager::ChangeEncryptionInfo(const ChangeEncryptionInfoRequestPB* req,
1075
6
                                            ChangeEncryptionInfoResponsePB* resp) {
1076
6
  auto l = cluster_config_->LockForWrite();
1077
6
  auto encryption_info = l.mutable_data()->pb.mutable_encryption_info();
1078
1079
6
  RETURN_NOT_OK(encryption_manager_->ChangeEncryptionInfo(req, encryption_info));
1080
1081
6
  l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
1082
6
  RETURN_NOT_OK(CheckStatus(
1083
6
      sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
1084
6
      "updating cluster config in sys-catalog"));
1085
6
  l.Commit();
1086
1087
6
  std::lock_guard<simple_spinlock> lock(should_send_universe_key_registry_mutex_);
1088
2
  for (auto& entry : should_send_universe_key_registry_) {
1089
2
    entry.second = true;
1090
2
  }
1091
1092
6
  return Status::OK();
1093
6
}
1094
1095
Status CatalogManager::IsEncryptionEnabled(const IsEncryptionEnabledRequestPB* req,
1096
6
                                           IsEncryptionEnabledResponsePB* resp) {
1097
6
  return encryption_manager_->IsEncryptionEnabled(
1098
6
      cluster_config_->LockForRead()->pb.encryption_info(), resp);
1099
6
}
1100
1101
Status CatalogManager::ImportNamespaceEntry(const SysRowEntry& entry,
1102
0
                                            NamespaceMap* namespace_map) {
1103
0
  LOG_IF(DFATAL, entry.type() != SysRowEntryType::NAMESPACE)
1104
0
      << "Unexpected entry type: " << entry.type();
1105
1106
0
  SysNamespaceEntryPB meta = VERIFY_RESULT(ParseFromSlice<SysNamespaceEntryPB>(entry.data()));
1107
0
  const YQLDatabase db_type = GetDatabaseType(meta);
1108
0
  NamespaceData& ns_data = (*namespace_map)[entry.id()];
1109
0
  ns_data.second = db_type;
1110
1111
0
  TRACE("Looking up namespace");
1112
  // First of all try to find the namespace by ID. It will work if we are restoring the backup
1113
  // on the original cluster where the backup was created.
1114
0
  scoped_refptr<NamespaceInfo> ns;
1115
0
  {
1116
0
    SharedLock lock(mutex_);
1117
0
    ns = FindPtrOrNull(namespace_ids_map_, entry.id());
1118
0
  }
1119
1120
0
  if (ns != nullptr && ns->name() == meta.name() && ns->state() == SysNamespaceEntryPB::RUNNING) {
1121
0
    ns_data.first = entry.id();
1122
0
    return Status::OK();
1123
0
  }
1124
1125
  // If the namespace was not found by ID, it's ok on a new cluster OR if the namespace was
1126
  // deleted and created again. In both cases the namespace can be found by NAME.
1127
0
  if (db_type == YQL_DATABASE_PGSQL) {
1128
    // YSQL database must be created via external call. Find it by name.
1129
0
    {
1130
0
      SharedLock lock(mutex_);
1131
0
      ns = FindPtrOrNull(namespace_names_mapper_[db_type], meta.name());
1132
0
    }
1133
1134
0
    if (ns == nullptr) {
1135
0
      const string msg = Format("YSQL database must exist: $0", meta.name());
1136
0
      LOG_WITH_FUNC(WARNING) << msg;
1137
0
      return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::NAMESPACE_NOT_FOUND));
1138
0
    }
1139
0
    if (ns->state() != SysNamespaceEntryPB::RUNNING) {
1140
0
      const string msg = Format("Found YSQL database must be running: $0", meta.name());
1141
0
      LOG_WITH_FUNC(WARNING) << msg;
1142
0
      return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::NAMESPACE_NOT_FOUND));
1143
0
    }
1144
1145
0
    auto ns_lock = ns->LockForRead();
1146
0
    ns_data.first = ns->id();
1147
0
  } else {
1148
0
    CreateNamespaceRequestPB req;
1149
0
    CreateNamespaceResponsePB resp;
1150
0
    req.set_name(meta.name());
1151
0
    const Status s = CreateNamespace(&req, &resp, nullptr);
1152
1153
0
    if (!s.ok() && !s.IsAlreadyPresent()) {
1154
0
      return s.CloneAndAppend("Failed to create namespace");
1155
0
    }
1156
1157
0
    if (s.IsAlreadyPresent()) {
1158
0
      LOG_WITH_FUNC(INFO) << "Using existing namespace '" << meta.name() << "': " << resp.id();
1159
0
    }
1160
1161
0
    ns_data.first = resp.id();
1162
0
  }
1163
0
  return Status::OK();
1164
0
}
1165
1166
Status CatalogManager::RecreateTable(const NamespaceId& new_namespace_id,
1167
                                     const ExternalTableSnapshotDataMap& table_map,
1168
0
                                     ExternalTableSnapshotData* table_data) {
1169
0
  const SysTablesEntryPB& meta = DCHECK_NOTNULL(table_data)->table_entry_pb;
1170
1171
0
  CreateTableRequestPB req;
1172
0
  CreateTableResponsePB resp;
1173
0
  req.set_name(meta.name());
1174
0
  req.set_table_type(meta.table_type());
1175
0
  req.set_num_tablets(narrow_cast<int32_t>(table_data->num_tablets));
1176
0
  for (const auto& p : table_data->partitions) {
1177
0
    *req.add_partitions() = p;
1178
0
  }
1179
0
  req.mutable_namespace_()->set_id(new_namespace_id);
1180
0
  *req.mutable_partition_schema() = meta.partition_schema();
1181
0
  *req.mutable_replication_info() = meta.replication_info();
1182
1183
0
  SchemaPB* const schema = req.mutable_schema();
1184
0
  *schema = meta.schema();
1185
1186
  // Setup Index info.
1187
0
  if (table_data->is_index()) {
1188
0
    TRACE("Looking up indexed table");
1189
    // First of all try to attach to the new copy of the referenced table,
1190
    // because the table restored from the snapshot is preferred.
1191
    // For that try to map old indexed table ID into new table ID.
1192
0
    ExternalTableSnapshotDataMap::const_iterator it = table_map.find(meta.indexed_table_id());
1193
0
    const bool using_existing_table = (it == table_map.end());
1194
1195
0
    if (using_existing_table) {
1196
0
      LOG_WITH_FUNC(INFO) << "Try to use old indexed table id " << meta.indexed_table_id();
1197
0
      req.set_indexed_table_id(meta.indexed_table_id());
1198
0
    } else {
1199
0
      LOG_WITH_FUNC(INFO) << "Found new table id " << it->second.new_table_id
1200
0
                          << " for old table id " << meta.indexed_table_id()
1201
0
                          << " from the snapshot";
1202
0
      req.set_indexed_table_id(it->second.new_table_id);
1203
0
    }
1204
1205
0
    scoped_refptr<TableInfo> indexed_table;
1206
0
    {
1207
0
      SharedLock lock(mutex_);
1208
      // Try to find the specified indexed table by id.
1209
0
      indexed_table = FindPtrOrNull(*table_ids_map_, req.indexed_table_id());
1210
0
    }
1211
1212
0
    if (indexed_table == nullptr) {
1213
0
      const string msg = Format("Indexed table not found by id: $0", req.indexed_table_id());
1214
0
      LOG_WITH_FUNC(WARNING) << msg;
1215
0
      return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
1216
0
    }
1217
1218
0
    LOG_WITH_FUNC(INFO) << "Found indexed table by id " << req.indexed_table_id();
1219
1220
    // Ensure the main table schema (including column ids) was not changed.
1221
0
    if (!using_existing_table) {
1222
0
      Schema new_indexed_schema, src_indexed_schema;
1223
0
      RETURN_NOT_OK(indexed_table->GetSchema(&new_indexed_schema));
1224
0
      RETURN_NOT_OK(SchemaFromPB(it->second.table_entry_pb.schema(), &src_indexed_schema));
1225
1226
0
      if (!new_indexed_schema.Equals(src_indexed_schema)) {
1227
0
          const string msg = Format(
1228
0
              "Recreated table has changes in schema: new schema={$0}, source schema={$1}",
1229
0
              new_indexed_schema.ToString(), src_indexed_schema.ToString());
1230
0
          LOG_WITH_FUNC(WARNING) << msg;
1231
0
          return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1232
0
      }
1233
1234
0
      if (new_indexed_schema.column_ids() != src_indexed_schema.column_ids()) {
1235
0
          const string msg = Format(
1236
0
              "Recreated table has changes in column ids: new ids=$0, source ids=$1",
1237
0
              ToString(new_indexed_schema.column_ids()), ToString(src_indexed_schema.column_ids()));
1238
0
          LOG_WITH_FUNC(WARNING) << msg;
1239
0
          return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1240
0
      }
1241
0
    }
1242
1243
0
    req.set_is_local_index(meta.is_local_index());
1244
0
    req.set_is_unique_index(meta.is_unique_index());
1245
0
    req.set_skip_index_backfill(true);
1246
    // Setup IndexInfoPB - self descriptor.
1247
0
    IndexInfoPB* const index_info_pb = req.mutable_index_info();
1248
0
    *index_info_pb = meta.index_info();
1249
0
    index_info_pb->clear_table_id();
1250
0
    index_info_pb->set_indexed_table_id(req.indexed_table_id());
1251
1252
    // Reset column ids.
1253
0
    for (int i = 0; i < index_info_pb->columns_size(); ++i) {
1254
0
      index_info_pb->mutable_columns(i)->clear_column_id();
1255
0
    }
1256
0
  }
1257
1258
0
  RETURN_NOT_OK(CreateTable(&req, &resp, /* RpcContext */nullptr));
1259
0
  table_data->new_table_id = resp.table_id();
1260
0
  LOG_WITH_FUNC(INFO) << "New table id " << table_data->new_table_id << " for "
1261
0
                      << table_data->old_table_id;
1262
0
  return Status::OK();
1263
0
}
1264
1265
Status CatalogManager::RepartitionTable(const scoped_refptr<TableInfo> table,
1266
0
                                        const ExternalTableSnapshotData* table_data) {
1267
0
  DCHECK_EQ(table->id(), table_data->new_table_id);
1268
0
  if (table->GetTableType() != PGSQL_TABLE_TYPE) {
1269
0
    return STATUS_FORMAT(InvalidArgument,
1270
0
                         "Cannot repartition non-YSQL table: got $0",
1271
0
                         TableType_Name(table->GetTableType()));
1272
0
  }
1273
0
  LOG_WITH_FUNC(INFO) << "Repartition table " << table->id()
1274
0
                      << " using external snapshot table " << table_data->old_table_id;
1275
1276
  // Get partitions from external snapshot.
1277
0
  size_t i = 0;
1278
0
  vector<Partition> partitions(table_data->partitions.size());
1279
0
  for (const auto& partition_pb : table_data->partitions) {
1280
0
    Partition::FromPB(partition_pb, &partitions[i++]);
1281
0
  }
1282
0
  VLOG_WITH_FUNC(3) << "Got " << partitions.size()
1283
0
                    << " partitions from external snapshot for table " << table->id();
1284
1285
  // Change TableInfo to point to the new tablets.
1286
0
  string deletion_msg;
1287
0
  vector<scoped_refptr<TabletInfo>> new_tablets;
1288
0
  vector<scoped_refptr<TabletInfo>> old_tablets;
1289
0
  {
1290
    // Acquire the TableInfo pb write lock. Although it is not required for some of the individual
1291
    // steps, we want to hold it through so that we guarantee the state does not change during the
1292
    // whole process. Consequently, we hold it through some steps that require mutex_, but since
1293
    // taking mutex_ after TableInfo pb lock is prohibited for deadlock reasons, acquire mutex_
1294
    // first, then release it when it is no longer needed, still holding table pb lock.
1295
0
    TableInfo::WriteLock table_lock;
1296
0
    {
1297
0
      LockGuard lock(mutex_);
1298
0
      TRACE("Acquired catalog manager lock");
1299
1300
      // Make sure the table is in RUNNING state.
1301
      // This by itself doesn't need a pb write lock: just a read lock. However, we want to prevent
1302
      // other writers from entering from this point forward, so take the write lock now.
1303
0
      table_lock = table->LockForWrite();
1304
0
      if (table->old_pb().state() != SysTablesEntryPB::RUNNING) {
1305
0
        return STATUS_FORMAT(IllegalState,
1306
0
                             "Table $0 not running: $1",
1307
0
                             table->ToString(),
1308
0
                             SysTablesEntryPB_State_Name(table->old_pb().state()));
1309
0
      }
1310
      // Make sure the table's tablets can be deleted.
1311
0
      RETURN_NOT_OK_PREPEND(CheckIfForbiddenToDeleteTabletOf(table),
1312
0
                            Format("Cannot repartition table $0", table->id()));
1313
1314
      // Create and mark new tablets for creation.
1315
1316
      // Use partitions from external snapshot to create new tablets in state PREPARING. The tablets
1317
      // will start CREATING once they are committed in memory.
1318
0
      for (const auto& partition : partitions) {
1319
0
        PartitionPB partition_pb;
1320
0
        partition.ToPB(&partition_pb);
1321
0
        new_tablets.push_back(CreateTabletInfo(table.get(), partition_pb));
1322
0
      }
1323
1324
      // Add tablets to catalog manager tablet_map_. This should be safe to do after creating
1325
      // tablets since we're still holding mutex_.
1326
0
      auto tablet_map_checkout = tablet_map_.CheckOut();
1327
0
      for (auto& new_tablet : new_tablets) {
1328
0
        InsertOrDie(tablet_map_checkout.get_ptr(), new_tablet->tablet_id(), new_tablet);
1329
0
      }
1330
0
      VLOG_WITH_FUNC(3) << "Prepared creation of " << new_tablets.size()
1331
0
                        << " new tablets for table " << table->id();
1332
1333
      // mutex_ is no longer needed, so release by going out of scope.
1334
0
    }
1335
    // The table pb write lock is still held, ensuring that the table state does not change. Later
1336
    // steps, like GetTablets or AddTablets, will acquire/release the TableInfo lock_, but it's
1337
    // probably fine that they are released between the steps since the table pb write lock is held
1338
    // throughout. In other words, there should be no risk that TableInfo tablets_ changes between
1339
    // GetTablets and RemoveTablets.
1340
1341
    // Abort tablet mutations in case of early returns.
1342
0
    ScopedInfoCommitter<TabletInfo> unlocker_new(&new_tablets);
1343
1344
    // Mark old tablets for deletion.
1345
0
    old_tablets = table->GetTablets(IncludeInactive::kTrue);
1346
    // Sort so that locking can be done in a deterministic order.
1347
0
    std::sort(old_tablets.begin(), old_tablets.end(), [](const auto& lhs, const auto& rhs) {
1348
0
      return lhs->tablet_id() < rhs->tablet_id();
1349
0
    });
1350
0
    deletion_msg = Format("Old tablets of table $0 deleted at $1",
1351
0
                          table->id(), LocalTimeAsString());
1352
0
    for (auto& old_tablet : old_tablets) {
1353
0
      old_tablet->mutable_metadata()->StartMutation();
1354
0
      old_tablet->mutable_metadata()->mutable_dirty()->set_state(
1355
0
          SysTabletsEntryPB::DELETED, deletion_msg);
1356
0
    }
1357
0
    VLOG_WITH_FUNC(3) << "Prepared deletion of " << old_tablets.size() << " old tablets for table "
1358
0
                      << table->id();
1359
1360
    // Abort tablet mutations in case of early returns.
1361
0
    ScopedInfoCommitter<TabletInfo> unlocker_old(&old_tablets);
1362
1363
    // Change table's partition schema to the external snapshot's.
1364
0
    auto& table_pb = table_lock.mutable_data()->pb;
1365
0
    table_pb.mutable_partition_schema()->CopyFrom(
1366
0
        table_data->table_entry_pb.partition_schema());
1367
0
    table_pb.set_partition_list_version(table_pb.partition_list_version() + 1);
1368
1369
    // Remove old tablets from TableInfo.
1370
0
    table->RemoveTablets(old_tablets);
1371
    // Add new tablets to TableInfo. This must be done after removing tablets because
1372
    // TableInfo::partitions_ has key PartitionKey, which old and new tablets may conflict on.
1373
0
    table->AddTablets(new_tablets);
1374
1375
    // Commit table and tablets to disk.
1376
0
    RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), table, new_tablets, old_tablets));
1377
0
    VLOG_WITH_FUNC(2) << "Committed to disk: table " << table->id() << " repartition from "
1378
0
                      << old_tablets.size() << " tablets to " << new_tablets.size() << " tablets";
1379
1380
    // Commit to memory. Commit new tablets (addition) first since that doesn't break anything.
1381
    // Commit table next since new tablets are already committed and ready to be referenced. Commit
1382
    // old tablets (deletion) last since the table is not referencing them anymore.
1383
0
    unlocker_new.Commit();
1384
0
    table_lock.Commit();
1385
0
    unlocker_old.Commit();
1386
0
    VLOG_WITH_FUNC(1) << "Committed to memory: table " << table->id() << " repartition from "
1387
0
                      << old_tablets.size() << " tablets to " << new_tablets.size() << " tablets";
1388
0
  }
1389
1390
  // Finally, now that everything is committed, send the delete tablet requests.
1391
0
  for (auto& old_tablet : old_tablets) {
1392
0
    DeleteTabletReplicas(old_tablet.get(), deletion_msg, HideOnly::kFalse);
1393
0
  }
1394
0
  VLOG_WITH_FUNC(2) << "Sent delete tablet requests for " << old_tablets.size() << " old tablets"
1395
0
                    << " of table " << table->id();
1396
  // The create tablet requests should be handled by bg tasks which find the PREPARING tablets after
1397
  // commit.
1398
1399
0
  return Status::OK();
1400
0
}
1401
1402
// Helper function for ImportTableEntry.
1403
//
1404
// Given an internal table and an external table snapshot, do some checks to determine if we should
1405
// move forward with using this internal table for import.
1406
//
1407
// table: internal table's info
1408
// snapshot_data: external table's snapshot data
1409
Result<bool> CatalogManager::CheckTableForImport(scoped_refptr<TableInfo> table,
1410
0
                                                 ExternalTableSnapshotData* snapshot_data) {
1411
0
  auto table_lock = table->LockForRead();
1412
1413
  // Check if table is live.
1414
0
  if (!table_lock->visible_to_client()) {
1415
0
    VLOG_WITH_FUNC(2) << "Table not visible to client: " << table->ToString();
1416
0
    return false;
1417
0
  }
1418
  // Check if table names match.
1419
0
  const string& external_table_name = snapshot_data->table_entry_pb.name();
1420
0
  if (table_lock->name() != external_table_name) {
1421
0
    VLOG_WITH_FUNC(2) << "Table names do not match: "
1422
0
                      << table_lock->name() << " vs " << external_table_name
1423
0
                      << " for " << table->ToString();
1424
0
    return false;
1425
0
  }
1426
  // Check index vs table.
1427
0
  if (snapshot_data->is_index() ? table->indexed_table_id().empty()
1428
0
                                : !table->indexed_table_id().empty()) {
1429
0
    VLOG_WITH_FUNC(2) << "External snapshot table is " << (snapshot_data->is_index() ? "" : "not ")
1430
0
                      << "index but internal table is the opposite: " << table->ToString();
1431
0
    return false;
1432
0
  }
1433
  // Check if table schemas match (if present in snapshot).
1434
0
  if (!snapshot_data->pg_schema_name.empty()) {
1435
0
    if (table->GetTableType() != PGSQL_TABLE_TYPE) {
1436
0
      LOG_WITH_FUNC(DFATAL) << "ExternalTableSnapshotData.pg_schema_name set when table type is not"
1437
0
          << " PGSQL: schema name: " << snapshot_data->pg_schema_name
1438
0
          << ", table type: " << TableType_Name(table->GetTableType());
1439
      // If not a debug build, ignore pg_schema_name.
1440
0
    } else {
1441
0
      const string internal_schema_name = VERIFY_RESULT(GetPgSchemaName(table));
1442
0
      const string& external_schema_name = snapshot_data->pg_schema_name;
1443
0
      if (internal_schema_name != external_schema_name) {
1444
0
        LOG_WITH_FUNC(INFO) << "Schema names do not match: "
1445
0
                            << internal_schema_name << " vs " << external_schema_name
1446
0
                            << " for " << table->ToString();
1447
0
        return false;
1448
0
      }
1449
0
    }
1450
0
  }
1451
1452
0
  return true;
1453
0
}
1454
1455
Status CatalogManager::ImportTableEntry(const NamespaceMap& namespace_map,
1456
                                        const ExternalTableSnapshotDataMap& table_map,
1457
0
                                        ExternalTableSnapshotData* table_data) {
1458
0
  const SysTablesEntryPB& meta = DCHECK_NOTNULL(table_data)->table_entry_pb;
1459
0
  bool is_parent_colocated_table = false;
1460
1461
0
  table_data->old_namespace_id = meta.namespace_id();
1462
0
  LOG_IF(DFATAL, table_data->old_namespace_id.empty()) << "No namespace id";
1463
1464
0
  LOG_IF(DFATAL, namespace_map.find(table_data->old_namespace_id) == namespace_map.end())
1465
0
      << "Namespace not found: " << table_data->old_namespace_id;
1466
0
  const NamespaceId new_namespace_id =
1467
0
      namespace_map.find(table_data->old_namespace_id)->second.first;
1468
0
  LOG_IF(DFATAL, new_namespace_id.empty()) << "No namespace id";
1469
1470
0
  Schema schema;
1471
0
  RETURN_NOT_OK(SchemaFromPB(meta.schema(), &schema));
1472
0
  const vector<ColumnId>& column_ids = schema.column_ids();
1473
0
  scoped_refptr<TableInfo> table;
1474
1475
  // First, check if namespace id and table id match. If, in addition, other properties match, we
1476
  // found the destination table.
1477
0
  if (new_namespace_id == table_data->old_namespace_id) {
1478
0
    TRACE("Looking up table");
1479
0
    {
1480
0
      SharedLock lock(mutex_);
1481
0
      table = FindPtrOrNull(*table_ids_map_, table_data->old_table_id);
1482
0
    }
1483
1484
0
    if (table != nullptr) {
1485
0
      VLOG_WITH_PREFIX(3) << "Begin first search";
1486
      // At this point, namespace id and table id match. Check other properties, like whether the
1487
      // table is active and whether table name matches.
1488
0
      SharedLock lock(mutex_);
1489
0
      if (VERIFY_RESULT(CheckTableForImport(table, table_data))) {
1490
0
        LOG_WITH_FUNC(INFO) << "Found existing table: '" << table->ToString() << "'";
1491
0
      } else {
1492
        // A property did not match, so this search by ids failed.
1493
0
        auto table_lock = table->LockForRead();
1494
0
        LOG_WITH_FUNC(WARNING) << "Existing table " << table->ToString() << " not suitable: "
1495
0
                               << table_lock->pb.ShortDebugString()
1496
0
                               << ", name: " << table->name() << " vs " << meta.name();
1497
0
        table.reset();
1498
0
      }
1499
0
    }
1500
0
  }
1501
1502
  // Second, if we still didn't find a match...
1503
0
  if (table == nullptr) {
1504
0
    VLOG_WITH_PREFIX(3) << "Begin second search";
1505
0
    switch (meta.table_type()) {
1506
0
      case TableType::YQL_TABLE_TYPE: FALLTHROUGH_INTENDED;
1507
0
      case TableType::REDIS_TABLE_TYPE: {
1508
        // For YCQL and YEDIS, simply create the missing table.
1509
0
        RETURN_NOT_OK(RecreateTable(new_namespace_id, table_map, table_data));
1510
0
        break;
1511
0
      }
1512
0
      case TableType::PGSQL_TABLE_TYPE: {
1513
        // For YSQL, the table must be created via external call. Therefore, continue the search for
1514
        // the table, this time checking for name matches rather than id matches.
1515
1516
0
        if (meta.colocated() && IsColocatedParentTableId(table_data->old_table_id)) {
1517
          // For the parent colocated table we need to generate the new_table_id ourselves
1518
          // since the names will not match.
1519
          // For normal colocated tables, we are still able to follow the normal table flow, so no
1520
          // need to generate the new_table_id ourselves.
1521
0
          table_data->new_table_id = new_namespace_id + kColocatedParentTableIdSuffix;
1522
0
          is_parent_colocated_table = true;
1523
0
        } else {
1524
0
          if (!table_data->new_table_id.empty()) {
1525
0
            const string msg = Format(
1526
0
                "$0 expected empty new table id but $1 found", __func__, table_data->new_table_id);
1527
0
            LOG_WITH_FUNC(WARNING) << msg;
1528
0
            return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1529
0
          }
1530
0
          SharedLock lock(mutex_);
1531
1532
0
          for (const auto& entry : *table_ids_map_) {
1533
0
            table = entry.second;
1534
1535
0
            if (new_namespace_id != table->namespace_id()) {
1536
0
              VLOG_WITH_FUNC(3) << "Namespace ids do not match: "
1537
0
                                << table->namespace_id() << " vs " << new_namespace_id
1538
0
                                << " for " << table->ToString();
1539
0
              continue;
1540
0
            }
1541
0
            if (!VERIFY_RESULT(CheckTableForImport(table, table_data))) {
1542
              // Some other check failed.
1543
0
              continue;
1544
0
            }
1545
            // Also check if table is user-created.
1546
0
            if (!IsUserCreatedTableUnlocked(*table)) {
1547
0
              VLOG_WITH_FUNC(2) << "Table not user created: " << table->ToString();
1548
0
              continue;
1549
0
            }
1550
1551
            // Found the new YSQL table by name.
1552
0
            if (table_data->new_table_id.empty()) {
1553
0
              LOG_WITH_FUNC(INFO) << "Found existing table " << entry.first << " for "
1554
0
                                  << new_namespace_id << "/" << meta.name() << " (old table "
1555
0
                                  << table_data->old_table_id << ") with schema "
1556
0
                                  << table_data->pg_schema_name;
1557
0
              table_data->new_table_id = entry.first;
1558
0
            } else if (table_data->new_table_id != entry.first) {
1559
0
              const string msg = Format(
1560
0
                  "Found 2 YSQL tables with the same name: $0 - $1, $2",
1561
0
                  meta.name(), table_data->new_table_id, entry.first);
1562
0
              LOG_WITH_FUNC(WARNING) << msg;
1563
0
              return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1564
0
            }
1565
0
          }
1566
1567
0
          if (table_data->new_table_id.empty()) {
1568
0
            const string msg = Format("YSQL table not found: $0", meta.name());
1569
0
            LOG_WITH_FUNC(WARNING) << msg;
1570
0
            return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
1571
0
          }
1572
0
        }
1573
0
        break;
1574
0
      }
1575
0
      case TableType::TRANSACTION_STATUS_TABLE_TYPE: {
1576
0
        return STATUS(
1577
0
            InvalidArgument,
1578
0
            Format("Unexpected table type: $0", TableType_Name(meta.table_type())),
1579
0
            MasterError(MasterErrorPB::INVALID_TABLE_TYPE));
1580
0
      }
1581
0
    }
1582
0
  } else {
1583
0
    table_data->new_table_id = table_data->old_table_id;
1584
0
    LOG_WITH_FUNC(INFO) << "Use existing table " << table_data->new_table_id;
1585
0
  }
1586
1587
  // The destination table should be found or created by now.
1588
0
  TRACE("Looking up new table");
1589
0
  {
1590
0
    SharedLock lock(mutex_);
1591
0
    table = FindPtrOrNull(*table_ids_map_, table_data->new_table_id);
1592
0
  }
1593
0
  if (table == nullptr) {
1594
0
    const string msg = Format("Created table not found: $0", table_data->new_table_id);
1595
0
    LOG_WITH_FUNC(WARNING) << msg;
1596
0
    return STATUS(InternalError, msg, MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
1597
0
  }
1598
1599
  // Don't do schema validation/column updates on the parent colocated table.
1600
  // However, still do the validation for regular colocated tables.
1601
0
  if (!is_parent_colocated_table) {
1602
0
    Schema persisted_schema;
1603
0
    size_t new_num_tablets = 0;
1604
0
    {
1605
0
      TRACE("Locking table");
1606
0
      auto table_lock = table->LockForRead();
1607
0
      RETURN_NOT_OK(table->GetSchema(&persisted_schema));
1608
0
      new_num_tablets = table->NumPartitions();
1609
0
    }
1610
1611
    // Ignore 'nullable' attribute - due to difference in implementation
1612
    // of PgCreateTable::AddColumn() and PgAlterTable::AddColumn().
1613
0
    struct CompareColumnsExceptNullable {
1614
0
      bool operator ()(const ColumnSchema& a, const ColumnSchema& b) {
1615
0
        return ColumnSchema::CompHashKey(a, b) && ColumnSchema::CompSortingType(a, b) &&
1616
0
            ColumnSchema::CompTypeInfo(a, b) && ColumnSchema::CompName(a, b);
1617
0
      }
1618
0
    } comparator;
1619
    // Schema::Equals() compares only column names & types. It does not compare the column ids.
1620
0
    if (!persisted_schema.Equals(schema, comparator)
1621
0
        || persisted_schema.column_ids().size() != column_ids.size()) {
1622
0
      const string msg = Format(
1623
0
          "Invalid created $0 table '$1' in namespace id $2: schema={$3}, expected={$4}",
1624
0
          TableType_Name(meta.table_type()), meta.name(), new_namespace_id,
1625
0
          persisted_schema, schema);
1626
0
      LOG_WITH_FUNC(WARNING) << msg;
1627
0
      return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1628
0
    }
1629
1630
0
    if (table_data->num_tablets > 0) {
1631
0
      if (meta.table_type() == TableType::PGSQL_TABLE_TYPE) {
1632
0
        bool partitions_match = true;
1633
0
        if (new_num_tablets != table_data->num_tablets) {
1634
0
          partitions_match = false;
1635
0
        } else {
1636
          // Check if partition boundaries match.  Only check the starts; assume the ends are fine.
1637
0
          size_t i = 0;
1638
0
          vector<PartitionKey> partition_starts(table_data->num_tablets);
1639
0
          for (const auto& partition_pb : table_data->partitions) {
1640
0
            partition_starts[i] = partition_pb.partition_key_start();
1641
0
            LOG_IF(DFATAL, (i == 0) ? partition_starts[i] != ""
1642
0
                                    : partition_starts[i] <= partition_starts[i-1])
1643
0
                << "Wrong partition key start: " << b2a_hex(partition_starts[i]);
1644
0
            i++;
1645
0
          }
1646
0
          if (!table->HasPartitions(partition_starts)) {
1647
0
            LOG_WITH_FUNC(INFO) << "Partition boundaries mismatch for table " << table->id();
1648
0
            partitions_match = false;
1649
0
          }
1650
0
        }
1651
1652
0
        if (!partitions_match) {
1653
0
          RETURN_NOT_OK(RepartitionTable(table, table_data));
1654
0
        }
1655
0
      } else { // not PGSQL_TABLE_TYPE
1656
0
        if (new_num_tablets != table_data->num_tablets) {
1657
0
          const string msg = Format(
1658
0
              "Wrong number of tablets in created $0 table '$1' in namespace id $2:"
1659
0
              " $3 (expected $4)",
1660
0
              TableType_Name(meta.table_type()), meta.name(), new_namespace_id,
1661
0
              new_num_tablets, table_data->num_tablets);
1662
0
          LOG_WITH_FUNC(WARNING) << msg;
1663
0
          return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1664
0
        }
1665
0
      }
1666
0
    }
1667
1668
    // Update the table column ids if it's not equal to the stored ids.
1669
0
    if (persisted_schema.column_ids() != column_ids) {
1670
0
      if (meta.table_type() != TableType::PGSQL_TABLE_TYPE) {
1671
0
        LOG_WITH_FUNC(WARNING) << "Unexpected wrong column ids in "
1672
0
                               << TableType_Name(meta.table_type()) << " table '" << meta.name()
1673
0
                               << "' in namespace id " << new_namespace_id;
1674
0
      }
1675
1676
0
      LOG_WITH_FUNC(INFO) << "Restoring column ids in " << TableType_Name(meta.table_type())
1677
0
                          << " table '" << meta.name() << "' in namespace id "
1678
0
                          << new_namespace_id;
1679
0
      auto l = table->LockForWrite();
1680
0
      size_t col_idx = 0;
1681
0
      for (auto& column : *l.mutable_data()->pb.mutable_schema()->mutable_columns()) {
1682
        // Expecting here correct schema (columns - order, names, types), but with only wrong
1683
        // column ids. Checking correct column order and column names below.
1684
0
        if (column.name() != schema.column(col_idx).name()) {
1685
0
          const string msg = Format(
1686
0
              "Unexpected column name for index=$0: name=$1, expected name=$2",
1687
0
              col_idx, schema.column(col_idx).name(), column.name());
1688
0
          LOG_WITH_FUNC(WARNING) << msg;
1689
0
          return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED));
1690
0
        }
1691
        // Copy the column id from imported (original) schema.
1692
0
        column.set_id(column_ids[col_idx++]);
1693
0
      }
1694
1695
0
      l.mutable_data()->pb.set_next_column_id(schema.max_col_id() + 1);
1696
0
      l.mutable_data()->pb.set_version(l->pb.version() + 1);
1697
      // Update sys-catalog with the new table schema.
1698
0
      RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), table));
1699
0
      l.Commit();
1700
      // Update the new table schema in tablets.
1701
0
      RETURN_NOT_OK(SendAlterTableRequest(table));
1702
0
    }
1703
0
  }
1704
1705
  // Set the type of the table in the response pb (default is TABLE so only set if colocated).
1706
0
  if (meta.colocated()) {
1707
0
    if (is_parent_colocated_table) {
1708
0
      table_data->table_meta->set_table_type(
1709
0
          ImportSnapshotMetaResponsePB_TableType_PARENT_COLOCATED_TABLE);
1710
0
    } else {
1711
0
      table_data->table_meta->set_table_type(
1712
0
          ImportSnapshotMetaResponsePB_TableType_COLOCATED_TABLE);
1713
0
    }
1714
0
  }
1715
1716
0
  TabletInfos new_tablets;
1717
0
  {
1718
0
    TRACE("Locking table");
1719
0
    auto table_lock = table->LockForRead();
1720
0
    new_tablets = table->GetTablets();
1721
0
  }
1722
1723
0
  for (const scoped_refptr<TabletInfo>& tablet : new_tablets) {
1724
0
    auto tablet_lock = tablet->LockForRead();
1725
0
    const PartitionPB& partition_pb = tablet->metadata().state().pb.partition();
1726
0
    const ExternalTableSnapshotData::PartitionKeys key(
1727
0
        partition_pb.partition_key_start(), partition_pb.partition_key_end());
1728
0
    table_data->new_tablets_map[key] = tablet->id();
1729
0
  }
1730
1731
0
  IdPairPB* const namespace_ids = table_data->table_meta->mutable_namespace_ids();
1732
0
  namespace_ids->set_new_id(new_namespace_id);
1733
0
  namespace_ids->set_old_id(table_data->old_namespace_id);
1734
1735
0
  IdPairPB* const table_ids = table_data->table_meta->mutable_table_ids();
1736
0
  table_ids->set_new_id(table_data->new_table_id);
1737
0
  table_ids->set_old_id(table_data->old_table_id);
1738
1739
0
  return Status::OK();
1740
0
}
1741
1742
Status CatalogManager::PreprocessTabletEntry(const SysRowEntry& entry,
1743
0
                                             ExternalTableSnapshotDataMap* table_map) {
1744
0
  LOG_IF(DFATAL, entry.type() != SysRowEntryType::TABLET)
1745
0
      << "Unexpected entry type: " << entry.type();
1746
1747
0
  SysTabletsEntryPB meta = VERIFY_RESULT(ParseFromSlice<SysTabletsEntryPB>(entry.data()));
1748
1749
0
  ExternalTableSnapshotData& table_data = (*table_map)[meta.table_id()];
1750
0
  ++table_data.num_tablets;
1751
0
  if (meta.has_partition()) {
1752
0
    table_data.partitions.push_back(meta.partition());
1753
0
  }
1754
0
  return Status::OK();
1755
0
}
1756
1757
Status CatalogManager::ImportTabletEntry(const SysRowEntry& entry,
1758
0
                                         ExternalTableSnapshotDataMap* table_map) {
1759
0
  LOG_IF(DFATAL, entry.type() != SysRowEntryType::TABLET)
1760
0
      << "Unexpected entry type: " << entry.type();
1761
1762
0
  SysTabletsEntryPB meta = VERIFY_RESULT(ParseFromSlice<SysTabletsEntryPB>(entry.data()));
1763
1764
0
  LOG_IF(DFATAL, table_map->find(meta.table_id()) == table_map->end())
1765
0
      << "Table not found: " << meta.table_id();
1766
0
  ExternalTableSnapshotData& table_data = (*table_map)[meta.table_id()];
1767
1768
0
  if (meta.colocated() && table_data.tablet_id_map->size() >= 1) {
1769
0
    LOG_WITH_FUNC(INFO) << "Already processed this colocated tablet: " << entry.id();
1770
0
    return Status::OK();
1771
0
  }
1772
1773
  // Update tablets IDs map.
1774
0
  if (table_data.new_table_id == table_data.old_table_id) {
1775
0
    TRACE("Looking up tablet");
1776
0
    SharedLock lock(mutex_);
1777
0
    scoped_refptr<TabletInfo> tablet = FindPtrOrNull(*tablet_map_, entry.id());
1778
1779
0
    if (tablet != nullptr) {
1780
0
      IdPairPB* const pair = table_data.tablet_id_map->Add();
1781
0
      pair->set_old_id(entry.id());
1782
0
      pair->set_new_id(entry.id());
1783
0
      return Status::OK();
1784
0
    }
1785
0
  }
1786
1787
0
  const PartitionPB& partition_pb = meta.partition();
1788
0
  const ExternalTableSnapshotData::PartitionKeys key(
1789
0
      partition_pb.partition_key_start(), partition_pb.partition_key_end());
1790
0
  const ExternalTableSnapshotData::PartitionToIdMap::const_iterator it =
1791
0
      table_data.new_tablets_map.find(key);
1792
1793
0
  if (it == table_data.new_tablets_map.end()) {
1794
0
    const string msg = Format(
1795
0
        "For new table $0 (old table $1, expecting $2 tablets) not found new tablet with "
1796
0
        "expected [$3]", table_data.new_table_id, table_data.old_table_id,
1797
0
        table_data.num_tablets, partition_pb);
1798
0
    LOG_WITH_FUNC(WARNING) << msg;
1799
0
    return STATUS(NotFound, msg, MasterError(MasterErrorPB::INTERNAL_ERROR));
1800
0
  }
1801
1802
0
  IdPairPB* const pair = table_data.tablet_id_map->Add();
1803
0
  pair->set_old_id(entry.id());
1804
0
  pair->set_new_id(it->second);
1805
0
  return Status::OK();
1806
0
}
1807
1808
124
const Schema& CatalogManager::schema() {
1809
124
  return sys_catalog()->schema();
1810
124
}
1811
1812
0
TabletInfos CatalogManager::GetTabletInfos(const std::vector<TabletId>& ids) {
1813
0
  TabletInfos result;
1814
0
  result.reserve(ids.size());
1815
0
  SharedLock lock(mutex_);
1816
0
  for (const auto& id : ids) {
1817
0
    auto it = tablet_map_->find(id);
1818
0
    result.push_back(it != tablet_map_->end() ? it->second : nullptr);
1819
0
  }
1820
0
  return result;
1821
0
}
1822
1823
AsyncTabletSnapshotOpPtr CatalogManager::CreateAsyncTabletSnapshotOp(
1824
    const TabletInfoPtr& tablet, const std::string& snapshot_id,
1825
    tserver::TabletSnapshotOpRequestPB::Operation operation,
1826
0
    TabletSnapshotOperationCallback callback) {
1827
0
  auto result = std::make_shared<AsyncTabletSnapshotOp>(
1828
0
      master_, AsyncTaskPool(), tablet, snapshot_id, operation);
1829
0
  result->SetCallback(std::move(callback));
1830
0
  tablet->table()->AddTask(result);
1831
0
  return result;
1832
0
}
1833
1834
0
void CatalogManager::ScheduleTabletSnapshotOp(const AsyncTabletSnapshotOpPtr& task) {
1835
0
  WARN_NOT_OK(ScheduleTask(task), "Failed to send create snapshot request");
1836
0
}
1837
1838
Status CatalogManager::RestoreSysCatalog(
1839
0
    SnapshotScheduleRestoration* restoration, tablet::Tablet* tablet, Status* complete_status) {
1840
0
  VLOG_WITH_PREFIX_AND_FUNC(1) << restoration->restoration_id;
1841
  // Restore master snapshot and load it to RocksDB.
1842
0
  auto dir = VERIFY_RESULT(tablet->snapshots().RestoreToTemporary(
1843
0
      restoration->snapshot_id, restoration->restore_at));
1844
0
  rocksdb::Options rocksdb_options;
1845
0
  std::string log_prefix = LogPrefix();
1846
  // Remove ": " to patch suffix.
1847
0
  log_prefix.erase(log_prefix.size() - 2);
1848
0
  tablet->InitRocksDBOptions(&rocksdb_options, log_prefix + " [TMP]: ");
1849
0
  auto db = VERIFY_RESULT(rocksdb::DB::Open(rocksdb_options, dir));
1850
1851
0
  auto doc_db = docdb::DocDB::FromRegularUnbounded(db.get());
1852
1853
  // Load objects to restore and determine obsolete objects.
1854
0
  RestoreSysCatalogState state(restoration);
1855
0
  RETURN_NOT_OK(state.LoadRestoringObjects(schema(), doc_db));
1856
  // Load existing objects from RocksDB because on followers they are NOT present in loaded sys
1857
  // catalog state.
1858
0
  RETURN_NOT_OK(state.LoadExistingObjects(schema(), tablet->doc_db()));
1859
0
  RETURN_NOT_OK(state.Process());
1860
1861
0
  docdb::DocWriteBatch write_batch(
1862
0
      tablet->doc_db(), docdb::InitMarkerBehavior::kOptional);
1863
1864
0
  if (FLAGS_enable_ysql) {
1865
    // Restore the pg_catalog tables.
1866
0
    const auto* meta = tablet->metadata();
1867
0
    const auto& pg_yb_catalog_version_schema =
1868
0
        *VERIFY_RESULT(meta->GetTableInfo(kPgYbCatalogVersionTableId))->schema;
1869
0
    RETURN_NOT_OK(state.ProcessPgCatalogRestores(
1870
0
        pg_yb_catalog_version_schema, doc_db, tablet->doc_db(), &write_batch));
1871
0
  }
1872
1873
  // Restore the other tables.
1874
0
  RETURN_NOT_OK(state.PrepareWriteBatch(schema(), &write_batch));
1875
1876
  // Apply write batch to RocksDB.
1877
0
  state.WriteToRocksDB(&write_batch, restoration->write_time, restoration->op_id, tablet);
1878
1879
  // TODO(pitr) Handle master leader failover.
1880
0
  RETURN_NOT_OK(ElectedAsLeaderCb());
1881
1882
0
  return Status::OK();
1883
0
}
1884
1885
0
Status CatalogManager::VerifyRestoredObjects(const SnapshotScheduleRestoration& restoration) {
1886
0
  auto entries = VERIFY_RESULT(CollectEntriesForSnapshot(
1887
0
      restoration.schedules[0].second.tables().tables()));
1888
0
  auto objects_to_restore = restoration.non_system_objects_to_restore;
1889
0
  VLOG_WITH_PREFIX(1) << "Objects to restore: " << AsString(objects_to_restore);
1890
0
  for (const auto& entry : entries.entries()) {
1891
0
    VLOG_WITH_PREFIX(1)
1892
0
        << "Alive " << SysRowEntryType_Name(entry.type()) << ": " << entry.id();
1893
0
    auto it = objects_to_restore.find(entry.id());
1894
0
    if (it == objects_to_restore.end()) {
1895
0
      return STATUS_FORMAT(IllegalState, "Object $0/$1 present, but should not be restored",
1896
0
                           SysRowEntryType_Name(entry.type()), entry.id());
1897
0
    }
1898
0
    if (it->second != entry.type()) {
1899
0
      return STATUS_FORMAT(
1900
0
          IllegalState, "Restored object $0 has wrong type $1, while $2 expected",
1901
0
          entry.id(), SysRowEntryType_Name(entry.type()), SysRowEntryType_Name(it->second));
1902
0
    }
1903
0
    objects_to_restore.erase(it);
1904
0
  }
1905
0
  for (const auto& id_and_type : objects_to_restore) {
1906
0
    return STATUS_FORMAT(
1907
0
        IllegalState, "Expected to restore $0/$1, but it is not present after restoration",
1908
0
        SysRowEntryType_Name(id_and_type.second), id_and_type.first);
1909
0
  }
1910
0
  return Status::OK();
1911
0
}
1912
1913
11.1k
void CatalogManager::CleanupHiddenObjects(const ScheduleMinRestoreTime& schedule_min_restore_time) {
1914
0
  VLOG_WITH_PREFIX_AND_FUNC(4) << AsString(schedule_min_restore_time);
1915
1916
11.1k
  std::vector<TabletInfoPtr> hidden_tablets;
1917
11.1k
  std::vector<TableInfoPtr> tables;
1918
11.1k
  {
1919
11.1k
    SharedLock lock(mutex_);
1920
11.1k
    hidden_tablets = hidden_tablets_;
1921
11.1k
    tables.reserve(table_ids_map_->size());
1922
1.21M
    for (const auto& p : *table_ids_map_) {
1923
1.21M
      if (!p.second->is_system()) {
1924
27.7k
        tables.push_back(p.second);
1925
27.7k
      }
1926
1.21M
    }
1927
11.1k
  }
1928
11.1k
  CleanupHiddenTablets(hidden_tablets, schedule_min_restore_time);
1929
11.1k
  CleanupHiddenTables(std::move(tables));
1930
11.1k
}
1931
1932
void CatalogManager::CleanupHiddenTablets(
1933
    const std::vector<TabletInfoPtr>& hidden_tablets,
1934
11.1k
    const ScheduleMinRestoreTime& schedule_min_restore_time) {
1935
11.1k
  if (hidden_tablets.empty()) {
1936
11.1k
    return;
1937
11.1k
  }
1938
0
  std::vector<TabletInfoPtr> tablets_to_delete;
1939
0
  std::vector<TabletInfoPtr> tablets_to_remove_from_hidden;
1940
1941
0
  for (const auto& tablet : hidden_tablets) {
1942
0
    auto lock = tablet->LockForRead();
1943
0
    if (!lock->ListedAsHidden()) {
1944
0
      tablets_to_remove_from_hidden.push_back(tablet);
1945
0
      continue;
1946
0
    }
1947
0
    auto hide_hybrid_time = HybridTime::FromPB(lock->pb.hide_hybrid_time());
1948
0
    bool cleanup = true;
1949
0
    for (const auto& schedule_id_str : lock->pb.retained_by_snapshot_schedules()) {
1950
0
      auto schedule_id = TryFullyDecodeSnapshotScheduleId(schedule_id_str);
1951
0
      auto it = schedule_min_restore_time.find(schedule_id);
1952
      // If schedule is not present in schedule_min_restore_time then it means that schedule
1953
      // was deleted, so it should not retain the tablet.
1954
0
      if (it != schedule_min_restore_time.end() && it->second <= hide_hybrid_time) {
1955
0
        VLOG_WITH_PREFIX(1)
1956
0
            << "Retaining tablet: " << tablet->tablet_id() << ", hide hybrid time: "
1957
0
            << hide_hybrid_time << ", because of schedule: " << schedule_id
1958
0
            << ", min restore time: " << it->second;
1959
0
        cleanup = false;
1960
0
        break;
1961
0
      }
1962
0
    }
1963
0
    if (cleanup) {
1964
0
      SharedLock read_lock(mutex_);
1965
0
      if (IsTableCdcProducer(*tablet->table())) {
1966
        // We also need to check if this tablet is being kept for xcluster replication.
1967
0
        auto l = tablet->table()->LockForRead();
1968
0
        cleanup = hide_hybrid_time.AddSeconds(l->pb.wal_retention_secs()) < master_->clock()->Now();
1969
0
      }
1970
0
    }
1971
0
    if (cleanup) {
1972
0
      tablets_to_delete.push_back(tablet);
1973
0
    }
1974
0
  }
1975
0
  if (!tablets_to_delete.empty()) {
1976
0
    LOG_WITH_PREFIX(INFO) << "Cleanup hidden tablets: " << AsString(tablets_to_delete);
1977
0
    WARN_NOT_OK(DeleteTabletListAndSendRequests(tablets_to_delete, "Cleanup hidden tablets", {}),
1978
0
                "Failed to cleanup hidden tablets");
1979
0
  }
1980
1981
0
  if (!tablets_to_remove_from_hidden.empty()) {
1982
0
    auto it = tablets_to_remove_from_hidden.begin();
1983
0
    LockGuard lock(mutex_);
1984
    // Order of tablets in tablets_to_remove_from_hidden matches order in hidden_tablets_,
1985
    // so we could avoid searching in tablets_to_remove_from_hidden.
1986
0
    auto filter = [&it, end = tablets_to_remove_from_hidden.end()](const TabletInfoPtr& tablet) {
1987
0
      if (it != end && tablet.get() == it->get()) {
1988
0
        ++it;
1989
0
        return true;
1990
0
      }
1991
0
      return false;
1992
0
    };
1993
0
    hidden_tablets_.erase(std::remove_if(hidden_tablets_.begin(), hidden_tablets_.end(), filter),
1994
0
                          hidden_tablets_.end());
1995
0
  }
1996
0
}
1997
1998
11.1k
void CatalogManager::CleanupHiddenTables(std::vector<TableInfoPtr> tables) {
1999
11.1k
  std::vector<TableInfo::WriteLock> locks;
2000
27.7k
  EraseIf([this, &locks](const TableInfoPtr& table) {
2001
27.7k
    {
2002
27.7k
      auto lock = table->LockForRead();
2003
27.7k
      if (!lock->is_hidden() || lock->started_deleting() || !table->AreAllTabletsDeleted()) {
2004
27.7k
        return true;
2005
27.7k
      }
2006
0
    }
2007
0
    auto lock = table->LockForWrite();
2008
0
    if (lock->started_deleting()) {
2009
0
      return true;
2010
0
    }
2011
0
    LOG_WITH_PREFIX(INFO) << "Should delete table: " << AsString(table);
2012
0
    lock.mutable_data()->set_state(
2013
0
        SysTablesEntryPB::DELETED, Format("Cleanup hidden table at $0", LocalTimeAsString()));
2014
0
    locks.push_back(std::move(lock));
2015
0
    return false;
2016
0
  }, &tables);
2017
11.1k
  if (tables.empty()) {
2018
11.1k
    return;
2019
11.1k
  }
2020
2021
0
  Status s = sys_catalog_->Upsert(leader_ready_term(), tables);
2022
0
  if (!s.ok()) {
2023
0
    LOG_WITH_PREFIX(WARNING) << "Failed to mark tables as deleted: " << s;
2024
0
    return;
2025
0
  }
2026
0
  for (auto& lock : locks) {
2027
0
    lock.Commit();
2028
0
  }
2029
0
}
2030
2031
5.35k
rpc::Scheduler& CatalogManager::Scheduler() {
2032
5.35k
  return master_->messenger()->scheduler();
2033
5.35k
}
2034
2035
24.1k
int64_t CatalogManager::LeaderTerm() {
2036
24.1k
  auto peer = tablet_peer();
2037
24.1k
  if (!peer) {
2038
92
    return false;
2039
92
  }
2040
24.0k
  auto consensus = peer->shared_consensus();
2041
24.0k
  if (!consensus) {
2042
2
    return false;
2043
2
  }
2044
24.0k
  return consensus->GetLeaderState(/* allow_stale= */ true).term;
2045
24.0k
}
2046
2047
0
void CatalogManager::HandleCreateTabletSnapshotResponse(TabletInfo *tablet, bool error) {
2048
0
  LOG(INFO) << "Handling Create Tablet Snapshot Response for tablet "
2049
0
            << DCHECK_NOTNULL(tablet)->ToString() << (error ? "  ERROR" : "  OK");
2050
2051
  // Get the snapshot data descriptor from the catalog manager.
2052
0
  scoped_refptr<SnapshotInfo> snapshot;
2053
0
  {
2054
0
    LockGuard lock(mutex_);
2055
0
    TRACE("Acquired catalog manager lock");
2056
2057
0
    if (current_snapshot_id_.empty()) {
2058
0
      LOG(WARNING) << "No active snapshot: " << current_snapshot_id_;
2059
0
      return;
2060
0
    }
2061
2062
0
    snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, current_snapshot_id_);
2063
2064
0
    if (!snapshot) {
2065
0
      LOG(WARNING) << "Snapshot not found: " << current_snapshot_id_;
2066
0
      return;
2067
0
    }
2068
0
  }
2069
2070
0
  if (!snapshot->IsCreateInProgress()) {
2071
0
    LOG(WARNING) << "Snapshot is not in creating state: " << snapshot->id();
2072
0
    return;
2073
0
  }
2074
2075
0
  auto tablet_l = tablet->LockForRead();
2076
0
  auto l = snapshot->LockForWrite();
2077
0
  RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots =
2078
0
      l.mutable_data()->pb.mutable_tablet_snapshots();
2079
0
  int num_tablets_complete = 0;
2080
2081
0
  for (int i = 0; i < tablet_snapshots->size(); ++i) {
2082
0
    SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i);
2083
2084
0
    if (tablet_info->id() == tablet->id()) {
2085
0
      tablet_info->set_state(error ? SysSnapshotEntryPB::FAILED : SysSnapshotEntryPB::COMPLETE);
2086
0
    }
2087
2088
0
    if (tablet_info->state() == SysSnapshotEntryPB::COMPLETE) {
2089
0
      ++num_tablets_complete;
2090
0
    }
2091
0
  }
2092
2093
  // Finish the snapshot.
2094
0
  bool finished = true;
2095
0
  if (error) {
2096
0
    l.mutable_data()->pb.set_state(SysSnapshotEntryPB::FAILED);
2097
0
    LOG(WARNING) << "Failed snapshot " << snapshot->id() << " on tablet " << tablet->id();
2098
0
  } else if (num_tablets_complete == tablet_snapshots->size()) {
2099
0
    l.mutable_data()->pb.set_state(SysSnapshotEntryPB::COMPLETE);
2100
0
    LOG(INFO) << "Completed snapshot " << snapshot->id();
2101
0
  } else {
2102
0
    finished = false;
2103
0
  }
2104
2105
0
  if (finished) {
2106
0
    LockGuard lock(mutex_);
2107
0
    TRACE("Acquired catalog manager lock");
2108
0
    current_snapshot_id_ = "";
2109
0
  }
2110
2111
0
  VLOG(1) << "Snapshot: " << snapshot->id()
2112
0
          << " PB: " << l.mutable_data()->pb.DebugString()
2113
0
          << " Complete " << num_tablets_complete << " tablets from " << tablet_snapshots->size();
2114
2115
0
  const Status s = sys_catalog_->Upsert(leader_ready_term(), snapshot);
2116
2117
0
  l.CommitOrWarn(s, "updating snapshot in sys-catalog");
2118
0
}
2119
2120
0
void CatalogManager::HandleRestoreTabletSnapshotResponse(TabletInfo *tablet, bool error) {
2121
0
  LOG(INFO) << "Handling Restore Tablet Snapshot Response for tablet "
2122
0
            << DCHECK_NOTNULL(tablet)->ToString() << (error ? "  ERROR" : "  OK");
2123
2124
  // Get the snapshot data descriptor from the catalog manager.
2125
0
  scoped_refptr<SnapshotInfo> snapshot;
2126
0
  {
2127
0
    LockGuard lock(mutex_);
2128
0
    TRACE("Acquired catalog manager lock");
2129
2130
0
    if (current_snapshot_id_.empty()) {
2131
0
      LOG(WARNING) << "No restoring snapshot: " << current_snapshot_id_;
2132
0
      return;
2133
0
    }
2134
2135
0
    snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, current_snapshot_id_);
2136
2137
0
    if (!snapshot) {
2138
0
      LOG(WARNING) << "Restoring snapshot not found: " << current_snapshot_id_;
2139
0
      return;
2140
0
    }
2141
0
  }
2142
2143
0
  if (!snapshot->IsRestoreInProgress()) {
2144
0
    LOG(WARNING) << "Snapshot is not in restoring state: " << snapshot->id();
2145
0
    return;
2146
0
  }
2147
2148
0
  auto tablet_l = tablet->LockForRead();
2149
0
  auto l = snapshot->LockForWrite();
2150
0
  RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots =
2151
0
      l.mutable_data()->pb.mutable_tablet_snapshots();
2152
0
  int num_tablets_complete = 0;
2153
2154
0
  for (int i = 0; i < tablet_snapshots->size(); ++i) {
2155
0
    SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i);
2156
2157
0
    if (tablet_info->id() == tablet->id()) {
2158
0
      tablet_info->set_state(error ? SysSnapshotEntryPB::FAILED : SysSnapshotEntryPB::COMPLETE);
2159
0
    }
2160
2161
0
    if (tablet_info->state() == SysSnapshotEntryPB::COMPLETE) {
2162
0
      ++num_tablets_complete;
2163
0
    }
2164
0
  }
2165
2166
  // Finish the snapshot.
2167
0
  if (error || num_tablets_complete == tablet_snapshots->size()) {
2168
0
    if (error) {
2169
0
      l.mutable_data()->pb.set_state(SysSnapshotEntryPB::FAILED);
2170
0
      LOG(WARNING) << "Failed restoring snapshot " << snapshot->id()
2171
0
                   << " on tablet " << tablet->id();
2172
0
    } else {
2173
0
      LOG_IF(DFATAL, num_tablets_complete != tablet_snapshots->size())
2174
0
          << "Wrong number of tablets";
2175
0
      l.mutable_data()->pb.set_state(SysSnapshotEntryPB::COMPLETE);
2176
0
      LOG(INFO) << "Restored snapshot " << snapshot->id();
2177
0
    }
2178
2179
0
    LockGuard lock(mutex_);
2180
0
    TRACE("Acquired catalog manager lock");
2181
0
    current_snapshot_id_ = "";
2182
0
  }
2183
2184
0
  VLOG(1) << "Snapshot: " << snapshot->id()
2185
0
          << " PB: " << l.mutable_data()->pb.DebugString()
2186
0
          << " Complete " << num_tablets_complete << " tablets from " << tablet_snapshots->size();
2187
2188
0
  const Status s = sys_catalog_->Upsert(leader_ready_term(), snapshot);
2189
2190
0
  l.CommitOrWarn(s, "updating snapshot in sys-catalog");
2191
0
}
2192
2193
void CatalogManager::HandleDeleteTabletSnapshotResponse(
2194
0
    const SnapshotId& snapshot_id, TabletInfo *tablet, bool error) {
2195
0
  LOG(INFO) << "Handling Delete Tablet Snapshot Response for tablet "
2196
0
            << DCHECK_NOTNULL(tablet)->ToString() << (error ? "  ERROR" : "  OK");
2197
2198
  // Get the snapshot data descriptor from the catalog manager.
2199
0
  scoped_refptr<SnapshotInfo> snapshot;
2200
0
  {
2201
0
    LockGuard lock(mutex_);
2202
0
    TRACE("Acquired catalog manager lock");
2203
2204
0
    snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, snapshot_id);
2205
2206
0
    if (!snapshot) {
2207
0
      LOG(WARNING) << __func__ << " Snapshot not found: " << snapshot_id;
2208
0
      return;
2209
0
    }
2210
0
  }
2211
2212
0
  if (!snapshot->IsDeleteInProgress()) {
2213
0
    LOG(WARNING) << "Snapshot is not in deleting state: " << snapshot->id();
2214
0
    return;
2215
0
  }
2216
2217
0
  auto tablet_l = tablet->LockForRead();
2218
0
  auto l = snapshot->LockForWrite();
2219
0
  RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots =
2220
0
      l.mutable_data()->pb.mutable_tablet_snapshots();
2221
0
  int num_tablets_complete = 0;
2222
2223
0
  for (int i = 0; i < tablet_snapshots->size(); ++i) {
2224
0
    SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i);
2225
2226
0
    if (tablet_info->id() == tablet->id()) {
2227
0
      tablet_info->set_state(error ? SysSnapshotEntryPB::FAILED : SysSnapshotEntryPB::DELETED);
2228
0
    }
2229
2230
0
    if (tablet_info->state() != SysSnapshotEntryPB::DELETING) {
2231
0
      ++num_tablets_complete;
2232
0
    }
2233
0
  }
2234
2235
0
  if (num_tablets_complete == tablet_snapshots->size()) {
2236
    // Delete the snapshot.
2237
0
    l.mutable_data()->pb.set_state(SysSnapshotEntryPB::DELETED);
2238
0
    LOG(INFO) << "Deleted snapshot " << snapshot->id();
2239
2240
0
    const Status s = sys_catalog_->Delete(leader_ready_term(), snapshot);
2241
2242
0
    LockGuard lock(mutex_);
2243
0
    TRACE("Acquired catalog manager lock");
2244
2245
0
    if (current_snapshot_id_ == snapshot_id) {
2246
0
      current_snapshot_id_ = "";
2247
0
    }
2248
2249
    // Remove it from the maps.
2250
0
    TRACE("Removing from maps");
2251
0
    if (non_txn_snapshot_ids_map_.erase(snapshot_id) < 1) {
2252
0
      LOG(WARNING) << "Could not remove snapshot " << snapshot_id << " from map";
2253
0
    }
2254
2255
0
    l.CommitOrWarn(s, "deleting snapshot from sys-catalog");
2256
0
  } else if (error) {
2257
0
    l.mutable_data()->pb.set_state(SysSnapshotEntryPB::FAILED);
2258
0
    LOG(WARNING) << "Failed snapshot " << snapshot->id() << " deletion on tablet " << tablet->id();
2259
2260
0
    const Status s = sys_catalog_->Upsert(leader_ready_term(), snapshot);
2261
0
    l.CommitOrWarn(s, "updating snapshot in sys-catalog");
2262
0
  }
2263
2264
0
  VLOG(1) << "Deleting snapshot: " << snapshot->id()
2265
0
          << " PB: " << l.mutable_data()->pb.DebugString()
2266
0
          << " Complete " << num_tablets_complete << " tablets from " << tablet_snapshots->size();
2267
0
}
2268
2269
Status CatalogManager::CreateSnapshotSchedule(const CreateSnapshotScheduleRequestPB* req,
2270
                                              CreateSnapshotScheduleResponsePB* resp,
2271
0
                                              rpc::RpcContext* rpc) {
2272
0
  auto id = VERIFY_RESULT(snapshot_coordinator_.CreateSchedule(
2273
0
      *req, leader_ready_term(), rpc->GetClientDeadline()));
2274
0
  resp->set_snapshot_schedule_id(id.data(), id.size());
2275
0
  return Status::OK();
2276
0
}
2277
2278
Status CatalogManager::ListSnapshotSchedules(const ListSnapshotSchedulesRequestPB* req,
2279
                                             ListSnapshotSchedulesResponsePB* resp,
2280
0
                                             rpc::RpcContext* rpc) {
2281
0
  auto snapshot_schedule_id = TryFullyDecodeSnapshotScheduleId(req->snapshot_schedule_id());
2282
2283
0
  return snapshot_coordinator_.ListSnapshotSchedules(snapshot_schedule_id, resp);
2284
0
}
2285
2286
Status CatalogManager::DeleteSnapshotSchedule(const DeleteSnapshotScheduleRequestPB* req,
2287
                                              DeleteSnapshotScheduleResponsePB* resp,
2288
0
                                              rpc::RpcContext* rpc) {
2289
0
  auto snapshot_schedule_id = TryFullyDecodeSnapshotScheduleId(req->snapshot_schedule_id());
2290
2291
0
  return snapshot_coordinator_.DeleteSnapshotSchedule(
2292
0
      snapshot_schedule_id, leader_ready_term(), rpc->GetClientDeadline());
2293
0
}
2294
2295
0
void CatalogManager::DumpState(std::ostream* out, bool on_disk_dump) const {
2296
0
  super::DumpState(out, on_disk_dump);
2297
2298
  // TODO: dump snapshots
2299
0
}
2300
2301
template <typename Registry, typename Mutex>
2302
bool ShouldResendRegistry(
2303
3
    const std::string& ts_uuid, bool has_registration, Registry* registry, Mutex* mutex) {
2304
3
  bool should_resend_registry;
2305
3
  {
2306
3
    std::lock_guard<Mutex> lock(*mutex);
2307
3
    auto it = registry->find(ts_uuid);
2308
3
    should_resend_registry = (it == registry->end() || it->second || has_registration);
2309
3
    if (it == registry->end()) {
2310
3
      registry->emplace(ts_uuid, false);
2311
0
    } else {
2312
0
      it->second = false;
2313
0
    }
2314
3
  }
2315
3
  return should_resend_registry;
2316
3
}
2317
2318
Status CatalogManager::FillHeartbeatResponse(const TSHeartbeatRequestPB* req,
2319
384k
                                             TSHeartbeatResponsePB* resp) {
2320
384k
  SysClusterConfigEntryPB cluster_config;
2321
384k
  RETURN_NOT_OK(GetClusterConfig(&cluster_config));
2322
384k
  RETURN_NOT_OK(FillHeartbeatResponseEncryption(cluster_config, req, resp));
2323
384k
  RETURN_NOT_OK(snapshot_coordinator_.FillHeartbeatResponse(resp));
2324
384k
  return FillHeartbeatResponseCDC(cluster_config, req, resp);
2325
384k
}
2326
2327
Status CatalogManager::FillHeartbeatResponseCDC(const SysClusterConfigEntryPB& cluster_config,
2328
                                                const TSHeartbeatRequestPB* req,
2329
384k
                                                TSHeartbeatResponsePB* resp) {
2330
384k
  resp->set_cluster_config_version(cluster_config.version());
2331
384k
  if (!cluster_config.has_consumer_registry() ||
2332
384k
      req->cluster_config_version() >= cluster_config.version()) {
2333
384k
    return Status::OK();
2334
384k
  }
2335
12
  *resp->mutable_consumer_registry() = cluster_config.consumer_registry();
2336
12
  return Status::OK();
2337
12
}
2338
2339
Status CatalogManager::FillHeartbeatResponseEncryption(
2340
    const SysClusterConfigEntryPB& cluster_config,
2341
    const TSHeartbeatRequestPB* req,
2342
384k
    TSHeartbeatResponsePB* resp) {
2343
384k
  const auto& ts_uuid = req->common().ts_instance().permanent_uuid();
2344
384k
  if (!cluster_config.has_encryption_info() ||
2345
3
      !ShouldResendRegistry(ts_uuid, req->has_registration(), &should_send_universe_key_registry_,
2346
383k
                            &should_send_universe_key_registry_mutex_)) {
2347
383k
    return Status::OK();
2348
383k
  }
2349
2350
142
  const auto& encryption_info = cluster_config.encryption_info();
2351
142
  RETURN_NOT_OK(encryption_manager_->FillHeartbeatResponseEncryption(encryption_info, resp));
2352
2353
142
  return Status::OK();
2354
142
}
2355
2356
void CatalogManager::SetTabletSnapshotsState(SysSnapshotEntryPB::State state,
2357
0
                                             SysSnapshotEntryPB* snapshot_pb) {
2358
0
  RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots =
2359
0
      snapshot_pb->mutable_tablet_snapshots();
2360
2361
0
  for (int i = 0; i < tablet_snapshots->size(); ++i) {
2362
0
    SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i);
2363
0
    tablet_info->set_state(state);
2364
0
  }
2365
0
}
2366
2367
157
Status CatalogManager::CreateCdcStateTableIfNeeded(rpc::RpcContext *rpc) {
2368
  // If CDC state table exists do nothing, otherwise create it.
2369
157
  if (VERIFY_RESULT(TableExists(kSystemNamespaceName, kCdcStateTableName))) {
2370
69
    return Status::OK();
2371
69
  }
2372
  // Set up a CreateTable request internally.
2373
88
  CreateTableRequestPB req;
2374
88
  CreateTableResponsePB resp;
2375
88
  req.set_name(kCdcStateTableName);
2376
88
  req.mutable_namespace_()->set_name(kSystemNamespaceName);
2377
88
  req.set_table_type(TableType::YQL_TABLE_TYPE);
2378
2379
88
  client::YBSchemaBuilder schema_builder;
2380
88
  schema_builder.AddColumn(master::kCdcTabletId)->HashPrimaryKey()->Type(DataType::STRING);
2381
88
  schema_builder.AddColumn(master::kCdcStreamId)->PrimaryKey()->Type(DataType::STRING);
2382
88
  schema_builder.AddColumn(master::kCdcCheckpoint)->Type(DataType::STRING);
2383
88
  schema_builder.AddColumn(master::kCdcData)->Type(QLType::CreateTypeMap(
2384
88
      DataType::STRING, DataType::STRING));
2385
88
  schema_builder.AddColumn(master::kCdcLastReplicationTime)->Type(DataType::TIMESTAMP);
2386
2387
88
  client::YBSchema yb_schema;
2388
88
  CHECK_OK(schema_builder.Build(&yb_schema));
2389
2390
88
  auto schema = yb::client::internal::GetSchema(yb_schema);
2391
88
  SchemaToPB(schema, req.mutable_schema());
2392
  // Explicitly set the number tablets if the corresponding flag is set, otherwise CreateTable
2393
  // will use the same defaults as for regular tables.
2394
88
  if (FLAGS_cdc_state_table_num_tablets > 0) {
2395
4
    req.mutable_schema()->mutable_table_properties()->set_num_tablets(
2396
4
        FLAGS_cdc_state_table_num_tablets);
2397
4
  }
2398
2399
88
  Status s = CreateTable(&req, &resp, rpc);
2400
  // We do not lock here so it is technically possible that the table was already created.
2401
  // If so, there is nothing to do so we just ignore the "AlreadyPresent" error.
2402
88
  if (!s.ok() && !s.IsAlreadyPresent()) {
2403
0
    return s;
2404
0
  }
2405
88
  return Status::OK();
2406
88
}
2407
2408
0
Status CatalogManager::IsCdcStateTableCreated(IsCreateTableDoneResponsePB* resp) {
2409
0
  IsCreateTableDoneRequestPB req;
2410
2411
0
  req.mutable_table()->set_table_name(kCdcStateTableName);
2412
0
  req.mutable_table()->mutable_namespace_()->set_name(kSystemNamespaceName);
2413
2414
0
  return IsCreateTableDone(&req, resp);
2415
0
}
2416
2417
// Helper class to print a vector of CDCStreamInfo pointers.
2418
namespace {
2419
2420
template<class CDCStreamInfoPointer>
2421
4
std::string JoinStreamsCSVLine(std::vector<CDCStreamInfoPointer> cdc_streams) {
2422
4
  std::vector<CDCStreamId> cdc_stream_ids;
2423
4
  for (const auto& cdc_stream : cdc_streams) {
2424
4
    cdc_stream_ids.push_back(cdc_stream->id());
2425
4
  }
2426
4
  return JoinCSVLine(cdc_stream_ids);
2427
4
}
catalog_manager_ent.cc:_ZN2yb6master10enterprise12_GLOBAL__N_118JoinStreamsCSVLineI13scoped_refptrINS0_13CDCStreamInfoEEEENSt3__112basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEENS7_6vectorIT_NSB_ISF_EEEE
Line
Count
Source
2421
2
std::string JoinStreamsCSVLine(std::vector<CDCStreamInfoPointer> cdc_streams) {
2422
2
  std::vector<CDCStreamId> cdc_stream_ids;
2423
2
  for (const auto& cdc_stream : cdc_streams) {
2424
2
    cdc_stream_ids.push_back(cdc_stream->id());
2425
2
  }
2426
2
  return JoinCSVLine(cdc_stream_ids);
2427
2
}
catalog_manager_ent.cc:_ZN2yb6master10enterprise12_GLOBAL__N_118JoinStreamsCSVLineIPNS0_13CDCStreamInfoEEENSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEENS6_6vectorIT_NSA_ISE_EEEE
Line
Count
Source
2421
2
std::string JoinStreamsCSVLine(std::vector<CDCStreamInfoPointer> cdc_streams) {
2422
2
  std::vector<CDCStreamId> cdc_stream_ids;
2423
2
  for (const auto& cdc_stream : cdc_streams) {
2424
2
    cdc_stream_ids.push_back(cdc_stream->id());
2425
2
  }
2426
2
  return JoinCSVLine(cdc_stream_ids);
2427
2
}
2428
2429
} // namespace
2430
2431
2.46k
Status CatalogManager::DeleteCDCStreamsForTable(const TableId& table_id) {
2432
2.46k
  return DeleteCDCStreamsForTables({table_id});
2433
2.46k
}
2434
2435
2.50k
Status CatalogManager::DeleteCDCStreamsForTables(const vector<TableId>& table_ids) {
2436
2.50k
  std::ostringstream tid_stream;
2437
4.76k
  for (const auto& tid : table_ids) {
2438
4.76k
    tid_stream << " " << tid;
2439
4.76k
  }
2440
2.50k
  LOG(INFO) << "Deleting CDC streams for tables:" << tid_stream.str();
2441
2442
2.50k
  std::vector<scoped_refptr<CDCStreamInfo>> streams;
2443
4.76k
  for (const auto& tid : table_ids) {
2444
4.76k
    auto newstreams = FindCDCStreamsForTable(tid);
2445
4.76k
    streams.insert(streams.end(), newstreams.begin(), newstreams.end());
2446
4.76k
  }
2447
2448
2.50k
  if (streams.empty()) {
2449
2.50k
    return Status::OK();
2450
2.50k
  }
2451
2452
  // Do not delete them here, just mark them as DELETING and the catalog manager background thread
2453
  // will handle the deletion.
2454
0
  return MarkCDCStreamsAsDeleting(streams);
2455
0
}
2456
2457
std::vector<scoped_refptr<CDCStreamInfo>> CatalogManager::FindCDCStreamsForTable(
2458
4.80k
    const TableId& table_id) const {
2459
4.80k
  std::vector<scoped_refptr<CDCStreamInfo>> streams;
2460
4.80k
  SharedLock lock(mutex_);
2461
2462
0
  for (const auto& entry : cdc_stream_map_) {
2463
0
    auto ltm = entry.second->LockForRead();
2464
    // for xCluster the first entry will be the table_id
2465
0
    if (ltm->table_id().Get(0) == table_id && !ltm->started_deleting()) {
2466
0
      streams.push_back(entry.second);
2467
0
    }
2468
0
  }
2469
4.80k
  return streams;
2470
4.80k
}
2471
2472
0
void CatalogManager::GetAllCDCStreams(std::vector<scoped_refptr<CDCStreamInfo>>* streams) {
2473
0
  streams->clear();
2474
0
  SharedLock lock(mutex_);
2475
0
  streams->reserve(cdc_stream_map_.size());
2476
0
  for (const CDCStreamInfoMap::value_type& e : cdc_stream_map_) {
2477
0
    if (!e.second->LockForRead()->is_deleting()) {
2478
0
      streams->push_back(e.second);
2479
0
    }
2480
0
  }
2481
0
}
2482
2483
Status CatalogManager::CreateCDCStream(const CreateCDCStreamRequestPB* req,
2484
                                       CreateCDCStreamResponsePB* resp,
2485
2.69k
                                       rpc::RpcContext* rpc) {
2486
2.69k
  LOG(INFO) << "CreateCDCStream from " << RequestorString(rpc) << ": " << req->ShortDebugString();
2487
2488
2.69k
  std::string id_type_option_value(cdc::kTableId);
2489
2490
10.9k
  for (auto option : req->options()) {
2491
10.9k
    if (option.key() == cdc::kIdType) {
2492
153
      id_type_option_value = option.value();
2493
153
    }
2494
10.9k
  }
2495
2496
2497
2.69k
  if (id_type_option_value != cdc::kNamespaceId) {
2498
2.54k
    scoped_refptr<TableInfo> table = VERIFY_RESULT(FindTableById(req->table_id()));
2499
2.54k
    {
2500
2.54k
      auto l = table->LockForRead();
2501
2.54k
      if (l->started_deleting()) {
2502
0
        return STATUS(
2503
0
            NotFound, "Table does not exist", req->table_id(),
2504
0
            MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
2505
0
      }
2506
2.54k
    }
2507
2508
2.54k
    AlterTableRequestPB alter_table_req;
2509
2.54k
    alter_table_req.mutable_table()->set_table_id(req->table_id());
2510
2.54k
    alter_table_req.set_wal_retention_secs(FLAGS_cdc_wal_retention_time_secs);
2511
2.54k
    AlterTableResponsePB alter_table_resp;
2512
2.54k
    Status s = this->AlterTable(&alter_table_req, &alter_table_resp, rpc);
2513
2.54k
    if (!s.ok()) {
2514
0
      return STATUS(InternalError,
2515
0
                    "Unable to change the WAL retention time for table", req->table_id(),
2516
0
                    MasterError(MasterErrorPB::INTERNAL_ERROR));
2517
0
    }
2518
2.69k
  }
2519
2520
2.69k
  scoped_refptr<CDCStreamInfo> stream;
2521
2.69k
  if (!req->has_db_stream_id()) {
2522
157
    {
2523
157
      TRACE("Acquired catalog manager lock");
2524
157
      LockGuard lock(mutex_);
2525
      // Construct the CDC stream if the producer wasn't bootstrapped.
2526
157
      CDCStreamId stream_id;
2527
157
      stream_id = GenerateIdUnlocked(SysRowEntryType::CDC_STREAM);
2528
2529
157
      stream = make_scoped_refptr<CDCStreamInfo>(stream_id);
2530
157
      stream->mutable_metadata()->StartMutation();
2531
157
      SysCDCStreamEntryPB* metadata = &stream->mutable_metadata()->mutable_dirty()->pb;
2532
157
      bool create_namespace = id_type_option_value == cdc::kNamespaceId;
2533
157
      if (create_namespace) {
2534
153
        metadata->set_namespace_id(req->table_id());
2535
4
      } else {
2536
4
        metadata->add_table_id(req->table_id());
2537
4
      }
2538
157
      metadata->mutable_options()->CopyFrom(req->options());
2539
157
      metadata->set_state(
2540
153
          req->has_initial_state() ? req->initial_state() : SysCDCStreamEntryPB::ACTIVE);
2541
2542
      // Add the stream to the in-memory map.
2543
157
      cdc_stream_map_[stream->id()] = stream;
2544
157
      if (!create_namespace) {
2545
4
        cdc_stream_tables_count_map_[req->table_id()]++;
2546
4
      }
2547
157
      resp->set_stream_id(stream->id());
2548
157
    }
2549
157
    TRACE("Inserted new CDC stream into CatalogManager maps");
2550
2551
    // Update the on-disk system catalog.
2552
157
    RETURN_NOT_OK(CheckLeaderStatusAndSetupError(
2553
157
        sys_catalog_->Upsert(leader_ready_term(), stream),
2554
157
        "inserting CDC stream into sys-catalog", resp));
2555
157
    TRACE("Wrote CDC stream to sys-catalog");
2556
2557
    // Commit the in-memory state.
2558
157
    stream->mutable_metadata()->CommitMutation();
2559
157
    LOG(INFO) << "Created CDC stream " << stream->ToString();
2560
2561
157
    RETURN_NOT_OK(CreateCdcStateTableIfNeeded(rpc));
2562
157
    if (!PREDICT_FALSE(FLAGS_TEST_disable_cdc_state_insert_on_setup) &&
2563
153
        (!req->has_initial_state() ||
2564
153
         (req->initial_state() == master::SysCDCStreamEntryPB::ACTIVE)) &&
2565
153
        (id_type_option_value != cdc::kNamespaceId)) {
2566
      // Create the cdc state entries for the tablets in this table from scratch since we have no
2567
      // data to bootstrap. If we data to bootstrap, let the BootstrapProducer logic take care of
2568
      // populating entries in cdc_state.
2569
0
      auto ybclient = master_->cdc_state_client_initializer().client();
2570
0
      if (!ybclient) {
2571
0
        return STATUS(IllegalState, "Client not initialized or shutting down");
2572
0
      }
2573
0
      client::TableHandle cdc_table;
2574
0
      const client::YBTableName cdc_state_table_name(
2575
0
          YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
2576
0
      RETURN_NOT_OK(ybclient->WaitForCreateTableToFinish(cdc_state_table_name));
2577
0
      RETURN_NOT_OK(cdc_table.Open(cdc_state_table_name, ybclient));
2578
0
      std::shared_ptr<client::YBSession> session = ybclient->NewSession();
2579
0
      scoped_refptr<TableInfo> table = VERIFY_RESULT(FindTableById(req->table_id()));
2580
0
      auto tablets = table->GetTablets();
2581
0
      for (const auto& tablet : tablets) {
2582
0
        const auto op = cdc_table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
2583
0
        auto* const req = op->mutable_request();
2584
0
        QLAddStringHashValue(req, tablet->id());
2585
0
        QLAddStringRangeValue(req, stream->id());
2586
0
        cdc_table.AddStringColumnValue(req, master::kCdcCheckpoint, OpId().ToString());
2587
0
        cdc_table.AddTimestampColumnValue(
2588
0
            req, master::kCdcLastReplicationTime, GetCurrentTimeMicros());
2589
0
        session->Apply(op);
2590
0
      }
2591
0
      RETURN_NOT_OK(session->Flush());
2592
0
    }
2593
2.53k
  } else {
2594
    // Update and add table_id.
2595
2.53k
    {
2596
2.53k
      SharedLock lock(mutex_);
2597
2.53k
      stream = FindPtrOrNull(cdc_stream_map_, req->db_stream_id());
2598
2.53k
    }
2599
2600
2.53k
    if (stream == nullptr) {
2601
0
      return STATUS(
2602
0
          NotFound, "Could not find CDC stream", req->ShortDebugString(),
2603
0
          MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
2604
0
    }
2605
2606
2.53k
    auto stream_lock = stream->LockForWrite();
2607
2.53k
    if (stream_lock->is_deleting()) {
2608
0
      return STATUS(
2609
0
          NotFound, "CDC stream has been deleted", req->ShortDebugString(),
2610
0
          MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
2611
0
    }
2612
2.53k
    stream_lock.mutable_data()->pb.add_table_id(req->table_id());
2613
2614
    // Also need to persist changes in sys catalog.
2615
2.53k
    RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), stream));
2616
2.53k
    stream_lock.Commit();
2617
2.53k
  }
2618
2.69k
  return Status::OK();
2619
2.69k
}
2620
2621
Status CatalogManager::DeleteCDCStream(const DeleteCDCStreamRequestPB* req,
2622
                                       DeleteCDCStreamResponsePB* resp,
2623
2
                                       rpc::RpcContext* rpc) {
2624
2
  LOG(INFO) << "Servicing DeleteCDCStream request from " << RequestorString(rpc)
2625
2
            << ": " << req->ShortDebugString();
2626
2627
2
  if (req->stream_id_size() < 1) {
2628
0
    return STATUS(InvalidArgument, "No CDC Stream ID given", req->ShortDebugString(),
2629
0
                  MasterError(MasterErrorPB::INVALID_REQUEST));
2630
0
  }
2631
2632
2
  std::vector<scoped_refptr<CDCStreamInfo>> streams;
2633
2
  {
2634
2
    SharedLock lock(mutex_);
2635
2
    for (const auto& stream_id : req->stream_id()) {
2636
2
      auto stream = FindPtrOrNull(cdc_stream_map_, stream_id);
2637
2638
2
      if (stream == nullptr || stream->LockForRead()->is_deleting()) {
2639
0
        resp->add_not_found_stream_ids(stream_id);
2640
0
        LOG(WARNING) << "CDC stream does not exist: " << stream_id;
2641
2
      } else {
2642
2
        auto ltm = stream->LockForRead();
2643
2
        bool active = (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE);
2644
2
        bool is_WAL = false;
2645
5
        for (const auto& option : ltm->pb.options()) {
2646
5
          if (option.key() == "record_format" && option.value() == "WAL") {
2647
0
            is_WAL = true;
2648
0
          }
2649
5
        }
2650
2
        if (!req->force_delete() && is_WAL && active) {
2651
0
          return STATUS(NotSupported,
2652
0
                        "Cannot delete an xCluster Stream in replication. "
2653
0
                        "Use 'force_delete' to override",
2654
0
                        req->ShortDebugString(),
2655
0
                        MasterError(MasterErrorPB::INVALID_REQUEST));
2656
0
        }
2657
2
        streams.push_back(stream);
2658
2
      }
2659
2
    }
2660
2
  }
2661
2662
2
  if (!resp->not_found_stream_ids().empty() && !req->ignore_errors()) {
2663
0
    string missing_streams = JoinElementsIterator(resp->not_found_stream_ids().begin(),
2664
0
                                                  resp->not_found_stream_ids().end(),
2665
0
                                                  ",");
2666
0
    return STATUS(
2667
0
        NotFound,
2668
0
        Substitute("Did not find all requested CDC streams. Missing streams: [$0]. Request: $1",
2669
0
                   missing_streams, req->ShortDebugString()),
2670
0
        MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
2671
0
  }
2672
2673
  // Do not delete them here, just mark them as DELETING and the catalog manager background thread
2674
  // will handle the deletion.
2675
2
  Status s = MarkCDCStreamsAsDeleting(streams);
2676
2
  if (!s.ok()) {
2677
0
    if (s.IsIllegalState()) {
2678
0
      PANIC_RPC(rpc, s.message().ToString());
2679
0
    }
2680
0
    return CheckIfNoLongerLeaderAndSetupError(s, resp);
2681
0
  }
2682
2683
2
  LOG(INFO) << "Successfully deleted CDC streams " << JoinStreamsCSVLine(streams)
2684
2
            << " per request from " << RequestorString(rpc);
2685
2686
2
  return Status::OK();
2687
2
}
2688
2689
Status CatalogManager::MarkCDCStreamsAsDeleting(
2690
2
    const std::vector<scoped_refptr<CDCStreamInfo>>& streams) {
2691
2
  if (streams.empty()) {
2692
0
    return Status::OK();
2693
0
  }
2694
2
  std::vector<CDCStreamInfo::WriteLock> locks;
2695
2
  std::vector<CDCStreamInfo*> streams_to_mark;
2696
2
  locks.reserve(streams.size());
2697
2
  for (auto& stream : streams) {
2698
2
    auto l = stream->LockForWrite();
2699
2
    l.mutable_data()->pb.set_state(SysCDCStreamEntryPB::DELETING);
2700
2
    locks.push_back(std::move(l));
2701
2
    streams_to_mark.push_back(stream.get());
2702
2
  }
2703
  // The mutation will be aborted when 'l' exits the scope on early return.
2704
2
  RETURN_NOT_OK(CheckStatus(
2705
2
      sys_catalog_->Upsert(leader_ready_term(), streams_to_mark),
2706
2
      "updating CDC streams in sys-catalog"));
2707
2
  LOG(INFO) << "Successfully marked streams " << JoinStreamsCSVLine(streams_to_mark)
2708
2
            << " as DELETING in sys catalog";
2709
2
  for (auto& lock : locks) {
2710
2
    lock.Commit();
2711
2
  }
2712
2
  return Status::OK();
2713
2
}
2714
2715
Status CatalogManager::FindCDCStreamsMarkedAsDeleting(
2716
90.0k
    std::vector<scoped_refptr<CDCStreamInfo>>* streams) {
2717
90.0k
  TRACE("Acquired catalog manager lock");
2718
90.0k
  SharedLock lock(mutex_);
2719
558
  for (const CDCStreamInfoMap::value_type& entry : cdc_stream_map_) {
2720
558
    auto ltm = entry.second->LockForRead();
2721
558
    if (ltm->is_deleting()) {
2722
0
      LOG(INFO) << "Stream " << entry.second->id() << " was marked as DELETING";
2723
0
      streams->push_back(entry.second);
2724
0
    }
2725
558
  }
2726
90.0k
  return Status::OK();
2727
90.0k
}
2728
2729
Status CatalogManager::CleanUpDeletedCDCStreams(
2730
0
    const std::vector<scoped_refptr<CDCStreamInfo>>& streams) {
2731
0
  auto ybclient = master_->cdc_state_client_initializer().client();
2732
0
  if (!ybclient) {
2733
0
    return STATUS(IllegalState, "Client not initialized or shutting down");
2734
0
  }
2735
2736
  // First. For each deleted stream, delete the cdc state rows.
2737
  // Delete all the entries in cdc_state table that contain all the deleted cdc streams.
2738
0
  client::TableHandle cdc_table;
2739
0
  const client::YBTableName cdc_state_table_name(
2740
0
      YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
2741
0
  Status s = cdc_table.Open(cdc_state_table_name, ybclient);
2742
0
  if (!s.ok()) {
2743
0
    LOG(WARNING) << "Unable to open table " << master::kCdcStateTableName
2744
0
                 << " to delete stream ids entries: " << s;
2745
0
    return s.CloneAndPrepend("Unable to open cdc_state table");
2746
0
  }
2747
2748
0
  std::shared_ptr<client::YBSession> session = ybclient->NewSession();
2749
0
  std::vector<std::pair<CDCStreamId, std::shared_ptr<client::YBqlWriteOp>>> stream_ops;
2750
0
  std::set<CDCStreamId> failed_streams;
2751
0
  for (const auto& stream : streams) {
2752
0
    LOG(INFO) << "Deleting rows for stream " << stream->id();
2753
0
    for (const auto& table_id : stream->table_id()) {
2754
0
      TabletInfos tablets;
2755
0
      scoped_refptr<TableInfo> table;
2756
0
      {
2757
0
        TRACE("Acquired catalog manager lock");
2758
0
        SharedLock lock(mutex_);
2759
0
        table = FindPtrOrNull(*table_ids_map_, table_id);
2760
0
      }
2761
      // GetTablets locks lock_ in shared mode.
2762
0
      if (table) {
2763
0
        tablets = table->GetTablets();
2764
0
      }
2765
2766
0
      for (const auto& tablet : tablets) {
2767
0
        const auto delete_op = cdc_table.NewDeleteOp();
2768
0
        auto* delete_req = delete_op->mutable_request();
2769
2770
0
        QLAddStringHashValue(delete_req, tablet->tablet_id());
2771
0
        QLAddStringRangeValue(delete_req, stream->id());
2772
0
        session->Apply(delete_op);
2773
0
        stream_ops.push_back(std::make_pair(stream->id(), delete_op));
2774
0
        LOG(INFO) << "Deleting stream " << stream->id() << " for tablet " << tablet->tablet_id()
2775
0
                  << " with request " << delete_req->ShortDebugString();
2776
0
      }
2777
0
    }
2778
0
  }
2779
  // Flush all the delete operations.
2780
0
  s = session->Flush();
2781
0
  if (!s.ok()) {
2782
0
    LOG(ERROR) << "Unable to flush operations to delete cdc streams: " << s;
2783
0
    return s.CloneAndPrepend("Error deleting cdc stream rows from cdc_state table");
2784
0
  }
2785
2786
0
  for (const auto& e : stream_ops) {
2787
0
    if (!e.second->succeeded()) {
2788
0
      LOG(WARNING) << "Error deleting cdc_state row with tablet id "
2789
0
                   << e.second->request().hashed_column_values(0).value().string_value()
2790
0
                   << " and stream id "
2791
0
                   << e.second->request().range_column_values(0).value().string_value()
2792
0
                   << ": " << e.second->response().status();
2793
0
      failed_streams.insert(e.first);
2794
0
    }
2795
0
  }
2796
2797
  // TODO: Read cdc_state table and verify that there are not rows with the specified cdc stream
2798
  // and keep those in the map in the DELETED state to retry later.
2799
2800
0
  std::vector<CDCStreamInfo::WriteLock> locks;
2801
0
  locks.reserve(streams.size() - failed_streams.size());
2802
0
  std::vector<CDCStreamInfo*> streams_to_delete;
2803
0
  streams_to_delete.reserve(streams.size() - failed_streams.size());
2804
2805
  // Delete from sys catalog only those streams that were successfully delete from cdc_state.
2806
0
  for (auto& stream : streams) {
2807
0
    if (failed_streams.find(stream->id()) == failed_streams.end()) {
2808
0
      locks.push_back(stream->LockForWrite());
2809
0
      streams_to_delete.push_back(stream.get());
2810
0
    }
2811
0
  }
2812
2813
  // The mutation will be aborted when 'l' exits the scope on early return.
2814
0
  RETURN_NOT_OK(CheckStatus(
2815
0
      sys_catalog_->Delete(leader_ready_term(), streams_to_delete),
2816
0
      "deleting CDC streams from sys-catalog"));
2817
0
  LOG(INFO) << "Successfully deleted streams " << JoinStreamsCSVLine(streams_to_delete)
2818
0
            << " from sys catalog";
2819
2820
  // Remove it from the map.
2821
0
  TRACE("Removing from CDC stream maps");
2822
0
  {
2823
0
    LockGuard lock(mutex_);
2824
0
    for (const auto& stream : streams_to_delete) {
2825
0
      if (cdc_stream_map_.erase(stream->id()) < 1) {
2826
0
        return STATUS(IllegalState, "Could not remove CDC stream from map", stream->id());
2827
0
      }
2828
0
      for (auto& id : stream->table_id()) {
2829
0
        cdc_stream_tables_count_map_[id]--;
2830
0
      }
2831
0
    }
2832
0
  }
2833
0
  LOG(INFO) << "Successfully deleted streams " << JoinStreamsCSVLine(streams_to_delete)
2834
0
            << " from stream map";
2835
2836
0
  for (auto& lock : locks) {
2837
0
    lock.Commit();
2838
0
  }
2839
0
  return Status::OK();
2840
0
}
2841
2842
Status CatalogManager::GetCDCStream(const GetCDCStreamRequestPB* req,
2843
                                    GetCDCStreamResponsePB* resp,
2844
6
                                    rpc::RpcContext* rpc) {
2845
6
  LOG(INFO) << "GetCDCStream from " << RequestorString(rpc)
2846
6
            << ": " << req->DebugString();
2847
2848
6
  if (!req->has_stream_id()) {
2849
0
    return STATUS(InvalidArgument, "CDC Stream ID must be provided", req->ShortDebugString(),
2850
0
                  MasterError(MasterErrorPB::INVALID_REQUEST));
2851
0
  }
2852
2853
6
  scoped_refptr<CDCStreamInfo> stream;
2854
6
  {
2855
6
    SharedLock lock(mutex_);
2856
6
    stream = FindPtrOrNull(cdc_stream_map_, req->stream_id());
2857
6
  }
2858
2859
6
  if (stream == nullptr || stream->LockForRead()->is_deleting()) {
2860
2
    return STATUS(NotFound, "Could not find CDC stream", req->ShortDebugString(),
2861
2
                  MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
2862
2
  }
2863
2864
4
  auto stream_lock = stream->LockForRead();
2865
2866
4
  CDCStreamInfoPB* stream_info = resp->mutable_stream();
2867
2868
4
  stream_info->set_stream_id(stream->id());
2869
4
  std::string id_type_option_value(cdc::kTableId);
2870
2871
0
  for (auto option : stream_lock->options()) {
2872
0
    if (option.has_key() && option.key() == cdc::kIdType)
2873
0
      id_type_option_value = option.value();
2874
0
  }
2875
4
  if (id_type_option_value == cdc::kNamespaceId) {
2876
0
    stream_info->set_namespace_id(stream_lock->namespace_id());
2877
0
  }
2878
2879
4
  for (auto& table_id : stream_lock->table_id()) {
2880
4
    stream_info->add_table_id(table_id);
2881
4
  }
2882
2883
4
  stream_info->mutable_options()->CopyFrom(stream_lock->options());
2884
2885
4
  return Status::OK();
2886
4
}
2887
2888
Status CatalogManager::GetCDCDBStreamInfo(const GetCDCDBStreamInfoRequestPB* req,
2889
11
                                          GetCDCDBStreamInfoResponsePB* resp) {
2890
2891
11
  if (!req->has_db_stream_id()) {
2892
0
    return STATUS(InvalidArgument, "CDC DB Stream ID must be provided", req->ShortDebugString(),
2893
0
                  MasterError(MasterErrorPB::INVALID_REQUEST));
2894
0
  }
2895
2896
11
  scoped_refptr<CDCStreamInfo> stream;
2897
11
  {
2898
11
    SharedLock lock(mutex_);
2899
11
    stream = FindPtrOrNull(cdc_stream_map_, req->db_stream_id());
2900
11
  }
2901
2902
11
  if (stream == nullptr || stream->LockForRead()->is_deleting()) {
2903
0
    return STATUS(NotFound, "Could not find CDC stream", req->ShortDebugString(),
2904
0
                  MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
2905
0
  }
2906
2907
11
  auto stream_lock = stream->LockForRead();
2908
2909
11
  if (!stream->namespace_id().empty()) {
2910
11
    resp->set_namespace_id(stream->namespace_id());
2911
11
  }
2912
2913
11
  for (const auto& table_id : stream_lock->table_id()) {
2914
11
    const auto table_info = resp->add_table_info();
2915
11
    table_info->set_stream_id(req->db_stream_id());
2916
11
    table_info->set_table_id(table_id);
2917
11
  }
2918
2919
11
  return Status::OK();
2920
11
}
2921
2922
Status CatalogManager::ListCDCStreams(const ListCDCStreamsRequestPB* req,
2923
1
                                      ListCDCStreamsResponsePB* resp) {
2924
2925
1
  scoped_refptr<TableInfo> table;
2926
1
  bool filter_table = req->has_table_id();
2927
1
  if (filter_table) {
2928
0
    table = VERIFY_RESULT(FindTableById(req->table_id()));
2929
0
  }
2930
2931
1
  SharedLock lock(mutex_);
2932
2933
1
  for (const CDCStreamInfoMap::value_type& entry : cdc_stream_map_) {
2934
1
    bool skip_stream = false;
2935
1
    bool id_type_option_present = false;
2936
2937
    // if the request is to list the DB streams of a specific namespace then the other namespaces
2938
    // should not be considered
2939
1
    if (req->has_namespace_id() && (req->namespace_id() != entry.second->namespace_id())) {
2940
0
      continue;
2941
0
    }
2942
2943
1
    if (filter_table && table->id() != entry.second->table_id().Get(0)) {
2944
0
      continue; // Skip deleting/deleted streams and streams from other tables.
2945
0
    }
2946
2947
1
    auto ltm = entry.second->LockForRead();
2948
2949
1
    if (ltm->is_deleting()) {
2950
0
      continue;
2951
0
    }
2952
2953
1
    for (const auto& option : ltm->options()) {
2954
0
      if (option.key() == cdc::kIdType) {
2955
0
        id_type_option_present = true;
2956
0
        if (req->has_id_type()) {
2957
0
          if (req->id_type() == IdTypePB::NAMESPACE_ID &&
2958
0
              option.value() != cdc::kNamespaceId) {
2959
0
            skip_stream = true;
2960
0
            break;
2961
0
          }
2962
0
          if (req->id_type() == IdTypePB::TABLE_ID &&
2963
0
              option.value() == cdc::kNamespaceId) {
2964
0
            skip_stream = true;
2965
0
            break;
2966
0
          }
2967
0
        }
2968
0
      }
2969
0
    }
2970
2971
1
    if ((!id_type_option_present && req->id_type() == IdTypePB::NAMESPACE_ID) ||
2972
1
        skip_stream)
2973
0
      continue;
2974
2975
1
    CDCStreamInfoPB* stream = resp->add_streams();
2976
1
    stream->set_stream_id(entry.second->id());
2977
1
    for (const auto& table_id : ltm->table_id()) {
2978
1
      stream->add_table_id(table_id);
2979
1
    }
2980
1
    stream->mutable_options()->CopyFrom(ltm->options());
2981
    // Also add an option for the current state.
2982
1
    if (ltm->pb.has_state()) {
2983
1
      auto state_option = stream->add_options();
2984
1
      state_option->set_key("state");
2985
1
      state_option->set_value(master::SysCDCStreamEntryPB::State_Name(ltm->pb.state()));
2986
1
    }
2987
1
  }
2988
1
  return Status::OK();
2989
1
}
2990
2991
157
bool CatalogManager::CDCStreamExistsUnlocked(const CDCStreamId& stream_id) {
2992
157
  scoped_refptr<CDCStreamInfo> stream = FindPtrOrNull(cdc_stream_map_, stream_id);
2993
157
  if (stream == nullptr || stream->LockForRead()->is_deleting()) {
2994
157
    return false;
2995
157
  }
2996
0
  return true;
2997
0
}
2998
2999
Status CatalogManager::UpdateCDCStream(const UpdateCDCStreamRequestPB *req,
3000
                                       UpdateCDCStreamResponsePB* resp,
3001
0
                                       rpc::RpcContext* rpc) {
3002
0
  LOG(INFO) << "UpdateCDCStream from " << RequestorString(rpc)
3003
0
            << ": " << req->DebugString();
3004
3005
  // Check fields.
3006
0
  if (!req->has_stream_id()) {
3007
0
    return STATUS(InvalidArgument, "Stream ID must be provided",
3008
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3009
0
  }
3010
0
  if (!req->has_entry()) {
3011
0
    return STATUS(InvalidArgument, "CDC Stream Entry must be provided",
3012
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3013
0
  }
3014
3015
0
  scoped_refptr<CDCStreamInfo> stream;
3016
0
  {
3017
0
    SharedLock lock(mutex_);
3018
0
    stream = FindPtrOrNull(cdc_stream_map_, req->stream_id());
3019
0
  }
3020
0
  if (stream == nullptr) {
3021
0
    return STATUS(NotFound, "Could not find CDC stream", req->ShortDebugString(),
3022
0
                  MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
3023
0
  }
3024
3025
0
  auto stream_lock = stream->LockForWrite();
3026
0
  if (stream_lock->is_deleting()) {
3027
0
    return STATUS(NotFound, "CDC stream has been deleted", req->ShortDebugString(),
3028
0
                  MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
3029
0
  }
3030
3031
0
  stream_lock.mutable_data()->pb.CopyFrom(req->entry());
3032
  // Also need to persist changes in sys catalog.
3033
0
  RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), stream));
3034
3035
0
  stream_lock.Commit();
3036
3037
0
  return Status::OK();
3038
0
}
3039
3040
/*
3041
 * UniverseReplication is setup in 4 stages within the Catalog Manager
3042
 * 1. SetupUniverseReplication: Validates user input & requests Producer schema.
3043
 * 2. GetTableSchemaCallback:   Validates Schema compatibility & requests Producer CDC init.
3044
 * 3. AddCDCStreamToUniverseAndInitConsumer:  Setup RPC connections for CDC Streaming
3045
 * 4. InitCDCConsumer:          Initializes the Consumer settings to begin tailing data
3046
 */
3047
Status CatalogManager::SetupUniverseReplication(const SetupUniverseReplicationRequestPB* req,
3048
                                                SetupUniverseReplicationResponsePB* resp,
3049
                                                rpc::RpcContext* rpc) {
3050
  LOG(INFO) << "SetupUniverseReplication from " << RequestorString(rpc)
3051
            << ": " << req->DebugString();
3052
3053
  // Sanity checking section.
3054
  if (!req->has_producer_id()) {
3055
    return STATUS(InvalidArgument, "Producer universe ID must be provided",
3056
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3057
  }
3058
3059
  if (req->producer_master_addresses_size() <= 0) {
3060
    return STATUS(InvalidArgument, "Producer master address must be provided",
3061
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3062
  }
3063
3064
  if (req->producer_bootstrap_ids().size() > 0 &&
3065
      req->producer_bootstrap_ids().size() != req->producer_table_ids().size()) {
3066
    return STATUS(InvalidArgument, "Number of bootstrap ids must be equal to number of tables",
3067
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3068
  }
3069
3070
  {
3071
    auto l = cluster_config_->LockForRead();
3072
    if (l->pb.cluster_uuid() == req->producer_id()) {
3073
      return STATUS(InvalidArgument, "The request UUID and cluster UUID are identical.",
3074
                    req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3075
    }
3076
  }
3077
3078
  {
3079
    auto request_master_addresses = req->producer_master_addresses();
3080
    std::vector<ServerEntryPB> cluster_master_addresses;
3081
    RETURN_NOT_OK(master_->ListMasters(&cluster_master_addresses));
3082
    for (const auto &req_elem : request_master_addresses) {
3083
      for (const auto &cluster_elem : cluster_master_addresses) {
3084
        if (cluster_elem.has_registration()) {
3085
          auto p_rpc_addresses = cluster_elem.registration().private_rpc_addresses();
3086
          for (const auto &p_rpc_elem : p_rpc_addresses) {
3087
            if (req_elem.host() == p_rpc_elem.host() && req_elem.port() == p_rpc_elem.port()) {
3088
              return STATUS(InvalidArgument,
3089
                "Duplicate between request master addresses and private RPC addresses",
3090
                req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3091
            }
3092
          }
3093
3094
          auto broadcast_addresses = cluster_elem.registration().broadcast_addresses();
3095
          for (const auto &bc_elem : broadcast_addresses) {
3096
            if (req_elem.host() == bc_elem.host() && req_elem.port() == bc_elem.port()) {
3097
              return STATUS(InvalidArgument,
3098
                "Duplicate between request master addresses and broadcast addresses",
3099
                req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
3100
            }
3101
          }
3102
        }
3103
      }
3104
    }
3105
  }
3106
3107
  std::unordered_map<TableId, std::string> table_id_to_bootstrap_id;
3108
3109
  if (req->producer_bootstrap_ids_size() > 0) {
3110
    for (int i = 0; i < req->producer_table_ids().size(); i++) {
3111
      table_id_to_bootstrap_id[req->producer_table_ids(i)] = req->producer_bootstrap_ids(i);
3112
    }
3113
  }
3114
3115
  // We assume that the list of table ids is unique.
3116
  if (req->producer_bootstrap_ids().size() > 0 &&
3117
      implicit_cast<size_t>(req->producer_table_ids().size()) != table_id_to_bootstrap_id.size()) {
3118
    return STATUS(InvalidArgument, "When providing bootstrap ids, "
3119
                  "the list of tables must be unique", req->ShortDebugString(),
3120
                  MasterError(MasterErrorPB::INVALID_REQUEST));
3121
  }
3122
3123
  scoped_refptr<UniverseReplicationInfo> ri;
3124
  {
3125
    TRACE("Acquired catalog manager lock");
3126
    SharedLock lock(mutex_);
3127
3128
    if (FindPtrOrNull(universe_replication_map_, req->producer_id()) != nullptr) {
3129
      return STATUS(InvalidArgument, "Producer already present", req->producer_id(),
3130
                    MasterError(MasterErrorPB::INVALID_REQUEST));
3131
    }
3132
  }
3133
3134
  // Create an entry in the system catalog DocDB for this new universe replication.
3135
  ri = new UniverseReplicationInfo(req->producer_id());
3136
  ri->mutable_metadata()->StartMutation();
3137
  SysUniverseReplicationEntryPB *metadata = &ri->mutable_metadata()->mutable_dirty()->pb;
3138
  metadata->set_producer_id(req->producer_id());
3139
  metadata->mutable_producer_master_addresses()->CopyFrom(req->producer_master_addresses());
3140
  metadata->mutable_tables()->CopyFrom(req->producer_table_ids());
3141
  metadata->set_state(SysUniverseReplicationEntryPB::INITIALIZING);
3142
3143
  RETURN_NOT_OK(CheckLeaderStatusAndSetupError(
3144
      sys_catalog_->Upsert(leader_ready_term(), ri),
3145
      "inserting universe replication info into sys-catalog", resp));
3146
  TRACE("Wrote universe replication info to sys-catalog");
3147
3148
  // Commit the in-memory state now that it's added to the persistent catalog.
3149
  ri->mutable_metadata()->CommitMutation();
3150
  LOG(INFO) << "Setup universe replication from producer " << ri->ToString();
3151
3152
  {
3153
    LockGuard lock(mutex_);
3154
    universe_replication_map_[ri->id()] = ri;
3155
  }
3156
3157
  // Initialize the CDC Stream by querying the Producer server for RPC sanity checks.
3158
  auto result = ri->GetOrCreateCDCRpcTasks(req->producer_master_addresses());
3159
  LOG(INFO) << "GetOrCreateCDCRpcTasks: " << result.ok();
3160
  if (!result.ok()) {
3161
    MarkUniverseReplicationFailed(ri, ResultToStatus(result));
3162
    return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, result.status());
3163
  }
3164
  std::shared_ptr<CDCRpcTasks> cdc_rpc = *result;
3165
3166
  // For each table, run an async RPC task to verify a sufficient Producer:Consumer schema match.
3167
  for (int i = 0; i < req->producer_table_ids_size(); i++) {
3168
3169
    // SETUP CONTINUES after this async call.
3170
    Status s;
3171
    if (IsColocatedParentTableId(req->producer_table_ids(i))) {
3172
      auto tables_info = std::make_shared<std::vector<client::YBTableInfo>>();
3173
      s = cdc_rpc->client()->GetColocatedTabletSchemaById(
3174
          req->producer_table_ids(i), tables_info,
3175
          Bind(&enterprise::CatalogManager::GetColocatedTabletSchemaCallback, Unretained(this),
3176
               ri->id(), tables_info, table_id_to_bootstrap_id));
3177
    } else if (IsTablegroupParentTableId(req->producer_table_ids(i))) {
3178
      auto tablegroup_info = std::make_shared<std::vector<client::YBTableInfo>>();
3179
      s = cdc_rpc->client()->GetTablegroupSchemaById(
3180
          req->producer_table_ids(i), tablegroup_info,
3181
          Bind(&enterprise::CatalogManager::GetTablegroupSchemaCallback, Unretained(this),
3182
               ri->id(), tablegroup_info, req->producer_table_ids(i), table_id_to_bootstrap_id));
3183
    } else {
3184
      auto table_info = std::make_shared<client::YBTableInfo>();
3185
      s = cdc_rpc->client()->GetTableSchemaById(
3186
          req->producer_table_ids(i), table_info,
3187
          Bind(&enterprise::CatalogManager::GetTableSchemaCallback, Unretained(this),
3188
               ri->id(), table_info, table_id_to_bootstrap_id));
3189
    }
3190
3191
    if (!s.ok()) {
3192
      MarkUniverseReplicationFailed(ri, s);
3193
      return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s);
3194
    }
3195
  }
3196
3197
  LOG(INFO) << "Started schema validation for universe replication " << ri->ToString();
3198
  return Status::OK();
3199
}
3200
3201
void CatalogManager::MarkUniverseReplicationFailed(
3202
0
    scoped_refptr<UniverseReplicationInfo> universe, const Status& failure_status) {
3203
0
  auto l = universe->LockForWrite();
3204
0
  if (l->pb.state() == SysUniverseReplicationEntryPB::DELETED) {
3205
0
    l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DELETED_ERROR);
3206
0
  } else {
3207
0
    l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED);
3208
0
  }
3209
3210
0
  universe->SetSetupUniverseReplicationErrorStatus(failure_status);
3211
3212
  // Update sys_catalog.
3213
0
  const Status s = sys_catalog_->Upsert(leader_ready_term(), universe);
3214
3215
0
  l.CommitOrWarn(s, "updating universe replication info in sys-catalog");
3216
0
}
3217
3218
Status CatalogManager::ValidateTableSchema(
3219
    const std::shared_ptr<client::YBTableInfo>& info,
3220
    const std::unordered_map<TableId, std::string>& table_bootstrap_ids,
3221
0
    GetTableSchemaResponsePB* resp) {
3222
  // Get corresponding table schema on local universe.
3223
0
  GetTableSchemaRequestPB req;
3224
3225
0
  auto* table = req.mutable_table();
3226
0
  table->set_table_name(info->table_name.table_name());
3227
0
  table->mutable_namespace_()->set_name(info->table_name.namespace_name());
3228
0
  table->mutable_namespace_()->set_database_type(
3229
0
      GetDatabaseTypeForTable(client::ClientToPBTableType(info->table_type)));
3230
3231
  // Since YSQL tables are not present in table map, we first need to list tables to get the table
3232
  // ID and then get table schema.
3233
  // Remove this once table maps are fixed for YSQL.
3234
0
  ListTablesRequestPB list_req;
3235
0
  ListTablesResponsePB list_resp;
3236
3237
0
  list_req.set_name_filter(info->table_name.table_name());
3238
0
  Status status = ListTables(&list_req, &list_resp);
3239
0
  if (!status.ok() || list_resp.has_error()) {
3240
0
    return STATUS(NotFound, Substitute("Error while listing table: $0", status.ToString()));
3241
0
  }
3242
3243
  // TODO: This does not work for situation where tables in different YSQL schemas have the same
3244
  // name. This will be fixed as part of #1476.
3245
0
  for (const auto& t : list_resp.tables()) {
3246
0
    if (t.name() == info->table_name.table_name() &&
3247
0
        t.namespace_().name() == info->table_name.namespace_name()) {
3248
0
      table->set_table_id(t.id());
3249
0
      break;
3250
0
    }
3251
0
  }
3252
3253
0
  if (!table->has_table_id()) {
3254
0
    return STATUS(NotFound,
3255
0
        Substitute("Could not find matching table for $0", info->table_name.ToString()));
3256
0
  }
3257
3258
  // We have a table match.  Now get the table schema and validate.
3259
0
  status = GetTableSchema(&req, resp);
3260
0
  if (!status.ok() || resp->has_error()) {
3261
0
    return STATUS(NotFound, Substitute("Error while getting table schema: $0", status.ToString()));
3262
0
  }
3263
3264
0
  auto result = info->schema.EquivalentForDataCopy(resp->schema());
3265
0
  if (!result.ok() || !*result) {
3266
0
    return STATUS(IllegalState,
3267
0
        Substitute("Source and target schemas don't match: "
3268
0
                   "Source: $0, Target: $1, Source schema: $2, Target schema: $3",
3269
0
                   info->table_id, resp->identifier().table_id(),
3270
0
                   info->schema.ToString(), resp->schema().DebugString()));
3271
0
  }
3272
3273
  // Still need to make map of table id to resp table id (to add to validated map)
3274
  // For colocated tables, only add the parent table since we only added the parent table to the
3275
  // original pb (we use the number of tables in the pb to determine when validation is done).
3276
0
  if (info->colocated) {
3277
    // For now we require that colocated tables have the same table oid.
3278
0
    auto source_oid = CHECK_RESULT(GetPgsqlTableOid(info->table_id));
3279
0
    auto target_oid = CHECK_RESULT(GetPgsqlTableOid(resp->identifier().table_id()));
3280
0
    if (source_oid != target_oid) {
3281
0
      return STATUS(IllegalState,
3282
0
          Substitute("Source and target table oids don't match for colocated table: "
3283
0
                    "Source: $0, Target: $1, Source table oid: $2, Target table oid: $3",
3284
0
                    info->table_id, resp->identifier().table_id(), source_oid, target_oid));
3285
0
    }
3286
0
  }
3287
3288
0
  return Status::OK();
3289
0
}
3290
3291
Status CatalogManager::AddValidatedTableAndCreateCdcStreams(
3292
      scoped_refptr<UniverseReplicationInfo> universe,
3293
      const std::unordered_map<TableId, std::string>& table_bootstrap_ids,
3294
      const TableId& producer_table,
3295
0
      const TableId& consumer_table) {
3296
0
  auto l = universe->LockForWrite();
3297
0
  auto master_addresses = l->pb.producer_master_addresses();
3298
3299
0
  auto res = universe->GetOrCreateCDCRpcTasks(master_addresses);
3300
0
  if (!res.ok()) {
3301
0
    l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED);
3302
0
    const Status s = sys_catalog_->Upsert(leader_ready_term(), universe);
3303
0
    if (!s.ok()) {
3304
0
      return CheckStatus(s, "updating universe replication info in sys-catalog");
3305
0
    }
3306
0
    l.Commit();
3307
0
    return STATUS(InternalError,
3308
0
        Substitute("Error while setting up client for producer $0", universe->id()));
3309
0
  }
3310
0
  std::shared_ptr<CDCRpcTasks> cdc_rpc = *res;
3311
0
  vector<TableId> validated_tables;
3312
3313
0
  if (l->is_deleted_or_failed()) {
3314
    // Nothing to do since universe is being deleted.
3315
0
    return STATUS(Aborted, "Universe is being deleted");
3316
0
  }
3317
3318
0
  auto map = l.mutable_data()->pb.mutable_validated_tables();
3319
0
  (*map)[producer_table] = consumer_table;
3320
3321
  // Now, all tables are validated.
3322
0
  if (l.mutable_data()->pb.validated_tables_size() == l.mutable_data()->pb.tables_size()) {
3323
0
    l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::VALIDATED);
3324
0
    auto tbl_iter = l->pb.tables();
3325
0
    validated_tables.insert(validated_tables.begin(), tbl_iter.begin(), tbl_iter.end());
3326
0
  }
3327
3328
  // TODO: end of config validation should be where SetupUniverseReplication exits back to user
3329
0
  LOG(INFO) << "UpdateItem in AddValidatedTable";
3330
3331
  // Update sys_catalog.
3332
0
  Status status = sys_catalog_->Upsert(leader_ready_term(), universe);
3333
0
  if (!status.ok()) {
3334
0
    LOG(ERROR) << "Error during UpdateItem: " << status;
3335
0
    return CheckStatus(status, "updating universe replication info in sys-catalog");
3336
0
  }
3337
0
  l.Commit();
3338
3339
  // Create CDC stream for each validated table, after persisting the replication state change.
3340
0
  if (!validated_tables.empty()) {
3341
0
    std::unordered_map<std::string, std::string> options;
3342
0
    options.reserve(4);
3343
0
    options.emplace(cdc::kRecordType, CDCRecordType_Name(cdc::CDCRecordType::CHANGE));
3344
0
    options.emplace(cdc::kRecordFormat, CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL));
3345
0
    options.emplace(cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER));
3346
0
    options.emplace(cdc::kCheckpointType, CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT));
3347
3348
0
    for (const auto& table : validated_tables) {
3349
0
      string producer_bootstrap_id;
3350
0
      auto it = table_bootstrap_ids.find(table);
3351
0
      if (it != table_bootstrap_ids.end()) {
3352
0
        producer_bootstrap_id = it->second;
3353
0
      }
3354
0
      if (!producer_bootstrap_id.empty()) {
3355
0
        auto table_id = std::make_shared<TableId>();
3356
0
        auto stream_options = std::make_shared<std::unordered_map<std::string, std::string>>();
3357
0
        cdc_rpc->client()->GetCDCStream(producer_bootstrap_id, table_id, stream_options,
3358
0
            std::bind(&enterprise::CatalogManager::GetCDCStreamCallback, this,
3359
0
                producer_bootstrap_id, table_id, stream_options, universe->id(), table, cdc_rpc,
3360
0
                std::placeholders::_1));
3361
0
      } else {
3362
0
        cdc_rpc->client()->CreateCDCStream(
3363
0
            table, options,
3364
0
            std::bind(&enterprise::CatalogManager::AddCDCStreamToUniverseAndInitConsumer, this,
3365
0
                universe->id(), table, std::placeholders::_1, nullptr /* on_success_cb */));
3366
0
      }
3367
0
    }
3368
0
  }
3369
0
  return Status::OK();
3370
0
}
3371
3372
void CatalogManager::GetTableSchemaCallback(
3373
    const std::string& universe_id, const std::shared_ptr<client::YBTableInfo>& info,
3374
0
    const std::unordered_map<TableId, std::string>& table_bootstrap_ids, const Status& s) {
3375
  // First get the universe.
3376
0
  scoped_refptr<UniverseReplicationInfo> universe;
3377
0
  {
3378
0
    SharedLock lock(mutex_);
3379
0
    TRACE("Acquired catalog manager lock");
3380
3381
0
    universe = FindPtrOrNull(universe_replication_map_, universe_id);
3382
0
    if (universe == nullptr) {
3383
0
      LOG(ERROR) << "Universe not found: " << universe_id;
3384
0
      return;
3385
0
    }
3386
0
  }
3387
3388
0
  if (!s.ok()) {
3389
0
    MarkUniverseReplicationFailed(universe, s);
3390
0
    LOG(ERROR) << "Error getting schema for table " << info->table_id << ": " << s;
3391
0
    return;
3392
0
  }
3393
3394
  // Validate the table schema.
3395
0
  GetTableSchemaResponsePB resp;
3396
0
  Status status = ValidateTableSchema(info, table_bootstrap_ids, &resp);
3397
0
  if (!status.ok()) {
3398
0
    MarkUniverseReplicationFailed(universe, status);
3399
0
    LOG(ERROR) << "Found error while validating table schema for table " << info->table_id
3400
0
               << ": " << status;
3401
0
    return;
3402
0
  }
3403
3404
0
  status = AddValidatedTableAndCreateCdcStreams(universe,
3405
0
                                                table_bootstrap_ids,
3406
0
                                                info->table_id,
3407
0
                                                resp.identifier().table_id());
3408
0
  if (!status.ok()) {
3409
0
    LOG(ERROR) << "Found error while adding validated table to system catalog: " << info->table_id
3410
0
               << ": " << status;
3411
0
    return;
3412
0
  }
3413
0
}
3414
3415
void CatalogManager::GetTablegroupSchemaCallback(
3416
    const std::string& universe_id,
3417
    const std::shared_ptr<std::vector<client::YBTableInfo>>& infos,
3418
    const TablegroupId& producer_tablegroup_id,
3419
    const std::unordered_map<TableId, std::string>& table_bootstrap_ids,
3420
0
    const Status& s) {
3421
  // First get the universe.
3422
0
  scoped_refptr<UniverseReplicationInfo> universe;
3423
0
  {
3424
0
    SharedLock lock(mutex_);
3425
0
    TRACE("Acquired catalog manager lock");
3426
3427
0
    universe = FindPtrOrNull(universe_replication_map_, universe_id);
3428
0
    if (universe == nullptr) {
3429
0
      LOG(ERROR) << "Universe not found: " << universe_id;
3430
0
      return;
3431
0
    }
3432
0
  }
3433
3434
0
  if (!s.ok()) {
3435
0
    MarkUniverseReplicationFailed(universe, s);
3436
0
    std::ostringstream oss;
3437
0
    for (size_t i = 0; i < infos->size(); ++i) {
3438
0
      oss << ((i == 0) ? "" : ", ") << (*infos)[i].table_id;
3439
0
    }
3440
0
    LOG(ERROR) << "Error getting schema for tables: [ " << oss.str() << " ]: " << s;
3441
0
    return;
3442
0
  }
3443
3444
0
  if (infos->empty()) {
3445
0
    LOG(WARNING) << "Received empty list of tables to validate: " << s;
3446
0
    return;
3447
0
  }
3448
3449
  // validated_consumer_tables contains the table IDs corresponding to that
3450
  // from the producer tables.
3451
0
  std::unordered_set<TableId> validated_consumer_tables;
3452
0
  for (const auto& info : *infos) {
3453
    // Validate each of the member table in the tablegroup.
3454
0
    GetTableSchemaResponsePB resp;
3455
0
    Status table_status = ValidateTableSchema(std::make_shared<client::YBTableInfo>(info),
3456
0
                                              table_bootstrap_ids,
3457
0
                                              &resp);
3458
3459
0
    if (!table_status.ok()) {
3460
0
      MarkUniverseReplicationFailed(universe, table_status);
3461
0
      LOG(ERROR) << "Found error while validating table schema for table " << info.table_id
3462
0
                 << ": " << table_status;
3463
0
      return;
3464
0
    }
3465
3466
0
    validated_consumer_tables.insert(resp.identifier().table_id());
3467
0
  }
3468
3469
  // Get the consumer tablegroup ID. Since this call is expensive (one needs to reverse lookup
3470
  // the tablegroup ID from table ID), we only do this call once and do validation afterward.
3471
0
  TablegroupId consumer_tablegroup_id;
3472
0
  {
3473
0
    const auto& result = FindTablegroupByTableId(*validated_consumer_tables.begin());
3474
0
    if (!result.has_value()) {
3475
0
      std::string message =
3476
0
          Format("No consumer tablegroup found for producer tablegroup: $0",
3477
0
                 producer_tablegroup_id);
3478
0
      MarkUniverseReplicationFailed(universe, STATUS(IllegalState, message));
3479
0
      LOG(ERROR) << message;
3480
0
      return;
3481
0
    }
3482
0
    consumer_tablegroup_id = result.value();
3483
0
  }
3484
3485
  // tables_in_consumer_tablegroup are the tables listed within the consumer_tablegroup_id.
3486
  // We need validated_consumer_tables and tables_in_consumer_tablegroup to be identical.
3487
0
  std::unordered_set<TableId> tables_in_consumer_tablegroup;
3488
0
  {
3489
0
    GetTablegroupSchemaRequestPB req;
3490
0
    GetTablegroupSchemaResponsePB resp;
3491
0
    req.mutable_parent_tablegroup()->set_id(consumer_tablegroup_id);
3492
0
    Status status = GetTablegroupSchema(&req, &resp);
3493
0
    if (!status.ok() || resp.has_error()) {
3494
0
      std::string message = Format("Error when getting consumer tablegroup schema: $0",
3495
0
                                   consumer_tablegroup_id);
3496
0
      MarkUniverseReplicationFailed(universe, STATUS(IllegalState, message));
3497
0
      LOG(ERROR) << message;
3498
0
      return;
3499
0
    }
3500
3501
0
    for (const auto& info : resp.get_table_schema_response_pbs()) {
3502
0
      tables_in_consumer_tablegroup.insert(info.identifier().table_id());
3503
0
    }
3504
0
  }
3505
3506
0
  if (validated_consumer_tables != tables_in_consumer_tablegroup) {
3507
0
    std::ostringstream validated_tables_oss;
3508
0
    for (auto it = validated_consumer_tables.begin();
3509
0
        it != validated_consumer_tables.end(); it++) {
3510
0
      validated_tables_oss << (it == validated_consumer_tables.begin() ? "" : ",") << *it;
3511
0
    }
3512
0
    std::ostringstream consumer_tables_oss;
3513
0
    for (auto it = tables_in_consumer_tablegroup.begin();
3514
0
        it != tables_in_consumer_tablegroup.end(); it++) {
3515
0
      consumer_tables_oss << (it == tables_in_consumer_tablegroup.begin() ? "" : ",") << *it;
3516
0
    }
3517
3518
0
    std::string message =
3519
0
        Format("Mismatch between tables associated with producer tablegroup $0 and "
3520
0
               "tables in consumer tablegroup $1: ($2) vs ($3).",
3521
0
               producer_tablegroup_id, consumer_tablegroup_id,
3522
0
               validated_tables_oss.str(), consumer_tables_oss.str());
3523
0
    MarkUniverseReplicationFailed(universe, STATUS(IllegalState, message));
3524
0
    LOG(ERROR) << message;
3525
0
    return;
3526
0
  }
3527
3528
0
  Status status = AddValidatedTableAndCreateCdcStreams(universe,
3529
0
                                                       table_bootstrap_ids,
3530
0
                                                       producer_tablegroup_id,
3531
0
                                                       consumer_tablegroup_id);
3532
0
  if (!status.ok()) {
3533
0
    LOG(ERROR) << "Found error while adding validated table to system catalog: "
3534
0
               << producer_tablegroup_id << ": " << status;
3535
0
    return;
3536
0
  }
3537
0
}
3538
3539
void CatalogManager::GetColocatedTabletSchemaCallback(
3540
    const std::string& universe_id, const std::shared_ptr<std::vector<client::YBTableInfo>>& infos,
3541
0
    const std::unordered_map<TableId, std::string>& table_bootstrap_ids, const Status& s) {
3542
  // First get the universe.
3543
0
  scoped_refptr<UniverseReplicationInfo> universe;
3544
0
  {
3545
0
    SharedLock lock(mutex_);
3546
0
    TRACE("Acquired catalog manager lock");
3547
3548
0
    universe = FindPtrOrNull(universe_replication_map_, universe_id);
3549
0
    if (universe == nullptr) {
3550
0
      LOG(ERROR) << "Universe not found: " << universe_id;
3551
0
      return;
3552
0
    }
3553
0
  }
3554
3555
0
  if (!s.ok()) {
3556
0
    MarkUniverseReplicationFailed(universe, s);
3557
0
    std::ostringstream oss;
3558
0
    for (size_t i = 0; i < infos->size(); ++i) {
3559
0
      oss << ((i == 0) ? "" : ", ") << (*infos)[i].table_id;
3560
0
    }
3561
0
    LOG(ERROR) << "Error getting schema for tables: [ " << oss.str() << " ]: " << s;
3562
0
    return;
3563
0
  }
3564
3565
0
  if (infos->empty()) {
3566
0
    LOG(WARNING) << "Received empty list of tables to validate: " << s;
3567
0
    return;
3568
0
  }
3569
3570
  // Validate table schemas.
3571
0
  std::unordered_set<TableId> producer_parent_table_ids;
3572
0
  std::unordered_set<TableId> consumer_parent_table_ids;
3573
0
  for (const auto& info : *infos) {
3574
    // Verify that we have a colocated table.
3575
0
    if (!info.colocated) {
3576
0
      MarkUniverseReplicationFailed(universe,
3577
0
          STATUS(InvalidArgument, Substitute("Received non-colocated table: $0", info.table_id)));
3578
0
      LOG(ERROR) << "Received non-colocated table: " << info.table_id;
3579
0
      return;
3580
0
    }
3581
    // Validate each table, and get the parent colocated table id for the consumer.
3582
0
    GetTableSchemaResponsePB resp;
3583
0
    Status table_status = ValidateTableSchema(std::make_shared<client::YBTableInfo>(info),
3584
0
                                              table_bootstrap_ids,
3585
0
                                              &resp);
3586
0
    if (!table_status.ok()) {
3587
0
      MarkUniverseReplicationFailed(universe, table_status);
3588
0
      LOG(ERROR) << "Found error while validating table schema for table " << info.table_id
3589
0
                 << ": " << table_status;
3590
0
      return;
3591
0
    }
3592
    // Store the parent table ids.
3593
0
    producer_parent_table_ids.insert(
3594
0
        info.table_name.namespace_id() + kColocatedParentTableIdSuffix);
3595
0
    consumer_parent_table_ids.insert(
3596
0
        resp.identifier().namespace_().id() + kColocatedParentTableIdSuffix);
3597
0
  }
3598
3599
  // Verify that we only found one producer and one consumer colocated parent table id.
3600
0
  if (producer_parent_table_ids.size() != 1) {
3601
0
    std::ostringstream oss;
3602
0
    for (auto it = producer_parent_table_ids.begin(); it != producer_parent_table_ids.end(); ++it) {
3603
0
      oss << ((it == producer_parent_table_ids.begin()) ? "" : ", ") << *it;
3604
0
    }
3605
0
    MarkUniverseReplicationFailed(universe, STATUS(InvalidArgument,
3606
0
        Substitute("Found incorrect number of producer colocated parent table ids. "
3607
0
                   "Expected 1, but found: [ $0 ]", oss.str())));
3608
0
    LOG(ERROR) << "Found incorrect number of producer colocated parent table ids. "
3609
0
               << "Expected 1, but found: [ " << oss.str() << " ]";
3610
0
    return;
3611
0
  }
3612
0
  if (consumer_parent_table_ids.size() != 1) {
3613
0
    std::ostringstream oss;
3614
0
    for (auto it = consumer_parent_table_ids.begin(); it != consumer_parent_table_ids.end(); ++it) {
3615
0
      oss << ((it == consumer_parent_table_ids.begin()) ? "" : ", ") << *it;
3616
0
    }
3617
0
    MarkUniverseReplicationFailed(universe, STATUS(InvalidArgument,
3618
0
        Substitute("Found incorrect number of consumer colocated parent table ids. "
3619
0
                   "Expected 1, but found: [ $0 ]", oss.str())));
3620
0
    LOG(ERROR) << "Found incorrect number of consumer colocated parent table ids. "
3621
0
               << "Expected 1, but found: [ " << oss.str() << " ]";
3622
0
    return;
3623
0
  }
3624
3625
0
  Status status = AddValidatedTableAndCreateCdcStreams(universe,
3626
0
                                                       table_bootstrap_ids,
3627
0
                                                       *producer_parent_table_ids.begin(),
3628
0
                                                       *consumer_parent_table_ids.begin());
3629
0
  if (!status.ok()) {
3630
0
    LOG(ERROR) << "Found error while adding validated table to system catalog: "
3631
0
               << *producer_parent_table_ids.begin() << ": " << status;
3632
0
    return;
3633
0
  }
3634
0
}
3635
3636
void CatalogManager::GetCDCStreamCallback(
3637
    const CDCStreamId& bootstrap_id,
3638
    std::shared_ptr<TableId> table_id,
3639
    std::shared_ptr<std::unordered_map<std::string, std::string>> options,
3640
    const std::string& universe_id,
3641
    const TableId& table,
3642
    std::shared_ptr<CDCRpcTasks> cdc_rpc,
3643
0
    const Status& s) {
3644
0
  if (!s.ok()) {
3645
0
    LOG(ERROR) << "Unable to find bootstrap id " << bootstrap_id;
3646
0
    AddCDCStreamToUniverseAndInitConsumer(universe_id, table, s);
3647
0
  } else {
3648
0
    if (*table_id != table) {
3649
0
      const Status invalid_bootstrap_id_status = STATUS_FORMAT(
3650
0
          InvalidArgument, "Invalid bootstrap id for table $0. Bootstrap id $1 belongs to table $2",
3651
0
          table, bootstrap_id, *table_id);
3652
0
      LOG(ERROR) << invalid_bootstrap_id_status;
3653
0
      AddCDCStreamToUniverseAndInitConsumer(universe_id, table, invalid_bootstrap_id_status);
3654
0
      return;
3655
0
    }
3656
    // todo check options
3657
0
    AddCDCStreamToUniverseAndInitConsumer(universe_id, table, bootstrap_id, [&] () {
3658
        // Extra callback on universe setup success - update the producer to let it know that
3659
        // the bootstrapping is complete.
3660
0
        SysCDCStreamEntryPB new_entry;
3661
0
        new_entry.add_table_id(*table_id);
3662
0
        new_entry.mutable_options()->Reserve(narrow_cast<int>(options->size()));
3663
0
        for (const auto& option : *options) {
3664
0
          auto new_option = new_entry.add_options();
3665
0
          new_option->set_key(option.first);
3666
0
          new_option->set_value(option.second);
3667
0
        }
3668
0
        new_entry.set_state(master::SysCDCStreamEntryPB::ACTIVE);
3669
3670
0
        WARN_NOT_OK(cdc_rpc->client()->UpdateCDCStream(bootstrap_id, new_entry),
3671
0
                  "Unable to update CDC stream options");
3672
0
      });
3673
0
  }
3674
0
}
3675
3676
void CatalogManager::AddCDCStreamToUniverseAndInitConsumer(
3677
    const std::string& universe_id, const TableId& table_id, const Result<CDCStreamId>& stream_id,
3678
0
    std::function<void()> on_success_cb) {
3679
0
  scoped_refptr<UniverseReplicationInfo> universe;
3680
0
  {
3681
0
    SharedLock lock(mutex_);
3682
0
    TRACE("Acquired catalog manager lock");
3683
3684
0
    universe = FindPtrOrNull(universe_replication_map_, universe_id);
3685
0
    if (universe == nullptr) {
3686
0
      LOG(ERROR) << "Universe not found: " << universe_id;
3687
0
      return;
3688
0
    }
3689
0
  }
3690
3691
0
  if (!stream_id.ok()) {
3692
0
    LOG(ERROR) << "Error setting up CDC stream for table " << table_id;
3693
0
    MarkUniverseReplicationFailed(universe, ResultToStatus(stream_id));
3694
0
    return;
3695
0
  }
3696
3697
0
  bool merge_alter = false;
3698
0
  bool validated_all_tables = false;
3699
0
  {
3700
0
    auto l = universe->LockForWrite();
3701
0
    if (l->is_deleted_or_failed()) {
3702
      // Nothing to do if universe is being deleted.
3703
0
      return;
3704
0
    }
3705
3706
0
    auto map = l.mutable_data()->pb.mutable_table_streams();
3707
0
    (*map)[table_id] = *stream_id;
3708
3709
    // This functions as a barrier: waiting for the last RPC call from GetTableSchemaCallback.
3710
0
    if (l.mutable_data()->pb.table_streams_size() == l->pb.tables_size()) {
3711
      // All tables successfully validated! Register CDC consumers & start replication.
3712
0
      validated_all_tables = true;
3713
0
      LOG(INFO) << "Registering CDC consumers for universe " << universe->id();
3714
3715
0
      auto& validated_tables = l->pb.validated_tables();
3716
3717
0
      std::vector<CDCConsumerStreamInfo> consumer_info;
3718
0
      consumer_info.reserve(l->pb.tables_size());
3719
0
      for (const auto& table : validated_tables) {
3720
0
        CDCConsumerStreamInfo info;
3721
0
        info.producer_table_id = table.first;
3722
0
        info.consumer_table_id = table.second;
3723
0
        info.stream_id = (*map)[info.producer_table_id];
3724
0
        consumer_info.push_back(info);
3725
0
      }
3726
3727
0
      std::vector<HostPort> hp;
3728
0
      HostPortsFromPBs(l->pb.producer_master_addresses(), &hp);
3729
3730
0
      auto cdc_rpc_tasks_result =
3731
0
          universe->GetOrCreateCDCRpcTasks(l->pb.producer_master_addresses());
3732
0
      if (!cdc_rpc_tasks_result.ok()) {
3733
0
        LOG(WARNING) << "CDC streams won't be created: " << cdc_rpc_tasks_result;
3734
0
        l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED);
3735
0
      } else {
3736
0
        auto cdc_rpc_tasks = *cdc_rpc_tasks_result;
3737
0
        Status s = InitCDCConsumer(
3738
0
            consumer_info, HostPort::ToCommaSeparatedString(hp), l->pb.producer_id(),
3739
0
            cdc_rpc_tasks);
3740
0
        if (!s.ok()) {
3741
0
          LOG(ERROR) << "Error registering subscriber: " << s;
3742
0
          l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED);
3743
0
        } else {
3744
0
          GStringPiece original_producer_id(universe->id());
3745
0
          if (original_producer_id.ends_with(".ALTER")) {
3746
            // Don't enable ALTER universes, merge them into the main universe instead.
3747
0
            merge_alter = true;
3748
0
          } else {
3749
0
            l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::ACTIVE);
3750
0
          }
3751
0
        }
3752
0
      }
3753
0
    }
3754
3755
    // Update sys_catalog with new producer table id info.
3756
0
    Status status = sys_catalog_->Upsert(leader_ready_term(), universe);
3757
3758
    // Before committing, run any callbacks on success.
3759
0
    if (status.ok() && on_success_cb &&
3760
0
        (l.mutable_data()->pb.state() == SysUniverseReplicationEntryPB::ACTIVE || merge_alter)) {
3761
0
      on_success_cb();
3762
0
    }
3763
3764
0
    l.CommitOrWarn(status, "updating universe replication info in sys-catalog");
3765
0
  }
3766
3767
0
  if (validated_all_tables) {
3768
    // Also update the set of consumer tables.
3769
0
    LockGuard lock(mutex_);
3770
0
    auto l = universe->LockForRead();
3771
0
    for (const auto& table : l->pb.validated_tables()) {
3772
0
      xcluster_consumer_tables_to_stream_map_[table.second].emplace(universe->id(), *stream_id);
3773
0
    }
3774
0
  }
3775
3776
  // If this is an 'alter', merge back into primary command now that setup is a success.
3777
0
  if (merge_alter) {
3778
0
    MergeUniverseReplication(universe);
3779
0
  }
3780
0
}
3781
3782
/*
3783
 * UpdateXClusterConsumerOnTabletSplit updates the consumer -> producer tablet mapping after a local
3784
 * tablet split.
3785
 */
3786
Status CatalogManager::UpdateXClusterConsumerOnTabletSplit(
3787
    const TableId& consumer_table_id,
3788
42
    const SplitTabletIds& split_tablet_ids) {
3789
  // Check if this table is consuming a stream.
3790
42
  XClusterConsumerTableStreamInfoMap stream_infos =
3791
42
      GetXClusterStreamInfoForConsumerTable(consumer_table_id);
3792
3793
42
  if (stream_infos.empty()) {
3794
42
    return Status::OK();
3795
42
  }
3796
3797
0
  auto l = cluster_config_->LockForWrite();
3798
0
  for (const auto& stream_info : stream_infos) {
3799
0
    std::string universe_id = stream_info.first;
3800
0
    CDCStreamId stream_id = stream_info.second;
3801
    // Fetch the stream entry so we can update the mappings.
3802
0
    auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
3803
0
    auto producer_entry = FindOrNull(*producer_map, universe_id);
3804
0
    if (!producer_entry) {
3805
0
      return STATUS_FORMAT(NotFound,
3806
0
                           "Unable to find the producer entry for universe $0",
3807
0
                           universe_id);
3808
0
    }
3809
0
    auto stream_entry = FindOrNull(*producer_entry->mutable_stream_map(), stream_id);
3810
0
    if (!stream_entry) {
3811
0
      return STATUS_FORMAT(NotFound,
3812
0
                           "Unable to find the stream entry for universe $0, stream $1",
3813
0
                           universe_id,
3814
0
                           stream_id);
3815
0
    }
3816
3817
0
    RETURN_NOT_OK(UpdateTableMappingOnTabletSplit(stream_entry, split_tablet_ids));
3818
3819
    // We also need to mark this stream as no longer being able to perform 1-1 mappings.
3820
0
    stream_entry->set_same_num_producer_consumer_tablets(false);
3821
0
  }
3822
3823
  // Also bump the cluster_config_ version so that changes are propagated to tservers.
3824
0
  l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
3825
3826
0
  RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
3827
0
                            "Updating cluster config in sys-catalog"));
3828
0
  l.Commit();
3829
3830
0
  return Status::OK();
3831
0
}
3832
3833
Status CatalogManager::UpdateXClusterProducerOnTabletSplit(
3834
    const TableId& producer_table_id,
3835
42
    const SplitTabletIds& split_tablet_ids) {
3836
  // First check if this table has any streams associated with it.
3837
42
  auto streams = FindCDCStreamsForTable(producer_table_id);
3838
3839
42
  if (!streams.empty()) {
3840
    // For each stream, need to add in the children entries to the cdc_state table.
3841
0
    client::TableHandle cdc_table;
3842
0
    const client::YBTableName cdc_state_table_name(
3843
0
        YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
3844
0
    auto ybclient = master_->cdc_state_client_initializer().client();
3845
0
    if (!ybclient) {
3846
0
      return STATUS(IllegalState, "Client not initialized or shutting down");
3847
0
    }
3848
0
    RETURN_NOT_OK(cdc_table.Open(cdc_state_table_name, ybclient));
3849
0
    std::shared_ptr<client::YBSession> session = ybclient->NewSession();
3850
3851
0
    for (const auto& stream : streams) {
3852
0
      for (const auto& child_tablet_id :
3853
0
           {split_tablet_ids.children.first, split_tablet_ids.children.second}) {
3854
0
        const auto insert_op = cdc_table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
3855
0
        auto* insert_req = insert_op->mutable_request();
3856
0
        QLAddStringHashValue(insert_req, child_tablet_id);
3857
0
        QLAddStringRangeValue(insert_req, stream->id());
3858
        // TODO(JHE) set the checkpoint to a different value? OpId of the SPLIT_OP itself perhaps?
3859
0
        cdc_table.AddStringColumnValue(insert_req, master::kCdcCheckpoint, OpId().ToString());
3860
        // TODO(JHE) what to set the time to?
3861
        // cdc_table.AddTimestampColumnValue(
3862
        //     insert_req, master::kCdcLastReplicationTime, GetCurrentTimeMicros());
3863
0
        session->Apply(insert_op);
3864
0
      }
3865
0
    }
3866
0
    RETURN_NOT_OK(session->Flush());
3867
0
  }
3868
3869
42
  return Status::OK();
3870
42
}
3871
3872
Status CatalogManager::InitCDCConsumer(
3873
    const std::vector<CDCConsumerStreamInfo>& consumer_info,
3874
    const std::string& master_addrs,
3875
    const std::string& producer_universe_uuid,
3876
0
    std::shared_ptr<CDCRpcTasks> cdc_rpc_tasks) {
3877
3878
0
  std::unordered_set<HostPort, HostPortHash> tserver_addrs;
3879
  // Get the tablets in the consumer table.
3880
0
  cdc::ProducerEntryPB producer_entry;
3881
0
  for (const auto& stream_info : consumer_info) {
3882
0
    GetTableLocationsRequestPB consumer_table_req;
3883
0
    consumer_table_req.set_max_returned_locations(std::numeric_limits<int32_t>::max());
3884
0
    GetTableLocationsResponsePB consumer_table_resp;
3885
0
    TableIdentifierPB table_identifer;
3886
0
    table_identifer.set_table_id(stream_info.consumer_table_id);
3887
0
    *(consumer_table_req.mutable_table()) = table_identifer;
3888
0
    RETURN_NOT_OK(GetTableLocations(&consumer_table_req, &consumer_table_resp));
3889
3890
0
    cdc::StreamEntryPB stream_entry;
3891
    // Get producer tablets and map them to the consumer tablets
3892
0
    RETURN_NOT_OK(CreateTabletMapping(
3893
0
        stream_info.producer_table_id, stream_info.consumer_table_id, producer_universe_uuid,
3894
0
        master_addrs, consumer_table_resp, &tserver_addrs, &stream_entry, cdc_rpc_tasks));
3895
0
    (*producer_entry.mutable_stream_map())[stream_info.stream_id] = std::move(stream_entry);
3896
0
  }
3897
3898
  // Log the Network topology of the Producer Cluster
3899
0
  auto master_addrs_list = StringSplit(master_addrs, ',');
3900
0
  producer_entry.mutable_master_addrs()->Reserve(narrow_cast<int>(master_addrs_list.size()));
3901
0
  for (const auto& addr : master_addrs_list) {
3902
0
    auto hp = VERIFY_RESULT(HostPort::FromString(addr, 0));
3903
0
    HostPortToPB(hp, producer_entry.add_master_addrs());
3904
0
  }
3905
3906
0
  producer_entry.mutable_tserver_addrs()->Reserve(narrow_cast<int>(tserver_addrs.size()));
3907
0
  for (const auto& addr : tserver_addrs) {
3908
0
    HostPortToPB(addr, producer_entry.add_tserver_addrs());
3909
0
  }
3910
3911
0
  auto l = cluster_config_->LockForWrite();
3912
0
  auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
3913
0
  auto it = producer_map->find(producer_universe_uuid);
3914
0
  if (it != producer_map->end()) {
3915
0
    return STATUS(InvalidArgument, "Already created a consumer for this universe");
3916
0
  }
3917
3918
  // TServers will use the ClusterConfig to create CDC Consumers for applicable local tablets.
3919
0
  (*producer_map)[producer_universe_uuid] = std::move(producer_entry);
3920
0
  l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
3921
0
  RETURN_NOT_OK(CheckStatus(
3922
0
      sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
3923
0
      "updating cluster config in sys-catalog"));
3924
0
  l.Commit();
3925
3926
0
  return Status::OK();
3927
0
}
3928
3929
0
void CatalogManager::MergeUniverseReplication(scoped_refptr<UniverseReplicationInfo> universe) {
3930
  // Merge back into primary command now that setup is a success.
3931
0
  GStringPiece original_producer_id(universe->id());
3932
0
  if (!original_producer_id.ends_with(".ALTER")) {
3933
0
    return;
3934
0
  }
3935
0
  original_producer_id.remove_suffix(sizeof(".ALTER")-1 /* exclude \0 ending */);
3936
0
  LOG(INFO) << "Merging CDC universe: " << universe->id()
3937
0
            << " into " << original_producer_id.ToString();
3938
3939
0
  scoped_refptr<UniverseReplicationInfo> original_universe;
3940
0
  {
3941
0
    SharedLock lock(mutex_);
3942
0
    TRACE("Acquired catalog manager lock");
3943
3944
0
    original_universe = FindPtrOrNull(universe_replication_map_, original_producer_id.ToString());
3945
0
    if (original_universe == nullptr) {
3946
0
      LOG(ERROR) << "Universe not found: " << original_producer_id.ToString();
3947
0
      return;
3948
0
    }
3949
0
  }
3950
  // Merge Cluster Config for TServers.
3951
0
  {
3952
0
    auto cl = cluster_config_->LockForWrite();
3953
0
    auto pm = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
3954
0
    auto original_producer_entry = pm->find(original_universe->id());
3955
0
    auto alter_producer_entry = pm->find(universe->id());
3956
0
    if (original_producer_entry != pm->end() && alter_producer_entry != pm->end()) {
3957
      // Merge the Tables from the Alter into the original.
3958
0
      auto as = alter_producer_entry->second.stream_map();
3959
0
      original_producer_entry->second.mutable_stream_map()->insert(as.begin(), as.end());
3960
      // Delete the Alter
3961
0
      pm->erase(alter_producer_entry);
3962
0
    } else {
3963
0
      LOG(WARNING) << "Could not find both universes in Cluster Config: " << universe->id();
3964
0
    }
3965
0
    cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1);
3966
0
    const Status s = sys_catalog_->Upsert(leader_ready_term(), cluster_config_);
3967
0
    cl.CommitOrWarn(s, "updating cluster config in sys-catalog");
3968
0
  }
3969
  // Merge Master Config on Consumer. (no need for Producer changes, since it uses stream_id)
3970
0
  {
3971
0
    auto original_lock = original_universe->LockForWrite();
3972
0
    auto alter_lock = universe->LockForWrite();
3973
    // Merge Table->StreamID mapping.
3974
0
    auto at = alter_lock.mutable_data()->pb.mutable_tables();
3975
0
    original_lock.mutable_data()->pb.mutable_tables()->MergeFrom(*at);
3976
0
    at->Clear();
3977
0
    auto as = alter_lock.mutable_data()->pb.mutable_table_streams();
3978
0
    original_lock.mutable_data()->pb.mutable_table_streams()->insert(as->begin(), as->end());
3979
0
    as->clear();
3980
0
    auto av = alter_lock.mutable_data()->pb.mutable_validated_tables();
3981
0
    original_lock.mutable_data()->pb.mutable_validated_tables()->insert(av->begin(), av->end());
3982
0
    av->clear();
3983
0
    alter_lock.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DELETED);
3984
3985
0
    vector<UniverseReplicationInfo*> universes{original_universe.get(), universe.get()};
3986
0
    const Status s = sys_catalog_->Upsert(leader_ready_term(), universes);
3987
0
    alter_lock.CommitOrWarn(s, "updating universe replication entries in sys-catalog");
3988
0
    if (s.ok()) {
3989
0
      original_lock.Commit();
3990
0
    }
3991
0
  }
3992
  // TODO: universe_replication_map_.erase(universe->id()) at a later time.
3993
  //       TwoDCTest.AlterUniverseReplicationTables crashes due to undiagnosed race right now.
3994
0
  LOG(INFO) << "Done with Merging " << universe->id() << " into " << original_universe->id();
3995
0
}
3996
3997
Status ReturnErrorOrAddWarning(const Status& s,
3998
                               const DeleteUniverseReplicationRequestPB* req,
3999
0
                               DeleteUniverseReplicationResponsePB* resp) {
4000
0
  if (!s.ok()) {
4001
0
    if (req->ignore_errors()) {
4002
      // Continue executing, save the status as a warning.
4003
0
      AppStatusPB* warning = resp->add_warnings();
4004
0
      StatusToPB(s, warning);
4005
0
      return Status::OK();
4006
0
    }
4007
0
    return s.CloneAndAppend("\nUse 'ignore-errors' to ignore this error.");
4008
0
  }
4009
0
  return s;
4010
0
}
4011
4012
Status CatalogManager::DeleteUniverseReplication(const DeleteUniverseReplicationRequestPB* req,
4013
                                                 DeleteUniverseReplicationResponsePB* resp,
4014
0
                                                 rpc::RpcContext* rpc) {
4015
0
  LOG(INFO) << "Servicing DeleteUniverseReplication request from " << RequestorString(rpc)
4016
0
            << ": " << req->ShortDebugString();
4017
4018
0
  if (!req->has_producer_id()) {
4019
0
    return STATUS(InvalidArgument, "Producer universe ID required", req->ShortDebugString(),
4020
0
                  MasterError(MasterErrorPB::INVALID_REQUEST));
4021
0
  }
4022
4023
0
  scoped_refptr<UniverseReplicationInfo> ri;
4024
0
  {
4025
0
    SharedLock lock(mutex_);
4026
0
    TRACE("Acquired catalog manager lock");
4027
4028
0
    ri = FindPtrOrNull(universe_replication_map_, req->producer_id());
4029
0
    if (ri == nullptr) {
4030
0
      return STATUS(NotFound, "Universe replication info does not exist",
4031
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
4032
0
    }
4033
0
  }
4034
4035
0
  auto l = ri->LockForWrite();
4036
0
  l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DELETED);
4037
4038
  // Delete subscribers on the Consumer Registry (removes from TServers).
4039
0
  LOG(INFO) << "Deleting subscribers for producer " << req->producer_id();
4040
0
  {
4041
0
    auto cl = cluster_config_->LockForWrite();
4042
0
    auto producer_map = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
4043
0
    auto it = producer_map->find(req->producer_id());
4044
0
    if (it != producer_map->end()) {
4045
0
      producer_map->erase(it);
4046
0
      cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1);
4047
0
      RETURN_NOT_OK(CheckStatus(
4048
0
          sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
4049
0
          "updating cluster config in sys-catalog"));
4050
0
      cl.Commit();
4051
0
    }
4052
0
  }
4053
4054
  // Delete CDC stream config on the Producer.
4055
0
  if (!l->pb.table_streams().empty()) {
4056
0
    auto result = ri->GetOrCreateCDCRpcTasks(l->pb.producer_master_addresses());
4057
0
    if (!result.ok()) {
4058
0
      LOG(WARNING) << "Unable to create cdc rpc task. CDC streams won't be deleted: " << result;
4059
0
    } else {
4060
0
      auto cdc_rpc = *result;
4061
0
      vector<CDCStreamId> streams;
4062
0
      std::unordered_map<CDCStreamId, TableId> stream_to_producer_table_id;
4063
0
      for (const auto& table : l->pb.table_streams()) {
4064
0
        streams.push_back(table.second);
4065
0
        stream_to_producer_table_id.emplace(table.second, table.first);
4066
0
      }
4067
4068
0
      DeleteCDCStreamResponsePB delete_cdc_stream_resp;
4069
      // Set force_delete=true since we are deleting active xCluster streams.
4070
0
      auto s = cdc_rpc->client()->DeleteCDCStream(streams,
4071
0
                                                  true, /* force_delete */
4072
0
                                                  req->ignore_errors(),
4073
0
                                                  &delete_cdc_stream_resp);
4074
4075
0
      if (delete_cdc_stream_resp.not_found_stream_ids().size() > 0) {
4076
0
        std::ostringstream missing_streams;
4077
0
        for (auto it = delete_cdc_stream_resp.not_found_stream_ids().begin();
4078
0
               it != delete_cdc_stream_resp.not_found_stream_ids().end();
4079
0
               ++it) {
4080
0
          if (it != delete_cdc_stream_resp.not_found_stream_ids().begin()) {
4081
0
            missing_streams << ",";
4082
0
          }
4083
0
          missing_streams << *it << " (table_id: " << stream_to_producer_table_id[*it] << ")";
4084
0
        }
4085
0
        if (s.ok()) {
4086
          // Returned but did not find some streams, so still need to warn the user about those.
4087
0
          s = STATUS(NotFound,
4088
0
                     "Could not find the following streams: [" + missing_streams.str() + "].");
4089
0
        } else {
4090
0
          s = s.CloneAndPrepend(
4091
0
              "Could not find the following streams: [" + missing_streams.str() + "].");
4092
0
        }
4093
0
      }
4094
4095
0
      RETURN_NOT_OK(ReturnErrorOrAddWarning(s, req, resp));
4096
0
    }
4097
0
  }
4098
4099
  // Delete universe in the Universe Config.
4100
0
  RETURN_NOT_OK(ReturnErrorOrAddWarning(DeleteUniverseReplicationUnlocked(ri), req, resp));
4101
0
  l.Commit();
4102
4103
0
  LOG(INFO) << "Processed delete universe replication " << ri->ToString()
4104
0
            << " per request from " << RequestorString(rpc);
4105
4106
0
  return Status::OK();
4107
0
}
4108
4109
Status CatalogManager::DeleteUniverseReplicationUnlocked(
4110
0
    scoped_refptr<UniverseReplicationInfo> universe) {
4111
  // Assumes that caller has locked universe.
4112
0
  RETURN_NOT_OK_PREPEND(
4113
0
      sys_catalog_->Delete(leader_ready_term(), universe),
4114
0
      Substitute("An error occurred while updating sys-catalog, universe_id: $0", universe->id()));
4115
4116
  // Remove it from the map.
4117
0
  LockGuard lock(mutex_);
4118
0
  if (universe_replication_map_.erase(universe->id()) < 1) {
4119
0
    LOG(WARNING) << "Failed to remove replication info from map: universe_id: " << universe->id();
4120
0
  }
4121
  // Also update the mapping of consumer tables.
4122
0
  for (const auto& table : universe->metadata().state().pb.validated_tables()) {
4123
0
    if (xcluster_consumer_tables_to_stream_map_[table.second].erase(universe->id()) < 1) {
4124
0
      LOG(WARNING) << "Failed to remove consumer table from mapping. "
4125
0
                   << "table_id: " << table.second << ": universe_id: " << universe->id();
4126
0
    }
4127
0
    if (xcluster_consumer_tables_to_stream_map_[table.second].empty()) {
4128
0
      xcluster_consumer_tables_to_stream_map_.erase(table.second);
4129
0
    }
4130
0
  }
4131
0
  return Status::OK();
4132
0
}
4133
4134
Status CatalogManager::SetUniverseReplicationEnabled(
4135
    const SetUniverseReplicationEnabledRequestPB* req,
4136
    SetUniverseReplicationEnabledResponsePB* resp,
4137
0
    rpc::RpcContext* rpc) {
4138
0
  LOG(INFO) << "Servicing SetUniverseReplicationEnabled request from " << RequestorString(rpc)
4139
0
            << ": " << req->ShortDebugString();
4140
4141
  // Sanity Checking Cluster State and Input.
4142
0
  if (!req->has_producer_id()) {
4143
0
    return STATUS(InvalidArgument, "Producer universe ID must be provided",
4144
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4145
0
  }
4146
0
  if (!req->has_is_enabled()) {
4147
0
    return STATUS(InvalidArgument, "Must explicitly set whether to enable",
4148
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4149
0
  }
4150
4151
0
  scoped_refptr<UniverseReplicationInfo> universe;
4152
0
  {
4153
0
    SharedLock lock(mutex_);
4154
4155
0
    universe = FindPtrOrNull(universe_replication_map_, req->producer_id());
4156
0
    if (universe == nullptr) {
4157
0
      return STATUS(NotFound, "Could not find CDC producer universe",
4158
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
4159
0
    }
4160
0
  }
4161
4162
  // Update the Master's Universe Config with the new state.
4163
0
  {
4164
0
    auto l = universe->LockForWrite();
4165
0
    if (l->pb.state() != SysUniverseReplicationEntryPB::DISABLED &&
4166
0
        l->pb.state() != SysUniverseReplicationEntryPB::ACTIVE) {
4167
0
      return STATUS(
4168
0
          InvalidArgument,
4169
0
          Format("Universe Replication in invalid state: $0.  Retry or Delete.",
4170
0
              SysUniverseReplicationEntryPB::State_Name(l->pb.state())),
4171
0
          req->ShortDebugString(),
4172
0
          MasterError(MasterErrorPB::INVALID_REQUEST));
4173
0
    }
4174
0
    if (req->is_enabled()) {
4175
0
        l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::ACTIVE);
4176
0
    } else { // DISABLE.
4177
0
        l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DISABLED);
4178
0
    }
4179
0
    RETURN_NOT_OK(CheckStatus(
4180
0
        sys_catalog_->Upsert(leader_ready_term(), universe),
4181
0
        "updating universe replication info in sys-catalog"));
4182
0
    l.Commit();
4183
0
  }
4184
4185
  // Modify the Consumer Registry, which will fan out this info to all TServers on heartbeat.
4186
0
  {
4187
0
    auto l = cluster_config_->LockForWrite();
4188
0
    auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
4189
0
    auto it = producer_map->find(req->producer_id());
4190
0
    if (it == producer_map->end()) {
4191
0
      LOG(WARNING) << "Valid Producer Universe not in Consumer Registry: " << req->producer_id();
4192
0
      return STATUS(NotFound, "Could not find CDC producer universe",
4193
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
4194
0
    }
4195
0
    (*it).second.set_disable_stream(!req->is_enabled());
4196
0
    l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
4197
0
    RETURN_NOT_OK(CheckStatus(
4198
0
        sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
4199
0
        "updating cluster config in sys-catalog"));
4200
0
    l.Commit();
4201
0
  }
4202
4203
0
  return Status::OK();
4204
0
}
4205
4206
Status CatalogManager::AlterUniverseReplication(const AlterUniverseReplicationRequestPB* req,
4207
                                                AlterUniverseReplicationResponsePB* resp,
4208
0
                                                rpc::RpcContext* rpc) {
4209
0
  LOG(INFO) << "Servicing AlterUniverseReplication request from " << RequestorString(rpc)
4210
0
            << ": " << req->ShortDebugString();
4211
4212
  // Sanity Checking Cluster State and Input.
4213
0
  if (!req->has_producer_id()) {
4214
0
    return STATUS(InvalidArgument, "Producer universe ID must be provided",
4215
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4216
0
  }
4217
4218
  // Verify that there is an existing Universe config
4219
0
  scoped_refptr<UniverseReplicationInfo> original_ri;
4220
0
  {
4221
0
    SharedLock lock(mutex_);
4222
4223
0
    original_ri = FindPtrOrNull(universe_replication_map_, req->producer_id());
4224
0
    if (original_ri == nullptr) {
4225
0
      return STATUS(NotFound, "Could not find CDC producer universe",
4226
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
4227
0
    }
4228
0
  }
4229
4230
  // Currently, config options are mutually exclusive to simplify transactionality.
4231
0
  int config_count = (req->producer_master_addresses_size() > 0 ? 1 : 0) +
4232
0
                     (req->producer_table_ids_to_remove_size() > 0 ? 1 : 0) +
4233
0
                     (req->producer_table_ids_to_add_size() > 0 ? 1 : 0) +
4234
0
                     (req->has_new_producer_universe_id() ? 1 : 0);
4235
0
  if (config_count != 1) {
4236
0
    return STATUS(InvalidArgument, "Only 1 Alter operation per request currently supported",
4237
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4238
0
  }
4239
4240
  // Config logic...
4241
0
  if (req->producer_master_addresses_size() > 0) {
4242
    // 'set_master_addresses'
4243
    // TODO: Verify the input. Setup an RPC Task, ListTables, ensure same.
4244
4245
    // 1a. Persistent Config: Update the Universe Config for Master.
4246
0
    {
4247
0
      auto l = original_ri->LockForWrite();
4248
0
      l.mutable_data()->pb.mutable_producer_master_addresses()->CopyFrom(
4249
0
          req->producer_master_addresses());
4250
0
      RETURN_NOT_OK(CheckStatus(
4251
0
          sys_catalog_->Upsert(leader_ready_term(), original_ri),
4252
0
          "updating universe replication info in sys-catalog"));
4253
0
      l.Commit();
4254
0
    }
4255
    // 1b. Persistent Config: Update the Consumer Registry (updates TServers)
4256
0
    {
4257
0
      auto l = cluster_config_->LockForWrite();
4258
0
      auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
4259
0
      auto it = producer_map->find(req->producer_id());
4260
0
      if (it == producer_map->end()) {
4261
0
        LOG(WARNING) << "Valid Producer Universe not in Consumer Registry: " << req->producer_id();
4262
0
        return STATUS(NotFound, "Could not find CDC producer universe",
4263
0
                      req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
4264
0
      }
4265
0
      (*it).second.mutable_master_addrs()->CopyFrom(req->producer_master_addresses());
4266
0
      l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
4267
0
      RETURN_NOT_OK(CheckStatus(
4268
0
          sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
4269
0
          "updating cluster config in sys-catalog"));
4270
0
      l.Commit();
4271
0
    }
4272
    // 2. Memory Update: Change cdc_rpc_tasks (Master cache)
4273
0
    {
4274
0
      auto result = original_ri->GetOrCreateCDCRpcTasks(req->producer_master_addresses());
4275
0
      if (!result.ok()) {
4276
0
        return SetupError(resp->mutable_error(), MasterErrorPB::INTERNAL_ERROR, result.status());
4277
0
      }
4278
0
    }
4279
0
  } else if (req->producer_table_ids_to_remove_size() > 0) {
4280
    // 'remove_table'
4281
0
    auto it = req->producer_table_ids_to_remove();
4282
0
    std::set<string> table_ids_to_remove(it.begin(), it.end());
4283
    // Filter out any tables that aren't in the existing replication config.
4284
0
    {
4285
0
      auto l = original_ri->LockForRead();
4286
0
      auto tbl_iter = l->pb.tables();
4287
0
      std::set<string> existing_tables(tbl_iter.begin(), tbl_iter.end()), filtered_list;
4288
0
      set_intersection(table_ids_to_remove.begin(), table_ids_to_remove.end(),
4289
0
                       existing_tables.begin(), existing_tables.end(),
4290
0
                       std::inserter(filtered_list, filtered_list.begin()));
4291
0
      filtered_list.swap(table_ids_to_remove);
4292
0
    }
4293
4294
0
    vector<CDCStreamId> streams_to_remove;
4295
    // 1. Update the Consumer Registry (removes from TServers).
4296
0
    {
4297
0
      auto cl = cluster_config_->LockForWrite();
4298
0
      auto pm = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
4299
0
      auto producer_entry = pm->find(req->producer_id());
4300
0
      if (producer_entry != pm->end()) {
4301
        // Remove the Tables Specified (not part of the key).
4302
0
        auto stream_map = producer_entry->second.mutable_stream_map();
4303
0
        for (auto& p : *stream_map) {
4304
0
          if (table_ids_to_remove.count(p.second.producer_table_id()) > 0) {
4305
0
            streams_to_remove.push_back(p.first);
4306
0
          }
4307
0
        }
4308
0
        if (streams_to_remove.size() == stream_map->size()) {
4309
          // If this ends with an empty Map, disallow and force user to delete.
4310
0
          LOG(WARNING) << "CDC 'remove_table' tried to remove all tables." << req->producer_id();
4311
0
          return STATUS(
4312
0
              InvalidArgument,
4313
0
              "Cannot remove all tables with alter. Use delete_universe_replication instead.",
4314
0
              req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4315
0
        } else if (streams_to_remove.empty()) {
4316
          // If this doesn't delete anything, notify the user.
4317
0
          return STATUS(InvalidArgument, "Removal matched no entries.",
4318
0
                        req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4319
0
        }
4320
0
        for (auto& key : streams_to_remove) {
4321
0
          stream_map->erase(stream_map->find(key));
4322
0
        }
4323
0
      }
4324
0
      cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1);
4325
0
      RETURN_NOT_OK(CheckStatus(
4326
0
          sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
4327
0
          "updating cluster config in sys-catalog"));
4328
0
      cl.Commit();
4329
0
    }
4330
    // 2. Remove from Master Configs on Producer and Consumer.
4331
0
    {
4332
0
      auto l = original_ri->LockForWrite();
4333
0
      if (!l->pb.table_streams().empty()) {
4334
        // Delete Relevant Table->StreamID mappings on Consumer.
4335
0
        auto table_streams = l.mutable_data()->pb.mutable_table_streams();
4336
0
        auto validated_tables = l.mutable_data()->pb.mutable_validated_tables();
4337
0
        for (auto& key : table_ids_to_remove) {
4338
0
          table_streams->erase(table_streams->find(key));
4339
0
          validated_tables->erase(validated_tables->find(key));
4340
0
        }
4341
0
        for (int i = 0; i < l.mutable_data()->pb.tables_size(); i++) {
4342
0
          if (table_ids_to_remove.count(l.mutable_data()->pb.tables(i)) > 0) {
4343
0
            l.mutable_data()->pb.mutable_tables()->DeleteSubrange(i, 1);
4344
0
            --i;
4345
0
          }
4346
0
        }
4347
        // Delete CDC stream config on the Producer.
4348
0
        auto result = original_ri->GetOrCreateCDCRpcTasks(l->pb.producer_master_addresses());
4349
0
        if (!result.ok()) {
4350
0
          LOG(WARNING) << "Unable to create cdc rpc task. CDC streams won't be deleted: " << result;
4351
0
        } else {
4352
0
          auto s = (*result)->client()->DeleteCDCStream(streams_to_remove,
4353
0
                                                        true /* force_delete */);
4354
0
          if (!s.ok()) {
4355
0
            std::stringstream os;
4356
0
            std::copy(streams_to_remove.begin(), streams_to_remove.end(),
4357
0
                      std::ostream_iterator<CDCStreamId>(os, ", "));
4358
0
            LOG(WARNING) << "Unable to delete CDC streams: " << os.str() << s;
4359
0
          }
4360
0
        }
4361
0
      }
4362
0
      RETURN_NOT_OK(CheckStatus(
4363
0
          sys_catalog_->Upsert(leader_ready_term(), original_ri),
4364
0
          "updating universe replication info in sys-catalog"));
4365
0
      l.Commit();
4366
0
    }
4367
0
  } else if (req->producer_table_ids_to_add_size() > 0) {
4368
    // 'add_table'
4369
0
    string alter_producer_id = req->producer_id() + ".ALTER";
4370
4371
    // If user passed in bootstrap ids, check that there is a bootstrap id for every table.
4372
0
    if (req->producer_bootstrap_ids_to_add().size() > 0 &&
4373
0
      req->producer_table_ids_to_add().size() != req->producer_bootstrap_ids_to_add().size()) {
4374
0
      return STATUS(InvalidArgument, "Number of bootstrap ids must be equal to number of tables",
4375
0
        req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4376
0
    }
4377
4378
    // Verify no 'alter' command running.
4379
0
    scoped_refptr<UniverseReplicationInfo> alter_ri;
4380
0
    {
4381
0
      SharedLock lock(mutex_);
4382
0
      alter_ri = FindPtrOrNull(universe_replication_map_, alter_producer_id);
4383
0
    }
4384
0
    {
4385
0
      if (alter_ri != nullptr) {
4386
0
        LOG(INFO) << "Found " << alter_producer_id << "... Removing";
4387
0
        if (alter_ri->LockForRead()->is_deleted_or_failed()) {
4388
          // Delete previous Alter if it's completed but failed.
4389
0
          master::DeleteUniverseReplicationRequestPB delete_req;
4390
0
          delete_req.set_producer_id(alter_ri->id());
4391
0
          master::DeleteUniverseReplicationResponsePB delete_resp;
4392
0
          Status s = DeleteUniverseReplication(&delete_req, &delete_resp, rpc);
4393
0
          if (!s.ok()) {
4394
0
            if (delete_resp.has_error()) {
4395
0
              resp->mutable_error()->Swap(delete_resp.mutable_error());
4396
0
              return s;
4397
0
            }
4398
0
            return SetupError(resp->mutable_error(), s);
4399
0
          }
4400
0
        } else {
4401
0
          return STATUS(InvalidArgument, "Alter for CDC producer currently running",
4402
0
                        req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4403
0
        }
4404
0
      }
4405
0
    }
4406
4407
    // Map each table id to its corresponding bootstrap id.
4408
0
    std::unordered_map<TableId, std::string> table_id_to_bootstrap_id;
4409
0
    if (req->producer_bootstrap_ids_to_add().size() > 0) {
4410
0
      for (int i = 0; i < req->producer_table_ids_to_add().size(); i++) {
4411
0
        table_id_to_bootstrap_id[req->producer_table_ids_to_add(i)]
4412
0
          = req->producer_bootstrap_ids_to_add(i);
4413
0
      }
4414
4415
      // Ensure that table ids are unique. We need to do this here even though
4416
      // the same check is performed by SetupUniverseReplication because
4417
      // duplicate table ids can cause a bootstrap id entry in table_id_to_bootstrap_id
4418
      // to be overwritten.
4419
0
      if (table_id_to_bootstrap_id.size() !=
4420
0
              implicit_cast<size_t>(req->producer_table_ids_to_add().size())) {
4421
0
        return STATUS(InvalidArgument, "When providing bootstrap ids, "
4422
0
                      "the list of tables must be unique", req->ShortDebugString(),
4423
0
                      MasterError(MasterErrorPB::INVALID_REQUEST));
4424
0
      }
4425
0
    }
4426
4427
    // Only add new tables.  Ignore tables that are currently being replicated.
4428
0
    auto tid_iter = req->producer_table_ids_to_add();
4429
0
    std::unordered_set<string> new_tables(tid_iter.begin(), tid_iter.end());
4430
0
    {
4431
0
      auto l = original_ri->LockForRead();
4432
0
      for(auto t : l->pb.tables()) {
4433
0
        auto pos = new_tables.find(t);
4434
0
        if (pos != new_tables.end()) {
4435
0
          new_tables.erase(pos);
4436
0
        }
4437
0
      }
4438
0
    }
4439
0
    if (new_tables.empty()) {
4440
0
      return STATUS(InvalidArgument, "CDC producer already contains all requested tables",
4441
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4442
0
    }
4443
4444
    // 1. create an ALTER table request that mirrors the original 'setup_replication'.
4445
0
    master::SetupUniverseReplicationRequestPB setup_req;
4446
0
    master::SetupUniverseReplicationResponsePB setup_resp;
4447
0
    setup_req.set_producer_id(alter_producer_id);
4448
0
    setup_req.mutable_producer_master_addresses()->CopyFrom(
4449
0
        original_ri->LockForRead()->pb.producer_master_addresses());
4450
0
    for (auto t : new_tables) {
4451
0
      setup_req.add_producer_table_ids(t);
4452
4453
      // Add bootstrap id to request if it exists.
4454
0
      auto bootstrap_id_lookup_result = table_id_to_bootstrap_id.find(t);
4455
0
      if (bootstrap_id_lookup_result != table_id_to_bootstrap_id.end()) {
4456
0
        setup_req.add_producer_bootstrap_ids(bootstrap_id_lookup_result->second);
4457
0
      }
4458
0
    }
4459
4460
    // 2. run the 'setup_replication' pipeline on the ALTER Table
4461
0
    Status s = SetupUniverseReplication(&setup_req, &setup_resp, rpc);
4462
0
    if (!s.ok()) {
4463
0
      if (setup_resp.has_error()) {
4464
0
        resp->mutable_error()->Swap(setup_resp.mutable_error());
4465
0
        return s;
4466
0
      }
4467
0
      return SetupError(resp->mutable_error(), s);
4468
0
    }
4469
    // NOTE: ALTER merges back into original after completion.
4470
0
  } else if (req->has_new_producer_universe_id()) {
4471
0
    Status s = RenameUniverseReplication(original_ri, req, resp, rpc);
4472
0
    if (!s.ok()) {
4473
0
      return SetupError(resp->mutable_error(), s);
4474
0
    }
4475
0
  }
4476
4477
0
  return Status::OK();
4478
0
}
4479
4480
Status CatalogManager::RenameUniverseReplication(
4481
    scoped_refptr<UniverseReplicationInfo> universe,
4482
    const AlterUniverseReplicationRequestPB* req,
4483
    AlterUniverseReplicationResponsePB* resp,
4484
0
    rpc::RpcContext* rpc) {
4485
0
  const string old_universe_replication_id = universe->id();
4486
0
  const string new_producer_universe_id = req->new_producer_universe_id();
4487
0
  if (old_universe_replication_id == new_producer_universe_id) {
4488
0
    return STATUS(InvalidArgument, "Old and new replication ids must be different",
4489
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4490
0
  }
4491
4492
0
  {
4493
0
    LockGuard lock(mutex_);
4494
0
    auto l = universe->LockForWrite();
4495
0
    scoped_refptr<UniverseReplicationInfo> new_ri;
4496
4497
    // Assert that new_replication_name isn't already in use.
4498
0
    if (FindPtrOrNull(universe_replication_map_, new_producer_universe_id) != nullptr) {
4499
0
      return STATUS(InvalidArgument, "New replication id is already in use",
4500
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4501
0
    }
4502
4503
    // Since the producer_id is used as the key, we need to create a new UniverseReplicationInfo.
4504
0
    new_ri = new UniverseReplicationInfo(new_producer_universe_id);
4505
0
    new_ri->mutable_metadata()->StartMutation();
4506
0
    SysUniverseReplicationEntryPB *metadata = &new_ri->mutable_metadata()->mutable_dirty()->pb;
4507
0
    metadata->CopyFrom(l->pb);
4508
0
    metadata->set_producer_id(new_producer_universe_id);
4509
4510
    // Also need to update internal maps.
4511
0
    auto cl = cluster_config_->LockForWrite();
4512
0
    auto producer_map = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
4513
0
    (*producer_map)[new_producer_universe_id] =
4514
0
        std::move((*producer_map)[old_universe_replication_id]);
4515
0
    producer_map->erase(old_universe_replication_id);
4516
4517
0
    {
4518
      // Need both these updates to be atomic.
4519
0
      auto w = sys_catalog_->NewWriter(leader_ready_term());
4520
0
      RETURN_NOT_OK(w->Mutate(QLWriteRequestPB::QL_STMT_DELETE, universe.get()));
4521
0
      RETURN_NOT_OK(w->Mutate(QLWriteRequestPB::QL_STMT_UPDATE,
4522
0
                              new_ri.get(),
4523
0
                              cluster_config_.get()));
4524
0
      RETURN_NOT_OK(CheckStatus(
4525
0
          sys_catalog_->SyncWrite(w.get()),
4526
0
          "Updating universe replication info and cluster config in sys-catalog"));
4527
0
    }
4528
0
    new_ri->mutable_metadata()->CommitMutation();
4529
0
    cl.Commit();
4530
4531
    // Update universe_replication_map after persistent data is saved.
4532
0
    universe_replication_map_[new_producer_universe_id] = new_ri;
4533
0
    universe_replication_map_.erase(old_universe_replication_id);
4534
0
  }
4535
4536
0
  return Status::OK();
4537
0
}
4538
4539
Status CatalogManager::GetUniverseReplication(const GetUniverseReplicationRequestPB* req,
4540
                                              GetUniverseReplicationResponsePB* resp,
4541
0
                                              rpc::RpcContext* rpc) {
4542
0
  LOG(INFO) << "GetUniverseReplication from " << RequestorString(rpc)
4543
0
            << ": " << req->DebugString();
4544
4545
0
  if (!req->has_producer_id()) {
4546
0
    return STATUS(InvalidArgument, "Producer universe ID must be provided",
4547
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4548
0
  }
4549
4550
0
  scoped_refptr<UniverseReplicationInfo> universe;
4551
0
  {
4552
0
    SharedLock lock(mutex_);
4553
4554
0
    universe = FindPtrOrNull(universe_replication_map_, req->producer_id());
4555
0
    if (universe == nullptr) {
4556
0
      return STATUS(NotFound, "Could not find CDC producer universe",
4557
0
                    req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
4558
0
    }
4559
0
  }
4560
4561
0
  resp->mutable_entry()->CopyFrom(universe->LockForRead()->pb);
4562
0
  return Status::OK();
4563
0
}
4564
4565
/*
4566
 * Checks if the universe replication setup has completed.
4567
 * Returns Status::OK() if this call succeeds, and uses resp->done() to determine if the setup has
4568
 * completed (either failed or succeeded). If the setup has failed, then resp->replication_error()
4569
 * is also set.
4570
 */
4571
Status CatalogManager::IsSetupUniverseReplicationDone(
4572
    const IsSetupUniverseReplicationDoneRequestPB* req,
4573
    IsSetupUniverseReplicationDoneResponsePB* resp,
4574
0
    rpc::RpcContext* rpc) {
4575
0
  LOG(INFO) << "IsSetupUniverseReplicationDone from " << RequestorString(rpc)
4576
0
            << ": " << req->DebugString();
4577
4578
0
  if (!req->has_producer_id()) {
4579
0
    return STATUS(InvalidArgument, "Producer universe ID must be provided",
4580
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4581
0
  }
4582
4583
0
  GetUniverseReplicationRequestPB universe_req;
4584
0
  GetUniverseReplicationResponsePB universe_resp;
4585
0
  universe_req.set_producer_id(req->producer_id());
4586
4587
0
  RETURN_NOT_OK(GetUniverseReplication(&universe_req, &universe_resp, /* RpcContext */ nullptr));
4588
0
  if (universe_resp.has_error()) {
4589
0
    RETURN_NOT_OK(StatusFromPB(universe_resp.error().status()));
4590
0
  }
4591
4592
  // Two cases for completion:
4593
  //  - For a regular SetupUniverseReplication, we want to wait for the universe to become ACTIVE.
4594
  //  - For an AlterUniverseReplication, we need to wait until the .ALTER universe gets merged with
4595
  //    the main universe - at which point the .ALTER universe is deleted.
4596
0
  bool isAlterRequest = GStringPiece(req->producer_id()).ends_with(".ALTER");
4597
0
  if ((!isAlterRequest && universe_resp.entry().state() == SysUniverseReplicationEntryPB::ACTIVE) ||
4598
0
      (isAlterRequest && universe_resp.entry().state() == SysUniverseReplicationEntryPB::DELETED)) {
4599
0
    resp->set_done(true);
4600
0
    StatusToPB(Status::OK(), resp->mutable_replication_error());
4601
0
    return Status::OK();
4602
0
  }
4603
4604
  // Otherwise we have either failed (see MarkUniverseReplicationFailed), or are still working.
4605
0
  if (universe_resp.entry().state() == SysUniverseReplicationEntryPB::DELETED_ERROR ||
4606
0
      universe_resp.entry().state() == SysUniverseReplicationEntryPB::FAILED) {
4607
0
    resp->set_done(true);
4608
4609
    // Get the more detailed error.
4610
0
    scoped_refptr<UniverseReplicationInfo> universe;
4611
0
    {
4612
0
      SharedLock lock(mutex_);
4613
0
      universe = FindPtrOrNull(universe_replication_map_, req->producer_id());
4614
0
      if (universe == nullptr) {
4615
0
        StatusToPB(
4616
0
            STATUS(InternalError, "Could not find CDC producer universe after having failed."),
4617
0
            resp->mutable_replication_error());
4618
0
        return Status::OK();
4619
0
      }
4620
0
    }
4621
0
    if (!universe->GetSetupUniverseReplicationErrorStatus().ok()) {
4622
0
      StatusToPB(universe->GetSetupUniverseReplicationErrorStatus(),
4623
0
                 resp->mutable_replication_error());
4624
0
    } else {
4625
0
      LOG(WARNING) << "Did not find setup universe replication error status.";
4626
0
      StatusToPB(STATUS(InternalError, "unknown error"), resp->mutable_replication_error());
4627
0
    }
4628
0
    return Status::OK();
4629
0
  }
4630
4631
  // Not done yet.
4632
0
  resp->set_done(false);
4633
0
  return Status::OK();
4634
0
}
4635
4636
Status CatalogManager::UpdateConsumerOnProducerSplit(
4637
    const UpdateConsumerOnProducerSplitRequestPB* req,
4638
    UpdateConsumerOnProducerSplitResponsePB* resp,
4639
0
    rpc::RpcContext* rpc) {
4640
0
  LOG(INFO) << "UpdateConsumerOnProducerSplit from " << RequestorString(rpc)
4641
0
            << ": " << req->DebugString();
4642
4643
0
  if (!req->has_producer_id()) {
4644
0
    return STATUS(InvalidArgument, "Producer universe ID must be provided",
4645
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4646
0
  }
4647
0
  if (!req->has_stream_id()) {
4648
0
    return STATUS(InvalidArgument, "Stream ID must be provided",
4649
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4650
0
  }
4651
0
  if (!req->has_producer_split_tablet_info()) {
4652
0
    return STATUS(InvalidArgument, "Producer split tablet info must be provided",
4653
0
                  req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
4654
0
  }
4655
4656
0
  auto l = cluster_config_->LockForWrite();
4657
0
  auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map();
4658
0
  auto producer_entry = FindOrNull(*producer_map, req->producer_id());
4659
0
  if (!producer_entry) {
4660
0
    return STATUS_FORMAT(
4661
0
        NotFound, "Unable to find the producer entry for universe $0", req->producer_id());
4662
0
  }
4663
0
  auto stream_entry = FindOrNull(*producer_entry->mutable_stream_map(), req->stream_id());
4664
0
  if (!stream_entry) {
4665
0
    return STATUS_FORMAT(
4666
0
        NotFound, "Unable to find the stream entry for universe $0, stream $1",
4667
0
        req->producer_id(), req->stream_id());
4668
0
  }
4669
4670
  // Find the parent tablet in the tablet mapping.
4671
0
  bool found = false;
4672
0
  auto mutable_map = stream_entry->mutable_consumer_producer_tablet_map();
4673
  // Also keep track if we see the split children tablets.
4674
0
  vector<string> split_child_tablet_ids{req->producer_split_tablet_info().new_tablet1_id(),
4675
0
                                        req->producer_split_tablet_info().new_tablet2_id()};
4676
0
  for (auto& consumer_tablet_to_producer_tablets : *mutable_map) {
4677
0
    auto& producer_tablets_list = consumer_tablet_to_producer_tablets.second;
4678
0
    auto producer_tablets = producer_tablets_list.mutable_tablets();
4679
0
    for (auto tablet = producer_tablets->begin(); tablet < producer_tablets->end(); ++tablet) {
4680
0
      if (*tablet == req->producer_split_tablet_info().tablet_id()) {
4681
        // Remove the parent tablet id.
4682
0
        producer_tablets->erase(tablet);
4683
        // For now we add the children tablets to the same consumer tablet.
4684
        // See github issue #10186 for further improvements.
4685
0
        producer_tablets_list.add_tablets(req->producer_split_tablet_info().new_tablet1_id());
4686
0
        producer_tablets_list.add_tablets(req->producer_split_tablet_info().new_tablet2_id());
4687
        // There should only be one copy of each producer tablet per stream, so can exit early.
4688
0
        found = true;
4689
0
        break;
4690
0
      }
4691
      // Check if this is one of the child split tablets.
4692
0
      auto it = std::find(split_child_tablet_ids.begin(), split_child_tablet_ids.end(), *tablet);
4693
0
      if (it != split_child_tablet_ids.end()) {
4694
0
        split_child_tablet_ids.erase(it);
4695
0
      }
4696
0
    }
4697
0
    if (found) {
4698
0
      break;
4699
0
    }
4700
0
  }
4701
4702
0
  if (!found) {
4703
    // Did not find the source tablet, but did find the children - means that we have already
4704
    // processed this SPLIT_OP, so for idempotency, we can return OK.
4705
0
    if (split_child_tablet_ids.empty()) {
4706
0
      LOG(INFO) << "Already processed this tablet split: " << req->DebugString();
4707
0
      return Status::OK();
4708
0
    }
4709
4710
    // When there are sequential SPLIT_OPs, we may try to reprocess an older SPLIT_OP. However, if
4711
    // one or both of those children have also already been split and processed, then we'll end up
4712
    // here (!found && !split_child_tablet_ids.empty()).
4713
    // This is alright, we can log a warning, and then continue (to not block later records).
4714
0
    LOG(WARNING)
4715
0
        << "Unable to find matching source tablet " << req->producer_split_tablet_info().tablet_id()
4716
0
        << " for universe " << req->producer_id() << " stream " << req->stream_id();
4717
4718
0
    return Status::OK();
4719
0
  }
4720
4721
  // Also make sure that we switch off of 1-1 mapping optimizations.
4722
0
  stream_entry->set_same_num_producer_consumer_tablets(false);
4723
4724
  // Also bump the cluster_config_ version so that changes are propagated to tservers (and new
4725
  // pollers are created for the new tablets).
4726
0
  l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
4727
4728
0
  RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config_),
4729
0
                            "Updating cluster config in sys-catalog"));
4730
0
  l.Commit();
4731
4732
0
  return Status::OK();
4733
0
}
4734
4735
11.3M
bool CatalogManager::IsTableCdcProducer(const TableInfo& table_info) const {
4736
11.3M
  auto it = cdc_stream_tables_count_map_.find(table_info.id());
4737
11.3M
  if (it == cdc_stream_tables_count_map_.end()) {
4738
11.3M
    return false;
4739
7
  } else {
4740
7
    auto tid = table_info.id();
4741
7
    std::vector<scoped_refptr<CDCStreamInfo>> streams;
4742
5
    for (const auto& entry : cdc_stream_map_) {
4743
5
      auto s = entry.second->LockForRead();
4744
      // for xCluster the first entry will be the table_id
4745
5
      if (s->table_id().Get(0) == tid) {
4746
5
        if (!(s->is_deleting() || s->is_deleted())) {
4747
5
          return it->second > 0;
4748
5
        }
4749
5
      }
4750
5
    }
4751
7
  }
4752
2
  return false;
4753
11.3M
}
4754
4755
11.3M
bool CatalogManager::IsTableCdcConsumer(const TableInfo& table_info) const {
4756
11.3M
  auto it = xcluster_consumer_tables_to_stream_map_.find(table_info.id());
4757
11.3M
  if (it == xcluster_consumer_tables_to_stream_map_.end()) {
4758
11.3M
    return false;
4759
11.3M
  }
4760
0
  return !it->second.empty();
4761
0
}
4762
4763
CatalogManager::XClusterConsumerTableStreamInfoMap
4764
42
    CatalogManager::GetXClusterStreamInfoForConsumerTable(const TableId& table_id) const {
4765
42
  SharedLock lock(mutex_);
4766
42
  auto it = xcluster_consumer_tables_to_stream_map_.find(table_id);
4767
42
  if (it == xcluster_consumer_tables_to_stream_map_.end()) {
4768
42
    return {};
4769
42
  }
4770
4771
0
  return it->second;
4772
0
}
4773
4774
bool CatalogManager::IsCdcEnabled(
4775
11.3M
    const TableInfo& table_info) const {
4776
11.3M
  SharedLock lock(mutex_);
4777
11.3M
  return IsTableCdcProducer(table_info) || IsTableCdcConsumer(table_info);
4778
11.3M
}
4779
4780
5.35k
void CatalogManager::Started() {
4781
5.35k
  snapshot_coordinator_.Start();
4782
5.35k
}
4783
4784
Result<SnapshotSchedulesToObjectIdsMap> CatalogManager::MakeSnapshotSchedulesToObjectIdsMap(
4785
7.99k
    SysRowEntryType type) {
4786
7.99k
  return snapshot_coordinator_.MakeSnapshotSchedulesToObjectIdsMap(type);
4787
7.99k
}
4788
4789
0
Result<bool> CatalogManager::IsTablePartOfSomeSnapshotSchedule(const TableInfo& table_info) {
4790
0
  return snapshot_coordinator_.IsTableCoveredBySomeSnapshotSchedule(table_info);
4791
0
}
4792
4793
2.00k
void CatalogManager::SysCatalogLoaded(int64_t term) {
4794
2.00k
  return snapshot_coordinator_.SysCatalogLoaded(term);
4795
2.00k
}
4796
4797
0
size_t CatalogManager::GetNumLiveTServersForActiveCluster() {
4798
0
  BlacklistSet blacklist = BlacklistSetFromPB();
4799
0
  TSDescriptorVector ts_descs;
4800
0
  master_->ts_manager()->GetAllLiveDescriptorsInCluster(&ts_descs, placement_uuid(), blacklist);
4801
0
  return ts_descs.size();
4802
0
}
4803
4804
}  // namespace enterprise
4805
}  // namespace master
4806
}  // namespace yb