YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/tools/yb-admin_client_ent.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/tools/yb-admin_cli.h"
14
#include "yb/tools/yb-admin_client.h"
15
16
#include <iostream>
17
18
#include <boost/algorithm/string.hpp>
19
20
#include <rapidjson/document.h>
21
22
#include "yb/cdc/cdc_service.h"
23
#include "yb/cdc/cdc_service.proxy.h"
24
#include "yb/client/client.h"
25
26
#include "yb/common/entity_ids.h"
27
#include "yb/common/json_util.h"
28
#include "yb/common/wire_protocol.h"
29
30
#include "yb/encryption/encryption_util.h"
31
32
#include "yb/gutil/casts.h"
33
#include "yb/gutil/strings/util.h"
34
35
#include "yb/master/master_defaults.h"
36
#include "yb/master/master_error.h"
37
#include "yb/master/master_backup.proxy.h"
38
#include "yb/master/master_client.proxy.h"
39
#include "yb/master/master_cluster.proxy.h"
40
#include "yb/master/master_ddl.proxy.h"
41
#include "yb/master/master_encryption.proxy.h"
42
#include "yb/master/master_replication.proxy.h"
43
44
#include "yb/rpc/messenger.h"
45
#include "yb/rpc/rpc_controller.h"
46
#include "yb/tools/yb-admin_util.h"
47
#include "yb/util/cast.h"
48
#include "yb/util/env.h"
49
#include "yb/util/flag_tags.h"
50
#include "yb/util/jsonwriter.h"
51
#include "yb/util/monotime.h"
52
#include "yb/util/pb_util.h"
53
#include "yb/util/physical_time.h"
54
#include "yb/util/protobuf_util.h"
55
#include "yb/util/string_case.h"
56
#include "yb/util/string_trim.h"
57
#include "yb/util/string_util.h"
58
#include "yb/util/timestamp.h"
59
#include "yb/util/format.h"
60
#include "yb/util/status_format.h"
61
62
DEFINE_test_flag(int32, metadata_file_format_version, 0,
63
                 "Used in 'export_snapshot' metadata file format (0 means using latest format).");
64
65
DECLARE_bool(use_client_to_server_encryption);
66
DECLARE_int32(yb_client_admin_operation_timeout_sec);
67
68
namespace yb {
69
namespace tools {
70
namespace enterprise {
71
72
using namespace std::literals;
73
74
using std::cout;
75
using std::endl;
76
using std::string;
77
using std::vector;
78
79
using google::protobuf::RepeatedPtrField;
80
81
using client::YBTableName;
82
using pb_util::ParseFromSlice;
83
using rpc::RpcController;
84
85
using master::ChangeEncryptionInfoRequestPB;
86
using master::ChangeEncryptionInfoResponsePB;
87
using master::CreateSnapshotRequestPB;
88
using master::CreateSnapshotResponsePB;
89
using master::DeleteSnapshotRequestPB;
90
using master::DeleteSnapshotResponsePB;
91
using master::IdPairPB;
92
using master::ImportSnapshotMetaRequestPB;
93
using master::ImportSnapshotMetaResponsePB;
94
using master::ImportSnapshotMetaResponsePB_TableMetaPB;
95
using master::IsCreateTableDoneRequestPB;
96
using master::IsCreateTableDoneResponsePB;
97
using master::ListSnapshotRestorationsRequestPB;
98
using master::ListSnapshotRestorationsResponsePB;
99
using master::ListSnapshotsRequestPB;
100
using master::ListSnapshotsResponsePB;
101
using master::ListTablesRequestPB;
102
using master::ListTablesResponsePB;
103
using master::ListTabletServersRequestPB;
104
using master::ListTabletServersResponsePB;
105
using master::RestoreSnapshotRequestPB;
106
using master::RestoreSnapshotResponsePB;
107
using master::SnapshotInfoPB;
108
using master::SysNamespaceEntryPB;
109
using master::SysRowEntry;
110
using master::SysRowEntryType;
111
using master::BackupRowEntryPB;
112
using master::SysTablesEntryPB;
113
using master::SysSnapshotEntryPB;
114
115
PB_ENUM_FORMATTERS(yb::master::SysSnapshotEntryPB::State);
116
117
1
Status ClusterAdminClient::ListSnapshots(const ListSnapshotsFlags& flags) {
118
1
  ListSnapshotsResponsePB resp;
119
1
  RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) {
120
1
    ListSnapshotsRequestPB req;
121
1
    req.set_list_deleted_snapshots(flags.Test(ListSnapshotsFlag::SHOW_DELETED));
122
1
    return master_backup_proxy_->ListSnapshots(req, &resp, rpc);
123
1
  }));
124
125
1
  rapidjson::Document document(rapidjson::kObjectType);
126
1
  bool json = flags.Test(ListSnapshotsFlag::JSON);
127
128
1
  if (resp.has_current_snapshot_id()) {
129
0
    if (json) {
130
0
      AddStringField("current_snapshot_id",
131
0
                     SnapshotIdToString(resp.current_snapshot_id()),
132
0
                     &document, &document.GetAllocator());
133
0
    } else {
134
0
      cout << "Current snapshot id: " << SnapshotIdToString(resp.current_snapshot_id()) << endl;
135
0
    }
136
0
  }
137
138
1
  rapidjson::Value json_snapshots(rapidjson::kArrayType);
139
1
  if (!json) {
140
1
    if (resp.snapshots_size()) {
141
      // Using 2 tabs so that the header can be aligned to the time.
142
1
      cout << RightPadToUuidWidth("Snapshot UUID") << kColumnSep
143
1
           << "State" << kColumnSep << kColumnSep << "Creation Time" << endl;
144
1
    } else {
145
0
      cout << "No snapshots" << endl;
146
0
    }
147
1
  }
148
149
1
  for (SnapshotInfoPB& snapshot : *resp.mutable_snapshots()) {
150
1
    rapidjson::Value json_snapshot(rapidjson::kObjectType);
151
1
    if (json) {
152
0
      AddStringField(
153
0
          "id", SnapshotIdToString(snapshot.id()), &json_snapshot, &document.GetAllocator());
154
0
      const auto& entry = snapshot.entry();
155
0
      AddStringField(
156
0
          "state", SysSnapshotEntryPB::State_Name(entry.state()), &json_snapshot,
157
0
          &document.GetAllocator());
158
0
      AddStringField(
159
0
          "snapshot_time", HybridTimeToString(HybridTime::FromPB(entry.snapshot_hybrid_time())),
160
0
          &json_snapshot, &document.GetAllocator());
161
0
      AddStringField(
162
0
          "previous_snapshot_time",
163
0
          HybridTimeToString(HybridTime::FromPB(entry.previous_snapshot_hybrid_time())),
164
0
          &json_snapshot, &document.GetAllocator());
165
1
    } else {
166
1
      cout << SnapshotIdToString(snapshot.id()) << kColumnSep
167
1
           << snapshot.entry().state() << kColumnSep
168
1
           << HybridTimeToString(HybridTime::FromPB(snapshot.entry().snapshot_hybrid_time()))
169
1
           << endl;
170
1
    }
171
172
    // Not implemented in json mode.
173
1
    if (flags.Test(ListSnapshotsFlag::SHOW_DETAILS)) {
174
7
      for (SysRowEntry& entry : *snapshot.mutable_entry()->mutable_entries()) {
175
7
        string decoded_data;
176
7
        switch (entry.type()) {
177
1
          case SysRowEntryType::NAMESPACE: {
178
1
            auto meta = VERIFY_RESULT(ParseFromSlice<SysNamespaceEntryPB>(entry.data()));
179
0
            meta.clear_transaction();
180
1
            decoded_data = JsonWriter::ToJson(meta, JsonWriter::COMPACT);
181
1
            break;
182
1
          }
183
2
          case SysRowEntryType::TABLE: {
184
2
            auto meta = VERIFY_RESULT(ParseFromSlice<SysTablesEntryPB>(entry.data()));
185
0
            meta.clear_schema();
186
2
            meta.clear_partition_schema();
187
2
            meta.clear_index_info();
188
2
            meta.clear_indexes();
189
2
            meta.clear_transaction();
190
2
            decoded_data = JsonWriter::ToJson(meta, JsonWriter::COMPACT);
191
2
            break;
192
2
          }
193
4
          default:
194
4
            break;
195
7
        }
196
197
7
        if (!decoded_data.empty()) {
198
3
          entry.set_data("DATA");
199
3
          cout << kColumnSep << StringReplace(JsonWriter::ToJson(entry, JsonWriter::COMPACT),
200
3
                                              "\"DATA\"", decoded_data, false) << endl;
201
3
        }
202
7
      }
203
1
    }
204
1
    if (json) {
205
0
      json_snapshots.PushBack(json_snapshot, document.GetAllocator());
206
0
    }
207
1
  }
208
209
1
  ListSnapshotRestorationsResponsePB rest_resp;
210
1
  RETURN_NOT_OK(RequestMasterLeader(&rest_resp, [&](RpcController* rpc) {
211
1
    ListSnapshotRestorationsRequestPB rest_req;
212
1
    return master_backup_proxy_->ListSnapshotRestorations(rest_req, &rest_resp, rpc);
213
1
  }));
214
215
1
  if (json) {
216
0
    document.AddMember("snapshots", json_snapshots, document.GetAllocator());
217
0
    std::cout << common::PrettyWriteRapidJsonToString(document) << std::endl;
218
0
    return Status::OK();
219
0
  }
220
221
1
  if (rest_resp.restorations_size() == 0) {
222
1
    cout << "No snapshot restorations" << endl;
223
1
  } else 
if (0
flags.Test(ListSnapshotsFlag::NOT_SHOW_RESTORED)0
) {
224
0
    cout << "Not show fully RESTORED entries" << endl;
225
0
  }
226
227
1
  bool title_printed = false;
228
1
  for (const auto& restoration : rest_resp.restorations()) {
229
0
    if (!flags.Test(ListSnapshotsFlag::NOT_SHOW_RESTORED) ||
230
0
        restoration.entry().state() != SysSnapshotEntryPB::RESTORED) {
231
0
      if (!title_printed) {
232
0
        cout << RightPadToUuidWidth("Restoration UUID") << kColumnSep << "State" << endl;
233
0
        title_printed = true;
234
0
      }
235
0
      cout << TryFullyDecodeTxnSnapshotRestorationId(restoration.id()) << kColumnSep
236
0
           << restoration.entry().state() << endl;
237
0
    }
238
0
  }
239
240
1
  return Status::OK();
241
1
}
242
243
Status ClusterAdminClient::CreateSnapshot(
244
    const vector<YBTableName>& tables,
245
    const bool add_indexes,
246
2
    const int flush_timeout_secs) {
247
2
  if (flush_timeout_secs > 0) {
248
2
    const auto status = FlushTables(tables, add_indexes, flush_timeout_secs, false);
249
2
    if (status.IsTimedOut()) {
250
0
      cout << status.ToString(false) << " (ignored)" << endl;
251
2
    } else if (!status.ok() && 
!status.IsNotFound()0
) {
252
0
      return status;
253
0
    }
254
2
  }
255
256
2
  CreateSnapshotResponsePB resp;
257
2
  RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) {
258
2
    CreateSnapshotRequestPB req;
259
2
    for (const YBTableName& table_name : tables) {
260
2
      table_name.SetIntoTableIdentifierPB(req.add_tables());
261
2
    }
262
263
2
    req.set_add_indexes(add_indexes);
264
2
    req.set_transaction_aware(true);
265
2
    return master_backup_proxy_->CreateSnapshot(req, &resp, rpc);
266
2
  }));
