/Users/deen/code/yugabyte-db/ent/src/yb/tools/yb-admin_cli_ent.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include "yb/tools/yb-admin_cli.h" |
14 | | |
15 | | #include <iostream> |
16 | | #include <regex> |
17 | | |
18 | | #include <boost/algorithm/string.hpp> |
19 | | |
20 | | #include "yb/common/hybrid_time.h" |
21 | | #include "yb/common/snapshot.h" |
22 | | #include "yb/tools/yb-admin_client.h" |
23 | | #include "yb/util/date_time.h" |
24 | | #include "yb/util/format.h" |
25 | | #include "yb/util/result.h" |
26 | | #include "yb/util/status_format.h" |
27 | | #include "yb/util/stol_utils.h" |
28 | | #include "yb/util/string_case.h" |
29 | | |
30 | | namespace yb { |
31 | | namespace tools { |
32 | | namespace enterprise { |
33 | | |
34 | | using std::cerr; |
35 | | using std::endl; |
36 | | using std::string; |
37 | | using std::vector; |
38 | | |
39 | | using client::YBTableName; |
40 | | using strings::Substitute; |
41 | | |
42 | | namespace { |
43 | | |
44 | | const string kMinus = "minus"; |
45 | | |
46 | | template <class T, class Args> |
47 | 13 | Result<T> GetOptionalArg(const Args& args, size_t idx) { |
48 | 13 | if (args.size() <= idx) { |
49 | 1 | return T::Nil(); |
50 | 1 | } |
51 | 12 | if (args.size() > idx + 1) { |
52 | 0 | return STATUS_FORMAT(InvalidArgument, |
53 | 0 | "Too many arguments for command, at most $0 expected, but $1 found", |
54 | 0 | idx + 1, args.size()); |
55 | 0 | } |
56 | 12 | return VERIFY_RESULT(T::FromString(args[idx])); |
57 | 12 | } yb-admin_cli_ent.cc:yb::Result<yb::StronglyTypedUuid<yb::TxnSnapshotRestorationId_Tag> > yb::tools::enterprise::(anonymous namespace)::GetOptionalArg<yb::StronglyTypedUuid<yb::TxnSnapshotRestorationId_Tag>, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > >(std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, unsigned long) Line | Count | Source | 47 | 5 | Result<T> GetOptionalArg(const Args& args, size_t idx) { | 48 | 5 | if (args.size() <= idx) { | 49 | 0 | return T::Nil(); | 50 | 0 | } | 51 | 5 | if (args.size() > idx + 1) { | 52 | 0 | return STATUS_FORMAT(InvalidArgument, | 53 | 0 | "Too many arguments for command, at most $0 expected, but $1 found", | 54 | 0 | idx + 1, args.size()); | 55 | 0 | } | 56 | 5 | return VERIFY_RESULT0 (T::FromString(args[idx])); | 57 | 5 | } |
yb-admin_cli_ent.cc:yb::Result<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag> > yb::tools::enterprise::(anonymous namespace)::GetOptionalArg<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag>, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > >(std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, unsigned long) Line | Count | Source | 47 | 8 | Result<T> GetOptionalArg(const Args& args, size_t idx) { | 48 | 8 | if (args.size() <= idx) { | 49 | 1 | return T::Nil(); | 50 | 1 | } | 51 | 7 | if (args.size() > idx + 1) { | 52 | 0 | return STATUS_FORMAT(InvalidArgument, | 53 | 0 | "Too many arguments for command, at most $0 expected, but $1 found", | 54 | 0 | idx + 1, args.size()); | 55 | 0 | } | 56 | 7 | return VERIFY_RESULT0 (T::FromString(args[idx])); | 57 | 7 | } |
|
58 | | |
59 | | } // namespace |
60 | | |
61 | 77 | void ClusterAdminCli::RegisterCommandHandlers(ClusterAdminClientClass* client) { |
62 | 77 | super::RegisterCommandHandlers(client); |
63 | | |
64 | 77 | std::string options = ""; |
65 | 308 | for (auto flag : kListSnapshotsFlagList) { |
66 | 308 | options += Format(" [$0]", flag); |
67 | 308 | } |
68 | 77 | Register( |
69 | 77 | "list_snapshots", std::move(options), |
70 | 77 | [client](const CLIArguments& args) -> Status { |
71 | 1 | EnumBitSet<ListSnapshotsFlag> flags; |
72 | | |
73 | 2 | for (size_t i = 0; i < args.size(); ++i1 ) { |
74 | 1 | std::string uppercase_flag; |
75 | 1 | ToUpperCase(args[i], &uppercase_flag); |
76 | | |
77 | 1 | bool found = false; |
78 | 1 | for (auto flag : kListSnapshotsFlagList) { |
79 | 1 | if (uppercase_flag == ToString(flag)) { |
80 | 1 | flags.Set(flag); |
81 | 1 | found = true; |
82 | 1 | break; |
83 | 1 | } |
84 | 1 | } |
85 | 1 | if (!found) { |
86 | 0 | return STATUS_FORMAT(InvalidArgument, "Wrong flag: $0", args[i]); |
87 | 0 | } |
88 | 1 | } |
89 | | |
90 | 1 | RETURN_NOT_OK_PREPEND(client->ListSnapshots(flags), "Unable to list snapshots"); |
91 | 1 | return Status::OK(); |
92 | 1 | }); |
93 | | |
94 | 77 | Register( |
95 | 77 | "create_snapshot", |
96 | 77 | " <table>" |
97 | 77 | " [<table>]..." |
98 | 77 | " [flush_timeout_in_seconds] (default 60, set 0 to skip flushing)", |
99 | 77 | [client](const CLIArguments& args) -> Status { |
100 | 3 | int timeout_secs = 60; |
101 | 3 | const auto tables = VERIFY_RESULT(ResolveTableNames( |
102 | 3 | client, args.begin(), args.end(), |
103 | 3 | [&timeout_secs](auto i, const auto& end) -> Status { |
104 | 3 | if (std::next(i) == end) { |
105 | 3 | timeout_secs = VERIFY_RESULT(CheckedStoi(*i)); |
106 | 3 | return Status::OK(); |
107 | 3 | } |
108 | 3 | return ClusterAdminCli::kInvalidArguments; |
109 | 3 | })); |
110 | | |
111 | 4 | for (auto table : tables) { |
112 | 4 | if (table.is_cql_namespace() && table.is_system()3 ) { |
113 | 1 | return STATUS(InvalidArgument, |
114 | 1 | "Cannot create snapshot of YCQL system table", |
115 | 1 | table.table_name()); |
116 | 1 | } |
117 | 4 | } |
118 | | |
119 | 2 | RETURN_NOT_OK_PREPEND(client->CreateSnapshot(tables, true, timeout_secs), |
120 | 2 | Substitute("Unable to create snapshot of tables: $0", |
121 | 2 | yb::ToString(tables))); |
122 | 2 | return Status::OK(); |
123 | 2 | }); |
124 | | |
125 | 77 | RegisterJson( |
126 | 77 | "list_snapshot_restorations", |
127 | 77 | " [<restoration_id>]", |
128 | 77 | [client](const CLIArguments& args) -> Result<rapidjson::Document> { |
129 | 5 | auto restoration_id = VERIFY_RESULT(GetOptionalArg<TxnSnapshotRestorationId>(args, 0)); |
130 | 0 | return client->ListSnapshotRestorations(restoration_id); |
131 | 5 | }); |
132 | | |
133 | 77 | RegisterJson( |
134 | 77 | "create_snapshot_schedule", |
135 | 77 | " <snapshot_interval_in_minutes>" |
136 | 77 | " <snapshot_retention_in_minutes>" |
137 | 77 | " <keyspace>", |
138 | 77 | [client](const CLIArguments& args) -> Result<rapidjson::Document> { |
139 | 9 | RETURN_NOT_OK(CheckArgumentsCount(args.size(), 3, 3)); |
140 | 8 | auto interval = MonoDelta::FromMinutes(VERIFY_RESULT(CheckedStold(args[0]))); |
141 | 8 | auto retention = MonoDelta::FromMinutes(VERIFY_RESULT(CheckedStold(args[1]))); |
142 | 8 | const auto tables = VERIFY_RESULT(ResolveTableNames( |
143 | 8 | client, args.begin() + 2, args.end(), TailArgumentsProcessor(), true)); |
144 | | // This is just a paranoid check, should never happen. |
145 | 8 | if (tables.size() != 1 || !tables[0].has_namespace()) { |
146 | 0 | return STATUS(InvalidArgument, "Expecting exactly one keyspace argument"); |
147 | 0 | } |
148 | 8 | return client->CreateSnapshotSchedule(tables[0], interval, retention); |
149 | 8 | }); |
150 | | |
151 | 77 | RegisterJson( |
152 | 77 | "list_snapshot_schedules", |
153 | 77 | " [<schedule_id>]", |
154 | 77 | [client](const CLIArguments& args) -> Result<rapidjson::Document> { |
155 | 8 | RETURN_NOT_OK(CheckArgumentsCount(args.size(), 0, 1)); |
156 | 8 | auto schedule_id = VERIFY_RESULT(GetOptionalArg<SnapshotScheduleId>(args, 0)); |
157 | 0 | return client->ListSnapshotSchedules(schedule_id); |
158 | 8 | }); |
159 | | |
160 | 77 | RegisterJson( |
161 | 77 | "delete_snapshot_schedule", |
162 | 77 | " <schedule_id>", |
163 | 77 | [client](const CLIArguments& args) -> Result<rapidjson::Document> { |
164 | 0 | RETURN_NOT_OK(CheckArgumentsCount(args.size(), 1, 1)); |
165 | 0 | auto schedule_id = VERIFY_RESULT(SnapshotScheduleId::FromString(args[0])); |
166 | 0 | return client->DeleteSnapshotSchedule(schedule_id); |
167 | 0 | }); |
168 | | |
169 | 77 | RegisterJson( |
170 | 77 | "restore_snapshot_schedule", |
171 | 77 | Format(" <schedule_id> (<timestamp> | $0 <interval>)", kMinus), |
172 | 77 | [client](const CLIArguments& args) -> Result<rapidjson::Document> { |
173 | 3 | RETURN_NOT_OK(CheckArgumentsCount(args.size(), 2, 3)); |
174 | 3 | auto schedule_id = VERIFY_RESULT(SnapshotScheduleId::FromString(args[0])); |
175 | 0 | HybridTime restore_at; |
176 | 3 | if (args.size() == 2) { |
177 | 3 | restore_at = VERIFY_RESULT(HybridTime::ParseHybridTime(args[1])); |
178 | 3 | } else { |
179 | 0 | if (args[1] != kMinus) { |
180 | 0 | return ClusterAdminCli::kInvalidArguments; |
181 | 0 | } |
182 | 0 | restore_at = VERIFY_RESULT(HybridTime::ParseHybridTime("-" + args[2])); |
183 | 0 | } |
184 | | |
185 | 3 | return client->RestoreSnapshotSchedule(schedule_id, restore_at); |
186 | 3 | }); |
187 | | |
188 | 77 | Register( |
189 | 77 | "create_keyspace_snapshot", " [ycql.]<keyspace_name>", |
190 | 77 | [client](const CLIArguments& args) -> Status { |
191 | 0 | if (args.size() != 1) { |
192 | 0 | return ClusterAdminCli::kInvalidArguments; |
193 | 0 | } |
194 | | |
195 | 0 | const TypedNamespaceName keyspace = VERIFY_RESULT(ParseNamespaceName(args[0])); |
196 | 0 | SCHECK_NE( |
197 | 0 | keyspace.db_type, YQL_DATABASE_PGSQL, InvalidArgument, |
198 | 0 | Format("Wrong keyspace type: $0", YQLDatabase_Name(keyspace.db_type))); |
199 | | |
200 | 0 | RETURN_NOT_OK_PREPEND(client->CreateNamespaceSnapshot(keyspace), |
201 | 0 | Substitute("Unable to create snapshot of keyspace: $0", |
202 | 0 | keyspace.name)); |
203 | 0 | return Status::OK(); |
204 | 0 | }); |
205 | | |
206 | 77 | Register( |
207 | 77 | "create_database_snapshot", " [ysql.]<database_name>", |
208 | 77 | [client](const CLIArguments& args) -> Status { |
209 | 0 | if (args.size() != 1) { |
210 | 0 | return ClusterAdminCli::kInvalidArguments; |
211 | 0 | } |
212 | | |
213 | 0 | const TypedNamespaceName database = |
214 | 0 | VERIFY_RESULT(ParseNamespaceName(args[0], YQL_DATABASE_PGSQL)); |
215 | 0 | SCHECK_EQ( |
216 | 0 | database.db_type, YQL_DATABASE_PGSQL, InvalidArgument, |
217 | 0 | Format("Wrong database type: $0", YQLDatabase_Name(database.db_type))); |
218 | | |
219 | 0 | RETURN_NOT_OK_PREPEND(client->CreateNamespaceSnapshot(database), |
220 | 0 | Substitute("Unable to create snapshot of database: $0", |
221 | 0 | database.name)); |
222 | 0 | return Status::OK(); |
223 | 0 | }); |
224 | | |
225 | 77 | Register( |
226 | 77 | "restore_snapshot", Format(" <snapshot_id> [{<timestamp> | $0 {interval}]", kMinus), |
227 | 77 | [client](const CLIArguments& args) -> Status { |
228 | 0 | if (args.size() < 1 || 3 < args.size()) { |
229 | 0 | return ClusterAdminCli::kInvalidArguments; |
230 | 0 | } else if (args.size() == 3 && args[1] != kMinus) { |
231 | 0 | return ClusterAdminCli::kInvalidArguments; |
232 | 0 | } |
233 | 0 | const string snapshot_id = args[0]; |
234 | 0 | HybridTime timestamp; |
235 | 0 | if (args.size() == 2) { |
236 | 0 | timestamp = VERIFY_RESULT(HybridTime::ParseHybridTime(args[1])); |
237 | 0 | } else if (args.size() == 3) { |
238 | 0 | timestamp = VERIFY_RESULT(HybridTime::ParseHybridTime("-" + args[2])); |
239 | 0 | } |
240 | | |
241 | 0 | RETURN_NOT_OK_PREPEND(client->RestoreSnapshot(snapshot_id, timestamp), |
242 | 0 | Substitute("Unable to restore snapshot $0", snapshot_id)); |
243 | 0 | return Status::OK(); |
244 | 0 | }); |
245 | | |
246 | 77 | Register( |
247 | 77 | "export_snapshot", " <snapshot_id> <file_name>", |
248 | 77 | [client](const CLIArguments& args) -> Status { |
249 | 0 | if (args.size() != 2) { |
250 | 0 | return ClusterAdminCli::kInvalidArguments; |
251 | 0 | } |
252 | | |
253 | 0 | const string snapshot_id = args[0]; |
254 | 0 | const string file_name = args[1]; |
255 | 0 | RETURN_NOT_OK_PREPEND(client->CreateSnapshotMetaFile(snapshot_id, file_name), |
256 | 0 | Substitute("Unable to export snapshot $0 to file $1", |
257 | 0 | snapshot_id, |
258 | 0 | file_name)); |
259 | 0 | return Status::OK(); |
260 | 0 | }); |
261 | | |
262 | 77 | Register( |
263 | 77 | "import_snapshot", " <file_name> [<namespace> <table_name> [<table_name>]...]", |
264 | 77 | [client](const CLIArguments& args) -> Status { |
265 | 0 | if (args.size() < 1) { |
266 | 0 | return ClusterAdminCli::kInvalidArguments; |
267 | 0 | } |
268 | | |
269 | 0 | const string file_name = args[0]; |
270 | 0 | TypedNamespaceName keyspace; |
271 | 0 | size_t num_tables = 0; |
272 | 0 | vector<YBTableName> tables; |
273 | |
|
274 | 0 | if (args.size() >= 2) { |
275 | 0 | keyspace = VERIFY_RESULT(ParseNamespaceName(args[1])); |
276 | 0 | num_tables = args.size() - 2; |
277 | |
|
278 | 0 | if (num_tables > 0) { |
279 | 0 | LOG_IF(DFATAL, keyspace.name.empty()) << "Uninitialized keyspace: " << keyspace.name; |
280 | 0 | tables.reserve(num_tables); |
281 | |
|
282 | 0 | for (size_t i = 0; i < num_tables; ++i) { |
283 | 0 | tables.push_back(YBTableName(keyspace.db_type, keyspace.name, args[2 + i])); |
284 | 0 | } |
285 | 0 | } |
286 | 0 | } |
287 | | |
288 | 0 | const string msg = num_tables > 0 ? |
289 | 0 | Substitute("Unable to import tables $0 from snapshot meta file $1", |
290 | 0 | yb::ToString(tables), file_name) : |
291 | 0 | Substitute("Unable to import snapshot meta file $0", file_name); |
292 | |
|
293 | 0 | RETURN_NOT_OK_PREPEND(client->ImportSnapshotMetaFile(file_name, keyspace, tables), msg); |
294 | 0 | return Status::OK(); |
295 | 0 | }); |
296 | | |
297 | 77 | Register( |
298 | 77 | "delete_snapshot", " <snapshot_id>", |
299 | 77 | [client](const CLIArguments& args) -> Status { |
300 | 0 | if (args.size() != 1) { |
301 | 0 | return ClusterAdminCli::kInvalidArguments; |
302 | 0 | } |
303 | | |
304 | 0 | const string snapshot_id = args[0]; |
305 | 0 | RETURN_NOT_OK_PREPEND(client->DeleteSnapshot(snapshot_id), |
306 | 0 | Substitute("Unable to delete snapshot $0", snapshot_id)); |
307 | 0 | return Status::OK(); |
308 | 0 | }); |
309 | | |
310 | 77 | Register( |
311 | 77 | "list_replica_type_counts", |
312 | 77 | " <table>", |
313 | 77 | [client](const CLIArguments& args) -> Status { |
314 | 0 | const auto table_name = VERIFY_RESULT( |
315 | 0 | ResolveSingleTableName(client, args.begin(), args.end())); |
316 | 0 | RETURN_NOT_OK_PREPEND(client->ListReplicaTypeCounts(table_name), |
317 | 0 | "Unable to list live and read-only replica counts"); |
318 | 0 | return Status::OK(); |
319 | 0 | }); |
320 | | |
321 | 77 | Register( |
322 | 77 | "set_preferred_zones", " <cloud.region.zone> [<cloud.region.zone>]...", |
323 | 77 | [client](const CLIArguments& args) -> Status { |
324 | 0 | if (args.size() < 1) { |
325 | 0 | return ClusterAdminCli::kInvalidArguments; |
326 | 0 | } |
327 | 0 | RETURN_NOT_OK_PREPEND(client->SetPreferredZones(args), "Unable to set preferred zones"); |
328 | 0 | return Status::OK(); |
329 | 0 | }); |
330 | | |
331 | 77 | Register( |
332 | 77 | "rotate_universe_key", " key_path", |
333 | 77 | [client](const CLIArguments& args) -> Status { |
334 | 0 | if (args.size() < 1) { |
335 | 0 | return ClusterAdminCli::kInvalidArguments; |
336 | 0 | } |
337 | 0 | RETURN_NOT_OK_PREPEND( |
338 | 0 | client->RotateUniverseKey(args[0]), "Unable to rotate universe key."); |
339 | 0 | return Status::OK(); |
340 | 0 | }); |
341 | | |
342 | 77 | Register( |
343 | 77 | "disable_encryption", "", |
344 | 77 | [client](const CLIArguments& args) -> Status { |
345 | 0 | RETURN_NOT_OK_PREPEND(client->DisableEncryption(), "Unable to disable encryption."); |
346 | 0 | return Status::OK(); |
347 | 0 | }); |
348 | | |
349 | 77 | Register( |
350 | 77 | "is_encryption_enabled", "", |
351 | 77 | [client](const CLIArguments& args) -> Status { |
352 | 0 | RETURN_NOT_OK_PREPEND(client->IsEncryptionEnabled(), "Unable to get encryption status."); |
353 | 0 | return Status::OK(); |
354 | 0 | }); |
355 | | |
356 | 77 | Register( |
357 | 77 | "add_universe_key_to_all_masters", " key_id key_path", |
358 | 77 | [client](const CLIArguments& args) -> Status { |
359 | 0 | if (args.size() != 2) { |
360 | 0 | return ClusterAdminCli::kInvalidArguments; |
361 | 0 | } |
362 | 0 | string key_id = args[0]; |
363 | 0 | faststring contents; |
364 | 0 | RETURN_NOT_OK(ReadFileToString(Env::Default(), args[1], &contents)); |
365 | 0 | string universe_key = contents.ToString(); |
366 | |
|
367 | 0 | RETURN_NOT_OK_PREPEND(client->AddUniverseKeyToAllMasters(key_id, universe_key), |
368 | 0 | "Unable to add universe key to all masters."); |
369 | 0 | return Status::OK(); |
370 | 0 | }); |
371 | | |
372 | 77 | Register( |
373 | 77 | "all_masters_have_universe_key_in_memory", " key_id", |
374 | 77 | [client](const CLIArguments& args) -> Status { |
375 | 0 | if (args.size() != 1) { |
376 | 0 | return ClusterAdminCli::kInvalidArguments; |
377 | 0 | } |
378 | 0 | RETURN_NOT_OK_PREPEND(client->AllMastersHaveUniverseKeyInMemory(args[0]), |
379 | 0 | "Unable to check whether master has universe key in memory."); |
380 | 0 | return Status::OK(); |
381 | 0 | }); |
382 | | |
383 | 77 | Register( |
384 | 77 | "rotate_universe_key_in_memory", " key_id", |
385 | 77 | [client](const CLIArguments& args) -> Status { |
386 | 0 | if (args.size() != 1) { |
387 | 0 | return ClusterAdminCli::kInvalidArguments; |
388 | 0 | } |
389 | 0 | string key_id = args[0]; |
390 | |
|
391 | 0 | RETURN_NOT_OK_PREPEND(client->RotateUniverseKeyInMemory(key_id), |
392 | 0 | "Unable rotate universe key in memory."); |
393 | 0 | return Status::OK(); |
394 | 0 | }); |
395 | | |
396 | 77 | Register( |
397 | 77 | "disable_encryption_in_memory", "", |
398 | 77 | [client](const CLIArguments& args) -> Status { |
399 | 0 | if (args.size() != 0) { |
400 | 0 | return ClusterAdminCli::kInvalidArguments; |
401 | 0 | } |
402 | 0 | RETURN_NOT_OK_PREPEND(client->DisableEncryptionInMemory(), "Unable to disable encryption."); |
403 | 0 | return Status::OK(); |
404 | 0 | }); |
405 | | |
406 | 77 | Register( |
407 | 77 | "write_universe_key_to_file", " <key_id> <file_name>", |
408 | 77 | [client](const CLIArguments& args) -> Status { |
409 | 0 | if (args.size() != 2) { |
410 | 0 | return ClusterAdminCli::kInvalidArguments; |
411 | 0 | } |
412 | 0 | RETURN_NOT_OK_PREPEND(client->WriteUniverseKeyToFile(args[0], args[1]), |
413 | 0 | "Unable to write key to file"); |
414 | 0 | return Status::OK(); |
415 | 0 | }); |
416 | | |
417 | 77 | Register( |
418 | 77 | "create_cdc_stream", " <table_id>", |
419 | 77 | [client](const CLIArguments& args) -> Status { |
420 | 0 | if (args.size() < 1) { |
421 | 0 | return ClusterAdminCli::kInvalidArguments; |
422 | 0 | } |
423 | 0 | const string table_id = args[0]; |
424 | 0 | RETURN_NOT_OK_PREPEND(client->CreateCDCStream(table_id), |
425 | 0 | Substitute("Unable to create CDC stream for table $0", table_id)); |
426 | 0 | return Status::OK(); |
427 | 0 | }); |
428 | | |
429 | 77 | Register( |
430 | 77 | "create_change_data_stream", " <namespace> [checkpoint_type]", |
431 | 77 | [client](const CLIArguments& args) -> Status { |
432 | 0 | if (args.size() < 1) { |
433 | 0 | return ClusterAdminCli::kInvalidArguments; |
434 | 0 | } |
435 | | |
436 | 0 | std::string checkpoint_type = yb::ToString("IMPLICIT"); |
437 | 0 | std::string uppercase_checkpoint_type; |
438 | |
|
439 | 0 | if (args.size() > 1) { |
440 | 0 | ToUpperCase(args[1], &uppercase_checkpoint_type); |
441 | 0 | if (uppercase_checkpoint_type != yb::ToString("EXPLICIT") |
442 | 0 | && uppercase_checkpoint_type != yb::ToString("IMPLICIT")) { |
443 | 0 | return ClusterAdminCli::kInvalidArguments; |
444 | 0 | } |
445 | 0 | checkpoint_type = uppercase_checkpoint_type; |
446 | 0 | } |
447 | | |
448 | 0 | const string namespace_name = args[0]; |
449 | |
|
450 | 0 | const TypedNamespaceName database = |
451 | 0 | VERIFY_RESULT(ParseNamespaceName(args[0], YQL_DATABASE_PGSQL)); |
452 | 0 | SCHECK_EQ( |
453 | 0 | database.db_type, YQL_DATABASE_PGSQL, InvalidArgument, |
454 | 0 | Format("Wrong database type: $0", YQLDatabase_Name(database.db_type))); |
455 | | |
456 | 0 | RETURN_NOT_OK_PREPEND(client->CreateCDCSDKDBStream(database, checkpoint_type), |
457 | 0 | Substitute("Unable to create CDC stream for database $0", |
458 | 0 | namespace_name)); |
459 | 0 | return Status::OK(); |
460 | 0 | }); |
461 | | |
462 | 77 | Register( |
463 | 77 | "delete_cdc_stream", " <stream_id> [force_delete]", |
464 | 77 | [client](const CLIArguments& args) -> Status { |
465 | 0 | if (args.size() < 1) { |
466 | 0 | return ClusterAdminCli::kInvalidArguments; |
467 | 0 | } |
468 | 0 | const string stream_id = args[0]; |
469 | 0 | bool force_delete = false; |
470 | 0 | if (args.size() >= 2 && args[1] == "force_delete") { |
471 | 0 | force_delete = true; |
472 | 0 | } |
473 | 0 | RETURN_NOT_OK_PREPEND(client->DeleteCDCStream(stream_id, force_delete), |
474 | 0 | Substitute("Unable to delete CDC stream id $0", stream_id)); |
475 | 0 | return Status::OK(); |
476 | 0 | }); |
477 | | |
478 | 77 | Register( |
479 | 77 | "delete_change_data_stream", " <db_stream_id>", |
480 | 77 | [client](const CLIArguments& args) -> Status { |
481 | 0 | if (args.size() < 1) { |
482 | 0 | return ClusterAdminCli::kInvalidArguments; |
483 | 0 | } |
484 | | |
485 | 0 | const std::string db_stream_id = args[0]; |
486 | 0 | RETURN_NOT_OK_PREPEND(client->DeleteCDCSDKDBStream(db_stream_id), |
487 | 0 | Substitute("Unable to delete CDC database stream id $0", |
488 | 0 | db_stream_id)); |
489 | 0 | return Status::OK(); |
490 | 0 | }); |
491 | | |
492 | 77 | Register( |
493 | 77 | "list_cdc_streams", " [table_id]", |
494 | 77 | [client](const CLIArguments& args) -> Status { |
495 | 0 | if (args.size() != 0 && args.size() != 1) { |
496 | 0 | return ClusterAdminCli::kInvalidArguments; |
497 | 0 | } |
498 | 0 | const string table_id = (args.size() == 1 ? args[0] : ""); |
499 | 0 | RETURN_NOT_OK_PREPEND(client->ListCDCStreams(table_id), |
500 | 0 | Substitute("Unable to list CDC streams for table $0", table_id)); |
501 | 0 | return Status::OK(); |
502 | 0 | }); |
503 | | |
504 | 77 | Register( |
505 | 77 | "list_change_data_streams", " [namespace]", |
506 | 77 | [client](const CLIArguments& args) -> Status { |
507 | 0 | if (args.size() != 0 && args.size() != 1) { |
508 | 0 | return ClusterAdminCli::kInvalidArguments; |
509 | 0 | } |
510 | 0 | const string namespace_name = args.size() == 1 ? args[0] : ""; |
511 | 0 | string msg = (args.size() == 1) |
512 | 0 | ? Substitute("Unable to list CDC streams for namespace $0", namespace_name) |
513 | 0 | : "Unable to list CDC streams"; |
514 | |
|
515 | 0 | RETURN_NOT_OK_PREPEND(client->ListCDCSDKStreams(namespace_name), msg); |
516 | 0 | return Status::OK(); |
517 | 0 | }); |
518 | | |
519 | 77 | Register( |
520 | 77 | "get_change_data_stream_info", " <db_stream_id>", |
521 | 77 | [client](const CLIArguments& args) -> Status { |
522 | 0 | if (args.size() != 0 && args.size() != 1) { |
523 | 0 | return ClusterAdminCli::kInvalidArguments; |
524 | 0 | } |
525 | 0 | const string db_stream_id = args.size() == 1 ? args[0] : ""; |
526 | 0 | RETURN_NOT_OK_PREPEND(client->GetCDCDBStreamInfo(db_stream_id), |
527 | 0 | Substitute("Unable to list CDC stream info for database stream $0", |
528 | 0 | db_stream_id)); |
529 | 0 | return Status::OK(); |
530 | 0 | }); |
531 | | |
532 | 77 | Register( |
533 | 77 | "setup_universe_replication", |
534 | 77 | " <producer_universe_uuid> <producer_master_addresses> <comma_separated_list_of_table_ids>" |
535 | 77 | " [comma_separated_list_of_producer_bootstrap_ids]" , |
536 | 77 | [client](const CLIArguments& args) -> Status { |
537 | 0 | if (args.size() < 3) { |
538 | 0 | return ClusterAdminCli::kInvalidArguments; |
539 | 0 | } |
540 | 0 | const string producer_uuid = args[0]; |
541 | |
|
542 | 0 | vector<string> producer_addresses; |
543 | 0 | boost::split(producer_addresses, args[1], boost::is_any_of(",")); |
544 | |
|
545 | 0 | vector<string> table_uuids; |
546 | 0 | boost::split(table_uuids, args[2], boost::is_any_of(",")); |
547 | |
|
548 | 0 | vector<string> producer_bootstrap_ids; |
549 | 0 | if (args.size() == 4) { |
550 | 0 | boost::split(producer_bootstrap_ids, args[3], boost::is_any_of(",")); |
551 | 0 | } |
552 | |
|
553 | 0 | RETURN_NOT_OK_PREPEND(client->SetupUniverseReplication(producer_uuid, |
554 | 0 | producer_addresses, |
555 | 0 | table_uuids, |
556 | 0 | producer_bootstrap_ids), |
557 | 0 | Substitute("Unable to setup replication from universe $0", |
558 | 0 | producer_uuid)); |
559 | 0 | return Status::OK(); |
560 | 0 | }); |
561 | | |
562 | 77 | Register( |
563 | 77 | "delete_universe_replication", " <producer_universe_uuid> [ignore-errors]", |
564 | 77 | [client](const CLIArguments& args) -> Status { |
565 | 0 | if (args.size() < 1) { |
566 | 0 | return ClusterAdminCli::kInvalidArguments; |
567 | 0 | } |
568 | 0 | const string producer_id = args[0]; |
569 | 0 | bool ignore_errors = false; |
570 | 0 | if (args.size() >= 2 && args[1] == "ignore-errors") { |
571 | 0 | ignore_errors = true; |
572 | 0 | } |
573 | 0 | RETURN_NOT_OK_PREPEND(client->DeleteUniverseReplication(producer_id, ignore_errors), |
574 | 0 | Substitute("Unable to delete replication for universe $0", |
575 | 0 | producer_id)); |
576 | 0 | return Status::OK(); |
577 | 0 | }); |
578 | | |
579 | 77 | Register( |
580 | 77 | "alter_universe_replication", |
581 | 77 | " <producer_universe_id>" |
582 | 77 | " {set_master_addresses [comma_separated_list_of_producer_master_addresses] |" |
583 | 77 | " add_table [comma_separated_list_of_table_ids]" |
584 | 77 | " [comma_separated_list_of_producer_bootstrap_ids] |" |
585 | 77 | " remove_table [comma_separated_list_of_table_ids] |" |
586 | 77 | " rename_id <new_producer_universe_id>}", |
587 | 77 | [client](const CLIArguments& args) -> Status { |
588 | 0 | if (args.size() < 3 || args.size() > 4) { |
589 | 0 | return ClusterAdminCli::kInvalidArguments; |
590 | 0 | } |
591 | 0 | if (args.size() == 4 && args[1] != "add_table") { |
592 | 0 | return ClusterAdminCli::kInvalidArguments; |
593 | 0 | } |
594 | | |
595 | 0 | const string producer_uuid = args[0]; |
596 | 0 | vector<string> master_addresses; |
597 | 0 | vector<string> add_tables; |
598 | 0 | vector<string> remove_tables; |
599 | 0 | vector<string> bootstrap_ids_to_add; |
600 | 0 | string new_producer_universe_id = ""; |
601 | |
|
602 | 0 | vector<string> newElem, *lst; |
603 | 0 | if (args[1] == "set_master_addresses") { |
604 | 0 | lst = &master_addresses; |
605 | 0 | } else if (args[1] == "add_table") { |
606 | 0 | lst = &add_tables; |
607 | 0 | } else if (args[1] == "remove_table") { |
608 | 0 | lst = &remove_tables; |
609 | 0 | } else if (args[1] == "rename_id") { |
610 | 0 | lst = nullptr; |
611 | 0 | new_producer_universe_id = args[2]; |
612 | 0 | } else { |
613 | 0 | return ClusterAdminCli::kInvalidArguments; |
614 | 0 | } |
615 | | |
616 | 0 | if (lst) { |
617 | 0 | boost::split(newElem, args[2], boost::is_any_of(",")); |
618 | 0 | lst->insert(lst->end(), newElem.begin(), newElem.end()); |
619 | |
|
620 | 0 | if (args[1] == "add_table" && args.size() == 4) { |
621 | 0 | boost::split(bootstrap_ids_to_add, args[3], boost::is_any_of(",")); |
622 | 0 | } |
623 | 0 | } |
624 | |
|
625 | 0 | RETURN_NOT_OK_PREPEND(client->AlterUniverseReplication(producer_uuid, |
626 | 0 | master_addresses, |
627 | 0 | add_tables, |
628 | 0 | remove_tables, |
629 | 0 | bootstrap_ids_to_add, |
630 | 0 | new_producer_universe_id), |
631 | 0 | Substitute("Unable to alter replication for universe $0", producer_uuid)); |
632 | |
|
633 | 0 | return Status::OK(); |
634 | 0 | }); |
635 | | |
636 | 77 | Register( |
637 | 77 | "set_universe_replication_enabled", " <producer_universe_uuid> <0|1>", |
638 | 77 | [client](const CLIArguments& args) -> Status { |
639 | 0 | if (args.size() < 2) { |
640 | 0 | return ClusterAdminCli::kInvalidArguments; |
641 | 0 | } |
642 | 0 | const string producer_id = args[0]; |
643 | 0 | const bool is_enabled = VERIFY_RESULT(CheckedStoi(args[1])) != 0; |
644 | 0 | RETURN_NOT_OK_PREPEND(client->SetUniverseReplicationEnabled(producer_id, is_enabled), |
645 | 0 | Substitute("Unable to $0 replication for universe $1", |
646 | 0 | is_enabled ? "enable" : "disable", |
647 | 0 | producer_id)); |
648 | 0 | return Status::OK(); |
649 | 0 | }); |
650 | | |
651 | 77 | Register( |
652 | 77 | "bootstrap_cdc_producer", " <comma_separated_list_of_table_ids>", |
653 | 77 | [client](const CLIArguments& args) -> Status { |
654 | 0 | if (args.size() < 1) { |
655 | 0 | return ClusterAdminCli::kInvalidArguments; |
656 | 0 | } |
657 | | |
658 | 0 | vector<string> table_ids; |
659 | 0 | boost::split(table_ids, args[0], boost::is_any_of(",")); |
660 | |
|
661 | 0 | RETURN_NOT_OK_PREPEND(client->BootstrapProducer(table_ids), |
662 | 0 | "Unable to bootstrap CDC producer"); |
663 | 0 | return Status::OK(); |
664 | 0 | }); |
665 | 77 | } |
666 | | |
667 | | } // namespace enterprise |
668 | | } // namespace tools |
669 | | } // namespace yb |