YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/tools/yb-admin_cli_ent.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/tools/yb-admin_cli.h"
14
15
#include <iostream>
16
#include <regex>
17
18
#include <boost/algorithm/string.hpp>
19
20
#include "yb/common/hybrid_time.h"
21
#include "yb/common/snapshot.h"
22
#include "yb/tools/yb-admin_client.h"
23
#include "yb/util/date_time.h"
24
#include "yb/util/format.h"
25
#include "yb/util/result.h"
26
#include "yb/util/status_format.h"
27
#include "yb/util/stol_utils.h"
28
#include "yb/util/string_case.h"
29
30
namespace yb {
31
namespace tools {
32
namespace enterprise {
33
34
using std::cerr;
35
using std::endl;
36
using std::string;
37
using std::vector;
38
39
using client::YBTableName;
40
using strings::Substitute;
41
42
namespace {
43
44
const string kMinus = "minus";
45
46
template <class T, class Args>
47
13
Result<T> GetOptionalArg(const Args& args, size_t idx) {
48
13
  if (args.size() <= idx) {
49
1
    return T::Nil();
50
1
  }
51
12
  if (args.size() > idx + 1) {
52
0
    return STATUS_FORMAT(InvalidArgument,
53
0
                         "Too many arguments for command, at most $0 expected, but $1 found",
54
0
                         idx + 1, args.size());
55
0
  }
56
12
  return VERIFY_RESULT(T::FromString(args[idx]));
57
12
}
yb-admin_cli_ent.cc:yb::Result<yb::StronglyTypedUuid<yb::TxnSnapshotRestorationId_Tag> > yb::tools::enterprise::(anonymous namespace)::GetOptionalArg<yb::StronglyTypedUuid<yb::TxnSnapshotRestorationId_Tag>, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > >(std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, unsigned long)
Line
Count
Source
47
5
Result<T> GetOptionalArg(const Args& args, size_t idx) {
48
5
  if (args.size() <= idx) {
49
0
    return T::Nil();
50
0
  }
51
5
  if (args.size() > idx + 1) {
52
0
    return STATUS_FORMAT(InvalidArgument,
53
0
                         "Too many arguments for command, at most $0 expected, but $1 found",
54
0
                         idx + 1, args.size());
55
0
  }
56
5
  return VERIFY_RESULT
0
(T::FromString(args[idx]));
57
5
}
yb-admin_cli_ent.cc:yb::Result<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag> > yb::tools::enterprise::(anonymous namespace)::GetOptionalArg<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag>, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > >(std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, unsigned long)
Line
Count
Source
47
8
Result<T> GetOptionalArg(const Args& args, size_t idx) {
48
8
  if (args.size() <= idx) {
49
1
    return T::Nil();
50
1
  }
51
7
  if (args.size() > idx + 1) {
52
0
    return STATUS_FORMAT(InvalidArgument,
53
0
                         "Too many arguments for command, at most $0 expected, but $1 found",
54
0
                         idx + 1, args.size());
55
0
  }
56
7
  return VERIFY_RESULT
0
(T::FromString(args[idx]));
57
7
}
58
59
} // namespace
60
61
77
void ClusterAdminCli::RegisterCommandHandlers(ClusterAdminClientClass* client) {
62
77
  super::RegisterCommandHandlers(client);
63
64
77
  std::string options = "";
65
308
  for (auto flag : kListSnapshotsFlagList) {
66
308
    options += Format(" [$0]", flag);
67
308
  }
68
77
  Register(
69
77
      "list_snapshots", std::move(options),
70
77
      [client](const CLIArguments& args) -> Status {
71
1
        EnumBitSet<ListSnapshotsFlag> flags;
72
73
2
        for (size_t i = 0; i < args.size(); 
++i1
) {
74
1
          std::string uppercase_flag;
75
1
          ToUpperCase(args[i], &uppercase_flag);
76
77
1
          bool found = false;
78
1
          for (auto flag : kListSnapshotsFlagList) {
79
1
            if (uppercase_flag == ToString(flag)) {
80
1
              flags.Set(flag);
81
1
              found = true;
82
1
              break;
83
1
            }
84
1
          }
85
1
          if (!found) {
86
0
            return STATUS_FORMAT(InvalidArgument, "Wrong flag: $0", args[i]);
87
0
          }
88
1
        }
89
90
1
        RETURN_NOT_OK_PREPEND(client->ListSnapshots(flags), "Unable to list snapshots");
91
1
        return Status::OK();
92
1
      });
93
94
77
  Register(
95
77
      "create_snapshot",
96
77
      " <table>"
97
77
      " [<table>]..."
98
77
      " [flush_timeout_in_seconds] (default 60, set 0 to skip flushing)",
99
77
      [client](const CLIArguments& args) -> Status {
100
3
        int timeout_secs = 60;
101
3
        const auto tables = VERIFY_RESULT(ResolveTableNames(
102
3
            client, args.begin(), args.end(),
103
3
            [&timeout_secs](auto i, const auto& end) -> Status {
104
3
              if (std::next(i) == end) {
105
3
                timeout_secs = VERIFY_RESULT(CheckedStoi(*i));
106
3
                return Status::OK();
107
3
              }
108
3
              return ClusterAdminCli::kInvalidArguments;
109
3
            }));
110
111
4
        for (auto table : tables) {
112
4
          if (table.is_cql_namespace() && 
table.is_system()3
) {
113
1
            return STATUS(InvalidArgument,
114
1
                          "Cannot create snapshot of YCQL system table",
115
1
                          table.table_name());
116
1
          }
117
4
        }
118
119
2
        RETURN_NOT_OK_PREPEND(client->CreateSnapshot(tables, true, timeout_secs),
120
2
                              Substitute("Unable to create snapshot of tables: $0",
121
2
                                         yb::ToString(tables)));
122
2
        return Status::OK();
123
2
      });
124
125
77
  RegisterJson(
126
77
      "list_snapshot_restorations",
127
77
      " [<restoration_id>]",
128
77
      [client](const CLIArguments& args) -> Result<rapidjson::Document> {
129
5
        auto restoration_id = VERIFY_RESULT(GetOptionalArg<TxnSnapshotRestorationId>(args, 0));
130
0
        return client->ListSnapshotRestorations(restoration_id);
131
5
      });
132
133
77
  RegisterJson(
134
77
      "create_snapshot_schedule",
135
77
      " <snapshot_interval_in_minutes>"
136
77
      " <snapshot_retention_in_minutes>"
137
77
      " <keyspace>",
138
77
      [client](const CLIArguments& args) -> Result<rapidjson::Document> {
139
9
        RETURN_NOT_OK(CheckArgumentsCount(args.size(), 3, 3));
140
8
        auto interval = MonoDelta::FromMinutes(VERIFY_RESULT(CheckedStold(args[0])));
141
8
        auto retention = MonoDelta::FromMinutes(VERIFY_RESULT(CheckedStold(args[1])));
142
8
        const auto tables = VERIFY_RESULT(ResolveTableNames(
143
8
            client, args.begin() + 2, args.end(), TailArgumentsProcessor(), true));
144
        // This is just a paranoid check, should never happen.
145
8
        if (tables.size() != 1 || !tables[0].has_namespace()) {
146
0
          return STATUS(InvalidArgument, "Expecting exactly one keyspace argument");
147
0
        }
148
8
        return client->CreateSnapshotSchedule(tables[0], interval, retention);
149
8
      });
150
151
77
  RegisterJson(
152
77
      "list_snapshot_schedules",
153
77
      " [<schedule_id>]",
154
77
      [client](const CLIArguments& args) -> Result<rapidjson::Document> {
155
8
        RETURN_NOT_OK(CheckArgumentsCount(args.size(), 0, 1));
156
8
        auto schedule_id = VERIFY_RESULT(GetOptionalArg<SnapshotScheduleId>(args, 0));
157
0
        return client->ListSnapshotSchedules(schedule_id);
158
8
      });
159
160
77
  RegisterJson(
161
77
      "delete_snapshot_schedule",
162
77
      " <schedule_id>",
163
77
      [client](const CLIArguments& args) -> Result<rapidjson::Document> {
164
0
        RETURN_NOT_OK(CheckArgumentsCount(args.size(), 1, 1));
165
0
        auto schedule_id = VERIFY_RESULT(SnapshotScheduleId::FromString(args[0]));
166
0
        return client->DeleteSnapshotSchedule(schedule_id);
167
0
      });
168
169
77
  RegisterJson(
170
77
      "restore_snapshot_schedule",
171
77
      Format(" <schedule_id> (<timestamp> | $0 <interval>)", kMinus),
172
77
      [client](const CLIArguments& args) -> Result<rapidjson::Document> {
173
3
        RETURN_NOT_OK(CheckArgumentsCount(args.size(), 2, 3));
174
3
        auto schedule_id = VERIFY_RESULT(SnapshotScheduleId::FromString(args[0]));
175
0
        HybridTime restore_at;
176
3
        if (args.size() == 2) {
177
3
          restore_at = VERIFY_RESULT(HybridTime::ParseHybridTime(args[1]));
178
3
        } else {
179
0
          if (args[1] != kMinus) {
180
0
            return ClusterAdminCli::kInvalidArguments;
181
0
          }
182
0
          restore_at = VERIFY_RESULT(HybridTime::ParseHybridTime("-" + args[2]));
183
0
        }
184
185
3
        return client->RestoreSnapshotSchedule(schedule_id, restore_at);
186
3
      });
187
188
77
  Register(
189
77
      "create_keyspace_snapshot", " [ycql.]<keyspace_name>",
190
77
      [client](const CLIArguments& args) -> Status {
191
0
        if (args.size() != 1) {
192
0
          return ClusterAdminCli::kInvalidArguments;
193
0
        }
194
195
0
        const TypedNamespaceName keyspace = VERIFY_RESULT(ParseNamespaceName(args[0]));
196
0
        SCHECK_NE(
197
0
            keyspace.db_type, YQL_DATABASE_PGSQL, InvalidArgument,
198
0
            Format("Wrong keyspace type: $0", YQLDatabase_Name(keyspace.db_type)));
199
200
0
        RETURN_NOT_OK_PREPEND(client->CreateNamespaceSnapshot(keyspace),
201
0
                              Substitute("Unable to create snapshot of keyspace: $0",
202
0
                                         keyspace.name));
203
0
        return Status::OK();
204
0
      });
205
206
77
  Register(
207
77
      "create_database_snapshot", " [ysql.]<database_name>",
208
77
      [client](const CLIArguments& args) -> Status {
209
0
        if (args.size() != 1) {
210
0
          return ClusterAdminCli::kInvalidArguments;
211
0
        }
212
213
0
        const TypedNamespaceName database =
214
0
            VERIFY_RESULT(ParseNamespaceName(args[0], YQL_DATABASE_PGSQL));
215
0
        SCHECK_EQ(
216
0
            database.db_type, YQL_DATABASE_PGSQL, InvalidArgument,
217
0
            Format("Wrong database type: $0", YQLDatabase_Name(database.db_type)));
218
219
0
        RETURN_NOT_OK_PREPEND(client->CreateNamespaceSnapshot(database),
220
0
                              Substitute("Unable to create snapshot of database: $0",
221
0
                                         database.name));
222
0
        return Status::OK();
223
0
      });
224
225
77
  Register(
226
77
      "restore_snapshot", Format(" <snapshot_id> [{<timestamp> | $0 {interval}]", kMinus),
227
77
      [client](const CLIArguments& args) -> Status {
228
0
        if (args.size() < 1 || 3 < args.size()) {
229
0
          return ClusterAdminCli::kInvalidArguments;
230
0
        } else if (args.size() == 3 && args[1] != kMinus) {
231
0
          return ClusterAdminCli::kInvalidArguments;
232
0
        }
233
0
        const string snapshot_id = args[0];
234
0
        HybridTime timestamp;
235
0
        if (args.size() == 2) {
236
0
          timestamp = VERIFY_RESULT(HybridTime::ParseHybridTime(args[1]));
237
0
        } else if (args.size() == 3) {
238
0
          timestamp = VERIFY_RESULT(HybridTime::ParseHybridTime("-" + args[2]));
239
0
        }
240
241
0
        RETURN_NOT_OK_PREPEND(client->RestoreSnapshot(snapshot_id, timestamp),
242
0
                              Substitute("Unable to restore snapshot $0", snapshot_id));
243
0
        return Status::OK();
244
0
      });
245
246
77
  Register(
247
77
      "export_snapshot", " <snapshot_id> <file_name>",
248
77
      [client](const CLIArguments& args) -> Status {
249
0
        if (args.size() != 2) {
250
0
          return ClusterAdminCli::kInvalidArguments;
251
0
        }
252
253
0
        const string snapshot_id = args[0];
254
0
        const string file_name = args[1];
255
0
        RETURN_NOT_OK_PREPEND(client->CreateSnapshotMetaFile(snapshot_id, file_name),
256
0
                              Substitute("Unable to export snapshot $0 to file $1",
257
0
                                         snapshot_id,
258
0
                                         file_name));
259
0
        return Status::OK();
260
0
      });
261
262
77
  Register(
263
77
      "import_snapshot", " <file_name> [<namespace> <table_name> [<table_name>]...]",
264
77
      [client](const CLIArguments& args) -> Status {
265
0
        if (args.size() < 1) {
266
0
          return ClusterAdminCli::kInvalidArguments;
267
0
        }
268
269
0
        const string file_name = args[0];
270
0
        TypedNamespaceName keyspace;
271
0
        size_t num_tables = 0;
272
0
        vector<YBTableName> tables;
273
274
0
        if (args.size() >= 2) {
275
0
          keyspace = VERIFY_RESULT(ParseNamespaceName(args[1]));
276
0
          num_tables = args.size() - 2;
277
278
0
          if (num_tables > 0) {
279
0
            LOG_IF(DFATAL, keyspace.name.empty()) << "Uninitialized keyspace: " << keyspace.name;
280
0
            tables.reserve(num_tables);
281
282
0
            for (size_t i = 0; i < num_tables; ++i) {
283
0
              tables.push_back(YBTableName(keyspace.db_type, keyspace.name, args[2 + i]));
284
0
            }
285
0
          }
286
0
        }
287
288
0
        const string msg = num_tables > 0 ?
289
0
            Substitute("Unable to import tables $0 from snapshot meta file $1",
290
0
                       yb::ToString(tables), file_name) :
291
0
            Substitute("Unable to import snapshot meta file $0", file_name);
292
293
0
        RETURN_NOT_OK_PREPEND(client->ImportSnapshotMetaFile(file_name, keyspace, tables), msg);
294
0
        return Status::OK();
295
0
      });
296
297
77
  Register(
298
77
      "delete_snapshot", " <snapshot_id>",
299
77
      [client](const CLIArguments& args) -> Status {
300
0
        if (args.size() != 1) {
301
0
          return ClusterAdminCli::kInvalidArguments;
302
0
        }
303
304
0
        const string snapshot_id = args[0];
305
0
        RETURN_NOT_OK_PREPEND(client->DeleteSnapshot(snapshot_id),
306
0
                              Substitute("Unable to delete snapshot $0", snapshot_id));
307
0
        return Status::OK();
308
0
      });
309
310
77
  Register(
311
77
      "list_replica_type_counts",
312
77
      " <table>",
313
77
      [client](const CLIArguments& args) -> Status {
314
0
        const auto table_name = VERIFY_RESULT(
315
0
            ResolveSingleTableName(client, args.begin(), args.end()));
316
0
        RETURN_NOT_OK_PREPEND(client->ListReplicaTypeCounts(table_name),
317
0
                              "Unable to list live and read-only replica counts");
318
0
        return Status::OK();
319
0
      });
320
321
77
  Register(
322
77
      "set_preferred_zones", " <cloud.region.zone> [<cloud.region.zone>]...",
323
77
      [client](const CLIArguments& args) -> Status {
324
0
        if (args.size() < 1) {
325
0
          return ClusterAdminCli::kInvalidArguments;
326
0
        }
327
0
        RETURN_NOT_OK_PREPEND(client->SetPreferredZones(args), "Unable to set preferred zones");
328
0
        return Status::OK();
329
0
      });
330
331
77
  Register(
332
77
      "rotate_universe_key", " key_path",
333
77
      [client](const CLIArguments& args) -> Status {
334
0
        if (args.size() < 1) {
335
0
          return ClusterAdminCli::kInvalidArguments;
336
0
        }
337
0
        RETURN_NOT_OK_PREPEND(
338
0
            client->RotateUniverseKey(args[0]), "Unable to rotate universe key.");
339
0
        return Status::OK();
340
0
      });
341
342
77
  Register(
343
77
      "disable_encryption", "",
344
77
      [client](const CLIArguments& args) -> Status {
345
0
        RETURN_NOT_OK_PREPEND(client->DisableEncryption(), "Unable to disable encryption.");
346
0
        return Status::OK();
347
0
      });
348
349
77
  Register(
350
77
      "is_encryption_enabled", "",
351
77
      [client](const CLIArguments& args) -> Status {
352
0
        RETURN_NOT_OK_PREPEND(client->IsEncryptionEnabled(), "Unable to get encryption status.");
353
0
        return Status::OK();
354
0
      });
355
356
77
  Register(
357
77
      "add_universe_key_to_all_masters", " key_id key_path",
358
77
      [client](const CLIArguments& args) -> Status {
359
0
        if (args.size() != 2) {
360
0
          return ClusterAdminCli::kInvalidArguments;
361
0
        }
362
0
        string key_id = args[0];
363
0
        faststring contents;
364
0
        RETURN_NOT_OK(ReadFileToString(Env::Default(), args[1], &contents));
365
0
        string universe_key = contents.ToString();
366
367
0
        RETURN_NOT_OK_PREPEND(client->AddUniverseKeyToAllMasters(key_id, universe_key),
368
0
                              "Unable to add universe key to all masters.");
369
0
        return Status::OK();
370
0
      });
371
372
77
  Register(
373
77
      "all_masters_have_universe_key_in_memory", " key_id",
374
77
      [client](const CLIArguments& args) -> Status {
375
0
        if (args.size() != 1) {
376
0
          return ClusterAdminCli::kInvalidArguments;
377
0
        }
378
0
        RETURN_NOT_OK_PREPEND(client->AllMastersHaveUniverseKeyInMemory(args[0]),
379
0
                              "Unable to check whether master has universe key in memory.");
380
0
        return Status::OK();
381
0
      });
382
383
77
  Register(
384
77
      "rotate_universe_key_in_memory", " key_id",
385
77
      [client](const CLIArguments& args) -> Status {
386
0
        if (args.size() != 1) {
387
0
          return ClusterAdminCli::kInvalidArguments;
388
0
        }
389
0
        string key_id = args[0];
390
391
0
        RETURN_NOT_OK_PREPEND(client->RotateUniverseKeyInMemory(key_id),
392
0
                              "Unable rotate universe key in memory.");
393
0
        return Status::OK();
394
0
      });
395
396
77
  Register(
397
77
      "disable_encryption_in_memory", "",
398
77
      [client](const CLIArguments& args) -> Status {
399
0
        if (args.size() != 0) {
400
0
          return ClusterAdminCli::kInvalidArguments;
401
0
        }
402
0
        RETURN_NOT_OK_PREPEND(client->DisableEncryptionInMemory(), "Unable to disable encryption.");
403
0
        return Status::OK();
404
0
      });
405
406
77
  Register(
407
77
      "write_universe_key_to_file", " <key_id> <file_name>",
408
77
      [client](const CLIArguments& args) -> Status {
409
0
        if (args.size() != 2) {
410
0
          return ClusterAdminCli::kInvalidArguments;
411
0
        }
412
0
        RETURN_NOT_OK_PREPEND(client->WriteUniverseKeyToFile(args[0], args[1]),
413
0
                              "Unable to write key to file");
414
0
        return Status::OK();
415
0
      });
416
417
77
  Register(
418
77
      "create_cdc_stream", " <table_id>",
419
77
      [client](const CLIArguments& args) -> Status {
420
0
        if (args.size() < 1) {
421
0
          return ClusterAdminCli::kInvalidArguments;
422
0
        }
423
0
        const string table_id = args[0];
424
0
        RETURN_NOT_OK_PREPEND(client->CreateCDCStream(table_id),
425
0
                              Substitute("Unable to create CDC stream for table $0", table_id));
426
0
        return Status::OK();
427
0
      });
428
429
77
  Register(
430
77
    "create_change_data_stream", " <namespace> [checkpoint_type]",
431
77
    [client](const CLIArguments& args) -> Status {
432
0
      if (args.size() < 1) {
433
0
        return ClusterAdminCli::kInvalidArguments;
434
0
      }
435
436
0
      std::string checkpoint_type = yb::ToString("IMPLICIT");
437
0
      std::string uppercase_checkpoint_type;
438
439
0
      if (args.size() > 1) {
440
0
         ToUpperCase(args[1], &uppercase_checkpoint_type);
441
0
         if (uppercase_checkpoint_type != yb::ToString("EXPLICIT")
442
0
            && uppercase_checkpoint_type != yb::ToString("IMPLICIT")) {
443
0
            return ClusterAdminCli::kInvalidArguments;
444
0
         }
445
0
         checkpoint_type = uppercase_checkpoint_type;
446
0
      }
447
448
0
      const string namespace_name = args[0];
449
450
0
      const TypedNamespaceName database =
451
0
        VERIFY_RESULT(ParseNamespaceName(args[0], YQL_DATABASE_PGSQL));
452
0
      SCHECK_EQ(
453
0
        database.db_type, YQL_DATABASE_PGSQL, InvalidArgument,
454
0
        Format("Wrong database type: $0", YQLDatabase_Name(database.db_type)));
455
456
0
      RETURN_NOT_OK_PREPEND(client->CreateCDCSDKDBStream(database, checkpoint_type),
457
0
                            Substitute("Unable to create CDC stream for database $0",
458
0
                                       namespace_name));
459
0
      return Status::OK();
460
0
    });
461
462
77
  Register(
463
77
      "delete_cdc_stream", " <stream_id> [force_delete]",
464
77
      [client](const CLIArguments& args) -> Status {
465
0
        if (args.size() < 1) {
466
0
          return ClusterAdminCli::kInvalidArguments;
467
0
        }
468
0
        const string stream_id = args[0];
469
0
        bool force_delete = false;
470
0
        if (args.size() >= 2 && args[1] == "force_delete") {
471
0
          force_delete = true;
472
0
        }
473
0
        RETURN_NOT_OK_PREPEND(client->DeleteCDCStream(stream_id, force_delete),
474
0
            Substitute("Unable to delete CDC stream id $0", stream_id));
475
0
        return Status::OK();
476
0
      });
477
478
77
  Register(
479
77
    "delete_change_data_stream", " <db_stream_id>",
480
77
    [client](const CLIArguments& args) -> Status {
481
0
      if (args.size() < 1) {
482
0
        return ClusterAdminCli::kInvalidArguments;
483
0
      }
484
485
0
      const std::string db_stream_id = args[0];
486
0
      RETURN_NOT_OK_PREPEND(client->DeleteCDCSDKDBStream(db_stream_id),
487
0
                            Substitute("Unable to delete CDC database stream id $0",
488
0
                                       db_stream_id));
489
0
      return Status::OK();
490
0
    });
491
492
77
  Register(
493
77
      "list_cdc_streams", " [table_id]",
494
77
      [client](const CLIArguments& args) -> Status {
495
0
        if (args.size() != 0 && args.size() != 1) {
496
0
          return ClusterAdminCli::kInvalidArguments;
497
0
        }
498
0
        const string table_id = (args.size() == 1 ? args[0] : "");
499
0
        RETURN_NOT_OK_PREPEND(client->ListCDCStreams(table_id),
500
0
            Substitute("Unable to list CDC streams for table $0", table_id));
501
0
        return Status::OK();
502
0
      });
503
504
77
  Register(
505
77
    "list_change_data_streams", " [namespace]",
506
77
    [client](const CLIArguments& args) -> Status {
507
0
      if (args.size() != 0 && args.size() != 1) {
508
0
        return ClusterAdminCli::kInvalidArguments;
509
0
      }
510
0
      const string namespace_name = args.size() == 1 ? args[0] : "";
511
0
      string msg = (args.size() == 1)
512
0
                       ? Substitute("Unable to list CDC streams for namespace $0", namespace_name)
513
0
                       : "Unable to list CDC streams";
514
515
0
      RETURN_NOT_OK_PREPEND(client->ListCDCSDKStreams(namespace_name), msg);
516
0
      return Status::OK();
517
0
    });
518
519
77
  Register(
520
77
    "get_change_data_stream_info", " <db_stream_id>",
521
77
    [client](const CLIArguments& args) -> Status {
522
0
      if (args.size() != 0 && args.size() != 1) {
523
0
        return ClusterAdminCli::kInvalidArguments;
524
0
      }
525
0
      const string db_stream_id = args.size() == 1 ? args[0] : "";
526
0
      RETURN_NOT_OK_PREPEND(client->GetCDCDBStreamInfo(db_stream_id),
527
0
                            Substitute("Unable to list CDC stream info for database stream $0",
528
0
                                       db_stream_id));
529
0
      return Status::OK();
530
0
    });
531
532
77
  Register(
533
77
      "setup_universe_replication",
534
77
      " <producer_universe_uuid> <producer_master_addresses> <comma_separated_list_of_table_ids>"
535
77
          " [comma_separated_list_of_producer_bootstrap_ids]"  ,
536
77
      [client](const CLIArguments& args) -> Status {
537
0
        if (args.size() < 3) {
538
0
          return ClusterAdminCli::kInvalidArguments;
539
0
        }
540
0
        const string producer_uuid = args[0];
541
542
0
        vector<string> producer_addresses;
543
0
        boost::split(producer_addresses, args[1], boost::is_any_of(","));
544
545
0
        vector<string> table_uuids;
546
0
        boost::split(table_uuids, args[2], boost::is_any_of(","));
547
548
0
        vector<string> producer_bootstrap_ids;
549
0
        if (args.size() == 4) {
550
0
          boost::split(producer_bootstrap_ids, args[3], boost::is_any_of(","));
551
0
        }
552
553
0
        RETURN_NOT_OK_PREPEND(client->SetupUniverseReplication(producer_uuid,
554
0
                                                               producer_addresses,
555
0
                                                               table_uuids,
556
0
                                                               producer_bootstrap_ids),
557
0
                              Substitute("Unable to setup replication from universe $0",
558
0
                                         producer_uuid));
559
0
        return Status::OK();
560
0
      });
561
562
77
  Register(
563
77
      "delete_universe_replication", " <producer_universe_uuid> [ignore-errors]",
564
77
      [client](const CLIArguments& args) -> Status {
565
0
        if (args.size() < 1) {
566
0
          return ClusterAdminCli::kInvalidArguments;
567
0
        }
568
0
        const string producer_id = args[0];
569
0
        bool ignore_errors = false;
570
0
        if (args.size() >= 2 && args[1] == "ignore-errors") {
571
0
          ignore_errors = true;
572
0
        }
573
0
        RETURN_NOT_OK_PREPEND(client->DeleteUniverseReplication(producer_id, ignore_errors),
574
0
                              Substitute("Unable to delete replication for universe $0",
575
0
                              producer_id));
576
0
        return Status::OK();
577
0
      });
578
579
77
  Register(
580
77
      "alter_universe_replication",
581
77
      " <producer_universe_id>"
582
77
      " {set_master_addresses [comma_separated_list_of_producer_master_addresses] |"
583
77
      "  add_table [comma_separated_list_of_table_ids]"
584
77
      "            [comma_separated_list_of_producer_bootstrap_ids] |"
585
77
      "  remove_table [comma_separated_list_of_table_ids] |"
586
77
      "  rename_id <new_producer_universe_id>}",
587
77
      [client](const CLIArguments& args) -> Status {
588
0
        if (args.size() < 3 || args.size() > 4) {
589
0
          return ClusterAdminCli::kInvalidArguments;
590
0
        }
591
0
        if (args.size() == 4 && args[1] != "add_table") {
592
0
          return ClusterAdminCli::kInvalidArguments;
593
0
        }
594
595
0
        const string producer_uuid = args[0];
596
0
        vector<string> master_addresses;
597
0
        vector<string> add_tables;
598
0
        vector<string> remove_tables;
599
0
        vector<string> bootstrap_ids_to_add;
600
0
        string new_producer_universe_id = "";
601
602
0
        vector<string> newElem, *lst;
603
0
        if (args[1] == "set_master_addresses") {
604
0
          lst = &master_addresses;
605
0
        } else if (args[1] == "add_table") {
606
0
          lst = &add_tables;
607
0
        } else if (args[1] == "remove_table") {
608
0
          lst = &remove_tables;
609
0
        } else if (args[1] == "rename_id") {
610
0
          lst = nullptr;
611
0
          new_producer_universe_id = args[2];
612
0
        } else {
613
0
          return ClusterAdminCli::kInvalidArguments;
614
0
        }
615
616
0
        if (lst) {
617
0
          boost::split(newElem, args[2], boost::is_any_of(","));
618
0
          lst->insert(lst->end(), newElem.begin(), newElem.end());
619
620
0
          if (args[1] == "add_table" && args.size() == 4) {
621
0
            boost::split(bootstrap_ids_to_add, args[3], boost::is_any_of(","));
622
0
          }
623
0
        }
624
625
0
        RETURN_NOT_OK_PREPEND(client->AlterUniverseReplication(producer_uuid,
626
0
                                                               master_addresses,
627
0
                                                               add_tables,
628
0
                                                               remove_tables,
629
0
                                                               bootstrap_ids_to_add,
630
0
                                                               new_producer_universe_id),
631
0
            Substitute("Unable to alter replication for universe $0", producer_uuid));
632
633
0
        return Status::OK();
634
0
      });
635
636
77
  Register(
637
77
      "set_universe_replication_enabled", " <producer_universe_uuid> <0|1>",
638
77
      [client](const CLIArguments& args) -> Status {
639
0
        if (args.size() < 2) {
640
0
          return ClusterAdminCli::kInvalidArguments;
641
0
        }
642
0
        const string producer_id = args[0];
643
0
        const bool is_enabled = VERIFY_RESULT(CheckedStoi(args[1])) != 0;
644
0
        RETURN_NOT_OK_PREPEND(client->SetUniverseReplicationEnabled(producer_id, is_enabled),
645
0
            Substitute("Unable to $0 replication for universe $1",
646
0
                is_enabled ? "enable" : "disable",
647
0
                producer_id));
648
0
        return Status::OK();
649
0
      });
650
651
77
  Register(
652
77
      "bootstrap_cdc_producer", " <comma_separated_list_of_table_ids>",
653
77
      [client](const CLIArguments& args) -> Status {
654
0
        if (args.size() < 1) {
655
0
          return ClusterAdminCli::kInvalidArguments;
656
0
        }
657
658
0
        vector<string> table_ids;
659
0
        boost::split(table_ids, args[0], boost::is_any_of(","));
660
661
0
        RETURN_NOT_OK_PREPEND(client->BootstrapProducer(table_ids),
662
0
                              "Unable to bootstrap CDC producer");
663
0
        return Status::OK();
664
0
      });
665
77
}
666
667
}  // namespace enterprise
668
}  // namespace tools
669
}  // namespace yb