267
268
2
  cout << "Started snapshot creation: " << SnapshotIdToString(resp.snapshot_id()) << endl;
269
2
  return Status::OK();
270
2
}
271
272
0
Status ClusterAdminClient::CreateNamespaceSnapshot(const TypedNamespaceName& ns) {
273
0
  ListTablesResponsePB resp;
274
0
  RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) {
275
0
    ListTablesRequestPB req;
276
277
0
    req.mutable_namespace_()->set_name(ns.name);
278
0
    req.mutable_namespace_()->set_database_type(ns.db_type);
279
0
    req.set_exclude_system_tables(true);
280
0
    req.add_relation_type_filter(master::USER_TABLE_RELATION);
281
0
    req.add_relation_type_filter(master::INDEX_TABLE_RELATION);
282
0
    return master_ddl_proxy_->ListTables(req, &resp, rpc);
283
0
  }));
284
285
0
  if (resp.tables_size() == 0) {
286
0
    return STATUS_FORMAT(InvalidArgument, "No tables found in namespace: $0", ns.name);
287
0
  }
288
289
0
  vector<YBTableName> tables(resp.tables_size());
290
0
  for (int i = 0; i < resp.tables_size(); ++i) {
291
0
    const auto& table = resp.tables(i);
292
0
    tables[i].set_table_id(table.id());
293
0
    tables[i].set_namespace_id(table.namespace_().id());
294
0
    tables[i].set_pgschema_name(table.pgschema_name());
295
296
0
    RSTATUS_DCHECK(table.relation_type() == master::USER_TABLE_RELATION ||
297
0
            table.relation_type() == master::INDEX_TABLE_RELATION, InternalError,
298
0
            Format("Invalid relation type: $0", table.relation_type()));
299
0
    RSTATUS_DCHECK_EQ(table.namespace_().name(), ns.name, InternalError,
300
0
               Format("Invalid namespace name: $0", table.namespace_().name()));
301
0
    RSTATUS_DCHECK_EQ(table.namespace_().database_type(), ns.db_type, InternalError,
302
0
               Format("Invalid namespace type: $0",
303
0
                      YQLDatabase_Name(table.namespace_().database_type())));
304
0
  }
305
306
0
  return CreateSnapshot(tables, /* add_indexes */ false);
307
0
}
308
309
Result<rapidjson::Document> ClusterAdminClient::ListSnapshotRestorations(
310
5
    const TxnSnapshotRestorationId& restoration_id) {
311
5
  master::ListSnapshotRestorationsResponsePB resp;
312
5
  RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) {
313
5
    master::ListSnapshotRestorationsRequestPB req;
314
5
    if (restoration_id) {
315
5
      req.set_restoration_id(restoration_id.data(), restoration_id.size());
316
5
    }
317
5
    return master_backup_proxy_->ListSnapshotRestorations(req, &resp, rpc);
318
5
  }));
319
320
5
  rapidjson::Document result;
321
5
  result.SetObject();
322
5
  rapidjson::Value json_restorations(rapidjson::kArrayType);
323
5
  for (const auto& restoration : resp.restorations()) {
324
5
    rapidjson::Value json_restoration(rapidjson::kObjectType);
325
5
    AddStringField("id",
326
5
                   VERIFY_RESULT(FullyDecodeTxnSnapshotRestorationId(restoration.id())).ToString(),
327
0
                   &json_restoration, &result.GetAllocator());
328
0
    AddStringField(
329
5
        "snapshot_id",
330
5
        VERIFY_RESULT(FullyDecodeTxnSnapshotId(restoration.entry().snapshot_id())).ToString(),
331
0
        &json_restoration, &result.GetAllocator());
332
0
    AddStringField(
333
5
        "state",
334
5
        master::SysSnapshotEntryPB_State_Name(restoration.entry().state()),
335
5
        &json_restoration, &result.GetAllocator());
336
5
    json_restorations.PushBack(json_restoration, result.GetAllocator());
337
5
  }
338
5
  result.AddMember("restorations", json_restorations, result.GetAllocator());
339
5
  return result;
340
5
}
341
342
Result<rapidjson::Document> ClusterAdminClient::CreateSnapshotSchedule(
343
8
    const client::YBTableName& keyspace, MonoDelta interval, MonoDelta retention) {
344
8
  master::CreateSnapshotScheduleResponsePB resp;
345
8
  RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) {
346
8
    master::CreateSnapshotScheduleRequestPB req;
347
348
8
    auto& options = *req.mutable_options();
349
8
    auto& filter_tables = *options.mutable_filter()->mutable_tables()->mutable_tables();
350
8
    keyspace.SetIntoTableIdentifierPB(filter_tables.Add());
351
352
8
    options.set_interval_sec(interval.ToSeconds());
353
8
    options.set_retention_duration_sec(retention.ToSeconds());
354
8
    return master_backup_proxy_->CreateSnapshotSchedule(req, &resp, rpc);
355
8
  }));
356
357
8
  rapidjson::Document document;
358
8
  document.SetObject();
359
360
8
  AddStringField(
361
8
      "schedule_id",
362
8
      VERIFY_RESULT(FullyDecodeSnapshotScheduleId(resp.snapshot_schedule_id())).ToString(),
363
0
      &document, &document.GetAllocator());
364
0
  return document;
365
8
}
366
367
Result<rapidjson::Document> ClusterAdminClient::ListSnapshotSchedules(
368
8
    const SnapshotScheduleId& schedule_id) {
369
8
  master::ListSnapshotSchedulesResponsePB resp;
370
8
  RETURN_NOT_OK(RequestMasterLeader(&resp, [this, &resp, &schedule_id](RpcController* rpc) {
371
8
    master::ListSnapshotSchedulesRequestPB req;
372
8
    if (schedule_id) {
373
8
      req.set_snapshot_schedule_id(schedule_id.data(), schedule_id.size());
374
8
    }
375
8
    return master_backup_proxy_->ListSnapshotSchedules(req, &resp, rpc);
376
8
  }));
377
378
8
  rapidjson::Document result;
379
8
  result.SetObject();
380
8
  rapidjson::Value json_schedules(rapidjson::kArrayType);
381
8
  for (const auto& schedule : resp.schedules()) {
382
8
    rapidjson::Value json_schedule(rapidjson::kObjectType);
383
8
    AddStringField("id", VERIFY_RESULT(FullyDecodeSnapshotScheduleId(schedule.id())).ToString(),
384
0
                   &json_schedule, &result.GetAllocator());
385
386
0
    const auto& filter = schedule.options().filter();
387
8
    string filter_output;
388
    // The user input should only have 1 entry, at namespace level.
389
8
    if (filter.tables().tables_size() == 1) {
390
8
      const auto& table_id = filter.tables().tables(0);
391
8
      if (table_id.has_namespace_()) {
392
8
        string database_type;
393
8
        if (table_id.namespace_().database_type() == YQL_DATABASE_PGSQL) {
394
2
          database_type = "ysql";
395
6
        } else if (table_id.namespace_().database_type() == YQL_DATABASE_CQL) {
396
6
          database_type = "ycql";
397
6
        }
398
8
        if (!database_type.empty()) {
399
8
          filter_output = Format("$0.$1", database_type, table_id.namespace_().name());
400
8
        }
401
8
      }
402
8
    }
403
    // If the user input was non standard, just display the whole debug PB.
404
8
    if (filter_output.empty()) {
405
0
      filter_output = filter.ShortDebugString();
406
0
      DCHECK(false) << "Non standard filter " << filter_output;
407
0
    }
408
8
    rapidjson::Value options(rapidjson::kObjectType);
409
8
    AddStringField("filter", filter_output, &options, &result.GetAllocator());
410
8
    auto interval_min = schedule.options().interval_sec() / MonoTime::kSecondsPerMinute;
411
8
    AddStringField("interval",
412
8
                   Format("$0 min", interval_min),
413
8
                   &options, &result.GetAllocator());
414
8
    auto retention_min = schedule.options().retention_duration_sec() / MonoTime::kSecondsPerMinute;
415
8
    AddStringField("retention",
416
8
                   Format("$0 min", retention_min),
417
8
                   &options, &result.GetAllocator());
418
8
    auto delete_time = HybridTime::FromPB(schedule.options().delete_time());
419
8
    if (delete_time) {
420
0
      AddStringField("delete_time", HybridTimeToString(delete_time), &options,
421
0
                     &result.GetAllocator());
422
0
    }
423
424
8
    json_schedule.AddMember("options", options, result.GetAllocator());
425
8
    rapidjson::Value json_snapshots(rapidjson::kArrayType);
426
9
    for (const auto& snapshot : schedule.snapshots()) {
427
9
      rapidjson::Value json_snapshot(rapidjson::kObjectType);
428
9
      AddStringField("id", VERIFY_RESULT(FullyDecodeTxnSnapshotId(snapshot.id())).ToString(),
429
0
                     &json_snapshot, &result.GetAllocator());
430
0
      auto snapshot_ht = HybridTime::FromPB(snapshot.entry().snapshot_hybrid_time());
431
9
      AddStringField("snapshot_time",
432
9
                     HybridTimeToString(snapshot_ht),
433
9
                     &json_snapshot, &result.GetAllocator());
434
9
      auto previous_snapshot_ht = HybridTime::FromPB(
435
9
          snapshot.entry().previous_snapshot_hybrid_time());
436
9
      if (previous_snapshot_ht) {
437
1
        AddStringField(
438
1
            "previous_snapshot_time",
439
1
            HybridTimeToString(previous_snapshot_ht),
440
1
            &json_snapshot, &result.GetAllocator());
441
1
      }
442
9
      json_snapshots.PushBack(json_snapshot, result.GetAllocator());
443
9
    }
444
8
    json_schedule.AddMember("snapshots", json_snapshots, result.GetAllocator());
445
8
    json_schedules.PushBack(json_schedule, result.GetAllocator());
446
8
  }
447
8
  result.AddMember("schedules", json_schedules, result.GetAllocator());
448
8
  return result;
449
8
}
450
451
Result<rapidjson::Document> ClusterAdminClient::DeleteSnapshotSchedule(
452
0
    const SnapshotScheduleId& schedule_id) {
453
0
  master::DeleteSnapshotScheduleResponsePB resp;
454
0
  RETURN_NOT_OK(RequestMasterLeader(&resp, [this, &resp, &schedule_id](RpcController* rpc) {
455
0
    master::DeleteSnapshotScheduleRequestPB req;
456
0
    req.set_snapshot_schedule_id(schedule_id.data(), schedule_id.size());
457
458
0
    return master_backup_proxy_->DeleteSnapshotSchedule(req, &resp, rpc);
459
0
  }));
460
461
0
  rapidjson::Document document;
462
0
  document.SetObject();
463
0
  AddStringField("schedule_id", schedule_id.ToString(), &document, &document.GetAllocator());
464
0
  return document;
465
0
}
466
467
10
bool SnapshotSuitableForRestoreAt(const SysSnapshotEntryPB& entry, HybridTime restore_at) {
468
10
  return (entry.state() == master::SysSnapshotEntryPB::COMPLETE ||
469
10
          
entry.state() == master::SysSnapshotEntryPB::CREATING0
) &&
470
10
         HybridTime::FromPB(entry.snapshot_hybrid_time()) >= restore_at &&
471
10
         
HybridTime::FromPB(entry.previous_snapshot_hybrid_time()) < restore_at5
;
472
10
}
473
474
Result<TxnSnapshotId> ClusterAdminClient::SuitableSnapshotId(
475
3
    const SnapshotScheduleId& schedule_id, HybridTime restore_at, CoarseTimePoint deadline) {
476
3
  for (;;) {
477
3
    auto last_snapshot_time = HybridTime::kMin;
478
3
    {
479
3
      RpcController rpc;
480
3
      rpc.set_deadline(deadline);
481
3
      master::ListSnapshotSchedulesRequestPB req;
482
3
      master::ListSnapshotSchedulesResponsePB resp;
483
3
      if (schedule_id) {
484
3
        req.set_snapshot_schedule_id(schedule_id.data(), schedule_id.size());
485
3
      }
486
487
3
      RETURN_NOT_OK_PREPEND(master_backup_proxy_->ListSnapshotSchedules(req, &resp, &rpc),
488
3
                            "Failed to list snapshot schedules");
489
490
3
      if (resp.has_error()) {
491
0
        return StatusFromPB(resp.error().status());
492
0
      }
493
494
3
      if (resp.schedules().size() < 1) {
495
0
        return STATUS_FORMAT(InvalidArgument, "Unknown schedule: $0", schedule_id);
496
0
      }
497
498
7
      
for (const auto& snapshot : resp.schedules()[0].snapshots())3
{
499
7
        auto snapshot_hybrid_time = HybridTime::FromPB(snapshot.entry().snapshot_hybrid_time());
500
7
        last_snapshot_time = std::max(last_snapshot_time, snapshot_hybrid_time);
501
7
        if (SnapshotSuitableForRestoreAt(snapshot.entry(), restore_at)) {
502
2
          return VERIFY_RESULT(FullyDecodeTxnSnapshotId(snapshot.id()));
503
2
        }
504
7
      }
505
1
      if (last_snapshot_time > restore_at) {
506
0
        return STATUS_FORMAT(
507
0
            IllegalState, "Cannot restore at $0, last snapshot: $1, snapshots: $2",
508
0
            restore_at, last_snapshot_time, resp.schedules()[0].snapshots());
509
0
      }
510
1
    }
511
1
    RpcController rpc;
512
1
    rpc.set_deadline(deadline);
513
1
    master::CreateSnapshotRequestPB req;
514
1
    master::CreateSnapshotResponsePB resp;
515
1
    req.set_schedule_id(schedule_id.data(), schedule_id.size());
516
1
    RETURN_NOT_OK_PREPEND(master_backup_proxy_->CreateSnapshot(req, &resp, &rpc),
517
1
                          "Failed to create snapshot");
518
1
    if (resp.has_error()) {
519
0
      auto status = StatusFromPB(resp.error().status());
520
0
      if (master::MasterError(status) == master::MasterErrorPB::PARALLEL_SNAPSHOT_OPERATION) {
521
0
        std::this_thread::sleep_until(std::min(deadline, CoarseMonoClock::now() + 1s));
522
0
        continue;
523
0
      }
524
0
      return status;
525
0
    }
526
1
    return FullyDecodeTxnSnapshotId(resp.snapshot_id());
527
1
  }
528
3
}
529
530
Result<rapidjson::Document> ClusterAdminClient::RestoreSnapshotSchedule(
531
3
    const SnapshotScheduleId& schedule_id, HybridTime restore_at) {
532
3
  auto deadline = CoarseMonoClock::now() + timeout_;
533
534
3
  auto snapshot_id = VERIFY_RESULT(SuitableSnapshotId(schedule_id, restore_at, deadline));
535
536
8
  for (;;) {
537
8
    RpcController rpc;
538
8
    rpc.set_deadline(deadline);
539
8
    master::ListSnapshotsRequestPB req;
540
8
    req.set_snapshot_id(snapshot_id.data(), snapshot_id.size());
541
8
    master::ListSnapshotsResponsePB resp;
542
8
    RETURN_NOT_OK_PREPEND(master_backup_proxy_->ListSnapshots(req, &resp, &rpc),
543
8
                          "Failed to list snapshots");
544
8
    if (resp.has_error()) {
545
0
      return StatusFromPB(resp.error().status());
546
0
    }
547
8
    if (resp.snapshots().size() != 1) {
548
0
      return STATUS_FORMAT(
549
0
          IllegalState, "Wrong number of snapshots received $0", resp.snapshots().size());
550
0
    }
551
8
    if (resp.snapshots()[0].entry().state() == master::SysSnapshotEntryPB::COMPLETE) {
552
3
      if (SnapshotSuitableForRestoreAt(resp.snapshots()[0].entry(), restore_at)) {
553
3
        break;
554
3
      }
555
0
      return STATUS_FORMAT(
556
3
          IllegalState, "Snapshot is not suitable for restore at $0", restore_at);
557
3
    }
558
5
    auto now = CoarseMonoClock::now();
559
5
    if (now >= deadline) {
560
0
      return STATUS_FORMAT(
561
0
          TimedOut, "Timed out to complete a snapshot $0", snapshot_id);
562
0
    }
563
5
    std::this_thread::sleep_until(std::min(deadline, now + 100ms));
564
5
  }
565
566
3
  RpcController rpc;
567
3
  rpc.set_deadline(deadline);
568
3
  RestoreSnapshotRequestPB req;
569
3
  RestoreSnapshotResponsePB resp;
570
3
  req.set_snapshot_id(snapshot_id.data(), snapshot_id.size());
571
3
  req.set_restore_ht(restore_at.ToUint64());
572
3
  RETURN_NOT_OK_PREPEND(master_backup_proxy_->RestoreSnapshot(req, &resp, &rpc),
573
3
                        "Failed to restore snapshot");
574
575
3
  if (resp.has_error()) {
576
0
    return StatusFromPB(resp.error().status());
577
0
  }
578
579
3
  auto restoration_id = VERIFY_RESULT(FullyDecodeTxnSnapshotRestorationId(resp.restoration_id()));
580
581
0
  rapidjson::Document document;
582
3
  document.SetObject();
583
584
3
  AddStringField("snapshot_id", snapshot_id.ToString(), &document, &document.GetAllocator());
585
3
  AddStringField("restoration_id", restoration_id.ToString(), &document, &document.GetAllocator());
586
587
3
  return document;
588
3
}
589
590
Status ClusterAdminClient::RestoreSnapshot(const string& snapshot_id,
591
0
                                           HybridTime timestamp) {
592
0
  RestoreSnapshotResponsePB resp;
593
0
  RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) {
594
0
    RestoreSnapshotRequestPB req;
595
0
    req.set_snapshot_id(StringToSnapshotId(snapshot_id));
596
0
    if (timestamp) {
597
0
      req.set_restore_ht(timestamp.ToUint64());
598
0
    }
599
0
    return master_backup_proxy_->RestoreSnapshot(req, &resp, rpc);
600
0
  }));
601
602
0
  cout << "Started restoring snapshot: " << snapshot_id << endl
603
0
       << "Restoration id: " << FullyDecodeTxnSnapshotRestorationId(resp.restoration_id()) << endl;
604
0
  if (timestamp) {
605
0
    cout << "Restore at: " << timestamp << endl;
606
0
  }
607
0
  return Status::OK();
608
0
}
609
610
0
Status ClusterAdminClient::DeleteSnapshot(const std::string& snapshot_id) {
611
0
  DeleteSnapshotResponsePB resp;
612
0
  RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) {
613
0
    DeleteSnapshotRequestPB req;
614
0
    req.set_snapshot_id(StringToSnapshotId(snapshot_id));
615
0
    return master_backup_proxy_->DeleteSnapshot(req, &resp, rpc);
616
0
  }));
617
618
0
  cout << "Deleted snapshot: " << snapshot_id << endl;
619
0
  return Status::OK();
620
0
}
621
622
Status ClusterAdminClient::CreateSnapshotMetaFile(const string& snapshot_id,
623
0
                                                  const string& file_name) {
624
0
  ListSnapshotsResponsePB resp;
625
0
  RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) {
626
0
    ListSnapshotsRequestPB req;
627
0
    req.set_snapshot_id(StringToSnapshotId(snapshot_id));
628
629
    // Format 0 - latest format (== Format 2 at the moment).
630
    // Format -1 - old format (no 'namespace_name' in the Table entry).
631
    // Format 1 - old format.
632
    // Format 2 - new format.
633
0
    if (FLAGS_TEST_metadata_file_format_version == 0 ||
634
0
        FLAGS_TEST_metadata_file_format_version >= 2) {
635
0
      req.set_prepare_for_backup(true);
636
0
    }
637
0
    return master_backup_proxy_->ListSnapshots(req, &resp, rpc);
638
0
  }));
639
640
0
  if (resp.snapshots_size() > 1) {
641
0
    LOG(WARNING) << "Requested snapshot metadata for snapshot '" << snapshot_id << "', but got "
642
0
                 << resp.snapshots_size() << " snapshots in the response";
643
0
  }
644
645
0
  SnapshotInfoPB* snapshot = nullptr;
646
0
  for (SnapshotInfoPB& snapshot_entry : *resp.mutable_snapshots()) {
647
0
    if (SnapshotIdToString(snapshot_entry.id()) == snapshot_id) {
648
0
      snapshot = &snapshot_entry;
649
0
      break;
650
0
    }
651
0
  }
652
0
  if (!snapshot) {
653
0
    return STATUS_FORMAT(
654
0
        InternalError, "Response contained $0 entries but no entry for snapshot '$1'",
655
0
        resp.snapshots_size(), snapshot_id);
656
0
  }
657
658
0
  if (FLAGS_TEST_metadata_file_format_version == -1) {
659
    // Remove 'namespace_name' from SysTablesEntryPB.
660
0
    SysSnapshotEntryPB& sys_entry = *snapshot->mutable_entry();
661
0
    for (SysRowEntry& entry : *sys_entry.mutable_entries()) {
662
0
      if (entry.type() == SysRowEntryType::TABLE) {
663
0
        auto meta = VERIFY_RESULT(ParseFromSlice<SysTablesEntryPB>(entry.data()));
664
0
        meta.clear_namespace_name();
665
0
        entry.set_data(meta.SerializeAsString());
666
0
      }
667
0
    }
668
0
  }
669
670
0
  cout << "Exporting snapshot " << snapshot_id << " ("
671
0
       << snapshot->entry().state() << ") to file " << file_name << endl;
672
673
  // Serialize snapshot protobuf to given path.
674
0
  RETURN_NOT_OK(pb_util::WritePBContainerToPath(
675
0
      Env::Default(), file_name, *snapshot, pb_util::OVERWRITE, pb_util::SYNC));
676
677
0
  cout << "Snapshot metadata was saved into file: " << file_name << endl;
678
0
  return Status::OK();
679
0
}
680
681
Status ClusterAdminClient::ImportSnapshotMetaFile(const string& file_name,
682
                                                  const TypedNamespaceName& keyspace,
683
0
                                                  const vector<YBTableName>& tables) {
684
0
  cout << "Read snapshot meta file " << file_name << endl;
685
686
0
  ImportSnapshotMetaRequestPB req;
687
688
0
  SnapshotInfoPB* const snapshot_info = req.mutable_snapshot();
689
690
  // Read snapshot protobuf from given path.
691
0
  RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(Env::Default(), file_name, snapshot_info));
692
693
0
  if (!snapshot_info->has_format_version()) {
694
0
    SCHECK_EQ(
695
0
        0, snapshot_info->backup_entries_size(), InvalidArgument,
696
0
        Format("Metadata file in Format 1 has backup entries from Format 2: $0",
697
0
        snapshot_info->backup_entries_size()));
698
699
    // Repack PB data loaded in the old format.
700
    // Init BackupSnapshotPB based on loaded SnapshotInfoPB.
701
0
    SysSnapshotEntryPB& sys_entry = *snapshot_info->mutable_entry();
702
0
    snapshot_info->mutable_backup_entries()->Reserve(sys_entry.entries_size());
703
0
    for (SysRowEntry& entry : *sys_entry.mutable_entries()) {
704
0
      snapshot_info->add_backup_entries()->mutable_entry()->Swap(&entry);
705
0
    }
706
707
0
    sys_entry.clear_entries();
708
0
    snapshot_info->set_format_version(2);
709
0
  }
710
711
0
  cout << "Importing snapshot " << SnapshotIdToString(snapshot_info->id())
712
0
       << " (" << snapshot_info->entry().state() << ")" << endl;
713
714
0
  size_t table_index = 0;
715
0
  bool was_table_renamed = false;
716
0
  for (BackupRowEntryPB& backup_entry : *snapshot_info->mutable_backup_entries()) {
717
0
    SysRowEntry& entry = *backup_entry.mutable_entry();
718
0
    const YBTableName table_name = table_index < tables.size()
719
0
        ? tables[table_index] : YBTableName();
720
721
0
    switch (entry.type()) {
722
0
      case SysRowEntryType::NAMESPACE: {
723
0
        auto meta = VERIFY_RESULT(ParseFromSlice<SysNamespaceEntryPB>(entry.data()));
724
725
0
        if (!keyspace.name.empty() && keyspace.name != meta.name()) {
726
0
          meta.set_name(keyspace.name);
727
0
          entry.set_data(meta.SerializeAsString());
728
0
        }
729
0
        break;
730
0
      }
731
0
      case SysRowEntryType::TABLE: {
732
0
        if (was_table_renamed && table_name.empty()) {
733
          // Renaming is allowed for all tables OR for no one table.
734
0
          return STATUS_FORMAT(InvalidArgument,
735
0
                               "There is no name for table (including indexes) number: $0",
736
0
                               table_index);
737
0
        }
738
739
0
        auto meta = VERIFY_RESULT(ParseFromSlice<SysTablesEntryPB>(entry.data()));
740
741
0
        bool update_meta = false;
742
0
        if (!table_name.empty() && table_name.table_name() != meta.name()) {
743
0
          meta.set_name(table_name.table_name());
744
0
          update_meta = true;
745
0
          was_table_renamed = true;
746
0
        }
747
0
        if (!keyspace.name.empty() && keyspace.name != meta.namespace_name()) {
748
0
          meta.set_namespace_name(keyspace.name);
749
0
          update_meta = true;
750
0
        }
751
752
0
        if (meta.name().empty()) {
753
0
          return STATUS(IllegalState, "Could not find table name from snapshot metadata");
754
0
        }
755
756
        // Update the table name if needed.
757
0
        if (update_meta) {
758
0
          entry.set_data(meta.SerializeAsString());
759
0
        }
760
761
0
        const auto colocated_prefix = meta.colocated() ? "colocated " : "";
762
763
0
        if (meta.indexed_table_id().empty()) {
764
0
          cout << "Table type: " << colocated_prefix << "table" << endl;
765
0
        } else {
766
0
          cout << "Table type: " << colocated_prefix << "index (attaching to the old table id "
767
0
               << meta.indexed_table_id() << ")" << endl;
768
0
        }
769
770
0
        if (!table_name.empty()) {
771
0
          DCHECK(table_name.has_namespace());
772
0
          DCHECK(table_name.has_table());
773
0
          cout << "Target imported " << colocated_prefix << "table name: "
774
0
               << table_name.ToString() << endl;
775
0
        } else if (!keyspace.name.empty()) {
776
0
          cout << "Target imported " << colocated_prefix << "table name: "
777
0
               << keyspace.name << "." << meta.name() << endl;
778
0
        }
779
780
0
        cout << (meta.colocated() ? "Colocated t" : "T") << "able being imported: "
781
0
             << (meta.namespace_name().empty() ?
782
0
                 "[" + meta.namespace_id() + "]" : meta.namespace_name())
783
0
             << "." << meta.name() << endl;
784
0
        ++table_index;
785
0
        break;
786
0
      }
787
0
      default:
788
0
        break;
789
0
    }
790
0
  }
791
792
0
  ImportSnapshotMetaResponsePB resp;
793
0
  RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) {
794
0
    return master_backup_proxy_->ImportSnapshotMeta(req, &resp, rpc);
795
0
  }));
796
797
0
  const int kObjectColumnWidth = 16;
798
0
  const auto pad_object_type = [](const string& s) {
799
0
    return RightPadToWidth(s, kObjectColumnWidth);
800
0
  };
801
802
0
  cout << "Successfully applied snapshot." << endl
803
0
       << pad_object_type("Object") << kColumnSep
804
0
       << RightPadToUuidWidth("Old ID") << kColumnSep
805
0
       << RightPadToUuidWidth("New ID") << endl;
806
807
0
  const RepeatedPtrField<ImportSnapshotMetaResponsePB_TableMetaPB>& tables_meta =
808
0
      resp.tables_meta();
809
0
  CreateSnapshotRequestPB snapshot_req;
810
0
  CreateSnapshotResponsePB snapshot_resp;
811
812
0
  for (int i = 0; i < tables_meta.size(); ++i) {
813
0
    const ImportSnapshotMetaResponsePB_TableMetaPB& table_meta = tables_meta.Get(i);
814
0
    const string& new_table_id = table_meta.table_ids().new_id();
815
816
0
    cout << pad_object_type("Keyspace") << kColumnSep
817
0
         << table_meta.namespace_ids().old_id() << kColumnSep
818
0
         << table_meta.namespace_ids().new_id() << endl;
819
820
0
    if (!ImportSnapshotMetaResponsePB_TableType_IsValid(table_meta.table_type())) {
821
0
      return STATUS_FORMAT(InternalError, "Found unknown table type: ", table_meta.table_type());
822
0
    }
823
824
0
    const string table_type =
825
0
        AllCapsToCamelCase(ImportSnapshotMetaResponsePB_TableType_Name(table_meta.table_type()));
826
0
    cout << pad_object_type(table_type) << kColumnSep
827
0
         << table_meta.table_ids().old_id() << kColumnSep
828
0
         << new_table_id << endl;
829
830
0
    const RepeatedPtrField<IdPairPB>& tablets_map = table_meta.tablets_ids();
831
0
    for (int j = 0; j < tablets_map.size(); ++j) {
832
0
      const IdPairPB& pair = tablets_map.Get(j);
833
0
      cout << pad_object_type(Format("Tablet $0", j)) << kColumnSep
834
0
           << pair.old_id() << kColumnSep
835
0
           << pair.new_id() << endl;
836
0
    }
837
838
0
    RETURN_NOT_OK(yb_client_->WaitForCreateTableToFinish(
839
0
        new_table_id,
840
0
        CoarseMonoClock::Now() + MonoDelta::FromSeconds(FLAGS_yb_client_admin_operation_timeout_sec)
841
0
    ));
842
843
0
    snapshot_req.mutable_tables()->Add()->set_table_id(new_table_id);
844
0
  }
845
846
  // All indexes already are in the request. Do not add them twice.
847
0
  snapshot_req.set_add_indexes(false);
848
0
  snapshot_req.set_transaction_aware(true);
849
0
  snapshot_req.set_imported(true);
850
  // Create new snapshot.
851
0
  RETURN_NOT_OK(RequestMasterLeader(&snapshot_resp, [&](RpcController* rpc) {
852
0
    return master_backup_proxy_->CreateSnapshot(snapshot_req, &snapshot_resp, rpc);
853
0
  }));
854
855
0
  cout << pad_object_type("Snapshot") << kColumnSep
856
0
       << SnapshotIdToString(snapshot_info->id()) << kColumnSep
857
0
       << SnapshotIdToString(snapshot_resp.snapshot_id()) << endl;
858
859
0
  return Status::OK();
860
0
}
861
862
0
Status ClusterAdminClient::ListReplicaTypeCounts(const YBTableName& table_name) {
863
0
  vector<string> tablet_ids, ranges;
864
0
  RETURN_NOT_OK(yb_client_->GetTablets(table_name, 0, &tablet_ids, &ranges));
865
0
  master::GetTabletLocationsResponsePB resp;
866
0
  RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) {
867
0
    master::GetTabletLocationsRequestPB req;
868
0
    for (const auto& tablet_id : tablet_ids) {
869
0
      req.add_tablet_ids(tablet_id);
870
0
    }
871
0
    return master_client_proxy_->GetTabletLocations(req, &resp, rpc);
872
0
  }));
873
874
0
  struct ReplicaCounts {
875
0
    int live_count;
876
0
    int read_only_count;
877
0
    string placement_uuid;
878
0
  };
879
0
  std::map<TabletServerId, ReplicaCounts> replica_map;
880
881
0
  std::cout << "Tserver ID\t\tPlacement ID\t\tLive count\t\tRead only count\n";
882
883
0
  for (int tablet_idx = 0; tablet_idx < resp.tablet_locations_size(); tablet_idx++) {
884
0
    const master::TabletLocationsPB& locs = resp.tablet_locations(tablet_idx);
885
0
    for (int replica_idx = 0; replica_idx < locs.replicas_size(); replica_idx++) {
886
0
      const auto& replica = locs.replicas(replica_idx);
887
0
      const string& ts_uuid = replica.ts_info().permanent_uuid();
888
0
      const string& placement_uuid =
889
0
          replica.ts_info().has_placement_uuid() ? replica.ts_info().placement_uuid() : "";
890
0
      bool is_replica_read_only =
891
0
          replica.member_type() == consensus::PeerMemberType::PRE_OBSERVER ||
892
0
          replica.member_type() == consensus::PeerMemberType::OBSERVER;
893
0
      int live_count = is_replica_read_only ? 0 : 1;
894
0
      int read_only_count = 1 - live_count;
895
0
      if (replica_map.count(ts_uuid) == 0) {
896
0
        replica_map[ts_uuid].live_count = live_count;
897
0
        replica_map[ts_uuid].read_only_count = read_only_count;
898
0
        replica_map[ts_uuid].placement_uuid = placement_uuid;
899
0
      } else {
900
0
        ReplicaCounts* counts = &replica_map[ts_uuid];
901
0
        counts->live_count += live_count;
902
0
        counts->read_only_count += read_only_count;
903
0
      }
904
0
    }
905
0
  }
906
907
0
  for (auto const& tserver : replica_map) {
908
0
    std::cout << tserver.first << "\t\t" << tserver.second.placement_uuid << "\t\t"
909
0
              << tserver.second.live_count << "\t\t" << tserver.second.read_only_count << std::endl;
910
0
  }
911
912
0
  return Status::OK();
913
0
}
914
915
3
Status ClusterAdminClient::SetPreferredZones(const std::vector<string>& preferred_zones) {
916
3
  rpc::RpcController rpc;
917
3
  master::SetPreferredZonesRequestPB req;
918
3
  master::SetPreferredZonesResponsePB resp;
919
3
  rpc.set_timeout(timeout_);
920
921
3
  std::set<string> zones;
922
5
  for (const string& zone : preferred_zones) {
923
5
    if (std::find(zones.begin(), zones.end(), zone) != zones.end()) {
924
0
      continue;
925
0
    }
926
5
    size_t last_pos = 0;
927
5
    size_t next_pos;
928
5
    std::vector<string> tokens;
929
15
    while ((next_pos = zone.find(".", last_pos)) != string::npos) {
930
10
      tokens.push_back(zone.substr(last_pos, next_pos - last_pos));
931
10
      last_pos = next_pos + 1;
932
10
    }
933
5
    tokens.push_back(zone.substr(last_pos, zone.size() - last_pos));
934
5
    if (tokens.size() != 3) {
935
0
      return STATUS_SUBSTITUTE(InvalidArgument, "Invalid argument for preferred zone $0, should "
936
0
          "have format cloud.region.zone", zone);
937
0
    }
938
939
5
    CloudInfoPB* cloud_info = req.add_preferred_zones();
940
5
    cloud_info->set_placement_cloud(tokens[0]);
941
5
    cloud_info->set_placement_region(tokens[1]);
942
5
    cloud_info->set_placement_zone(tokens[2]);
943
944
5
    zones.emplace(zone);
945
5
  }
946
947
3
  RETURN_NOT_OK(master_cluster_proxy_->SetPreferredZones(req, &resp, &rpc));
948
949
3
  if (resp.has_error()) {
950
0
    return STATUS(ServiceUnavailable, resp.error().status().message());
951
0
  }
952
953
3
  return Status::OK();
954
3
}
955
956
0
Status ClusterAdminClient::RotateUniverseKey(const std::string& key_path) {
957
0
  return SendEncryptionRequest(key_path, true);
958
0
}
959
960
0
Status ClusterAdminClient::DisableEncryption() {
961
0
  return SendEncryptionRequest("", false);
962
0
}
963
964
Status ClusterAdminClient::SendEncryptionRequest(
965
0
    const std::string& key_path, bool enable_encryption) {
966
0
  RETURN_NOT_OK_PREPEND(WaitUntilMasterLeaderReady(), "Wait for master leader failed!");
967
0
  rpc::RpcController rpc;
968
0
  rpc.set_timeout(timeout_);
969
970
  // Get the cluster config from the master leader.
971
0
  master::ChangeEncryptionInfoRequestPB encryption_info_req;
972
0
  master::ChangeEncryptionInfoResponsePB encryption_info_resp;
973
0
  encryption_info_req.set_encryption_enabled(enable_encryption);
974
0
  if (key_path != "") {
975
0
    encryption_info_req.set_key_path(key_path);
976
0
  }
977
0
  RETURN_NOT_OK_PREPEND(master_encryption_proxy_->
978
0
      ChangeEncryptionInfo(encryption_info_req, &encryption_info_resp, &rpc),
979
0
                        "MasterServiceImpl::ChangeEncryptionInfo call fails.")
980
981
0
  if (encryption_info_resp.has_error()) {
982
0
    return StatusFromPB(encryption_info_resp.error().status());
983
0
  }
984
0
  return Status::OK();
985
0
}
986
987
11
Status ClusterAdminClient::IsEncryptionEnabled() {
988
11
  RETURN_NOT_OK_PREPEND(WaitUntilMasterLeaderReady(), "Wait for master leader failed!");
989
11
  rpc::RpcController rpc;
990
11
  rpc.set_timeout(timeout_);
991
992
11
  master::IsEncryptionEnabledRequestPB req;
993
11
  master::IsEncryptionEnabledResponsePB resp;
994
11
  RETURN_NOT_OK_PREPEND(master_encryption_proxy_->
995
11
      IsEncryptionEnabled(req, &resp, &rpc),
996
11
      "MasterServiceImpl::IsEncryptionEnabled call fails.");
997
11
  if (resp.has_error()) {
998
0
    return StatusFromPB(resp.error().status());
999
0
  }
1000
1001
11
  std::cout << "Encryption status: " << (resp.encryption_enabled() ?
1002
11
      Format("ENABLED with key id $0", resp.key_id()) : 
"DISABLED"0
) << std::endl;
1003
11
  return Status::OK();
1004
11
}
1005
1006
Status ClusterAdminClient::AddUniverseKeyToAllMasters(
1007
11
    const std::string& key_id, const std::string& universe_key) {
1008
1009
11
  RETURN_NOT_OK(encryption::EncryptionParams::IsValidKeySize(
1010
11
      universe_key.size() - encryption::EncryptionParams::kBlockSize));
1011
1012
11
  master::AddUniverseKeysRequestPB req;
1013
11
  master::AddUniverseKeysResponsePB resp;
1014
11
  auto* universe_keys = req.mutable_universe_keys();
1015
11
  (*universe_keys->mutable_map())[key_id] = universe_key;
1016
1017
33
  for (auto hp : 
VERIFY_RESULT11
(HostPort::ParseStrings(master_addr_list_, 7100)))11
{
1018
33
    rpc::RpcController rpc;
1019
33
    rpc.set_timeout(timeout_);
1020
33
    master::MasterEncryptionProxy proxy(proxy_cache_.get(), hp);
1021
33
    RETURN_NOT_OK_PREPEND(proxy.AddUniverseKeys(req, &resp, &rpc),
1022
33
                          Format("MasterServiceImpl::AddUniverseKeys call fails on host $0.",
1023
33
                                 hp.ToString()));
1024
33
    if (resp.has_error()) {
1025
0
      return StatusFromPB(resp.error().status());
1026
0
    }
1027
33
    std::cout << Format("Successfully added key to the node $0.\n", hp.ToString());
1028
33
  }
1029
1030
11
  return Status::OK();
1031
11
}
1032
1033
11
Status ClusterAdminClient::AllMastersHaveUniverseKeyInMemory(const std::string& key_id) {
1034
11
  master::HasUniverseKeyInMemoryRequestPB req;
1035
11
  master::HasUniverseKeyInMemoryResponsePB resp;
1036
11
  req.set_version_id(key_id);
1037
1038
33
  for (auto hp : 
VERIFY_RESULT11
(HostPort::ParseStrings(master_addr_list_, 7100)))11
{
1039
33
    rpc::RpcController rpc;
1040
33
    rpc.set_timeout(timeout_);
1041
33
    master::MasterEncryptionProxy proxy(proxy_cache_.get(), hp);
1042
33
    RETURN_NOT_OK_PREPEND(proxy.HasUniverseKeyInMemory(req, &resp, &rpc),
1043
33
                          "MasterServiceImpl::ChangeEncryptionInfo call fails.");
1044
1045
33
    if (resp.has_error()) {
1046
0
      return StatusFromPB(resp.error().status());
1047
0
    }
1048
33
    if (!resp.has_key()) {
1049
0
      return STATUS_FORMAT(TryAgain, "Node $0 does not have universe key in memory", hp);
1050
0
    }
1051
1052
33
    std::cout << Format("Node $0 has universe key in memory: $1\n", hp.ToString(), resp.has_key());
1053
33
  }
1054
1055
11
  return Status::OK();
1056
11
}
1057
1058
11
Status ClusterAdminClient::RotateUniverseKeyInMemory(const std::string& key_id) {
1059
11
  RETURN_NOT_OK_PREPEND(WaitUntilMasterLeaderReady(), "Wait for master leader failed!");
1060
11
  rpc::RpcController rpc;
1061
11
  rpc.set_timeout(timeout_);
1062
1063
11
  master::ChangeEncryptionInfoRequestPB req;
1064
11
  master::ChangeEncryptionInfoResponsePB resp;
1065
11
  req.set_encryption_enabled(true);
1066
11
  req.set_in_memory(true);
1067
11
  req.set_version_id(key_id);
1068
11
  RETURN_NOT_OK_PREPEND(master_encryption_proxy_->ChangeEncryptionInfo(req, &resp, &rpc),
1069
11
                        "MasterServiceImpl::ChangeEncryptionInfo call fails.");
1070
11
  if (resp.has_error()) {
1071
0
    return StatusFromPB(resp.error().status());
1072
0
  }
1073
1074
11
  std::cout << "Rotated universe key in memory\n";
1075
11
  return Status::OK();
1076
11
}
1077
1078
0
Status ClusterAdminClient::DisableEncryptionInMemory() {
1079
0
  RETURN_NOT_OK_PREPEND(WaitUntilMasterLeaderReady(), "Wait for master leader failed!");
1080
0
  rpc::RpcController rpc;
1081
0
  rpc.set_timeout(timeout_);
1082
1083
0
  master::ChangeEncryptionInfoRequestPB req;
1084
0
  master::ChangeEncryptionInfoResponsePB resp;
1085
0
  req.set_encryption_enabled(false);
1086
0
  RETURN_NOT_OK_PREPEND(master_encryption_proxy_->ChangeEncryptionInfo(req, &resp, &rpc),
1087
0
                        "MasterServiceImpl::ChangeEncryptionInfo call fails.");
1088
0
  if (resp.has_error()) {
1089
0
    return StatusFromPB(resp.error().status());
1090
0
  }
1091
1092
0
  std::cout << "Encryption disabled\n";
1093
0
  return Status::OK();
1094
0
}
1095
1096
Status ClusterAdminClient::WriteUniverseKeyToFile(
1097
0
    const std::string& key_id, const std::string& file_name) {
1098
0
  RETURN_NOT_OK_PREPEND(WaitUntilMasterLeaderReady(), "Wait for master leader failed!");
1099
0
  rpc::RpcController rpc;
1100
0
  rpc.set_timeout(timeout_);
1101
1102
0
  master::GetUniverseKeyRegistryRequestPB req;
1103
0
  master::GetUniverseKeyRegistryResponsePB resp;
1104
0
  RETURN_NOT_OK_PREPEND(master_encryption_proxy_->GetUniverseKeyRegistry(req, &resp, &rpc),
1105
0
                        "MasterServiceImpl::ChangeEncryptionInfo call fails.");
1106
0
  if (resp.has_error()) {
1107
0
    return StatusFromPB(resp.error().status());
1108
0
  }
1109
1110
0
  auto universe_keys = resp.universe_keys();
1111
0
  const auto& it = universe_keys.map().find(key_id);
1112
0
  if (it == universe_keys.map().end()) {
1113
0
    return STATUS_FORMAT(NotFound, "Could not find key with id $0", key_id);
1114
0
  }
1115
1116
0
  RETURN_NOT_OK(WriteStringToFile(Env::Default(), Slice(it->second), file_name));
1117
1118
0
  std::cout << "Finished writing to file\n";
1119
0
  return Status::OK();
1120
0
}
1121
1122
Status ClusterAdminClient::CreateCDCSDKDBStream(
1123
0
  const TypedNamespaceName& ns, const std::string& checkpoint_type) {
1124
0
  HostPort ts_addr = VERIFY_RESULT(GetFirstRpcAddressForTS());
1125
0
  auto cdc_proxy = std::make_unique<cdc::CDCServiceProxy>(proxy_cache_.get(), ts_addr);
1126
1127
0
  cdc::CreateCDCStreamRequestPB req;
1128
0
  cdc::CreateCDCStreamResponsePB resp;
1129
1130
0
  req.set_namespace_name(ns.name);
1131
0
  req.set_record_type(cdc::CDCRecordType::CHANGE);
1132
0
  req.set_record_format(cdc::CDCRecordFormat::PROTO);
1133
0
  req.set_source_type(cdc::CDCRequestSource::CDCSDK);
1134
0
  if (checkpoint_type == yb::ToString("EXPLICIT")) {
1135
0
    req.set_checkpoint_type(cdc::CDCCheckpointType::EXPLICIT);
1136
0
  } else {
1137
0
    req.set_checkpoint_type(cdc::CDCCheckpointType::IMPLICIT);
1138
0
  }
1139
1140
0
  RpcController rpc;
1141
0
  rpc.set_timeout(timeout_);
1142
0
  RETURN_NOT_OK(cdc_proxy->CreateCDCStream(req, &resp, &rpc));
1143
1144
0
  if (resp.has_error()) {
1145
0
    cout << "Error creating stream: " << resp.error().status().message() << endl;
1146
0
    return StatusFromPB(resp.error().status());
1147
0
  }
1148
1149
0
  cout << "CDC Stream ID: " << resp.db_stream_id() << endl;
1150
0
  return Status::OK();
1151
0
}
1152
1153
0
Status ClusterAdminClient::CreateCDCStream(const TableId& table_id) {
1154
0
  master::CreateCDCStreamRequestPB req;
1155
0
  master::CreateCDCStreamResponsePB resp;
1156
0
  req.set_table_id(table_id);
1157
0
  req.mutable_options()->Reserve(3);
1158
1159
0
  auto record_type_option = req.add_options();
1160
0
  record_type_option->set_key(cdc::kRecordType);
1161
0
  record_type_option->set_value(CDCRecordType_Name(cdc::CDCRecordType::CHANGE));
1162
1163
0
  auto record_format_option = req.add_options();
1164
0
  record_format_option->set_key(cdc::kRecordFormat);
1165
0
  record_format_option->set_value(CDCRecordFormat_Name(cdc::CDCRecordFormat::JSON));
1166
1167
0
  auto source_type_option = req.add_options();
1168
0
  source_type_option->set_key(cdc::kSourceType);
1169
0
  source_type_option->set_value(CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER));
1170
1171
0
  RpcController rpc;
1172
0
  rpc.set_timeout(timeout_);
1173
0
  RETURN_NOT_OK(master_replication_proxy_->CreateCDCStream(req, &resp, &rpc));
1174
1175
0
  if (resp.has_error()) {
1176
0
    cout << "Error creating stream: " << resp.error().status().message() << endl;
1177
0
    return StatusFromPB(resp.error().status());
1178
0
  }
1179
1180
0
  cout << "CDC Stream ID: " << resp.stream_id() << endl;
1181
0
  return Status::OK();
1182
0
}
1183
1184
0
Status ClusterAdminClient::DeleteCDCSDKDBStream(const std::string& db_stream_id) {
1185
0
  master::DeleteCDCStreamRequestPB req;
1186
0
  master::DeleteCDCStreamResponsePB resp;
1187
0
  req.add_stream_id(db_stream_id);
1188
1189
0
  RpcController rpc;
1190
0
  rpc.set_timeout(timeout_);
1191
0
      RETURN_NOT_OK(master_replication_proxy_->DeleteCDCStream(req, &resp, &rpc));
1192
1193
0
  if (resp.has_error()) {
1194
0
    cout << "Error deleting stream: " << resp.error().status().message() << endl;
1195
0
    return StatusFromPB(resp.error().status());
1196
0
  }
1197
1198
0
  cout << "Successfully deleted Change Data Stream ID: " << db_stream_id << endl;
1199
0
  return Status::OK();
1200
0
}
1201
1202
0
Status ClusterAdminClient::DeleteCDCStream(const std::string& stream_id, bool force_delete) {
1203
0
  master::DeleteCDCStreamRequestPB req;
1204
0
  master::DeleteCDCStreamResponsePB resp;
1205
0
  req.add_stream_id(stream_id);
1206
0
  req.set_force_delete(force_delete);
1207
1208
0
  RpcController rpc;
1209
0
  rpc.set_timeout(timeout_);
1210
0
  RETURN_NOT_OK(master_replication_proxy_->DeleteCDCStream(req, &resp, &rpc));
1211
1212
0
  if (resp.has_error()) {
1213
0
    cout << "Error deleting stream: " << resp.error().status().message() << endl;
1214
0
    return StatusFromPB(resp.error().status());
1215
0
  }
1216
1217
0
  cout << "Successfully deleted CDC Stream ID: " << stream_id << endl;
1218
0
  return Status::OK();
1219
0
}
1220
1221
0
Status ClusterAdminClient::ListCDCStreams(const TableId& table_id) {
1222
0
  master::ListCDCStreamsRequestPB req;
1223
0
  master::ListCDCStreamsResponsePB resp;
1224
0
  req.set_id_type(yb::master::IdTypePB::TABLE_ID);
1225
0
  if (!table_id.empty()) {
1226
0
    req.set_table_id(table_id);
1227
0
  }
1228
1229
0
  RpcController rpc;
1230
0
  rpc.set_timeout(timeout_);
1231
0
  RETURN_NOT_OK(master_replication_proxy_->ListCDCStreams(req, &resp, &rpc));
1232
1233
0
  if (resp.has_error()) {
1234
0
    cout << "Error getting CDC stream list: " << resp.error().status().message() << endl;
1235
0
    return StatusFromPB(resp.error().status());
1236
0
  }
1237
1238
0
  cout << "CDC Streams: \r\n" << resp.DebugString();
1239
0
  return Status::OK();
1240
0
}
1241
1242
0
Status ClusterAdminClient::ListCDCSDKStreams(const std::string& namespace_name) {
1243
0
  master::ListCDCStreamsRequestPB req;
1244
0
  master::ListCDCStreamsResponsePB resp;
1245
0
  req.set_id_type(yb::master::IdTypePB::NAMESPACE_ID);
1246
1247
0
  if (!namespace_name.empty()) {
1248
0
    cout << "Filtering out DB streams for the namespace: " << namespace_name << "\n\n";
1249
0
    master::GetNamespaceInfoResponsePB namespace_info_resp;
1250
0
    RETURN_NOT_OK(yb_client_->GetNamespaceInfo("", namespace_name, YQL_DATABASE_PGSQL,
1251
0
                                               &namespace_info_resp));
1252
0
    req.set_namespace_id(namespace_info_resp.namespace_().id());
1253
0
  }
1254
1255
0
  RpcController rpc;
1256
0
  rpc.set_timeout(timeout_);
1257
0
  RETURN_NOT_OK(master_replication_proxy_->ListCDCStreams(req, &resp, &rpc));
1258
1259
0
  if (resp.has_error()) {
1260
0
    cout << "Error getting CDC stream list: " << resp.error().status().message() << endl;
1261
0
    return StatusFromPB(resp.error().status());
1262
0
  }
1263
1264
0
  cout << "CDC Streams: \r\n" << resp.DebugString();
1265
0
  return Status::OK();
1266
0
}
1267
1268
0
Status ClusterAdminClient::GetCDCDBStreamInfo(const std::string& db_stream_id) {
1269
0
  master::GetCDCDBStreamInfoRequestPB req;
1270
0
  master::GetCDCDBStreamInfoResponsePB resp;
1271
0
  req.set_db_stream_id(db_stream_id);
1272
1273
0
  RpcController rpc;
1274
0
  rpc.set_timeout(timeout_);
1275
0
  RETURN_NOT_OK(master_replication_proxy_->GetCDCDBStreamInfo(req, &resp, &rpc));
1276
1277
0
  if (resp.has_error()) {
1278
0
    cout << "Error getting info corresponding to CDC db stream : " <<
1279
0
    resp.error().status().message() << endl;
1280
0
    return StatusFromPB(resp.error().status());
1281
0
  }
1282
1283
0
  cout << "CDC DB Stream Info: \r\n" << resp.DebugString();
1284
0
  return Status::OK();
1285
0
}
1286
1287
0
Status ClusterAdminClient::WaitForSetupUniverseReplicationToFinish(const string& producer_uuid) {
1288
0
  master::IsSetupUniverseReplicationDoneRequestPB req;
1289
0
  req.set_producer_id(producer_uuid);
1290
0
  for (;;) {
1291
0
    master::IsSetupUniverseReplicationDoneResponsePB resp;
1292
0
    RpcController rpc;
1293
0
    rpc.set_timeout(timeout_);
1294
0
    Status s = master_replication_proxy_->IsSetupUniverseReplicationDone(req, &resp, &rpc);
1295
1296
0
    if (!s.ok() || resp.has_error()) {
1297
0
        LOG(WARNING) << "Encountered error while waiting for setup_universe_replication to complete"
1298
0
                     << " : " << (!s.ok() ? s.ToString() : resp.error().status().message());
1299
0
    } else if (resp.has_done() && resp.done()) {
1300
0
      return StatusFromPB(resp.replication_error());
1301
0
    }
1302
1303
    // Still processing, wait and then loop again.
1304
0
    std::this_thread::sleep_for(100ms);
1305
0
  }
1306
0
}
1307
1308
Status ClusterAdminClient::SetupUniverseReplication(
1309
    const string& producer_uuid, const vector<string>& producer_addresses,
1310
    const vector<TableId>& tables,
1311
0
    const vector<string>& producer_bootstrap_ids) {
1312
0
  master::SetupUniverseReplicationRequestPB req;
1313
0
  master::SetupUniverseReplicationResponsePB resp;
1314
0
  req.set_producer_id(producer_uuid);
1315
1316
0
  req.mutable_producer_master_addresses()->Reserve(narrow_cast<int>(producer_addresses.size()));
1317
0
  for (const auto& addr : producer_addresses) {
1318
    // HostPort::FromString() expects a default port.
1319
0
    auto hp = VERIFY_RESULT(HostPort::FromString(addr, master::kMasterDefaultPort));
1320
0
    HostPortToPB(hp, req.add_producer_master_addresses());
1321
0
  }
1322
1323
0
  req.mutable_producer_table_ids()->Reserve(narrow_cast<int>(tables.size()));
1324
0
  for (const auto& table : tables) {
1325
0
    req.add_producer_table_ids(table);
1326
0
  }
1327
1328
0
  for (const auto& producer_bootstrap_id : producer_bootstrap_ids) {
1329
0
    req.add_producer_bootstrap_ids(producer_bootstrap_id);
1330
0
  }
1331
1332
0
  RpcController rpc;
1333
0
  rpc.set_timeout(timeout_);
1334
0
  auto setup_result_status = master_replication_proxy_->SetupUniverseReplication(req, &resp, &rpc);
1335
1336
  // Clean up config files if setup fails.
1337
0
  if (!setup_result_status.ok()) {
1338
0
    CleanupEnvironmentOnSetupUniverseReplicationFailure(producer_uuid, setup_result_status);
1339
0
    return setup_result_status;
1340
0
  }
1341
1342
0
  if (resp.has_error()) {
1343
0
    cout << "Error setting up universe replication: " << resp.error().status().message() << endl;
1344
0
    Status status_from_error = StatusFromPB(resp.error().status());
1345
0
    CleanupEnvironmentOnSetupUniverseReplicationFailure(producer_uuid, status_from_error);
1346
1347
0
    return status_from_error;
1348
0
  }
1349
1350
0
  setup_result_status = WaitForSetupUniverseReplicationToFinish(producer_uuid);
1351
1352
  // Clean up config files if setup fails to complete.
1353
0
  if (!setup_result_status.ok()) {
1354
0
    cout << "Error waiting for universe replication setup to complete: "
1355
0
         << setup_result_status.message().ToBuffer() << endl;
1356
0
    CleanupEnvironmentOnSetupUniverseReplicationFailure(producer_uuid, setup_result_status);
1357
0
    return setup_result_status;
1358
0
  }
1359
1360
0
  cout << "Replication setup successfully" << endl;
1361
0
  return Status::OK();
1362
0
}
1363
1364
// Helper function for deleting the universe if SetupUniverseReplicaion fails.
1365
void ClusterAdminClient::CleanupEnvironmentOnSetupUniverseReplicationFailure(
1366
0
  const std::string& producer_uuid, const Status& failure_status) {
1367
  // We don't need to delete the universe if the call to SetupUniverseReplication
1368
  // failed due to one of the sanity checks.
1369
0
  if (failure_status.IsInvalidArgument()) {
1370
0
    return;
1371
0
  }
1372
1373
0
  cout << "Replication setup failed, cleaning up environment" << endl;
1374
1375
0
  Status delete_result_status = DeleteUniverseReplication(producer_uuid, false);
1376
0
  if (!delete_result_status.ok()) {
1377
0
    cout << "Could not clean up environment: " << delete_result_status.message() << endl;
1378
0
  } else {
1379
0
    cout << "Successfully cleaned up environment" << endl;
1380
0
  }
1381
0
}
1382
1383
Status ClusterAdminClient::DeleteUniverseReplication(const std::string& producer_id,
1384
0
                                                     bool ignore_errors) {
1385
0
  master::DeleteUniverseReplicationRequestPB req;
1386
0
  master::DeleteUniverseReplicationResponsePB resp;
1387
0
  req.set_producer_id(producer_id);
1388
0
  req.set_ignore_errors(ignore_errors);
1389
1390
0
  RpcController rpc;
1391
0
  rpc.set_timeout(timeout_);
1392
0
  RETURN_NOT_OK(master_replication_proxy_->DeleteUniverseReplication(req, &resp, &rpc));
1393
1394
0
  if (resp.has_error()) {
1395
0
    cout << "Error deleting universe replication: " << resp.error().status().message() << endl;
1396
0
    return StatusFromPB(resp.error().status());
1397
0
  }
1398
1399
0
  if (resp.warnings().size() > 0) {
1400
0
    cout << "Encountered the following warnings while running delete_universe_replication:" << endl;
1401
0
    for (const auto& warning : resp.warnings()) {
1402
0
      cout << " - " << warning.message() << endl;
1403
0
    }
1404
0
  }
1405
1406
0
  cout << "Replication deleted successfully" << endl;
1407
0
  return Status::OK();
1408
0
}
1409
1410
Status ClusterAdminClient::AlterUniverseReplication(const std::string& producer_uuid,
1411
    const std::vector<std::string>& producer_addresses,
1412
    const std::vector<TableId>& add_tables,
1413
    const std::vector<TableId>& remove_tables,
1414
    const std::vector<std::string>& producer_bootstrap_ids_to_add,
1415
0
    const std::string& new_producer_universe_id) {
1416
0
  master::AlterUniverseReplicationRequestPB req;
1417
0
  master::AlterUniverseReplicationResponsePB resp;
1418
0
  req.set_producer_id(producer_uuid);
1419
1420
0
  if (!producer_addresses.empty()) {
1421
0
    req.mutable_producer_master_addresses()->Reserve(narrow_cast<int>(producer_addresses.size()));
1422
0
    for (const auto& addr : producer_addresses) {
1423
      // HostPort::FromString() expects a default port.
1424
0
      auto hp = VERIFY_RESULT(HostPort::FromString(addr, master::kMasterDefaultPort));
1425
0
      HostPortToPB(hp, req.add_producer_master_addresses());
1426
0
    }
1427
0
  }
1428
1429
0
  if (!add_tables.empty()) {
1430
0
    req.mutable_producer_table_ids_to_add()->Reserve(narrow_cast<int>(add_tables.size()));
1431
0
    for (const auto& table : add_tables) {
1432
0
      req.add_producer_table_ids_to_add(table);
1433
0
    }
1434
1435
0
    if (!producer_bootstrap_ids_to_add.empty()) {
1436
      // There msut be a bootstrap id for every table id.
1437
0
      if (producer_bootstrap_ids_to_add.size() != add_tables.size()) {
1438
0
        cout << "The number of bootstrap ids must equal the number of table ids. "
1439
0
             << "Use separate alter commands if only some tables are being bootstrapped." << endl;
1440
0
        return STATUS(InternalError, "Invalid number of bootstrap ids");
1441
0
      }
1442
1443
0
      req.mutable_producer_bootstrap_ids_to_add()->Reserve(
1444
0
          narrow_cast<int>(producer_bootstrap_ids_to_add.size()));
1445
0
      for (const auto& bootstrap_id : producer_bootstrap_ids_to_add) {
1446
0
        req.add_producer_bootstrap_ids_to_add(bootstrap_id);
1447
0
      }
1448
0
    }
1449
0
  }
1450
1451
0
  if (!remove_tables.empty()) {
1452
0
    req.mutable_producer_table_ids_to_remove()->Reserve(narrow_cast<int>(remove_tables.size()));
1453
0
    for (const auto& table : remove_tables) {
1454
0
      req.add_producer_table_ids_to_remove(table);
1455
0
    }
1456
0
  }
1457
1458
0
  if (!new_producer_universe_id.empty()) {
1459
0
    req.set_new_producer_universe_id(new_producer_universe_id);
1460
0
  }
1461
1462
0
  RpcController rpc;
1463
0
  rpc.set_timeout(timeout_);
1464
0
  RETURN_NOT_OK(master_replication_proxy_->AlterUniverseReplication(req, &resp, &rpc));
1465
1466
0
  if (resp.has_error()) {
1467
0
    cout << "Error altering universe replication: " << resp.error().status().message() << endl;
1468
0
    return StatusFromPB(resp.error().status());
1469
0
  }
1470
1471
0
  if (!add_tables.empty()) {
1472
    // If we are adding tables, then wait for the altered producer to be deleted (this happens once
1473
    // it is merged with the original).
1474
0
    RETURN_NOT_OK(WaitForSetupUniverseReplicationToFinish(producer_uuid + ".ALTER"));
1475
0
  }
1476
1477
0
  cout << "Replication altered successfully" << endl;
1478
0
  return Status::OK();
1479
0
}
1480
1481
CHECKED_STATUS ClusterAdminClient::SetUniverseReplicationEnabled(const std::string& producer_id,
1482
0
                                                                 bool is_enabled) {
1483
0
  master::SetUniverseReplicationEnabledRequestPB req;
1484
0
  master::SetUniverseReplicationEnabledResponsePB resp;
1485
0
  req.set_producer_id(producer_id);
1486
0
  req.set_is_enabled(is_enabled);
1487
0
  const string toggle = (is_enabled ? "enabl" : "disabl");
1488
1489
0
  RpcController rpc;
1490
0
  rpc.set_timeout(timeout_);
1491
0
  RETURN_NOT_OK(master_replication_proxy_->SetUniverseReplicationEnabled(req, &resp, &rpc));
1492
1493
0
  if (resp.has_error()) {
1494
0
    cout << "Error " << toggle << "ing "
1495
0
         << "universe replication: " << resp.error().status().message() << endl;
1496
0
    return StatusFromPB(resp.error().status());
1497
0
  }
1498
1499
0
  cout << "Replication " << toggle << "ed successfully" << endl;
1500
0
  return Status::OK();
1501
0
}
1502
1503
0
Result<HostPort> ClusterAdminClient::GetFirstRpcAddressForTS() {
1504
0
  RepeatedPtrField<ListTabletServersResponsePB::Entry> servers;
1505
0
  RETURN_NOT_OK(ListTabletServers(&servers));
1506
0
  for (const ListTabletServersResponsePB::Entry& server : servers) {
1507
0
    if (server.has_registration() &&
1508
0
        !server.registration().common().private_rpc_addresses().empty()) {
1509
0
      return HostPortFromPB(server.registration().common().private_rpc_addresses(0));
1510
0
    }
1511
0
  }
1512
1513
0
  return STATUS(NotFound, "Didn't find a server registered with the Master");
1514
0
}
1515
1516
0
Status ClusterAdminClient::BootstrapProducer(const vector<TableId>& table_ids) {
1517
1518
0
  HostPort ts_addr = VERIFY_RESULT(GetFirstRpcAddressForTS());
1519
0
  auto cdc_proxy = std::make_unique<cdc::CDCServiceProxy>(proxy_cache_.get(), ts_addr);
1520
1521
0
  cdc::BootstrapProducerRequestPB bootstrap_req;
1522
0
  cdc::BootstrapProducerResponsePB bootstrap_resp;
1523
0
  for (const auto& table_id : table_ids) {
1524
0
    bootstrap_req.add_table_ids(table_id);
1525
0
  }
1526
0
  RpcController rpc;
1527
0
  rpc.set_timeout(MonoDelta::FromSeconds(std::max(timeout_.ToSeconds(), 120.0)));
1528
0
  RETURN_NOT_OK(cdc_proxy->BootstrapProducer(bootstrap_req, &bootstrap_resp, &rpc));
1529
1530
0
  if (bootstrap_resp.has_error()) {
1531
0
    cout << "Error bootstrapping consumer: " << bootstrap_resp.error().status().message() << endl;
1532
0
    return StatusFromPB(bootstrap_resp.error().status());
1533
0
  }
1534
1535
0
  if (implicit_cast<size_t>(bootstrap_resp.cdc_bootstrap_ids().size()) != table_ids.size()) {
1536
0
    cout << "Received invalid number of bootstrap ids: " << bootstrap_resp.ShortDebugString();
1537
0
    return STATUS(InternalError, "Invalid number of bootstrap ids");
1538
0
  }
1539
1540
0
  int i = 0;
1541
0
  for (const auto& bootstrap_id : bootstrap_resp.cdc_bootstrap_ids()) {
1542
0
    cout << "table id: " << table_ids[i++] << ", CDC bootstrap id: " << bootstrap_id << endl;
1543
0
  }
1544
0
  return Status::OK();
1545
0
}
1546
1547
}  // namespace enterprise
1548
}  // namespace tools
1549
}  // namespace yb