/Users/deen/code/yugabyte-db/ent/src/yb/tools/yb-admin_client_ent.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include "yb/tools/yb-admin_cli.h" |
14 | | #include "yb/tools/yb-admin_client.h" |
15 | | |
16 | | #include <iostream> |
17 | | |
18 | | #include <boost/algorithm/string.hpp> |
19 | | |
20 | | #include <rapidjson/document.h> |
21 | | |
22 | | #include "yb/cdc/cdc_service.h" |
23 | | #include "yb/cdc/cdc_service.proxy.h" |
24 | | #include "yb/client/client.h" |
25 | | |
26 | | #include "yb/common/entity_ids.h" |
27 | | #include "yb/common/json_util.h" |
28 | | #include "yb/common/wire_protocol.h" |
29 | | |
30 | | #include "yb/encryption/encryption_util.h" |
31 | | |
32 | | #include "yb/gutil/casts.h" |
33 | | #include "yb/gutil/strings/util.h" |
34 | | |
35 | | #include "yb/master/master_defaults.h" |
36 | | #include "yb/master/master_error.h" |
37 | | #include "yb/master/master_backup.proxy.h" |
38 | | #include "yb/master/master_client.proxy.h" |
39 | | #include "yb/master/master_cluster.proxy.h" |
40 | | #include "yb/master/master_ddl.proxy.h" |
41 | | #include "yb/master/master_encryption.proxy.h" |
42 | | #include "yb/master/master_replication.proxy.h" |
43 | | |
44 | | #include "yb/rpc/messenger.h" |
45 | | #include "yb/rpc/rpc_controller.h" |
46 | | #include "yb/tools/yb-admin_util.h" |
47 | | #include "yb/util/cast.h" |
48 | | #include "yb/util/env.h" |
49 | | #include "yb/util/flag_tags.h" |
50 | | #include "yb/util/jsonwriter.h" |
51 | | #include "yb/util/monotime.h" |
52 | | #include "yb/util/pb_util.h" |
53 | | #include "yb/util/physical_time.h" |
54 | | #include "yb/util/protobuf_util.h" |
55 | | #include "yb/util/string_case.h" |
56 | | #include "yb/util/string_trim.h" |
57 | | #include "yb/util/string_util.h" |
58 | | #include "yb/util/timestamp.h" |
59 | | #include "yb/util/format.h" |
60 | | #include "yb/util/status_format.h" |
61 | | |
62 | | DEFINE_test_flag(int32, metadata_file_format_version, 0, |
63 | | "Used in 'export_snapshot' metadata file format (0 means using latest format)."); |
64 | | |
65 | | DECLARE_bool(use_client_to_server_encryption); |
66 | | DECLARE_int32(yb_client_admin_operation_timeout_sec); |
67 | | |
68 | | namespace yb { |
69 | | namespace tools { |
70 | | namespace enterprise { |
71 | | |
72 | | using namespace std::literals; |
73 | | |
74 | | using std::cout; |
75 | | using std::endl; |
76 | | using std::string; |
77 | | using std::vector; |
78 | | |
79 | | using google::protobuf::RepeatedPtrField; |
80 | | |
81 | | using client::YBTableName; |
82 | | using pb_util::ParseFromSlice; |
83 | | using rpc::RpcController; |
84 | | |
85 | | using master::ChangeEncryptionInfoRequestPB; |
86 | | using master::ChangeEncryptionInfoResponsePB; |
87 | | using master::CreateSnapshotRequestPB; |
88 | | using master::CreateSnapshotResponsePB; |
89 | | using master::DeleteSnapshotRequestPB; |
90 | | using master::DeleteSnapshotResponsePB; |
91 | | using master::IdPairPB; |
92 | | using master::ImportSnapshotMetaRequestPB; |
93 | | using master::ImportSnapshotMetaResponsePB; |
94 | | using master::ImportSnapshotMetaResponsePB_TableMetaPB; |
95 | | using master::IsCreateTableDoneRequestPB; |
96 | | using master::IsCreateTableDoneResponsePB; |
97 | | using master::ListSnapshotRestorationsRequestPB; |
98 | | using master::ListSnapshotRestorationsResponsePB; |
99 | | using master::ListSnapshotsRequestPB; |
100 | | using master::ListSnapshotsResponsePB; |
101 | | using master::ListTablesRequestPB; |
102 | | using master::ListTablesResponsePB; |
103 | | using master::ListTabletServersRequestPB; |
104 | | using master::ListTabletServersResponsePB; |
105 | | using master::RestoreSnapshotRequestPB; |
106 | | using master::RestoreSnapshotResponsePB; |
107 | | using master::SnapshotInfoPB; |
108 | | using master::SysNamespaceEntryPB; |
109 | | using master::SysRowEntry; |
110 | | using master::SysRowEntryType; |
111 | | using master::BackupRowEntryPB; |
112 | | using master::SysTablesEntryPB; |
113 | | using master::SysSnapshotEntryPB; |
114 | | |
115 | | PB_ENUM_FORMATTERS(yb::master::SysSnapshotEntryPB::State); |
116 | | |
117 | 1 | Status ClusterAdminClient::ListSnapshots(const ListSnapshotsFlags& flags) { |
118 | 1 | ListSnapshotsResponsePB resp; |
119 | 1 | RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) { |
120 | 1 | ListSnapshotsRequestPB req; |
121 | 1 | req.set_list_deleted_snapshots(flags.Test(ListSnapshotsFlag::SHOW_DELETED)); |
122 | 1 | return master_backup_proxy_->ListSnapshots(req, &resp, rpc); |
123 | 1 | })); |
124 | | |
125 | 1 | rapidjson::Document document(rapidjson::kObjectType); |
126 | 1 | bool json = flags.Test(ListSnapshotsFlag::JSON); |
127 | | |
128 | 1 | if (resp.has_current_snapshot_id()) { |
129 | 0 | if (json) { |
130 | 0 | AddStringField("current_snapshot_id", |
131 | 0 | SnapshotIdToString(resp.current_snapshot_id()), |
132 | 0 | &document, &document.GetAllocator()); |
133 | 0 | } else { |
134 | 0 | cout << "Current snapshot id: " << SnapshotIdToString(resp.current_snapshot_id()) << endl; |
135 | 0 | } |
136 | 0 | } |
137 | | |
138 | 1 | rapidjson::Value json_snapshots(rapidjson::kArrayType); |
139 | 1 | if (!json) { |
140 | 1 | if (resp.snapshots_size()) { |
141 | | // Using 2 tabs so that the header can be aligned to the time. |
142 | 1 | cout << RightPadToUuidWidth("Snapshot UUID") << kColumnSep |
143 | 1 | << "State" << kColumnSep << kColumnSep << "Creation Time" << endl; |
144 | 1 | } else { |
145 | 0 | cout << "No snapshots" << endl; |
146 | 0 | } |
147 | 1 | } |
148 | | |
149 | 1 | for (SnapshotInfoPB& snapshot : *resp.mutable_snapshots()) { |
150 | 1 | rapidjson::Value json_snapshot(rapidjson::kObjectType); |
151 | 1 | if (json) { |
152 | 0 | AddStringField( |
153 | 0 | "id", SnapshotIdToString(snapshot.id()), &json_snapshot, &document.GetAllocator()); |
154 | 0 | const auto& entry = snapshot.entry(); |
155 | 0 | AddStringField( |
156 | 0 | "state", SysSnapshotEntryPB::State_Name(entry.state()), &json_snapshot, |
157 | 0 | &document.GetAllocator()); |
158 | 0 | AddStringField( |
159 | 0 | "snapshot_time", HybridTimeToString(HybridTime::FromPB(entry.snapshot_hybrid_time())), |
160 | 0 | &json_snapshot, &document.GetAllocator()); |
161 | 0 | AddStringField( |
162 | 0 | "previous_snapshot_time", |
163 | 0 | HybridTimeToString(HybridTime::FromPB(entry.previous_snapshot_hybrid_time())), |
164 | 0 | &json_snapshot, &document.GetAllocator()); |
165 | 1 | } else { |
166 | 1 | cout << SnapshotIdToString(snapshot.id()) << kColumnSep |
167 | 1 | << snapshot.entry().state() << kColumnSep |
168 | 1 | << HybridTimeToString(HybridTime::FromPB(snapshot.entry().snapshot_hybrid_time())) |
169 | 1 | << endl; |
170 | 1 | } |
171 | | |
172 | | // Not implemented in json mode. |
173 | 1 | if (flags.Test(ListSnapshotsFlag::SHOW_DETAILS)) { |
174 | 7 | for (SysRowEntry& entry : *snapshot.mutable_entry()->mutable_entries()) { |
175 | 7 | string decoded_data; |
176 | 7 | switch (entry.type()) { |
177 | 1 | case SysRowEntryType::NAMESPACE: { |
178 | 1 | auto meta = VERIFY_RESULT(ParseFromSlice<SysNamespaceEntryPB>(entry.data())); |
179 | 0 | meta.clear_transaction(); |
180 | 1 | decoded_data = JsonWriter::ToJson(meta, JsonWriter::COMPACT); |
181 | 1 | break; |
182 | 1 | } |
183 | 2 | case SysRowEntryType::TABLE: { |
184 | 2 | auto meta = VERIFY_RESULT(ParseFromSlice<SysTablesEntryPB>(entry.data())); |
185 | 0 | meta.clear_schema(); |
186 | 2 | meta.clear_partition_schema(); |
187 | 2 | meta.clear_index_info(); |
188 | 2 | meta.clear_indexes(); |
189 | 2 | meta.clear_transaction(); |
190 | 2 | decoded_data = JsonWriter::ToJson(meta, JsonWriter::COMPACT); |
191 | 2 | break; |
192 | 2 | } |
193 | 4 | default: |
194 | 4 | break; |
195 | 7 | } |
196 | | |
197 | 7 | if (!decoded_data.empty()) { |
198 | 3 | entry.set_data("DATA"); |
199 | 3 | cout << kColumnSep << StringReplace(JsonWriter::ToJson(entry, JsonWriter::COMPACT), |
200 | 3 | "\"DATA\"", decoded_data, false) << endl; |
201 | 3 | } |
202 | 7 | } |
203 | 1 | } |
204 | 1 | if (json) { |
205 | 0 | json_snapshots.PushBack(json_snapshot, document.GetAllocator()); |
206 | 0 | } |
207 | 1 | } |
208 | | |
209 | 1 | ListSnapshotRestorationsResponsePB rest_resp; |
210 | 1 | RETURN_NOT_OK(RequestMasterLeader(&rest_resp, [&](RpcController* rpc) { |
211 | 1 | ListSnapshotRestorationsRequestPB rest_req; |
212 | 1 | return master_backup_proxy_->ListSnapshotRestorations(rest_req, &rest_resp, rpc); |
213 | 1 | })); |
214 | | |
215 | 1 | if (json) { |
216 | 0 | document.AddMember("snapshots", json_snapshots, document.GetAllocator()); |
217 | 0 | std::cout << common::PrettyWriteRapidJsonToString(document) << std::endl; |
218 | 0 | return Status::OK(); |
219 | 0 | } |
220 | | |
221 | 1 | if (rest_resp.restorations_size() == 0) { |
222 | 1 | cout << "No snapshot restorations" << endl; |
223 | 1 | } else if (0 flags.Test(ListSnapshotsFlag::NOT_SHOW_RESTORED)0 ) { |
224 | 0 | cout << "Not show fully RESTORED entries" << endl; |
225 | 0 | } |
226 | | |
227 | 1 | bool title_printed = false; |
228 | 1 | for (const auto& restoration : rest_resp.restorations()) { |
229 | 0 | if (!flags.Test(ListSnapshotsFlag::NOT_SHOW_RESTORED) || |
230 | 0 | restoration.entry().state() != SysSnapshotEntryPB::RESTORED) { |
231 | 0 | if (!title_printed) { |
232 | 0 | cout << RightPadToUuidWidth("Restoration UUID") << kColumnSep << "State" << endl; |
233 | 0 | title_printed = true; |
234 | 0 | } |
235 | 0 | cout << TryFullyDecodeTxnSnapshotRestorationId(restoration.id()) << kColumnSep |
236 | 0 | << restoration.entry().state() << endl; |
237 | 0 | } |
238 | 0 | } |
239 | | |
240 | 1 | return Status::OK(); |
241 | 1 | } |
242 | | |
243 | | Status ClusterAdminClient::CreateSnapshot( |
244 | | const vector<YBTableName>& tables, |
245 | | const bool add_indexes, |
246 | 2 | const int flush_timeout_secs) { |
247 | 2 | if (flush_timeout_secs > 0) { |
248 | 2 | const auto status = FlushTables(tables, add_indexes, flush_timeout_secs, false); |
249 | 2 | if (status.IsTimedOut()) { |
250 | 0 | cout << status.ToString(false) << " (ignored)" << endl; |
251 | 2 | } else if (!status.ok() && !status.IsNotFound()0 ) { |
252 | 0 | return status; |
253 | 0 | } |
254 | 2 | } |
255 | | |
256 | 2 | CreateSnapshotResponsePB resp; |
257 | 2 | RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) { |
258 | 2 | CreateSnapshotRequestPB req; |
259 | 2 | for (const YBTableName& table_name : tables) { |
260 | 2 | table_name.SetIntoTableIdentifierPB(req.add_tables()); |
261 | 2 | } |
262 | | |
263 | 2 | req.set_add_indexes(add_indexes); |
264 | 2 | req.set_transaction_aware(true); |
265 | 2 | return master_backup_proxy_->CreateSnapshot(req, &resp, rpc); |
266 | 2 | })); |
267 | | |
268 | 2 | cout << "Started snapshot creation: " << SnapshotIdToString(resp.snapshot_id()) << endl; |
269 | 2 | return Status::OK(); |
270 | 2 | } |
271 | | |
272 | 0 | Status ClusterAdminClient::CreateNamespaceSnapshot(const TypedNamespaceName& ns) { |
273 | 0 | ListTablesResponsePB resp; |
274 | 0 | RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) { |
275 | 0 | ListTablesRequestPB req; |
276 | |
|
277 | 0 | req.mutable_namespace_()->set_name(ns.name); |
278 | 0 | req.mutable_namespace_()->set_database_type(ns.db_type); |
279 | 0 | req.set_exclude_system_tables(true); |
280 | 0 | req.add_relation_type_filter(master::USER_TABLE_RELATION); |
281 | 0 | req.add_relation_type_filter(master::INDEX_TABLE_RELATION); |
282 | 0 | return master_ddl_proxy_->ListTables(req, &resp, rpc); |
283 | 0 | })); |
284 | | |
285 | 0 | if (resp.tables_size() == 0) { |
286 | 0 | return STATUS_FORMAT(InvalidArgument, "No tables found in namespace: $0", ns.name); |
287 | 0 | } |
288 | | |
289 | 0 | vector<YBTableName> tables(resp.tables_size()); |
290 | 0 | for (int i = 0; i < resp.tables_size(); ++i) { |
291 | 0 | const auto& table = resp.tables(i); |
292 | 0 | tables[i].set_table_id(table.id()); |
293 | 0 | tables[i].set_namespace_id(table.namespace_().id()); |
294 | 0 | tables[i].set_pgschema_name(table.pgschema_name()); |
295 | |
|
296 | 0 | RSTATUS_DCHECK(table.relation_type() == master::USER_TABLE_RELATION || |
297 | 0 | table.relation_type() == master::INDEX_TABLE_RELATION, InternalError, |
298 | 0 | Format("Invalid relation type: $0", table.relation_type())); |
299 | 0 | RSTATUS_DCHECK_EQ(table.namespace_().name(), ns.name, InternalError, |
300 | 0 | Format("Invalid namespace name: $0", table.namespace_().name())); |
301 | 0 | RSTATUS_DCHECK_EQ(table.namespace_().database_type(), ns.db_type, InternalError, |
302 | 0 | Format("Invalid namespace type: $0", |
303 | 0 | YQLDatabase_Name(table.namespace_().database_type()))); |
304 | 0 | } |
305 | |
|
306 | 0 | return CreateSnapshot(tables, /* add_indexes */ false); |
307 | 0 | } |
308 | | |
309 | | Result<rapidjson::Document> ClusterAdminClient::ListSnapshotRestorations( |
310 | 5 | const TxnSnapshotRestorationId& restoration_id) { |
311 | 5 | master::ListSnapshotRestorationsResponsePB resp; |
312 | 5 | RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) { |
313 | 5 | master::ListSnapshotRestorationsRequestPB req; |
314 | 5 | if (restoration_id) { |
315 | 5 | req.set_restoration_id(restoration_id.data(), restoration_id.size()); |
316 | 5 | } |
317 | 5 | return master_backup_proxy_->ListSnapshotRestorations(req, &resp, rpc); |
318 | 5 | })); |
319 | | |
320 | 5 | rapidjson::Document result; |
321 | 5 | result.SetObject(); |
322 | 5 | rapidjson::Value json_restorations(rapidjson::kArrayType); |
323 | 5 | for (const auto& restoration : resp.restorations()) { |
324 | 5 | rapidjson::Value json_restoration(rapidjson::kObjectType); |
325 | 5 | AddStringField("id", |
326 | 5 | VERIFY_RESULT(FullyDecodeTxnSnapshotRestorationId(restoration.id())).ToString(), |
327 | 0 | &json_restoration, &result.GetAllocator()); |
328 | 0 | AddStringField( |
329 | 5 | "snapshot_id", |
330 | 5 | VERIFY_RESULT(FullyDecodeTxnSnapshotId(restoration.entry().snapshot_id())).ToString(), |
331 | 0 | &json_restoration, &result.GetAllocator()); |
332 | 0 | AddStringField( |
333 | 5 | "state", |
334 | 5 | master::SysSnapshotEntryPB_State_Name(restoration.entry().state()), |
335 | 5 | &json_restoration, &result.GetAllocator()); |
336 | 5 | json_restorations.PushBack(json_restoration, result.GetAllocator()); |
337 | 5 | } |
338 | 5 | result.AddMember("restorations", json_restorations, result.GetAllocator()); |
339 | 5 | return result; |
340 | 5 | } |
341 | | |
342 | | Result<rapidjson::Document> ClusterAdminClient::CreateSnapshotSchedule( |
343 | 8 | const client::YBTableName& keyspace, MonoDelta interval, MonoDelta retention) { |
344 | 8 | master::CreateSnapshotScheduleResponsePB resp; |
345 | 8 | RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) { |
346 | 8 | master::CreateSnapshotScheduleRequestPB req; |
347 | | |
348 | 8 | auto& options = *req.mutable_options(); |
349 | 8 | auto& filter_tables = *options.mutable_filter()->mutable_tables()->mutable_tables(); |
350 | 8 | keyspace.SetIntoTableIdentifierPB(filter_tables.Add()); |
351 | | |
352 | 8 | options.set_interval_sec(interval.ToSeconds()); |
353 | 8 | options.set_retention_duration_sec(retention.ToSeconds()); |
354 | 8 | return master_backup_proxy_->CreateSnapshotSchedule(req, &resp, rpc); |
355 | 8 | })); |
356 | | |
357 | 8 | rapidjson::Document document; |
358 | 8 | document.SetObject(); |
359 | | |
360 | 8 | AddStringField( |
361 | 8 | "schedule_id", |
362 | 8 | VERIFY_RESULT(FullyDecodeSnapshotScheduleId(resp.snapshot_schedule_id())).ToString(), |
363 | 0 | &document, &document.GetAllocator()); |
364 | 0 | return document; |
365 | 8 | } |
366 | | |
367 | | Result<rapidjson::Document> ClusterAdminClient::ListSnapshotSchedules( |
368 | 8 | const SnapshotScheduleId& schedule_id) { |
369 | 8 | master::ListSnapshotSchedulesResponsePB resp; |
370 | 8 | RETURN_NOT_OK(RequestMasterLeader(&resp, [this, &resp, &schedule_id](RpcController* rpc) { |
371 | 8 | master::ListSnapshotSchedulesRequestPB req; |
372 | 8 | if (schedule_id) { |
373 | 8 | req.set_snapshot_schedule_id(schedule_id.data(), schedule_id.size()); |
374 | 8 | } |
375 | 8 | return master_backup_proxy_->ListSnapshotSchedules(req, &resp, rpc); |
376 | 8 | })); |
377 | | |
378 | 8 | rapidjson::Document result; |
379 | 8 | result.SetObject(); |
380 | 8 | rapidjson::Value json_schedules(rapidjson::kArrayType); |
381 | 8 | for (const auto& schedule : resp.schedules()) { |
382 | 8 | rapidjson::Value json_schedule(rapidjson::kObjectType); |
383 | 8 | AddStringField("id", VERIFY_RESULT(FullyDecodeSnapshotScheduleId(schedule.id())).ToString(), |
384 | 0 | &json_schedule, &result.GetAllocator()); |
385 | | |
386 | 0 | const auto& filter = schedule.options().filter(); |
387 | 8 | string filter_output; |
388 | | // The user input should only have 1 entry, at namespace level. |
389 | 8 | if (filter.tables().tables_size() == 1) { |
390 | 8 | const auto& table_id = filter.tables().tables(0); |
391 | 8 | if (table_id.has_namespace_()) { |
392 | 8 | string database_type; |
393 | 8 | if (table_id.namespace_().database_type() == YQL_DATABASE_PGSQL) { |
394 | 2 | database_type = "ysql"; |
395 | 6 | } else if (table_id.namespace_().database_type() == YQL_DATABASE_CQL) { |
396 | 6 | database_type = "ycql"; |
397 | 6 | } |
398 | 8 | if (!database_type.empty()) { |
399 | 8 | filter_output = Format("$0.$1", database_type, table_id.namespace_().name()); |
400 | 8 | } |
401 | 8 | } |
402 | 8 | } |
403 | | // If the user input was non standard, just display the whole debug PB. |
404 | 8 | if (filter_output.empty()) { |
405 | 0 | filter_output = filter.ShortDebugString(); |
406 | 0 | DCHECK(false) << "Non standard filter " << filter_output; |
407 | 0 | } |
408 | 8 | rapidjson::Value options(rapidjson::kObjectType); |
409 | 8 | AddStringField("filter", filter_output, &options, &result.GetAllocator()); |
410 | 8 | auto interval_min = schedule.options().interval_sec() / MonoTime::kSecondsPerMinute; |
411 | 8 | AddStringField("interval", |
412 | 8 | Format("$0 min", interval_min), |
413 | 8 | &options, &result.GetAllocator()); |
414 | 8 | auto retention_min = schedule.options().retention_duration_sec() / MonoTime::kSecondsPerMinute; |
415 | 8 | AddStringField("retention", |
416 | 8 | Format("$0 min", retention_min), |
417 | 8 | &options, &result.GetAllocator()); |
418 | 8 | auto delete_time = HybridTime::FromPB(schedule.options().delete_time()); |
419 | 8 | if (delete_time) { |
420 | 0 | AddStringField("delete_time", HybridTimeToString(delete_time), &options, |
421 | 0 | &result.GetAllocator()); |
422 | 0 | } |
423 | | |
424 | 8 | json_schedule.AddMember("options", options, result.GetAllocator()); |
425 | 8 | rapidjson::Value json_snapshots(rapidjson::kArrayType); |
426 | 9 | for (const auto& snapshot : schedule.snapshots()) { |
427 | 9 | rapidjson::Value json_snapshot(rapidjson::kObjectType); |
428 | 9 | AddStringField("id", VERIFY_RESULT(FullyDecodeTxnSnapshotId(snapshot.id())).ToString(), |
429 | 0 | &json_snapshot, &result.GetAllocator()); |
430 | 0 | auto snapshot_ht = HybridTime::FromPB(snapshot.entry().snapshot_hybrid_time()); |
431 | 9 | AddStringField("snapshot_time", |
432 | 9 | HybridTimeToString(snapshot_ht), |
433 | 9 | &json_snapshot, &result.GetAllocator()); |
434 | 9 | auto previous_snapshot_ht = HybridTime::FromPB( |
435 | 9 | snapshot.entry().previous_snapshot_hybrid_time()); |
436 | 9 | if (previous_snapshot_ht) { |
437 | 1 | AddStringField( |
438 | 1 | "previous_snapshot_time", |
439 | 1 | HybridTimeToString(previous_snapshot_ht), |
440 | 1 | &json_snapshot, &result.GetAllocator()); |
441 | 1 | } |
442 | 9 | json_snapshots.PushBack(json_snapshot, result.GetAllocator()); |
443 | 9 | } |
444 | 8 | json_schedule.AddMember("snapshots", json_snapshots, result.GetAllocator()); |
445 | 8 | json_schedules.PushBack(json_schedule, result.GetAllocator()); |
446 | 8 | } |
447 | 8 | result.AddMember("schedules", json_schedules, result.GetAllocator()); |
448 | 8 | return result; |
449 | 8 | } |
450 | | |
451 | | Result<rapidjson::Document> ClusterAdminClient::DeleteSnapshotSchedule( |
452 | 0 | const SnapshotScheduleId& schedule_id) { |
453 | 0 | master::DeleteSnapshotScheduleResponsePB resp; |
454 | 0 | RETURN_NOT_OK(RequestMasterLeader(&resp, [this, &resp, &schedule_id](RpcController* rpc) { |
455 | 0 | master::DeleteSnapshotScheduleRequestPB req; |
456 | 0 | req.set_snapshot_schedule_id(schedule_id.data(), schedule_id.size()); |
457 | |
|
458 | 0 | return master_backup_proxy_->DeleteSnapshotSchedule(req, &resp, rpc); |
459 | 0 | })); |
460 | | |
461 | 0 | rapidjson::Document document; |
462 | 0 | document.SetObject(); |
463 | 0 | AddStringField("schedule_id", schedule_id.ToString(), &document, &document.GetAllocator()); |
464 | 0 | return document; |
465 | 0 | } |
466 | | |
467 | 10 | bool SnapshotSuitableForRestoreAt(const SysSnapshotEntryPB& entry, HybridTime restore_at) { |
468 | 10 | return (entry.state() == master::SysSnapshotEntryPB::COMPLETE || |
469 | 10 | entry.state() == master::SysSnapshotEntryPB::CREATING0 ) && |
470 | 10 | HybridTime::FromPB(entry.snapshot_hybrid_time()) >= restore_at && |
471 | 10 | HybridTime::FromPB(entry.previous_snapshot_hybrid_time()) < restore_at5 ; |
472 | 10 | } |
473 | | |
474 | | Result<TxnSnapshotId> ClusterAdminClient::SuitableSnapshotId( |
475 | 3 | const SnapshotScheduleId& schedule_id, HybridTime restore_at, CoarseTimePoint deadline) { |
476 | 3 | for (;;) { |
477 | 3 | auto last_snapshot_time = HybridTime::kMin; |
478 | 3 | { |
479 | 3 | RpcController rpc; |
480 | 3 | rpc.set_deadline(deadline); |
481 | 3 | master::ListSnapshotSchedulesRequestPB req; |
482 | 3 | master::ListSnapshotSchedulesResponsePB resp; |
483 | 3 | if (schedule_id) { |
484 | 3 | req.set_snapshot_schedule_id(schedule_id.data(), schedule_id.size()); |
485 | 3 | } |
486 | | |
487 | 3 | RETURN_NOT_OK_PREPEND(master_backup_proxy_->ListSnapshotSchedules(req, &resp, &rpc), |
488 | 3 | "Failed to list snapshot schedules"); |
489 | | |
490 | 3 | if (resp.has_error()) { |
491 | 0 | return StatusFromPB(resp.error().status()); |
492 | 0 | } |
493 | | |
494 | 3 | if (resp.schedules().size() < 1) { |
495 | 0 | return STATUS_FORMAT(InvalidArgument, "Unknown schedule: $0", schedule_id); |
496 | 0 | } |
497 | | |
498 | 7 | for (const auto& snapshot : resp.schedules()[0].snapshots())3 { |
499 | 7 | auto snapshot_hybrid_time = HybridTime::FromPB(snapshot.entry().snapshot_hybrid_time()); |
500 | 7 | last_snapshot_time = std::max(last_snapshot_time, snapshot_hybrid_time); |
501 | 7 | if (SnapshotSuitableForRestoreAt(snapshot.entry(), restore_at)) { |
502 | 2 | return VERIFY_RESULT(FullyDecodeTxnSnapshotId(snapshot.id())); |
503 | 2 | } |
504 | 7 | } |
505 | 1 | if (last_snapshot_time > restore_at) { |
506 | 0 | return STATUS_FORMAT( |
507 | 0 | IllegalState, "Cannot restore at $0, last snapshot: $1, snapshots: $2", |
508 | 0 | restore_at, last_snapshot_time, resp.schedules()[0].snapshots()); |
509 | 0 | } |
510 | 1 | } |
511 | 1 | RpcController rpc; |
512 | 1 | rpc.set_deadline(deadline); |
513 | 1 | master::CreateSnapshotRequestPB req; |
514 | 1 | master::CreateSnapshotResponsePB resp; |
515 | 1 | req.set_schedule_id(schedule_id.data(), schedule_id.size()); |
516 | 1 | RETURN_NOT_OK_PREPEND(master_backup_proxy_->CreateSnapshot(req, &resp, &rpc), |
517 | 1 | "Failed to create snapshot"); |
518 | 1 | if (resp.has_error()) { |
519 | 0 | auto status = StatusFromPB(resp.error().status()); |
520 | 0 | if (master::MasterError(status) == master::MasterErrorPB::PARALLEL_SNAPSHOT_OPERATION) { |
521 | 0 | std::this_thread::sleep_until(std::min(deadline, CoarseMonoClock::now() + 1s)); |
522 | 0 | continue; |
523 | 0 | } |
524 | 0 | return status; |
525 | 0 | } |
526 | 1 | return FullyDecodeTxnSnapshotId(resp.snapshot_id()); |
527 | 1 | } |
528 | 3 | } |
529 | | |
530 | | Result<rapidjson::Document> ClusterAdminClient::RestoreSnapshotSchedule( |
531 | 3 | const SnapshotScheduleId& schedule_id, HybridTime restore_at) { |
532 | 3 | auto deadline = CoarseMonoClock::now() + timeout_; |
533 | | |
534 | 3 | auto snapshot_id = VERIFY_RESULT(SuitableSnapshotId(schedule_id, restore_at, deadline)); |
535 | | |
536 | 8 | for (;;) { |
537 | 8 | RpcController rpc; |
538 | 8 | rpc.set_deadline(deadline); |
539 | 8 | master::ListSnapshotsRequestPB req; |
540 | 8 | req.set_snapshot_id(snapshot_id.data(), snapshot_id.size()); |
541 | 8 | master::ListSnapshotsResponsePB resp; |
542 | 8 | RETURN_NOT_OK_PREPEND(master_backup_proxy_->ListSnapshots(req, &resp, &rpc), |
543 | 8 | "Failed to list snapshots"); |
544 | 8 | if (resp.has_error()) { |
545 | 0 | return StatusFromPB(resp.error().status()); |
546 | 0 | } |
547 | 8 | if (resp.snapshots().size() != 1) { |
548 | 0 | return STATUS_FORMAT( |
549 | 0 | IllegalState, "Wrong number of snapshots received $0", resp.snapshots().size()); |
550 | 0 | } |
551 | 8 | if (resp.snapshots()[0].entry().state() == master::SysSnapshotEntryPB::COMPLETE) { |
552 | 3 | if (SnapshotSuitableForRestoreAt(resp.snapshots()[0].entry(), restore_at)) { |
553 | 3 | break; |
554 | 3 | } |
555 | 0 | return STATUS_FORMAT( |
556 | 3 | IllegalState, "Snapshot is not suitable for restore at $0", restore_at); |
557 | 3 | } |
558 | 5 | auto now = CoarseMonoClock::now(); |
559 | 5 | if (now >= deadline) { |
560 | 0 | return STATUS_FORMAT( |
561 | 0 | TimedOut, "Timed out to complete a snapshot $0", snapshot_id); |
562 | 0 | } |
563 | 5 | std::this_thread::sleep_until(std::min(deadline, now + 100ms)); |
564 | 5 | } |
565 | | |
566 | 3 | RpcController rpc; |
567 | 3 | rpc.set_deadline(deadline); |
568 | 3 | RestoreSnapshotRequestPB req; |
569 | 3 | RestoreSnapshotResponsePB resp; |
570 | 3 | req.set_snapshot_id(snapshot_id.data(), snapshot_id.size()); |
571 | 3 | req.set_restore_ht(restore_at.ToUint64()); |
572 | 3 | RETURN_NOT_OK_PREPEND(master_backup_proxy_->RestoreSnapshot(req, &resp, &rpc), |
573 | 3 | "Failed to restore snapshot"); |
574 | | |
575 | 3 | if (resp.has_error()) { |
576 | 0 | return StatusFromPB(resp.error().status()); |
577 | 0 | } |
578 | | |
579 | 3 | auto restoration_id = VERIFY_RESULT(FullyDecodeTxnSnapshotRestorationId(resp.restoration_id())); |
580 | | |
581 | 0 | rapidjson::Document document; |
582 | 3 | document.SetObject(); |
583 | | |
584 | 3 | AddStringField("snapshot_id", snapshot_id.ToString(), &document, &document.GetAllocator()); |
585 | 3 | AddStringField("restoration_id", restoration_id.ToString(), &document, &document.GetAllocator()); |
586 | | |
587 | 3 | return document; |
588 | 3 | } |
589 | | |
590 | | Status ClusterAdminClient::RestoreSnapshot(const string& snapshot_id, |
591 | 0 | HybridTime timestamp) { |
592 | 0 | RestoreSnapshotResponsePB resp; |
593 | 0 | RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) { |
594 | 0 | RestoreSnapshotRequestPB req; |
595 | 0 | req.set_snapshot_id(StringToSnapshotId(snapshot_id)); |
596 | 0 | if (timestamp) { |
597 | 0 | req.set_restore_ht(timestamp.ToUint64()); |
598 | 0 | } |
599 | 0 | return master_backup_proxy_->RestoreSnapshot(req, &resp, rpc); |
600 | 0 | })); |
601 | | |
602 | 0 | cout << "Started restoring snapshot: " << snapshot_id << endl |
603 | 0 | << "Restoration id: " << FullyDecodeTxnSnapshotRestorationId(resp.restoration_id()) << endl; |
604 | 0 | if (timestamp) { |
605 | 0 | cout << "Restore at: " << timestamp << endl; |
606 | 0 | } |
607 | 0 | return Status::OK(); |
608 | 0 | } |
609 | | |
610 | 0 | Status ClusterAdminClient::DeleteSnapshot(const std::string& snapshot_id) { |
611 | 0 | DeleteSnapshotResponsePB resp; |
612 | 0 | RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) { |
613 | 0 | DeleteSnapshotRequestPB req; |
614 | 0 | req.set_snapshot_id(StringToSnapshotId(snapshot_id)); |
615 | 0 | return master_backup_proxy_->DeleteSnapshot(req, &resp, rpc); |
616 | 0 | })); |
617 | | |
618 | 0 | cout << "Deleted snapshot: " << snapshot_id << endl; |
619 | 0 | return Status::OK(); |
620 | 0 | } |
621 | | |
622 | | Status ClusterAdminClient::CreateSnapshotMetaFile(const string& snapshot_id, |
623 | 0 | const string& file_name) { |
624 | 0 | ListSnapshotsResponsePB resp; |
625 | 0 | RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) { |
626 | 0 | ListSnapshotsRequestPB req; |
627 | 0 | req.set_snapshot_id(StringToSnapshotId(snapshot_id)); |
628 | | |
629 | | // Format 0 - latest format (== Format 2 at the moment). |
630 | | // Format -1 - old format (no 'namespace_name' in the Table entry). |
631 | | // Format 1 - old format. |
632 | | // Format 2 - new format. |
633 | 0 | if (FLAGS_TEST_metadata_file_format_version == 0 || |
634 | 0 | FLAGS_TEST_metadata_file_format_version >= 2) { |
635 | 0 | req.set_prepare_for_backup(true); |
636 | 0 | } |
637 | 0 | return master_backup_proxy_->ListSnapshots(req, &resp, rpc); |
638 | 0 | })); |
639 | | |
640 | 0 | if (resp.snapshots_size() > 1) { |
641 | 0 | LOG(WARNING) << "Requested snapshot metadata for snapshot '" << snapshot_id << "', but got " |
642 | 0 | << resp.snapshots_size() << " snapshots in the response"; |
643 | 0 | } |
644 | |
|
645 | 0 | SnapshotInfoPB* snapshot = nullptr; |
646 | 0 | for (SnapshotInfoPB& snapshot_entry : *resp.mutable_snapshots()) { |
647 | 0 | if (SnapshotIdToString(snapshot_entry.id()) == snapshot_id) { |
648 | 0 | snapshot = &snapshot_entry; |
649 | 0 | break; |
650 | 0 | } |
651 | 0 | } |
652 | 0 | if (!snapshot) { |
653 | 0 | return STATUS_FORMAT( |
654 | 0 | InternalError, "Response contained $0 entries but no entry for snapshot '$1'", |
655 | 0 | resp.snapshots_size(), snapshot_id); |
656 | 0 | } |
657 | | |
658 | 0 | if (FLAGS_TEST_metadata_file_format_version == -1) { |
659 | | // Remove 'namespace_name' from SysTablesEntryPB. |
660 | 0 | SysSnapshotEntryPB& sys_entry = *snapshot->mutable_entry(); |
661 | 0 | for (SysRowEntry& entry : *sys_entry.mutable_entries()) { |
662 | 0 | if (entry.type() == SysRowEntryType::TABLE) { |
663 | 0 | auto meta = VERIFY_RESULT(ParseFromSlice<SysTablesEntryPB>(entry.data())); |
664 | 0 | meta.clear_namespace_name(); |
665 | 0 | entry.set_data(meta.SerializeAsString()); |
666 | 0 | } |
667 | 0 | } |
668 | 0 | } |
669 | | |
670 | 0 | cout << "Exporting snapshot " << snapshot_id << " (" |
671 | 0 | << snapshot->entry().state() << ") to file " << file_name << endl; |
672 | | |
673 | | // Serialize snapshot protobuf to given path. |
674 | 0 | RETURN_NOT_OK(pb_util::WritePBContainerToPath( |
675 | 0 | Env::Default(), file_name, *snapshot, pb_util::OVERWRITE, pb_util::SYNC)); |
676 | | |
677 | 0 | cout << "Snapshot metadata was saved into file: " << file_name << endl; |
678 | 0 | return Status::OK(); |
679 | 0 | } |
680 | | |
681 | | Status ClusterAdminClient::ImportSnapshotMetaFile(const string& file_name, |
682 | | const TypedNamespaceName& keyspace, |
683 | 0 | const vector<YBTableName>& tables) { |
684 | 0 | cout << "Read snapshot meta file " << file_name << endl; |
685 | |
|
686 | 0 | ImportSnapshotMetaRequestPB req; |
687 | |
|
688 | 0 | SnapshotInfoPB* const snapshot_info = req.mutable_snapshot(); |
689 | | |
690 | | // Read snapshot protobuf from given path. |
691 | 0 | RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(Env::Default(), file_name, snapshot_info)); |
692 | | |
693 | 0 | if (!snapshot_info->has_format_version()) { |
694 | 0 | SCHECK_EQ( |
695 | 0 | 0, snapshot_info->backup_entries_size(), InvalidArgument, |
696 | 0 | Format("Metadata file in Format 1 has backup entries from Format 2: $0", |
697 | 0 | snapshot_info->backup_entries_size())); |
698 | | |
699 | | // Repack PB data loaded in the old format. |
700 | | // Init BackupSnapshotPB based on loaded SnapshotInfoPB. |
701 | 0 | SysSnapshotEntryPB& sys_entry = *snapshot_info->mutable_entry(); |
702 | 0 | snapshot_info->mutable_backup_entries()->Reserve(sys_entry.entries_size()); |
703 | 0 | for (SysRowEntry& entry : *sys_entry.mutable_entries()) { |
704 | 0 | snapshot_info->add_backup_entries()->mutable_entry()->Swap(&entry); |
705 | 0 | } |
706 | |
|
707 | 0 | sys_entry.clear_entries(); |
708 | 0 | snapshot_info->set_format_version(2); |
709 | 0 | } |
710 | | |
711 | 0 | cout << "Importing snapshot " << SnapshotIdToString(snapshot_info->id()) |
712 | 0 | << " (" << snapshot_info->entry().state() << ")" << endl; |
713 | |
|
714 | 0 | size_t table_index = 0; |
715 | 0 | bool was_table_renamed = false; |
716 | 0 | for (BackupRowEntryPB& backup_entry : *snapshot_info->mutable_backup_entries()) { |
717 | 0 | SysRowEntry& entry = *backup_entry.mutable_entry(); |
718 | 0 | const YBTableName table_name = table_index < tables.size() |
719 | 0 | ? tables[table_index] : YBTableName(); |
720 | |
|
721 | 0 | switch (entry.type()) { |
722 | 0 | case SysRowEntryType::NAMESPACE: { |
723 | 0 | auto meta = VERIFY_RESULT(ParseFromSlice<SysNamespaceEntryPB>(entry.data())); |
724 | | |
725 | 0 | if (!keyspace.name.empty() && keyspace.name != meta.name()) { |
726 | 0 | meta.set_name(keyspace.name); |
727 | 0 | entry.set_data(meta.SerializeAsString()); |
728 | 0 | } |
729 | 0 | break; |
730 | 0 | } |
731 | 0 | case SysRowEntryType::TABLE: { |
732 | 0 | if (was_table_renamed && table_name.empty()) { |
733 | | // Renaming is allowed for all tables OR for no one table. |
734 | 0 | return STATUS_FORMAT(InvalidArgument, |
735 | 0 | "There is no name for table (including indexes) number: $0", |
736 | 0 | table_index); |
737 | 0 | } |
738 | | |
739 | 0 | auto meta = VERIFY_RESULT(ParseFromSlice<SysTablesEntryPB>(entry.data())); |
740 | | |
741 | 0 | bool update_meta = false; |
742 | 0 | if (!table_name.empty() && table_name.table_name() != meta.name()) { |
743 | 0 | meta.set_name(table_name.table_name()); |
744 | 0 | update_meta = true; |
745 | 0 | was_table_renamed = true; |
746 | 0 | } |
747 | 0 | if (!keyspace.name.empty() && keyspace.name != meta.namespace_name()) { |
748 | 0 | meta.set_namespace_name(keyspace.name); |
749 | 0 | update_meta = true; |
750 | 0 | } |
751 | |
|
752 | 0 | if (meta.name().empty()) { |
753 | 0 | return STATUS(IllegalState, "Could not find table name from snapshot metadata"); |
754 | 0 | } |
755 | | |
756 | | // Update the table name if needed. |
757 | 0 | if (update_meta) { |
758 | 0 | entry.set_data(meta.SerializeAsString()); |
759 | 0 | } |
760 | |
|
761 | 0 | const auto colocated_prefix = meta.colocated() ? "colocated " : ""; |
762 | |
|
763 | 0 | if (meta.indexed_table_id().empty()) { |
764 | 0 | cout << "Table type: " << colocated_prefix << "table" << endl; |
765 | 0 | } else { |
766 | 0 | cout << "Table type: " << colocated_prefix << "index (attaching to the old table id " |
767 | 0 | << meta.indexed_table_id() << ")" << endl; |
768 | 0 | } |
769 | |
|
770 | 0 | if (!table_name.empty()) { |
771 | 0 | DCHECK(table_name.has_namespace()); |
772 | 0 | DCHECK(table_name.has_table()); |
773 | 0 | cout << "Target imported " << colocated_prefix << "table name: " |
774 | 0 | << table_name.ToString() << endl; |
775 | 0 | } else if (!keyspace.name.empty()) { |
776 | 0 | cout << "Target imported " << colocated_prefix << "table name: " |
777 | 0 | << keyspace.name << "." << meta.name() << endl; |
778 | 0 | } |
779 | |
|
780 | 0 | cout << (meta.colocated() ? "Colocated t" : "T") << "able being imported: " |
781 | 0 | << (meta.namespace_name().empty() ? |
782 | 0 | "[" + meta.namespace_id() + "]" : meta.namespace_name()) |
783 | 0 | << "." << meta.name() << endl; |
784 | 0 | ++table_index; |
785 | 0 | break; |
786 | 0 | } |
787 | 0 | default: |
788 | 0 | break; |
789 | 0 | } |
790 | 0 | } |
791 | | |
792 | 0 | ImportSnapshotMetaResponsePB resp; |
793 | 0 | RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) { |
794 | 0 | return master_backup_proxy_->ImportSnapshotMeta(req, &resp, rpc); |
795 | 0 | })); |
796 | | |
797 | 0 | const int kObjectColumnWidth = 16; |
798 | 0 | const auto pad_object_type = [](const string& s) { |
799 | 0 | return RightPadToWidth(s, kObjectColumnWidth); |
800 | 0 | }; |
801 | |
|
802 | 0 | cout << "Successfully applied snapshot." << endl |
803 | 0 | << pad_object_type("Object") << kColumnSep |
804 | 0 | << RightPadToUuidWidth("Old ID") << kColumnSep |
805 | 0 | << RightPadToUuidWidth("New ID") << endl; |
806 | |
|
807 | 0 | const RepeatedPtrField<ImportSnapshotMetaResponsePB_TableMetaPB>& tables_meta = |
808 | 0 | resp.tables_meta(); |
809 | 0 | CreateSnapshotRequestPB snapshot_req; |
810 | 0 | CreateSnapshotResponsePB snapshot_resp; |
811 | |
|
812 | 0 | for (int i = 0; i < tables_meta.size(); ++i) { |
813 | 0 | const ImportSnapshotMetaResponsePB_TableMetaPB& table_meta = tables_meta.Get(i); |
814 | 0 | const string& new_table_id = table_meta.table_ids().new_id(); |
815 | |
|
816 | 0 | cout << pad_object_type("Keyspace") << kColumnSep |
817 | 0 | << table_meta.namespace_ids().old_id() << kColumnSep |
818 | 0 | << table_meta.namespace_ids().new_id() << endl; |
819 | |
|
820 | 0 | if (!ImportSnapshotMetaResponsePB_TableType_IsValid(table_meta.table_type())) { |
821 | 0 | return STATUS_FORMAT(InternalError, "Found unknown table type: ", table_meta.table_type()); |
822 | 0 | } |
823 | | |
824 | 0 | const string table_type = |
825 | 0 | AllCapsToCamelCase(ImportSnapshotMetaResponsePB_TableType_Name(table_meta.table_type())); |
826 | 0 | cout << pad_object_type(table_type) << kColumnSep |
827 | 0 | << table_meta.table_ids().old_id() << kColumnSep |
828 | 0 | << new_table_id << endl; |
829 | |
|
830 | 0 | const RepeatedPtrField<IdPairPB>& tablets_map = table_meta.tablets_ids(); |
831 | 0 | for (int j = 0; j < tablets_map.size(); ++j) { |
832 | 0 | const IdPairPB& pair = tablets_map.Get(j); |
833 | 0 | cout << pad_object_type(Format("Tablet $0", j)) << kColumnSep |
834 | 0 | << pair.old_id() << kColumnSep |
835 | 0 | << pair.new_id() << endl; |
836 | 0 | } |
837 | |
|
838 | 0 | RETURN_NOT_OK(yb_client_->WaitForCreateTableToFinish( |
839 | 0 | new_table_id, |
840 | 0 | CoarseMonoClock::Now() + MonoDelta::FromSeconds(FLAGS_yb_client_admin_operation_timeout_sec) |
841 | 0 | )); |
842 | | |
843 | 0 | snapshot_req.mutable_tables()->Add()->set_table_id(new_table_id); |
844 | 0 | } |
845 | | |
846 | | // All indexes already are in the request. Do not add them twice. |
847 | 0 | snapshot_req.set_add_indexes(false); |
848 | 0 | snapshot_req.set_transaction_aware(true); |
849 | 0 | snapshot_req.set_imported(true); |
850 | | // Create new snapshot. |
851 | 0 | RETURN_NOT_OK(RequestMasterLeader(&snapshot_resp, [&](RpcController* rpc) { |
852 | 0 | return master_backup_proxy_->CreateSnapshot(snapshot_req, &snapshot_resp, rpc); |
853 | 0 | })); |
854 | | |
855 | 0 | cout << pad_object_type("Snapshot") << kColumnSep |
856 | 0 | << SnapshotIdToString(snapshot_info->id()) << kColumnSep |
857 | 0 | << SnapshotIdToString(snapshot_resp.snapshot_id()) << endl; |
858 | |
|
859 | 0 | return Status::OK(); |
860 | 0 | } |
861 | | |
862 | 0 | Status ClusterAdminClient::ListReplicaTypeCounts(const YBTableName& table_name) { |
863 | 0 | vector<string> tablet_ids, ranges; |
864 | 0 | RETURN_NOT_OK(yb_client_->GetTablets(table_name, 0, &tablet_ids, &ranges)); |
865 | 0 | master::GetTabletLocationsResponsePB resp; |
866 | 0 | RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) { |
867 | 0 | master::GetTabletLocationsRequestPB req; |
868 | 0 | for (const auto& tablet_id : tablet_ids) { |
869 | 0 | req.add_tablet_ids(tablet_id); |
870 | 0 | } |
871 | 0 | return master_client_proxy_->GetTabletLocations(req, &resp, rpc); |
872 | 0 | })); |
873 | | |
874 | 0 | struct ReplicaCounts { |
875 | 0 | int live_count; |
876 | 0 | int read_only_count; |
877 | 0 | string placement_uuid; |
878 | 0 | }; |
879 | 0 | std::map<TabletServerId, ReplicaCounts> replica_map; |
880 | |
|
881 | 0 | std::cout << "Tserver ID\t\tPlacement ID\t\tLive count\t\tRead only count\n"; |
882 | |
|
883 | 0 | for (int tablet_idx = 0; tablet_idx < resp.tablet_locations_size(); tablet_idx++) { |
884 | 0 | const master::TabletLocationsPB& locs = resp.tablet_locations(tablet_idx); |
885 | 0 | for (int replica_idx = 0; replica_idx < locs.replicas_size(); replica_idx++) { |
886 | 0 | const auto& replica = locs.replicas(replica_idx); |
887 | 0 | const string& ts_uuid = replica.ts_info().permanent_uuid(); |
888 | 0 | const string& placement_uuid = |
889 | 0 | replica.ts_info().has_placement_uuid() ? replica.ts_info().placement_uuid() : ""; |
890 | 0 | bool is_replica_read_only = |
891 | 0 | replica.member_type() == consensus::PeerMemberType::PRE_OBSERVER || |
892 | 0 | replica.member_type() == consensus::PeerMemberType::OBSERVER; |
893 | 0 | int live_count = is_replica_read_only ? 0 : 1; |
894 | 0 | int read_only_count = 1 - live_count; |
895 | 0 | if (replica_map.count(ts_uuid) == 0) { |
896 | 0 | replica_map[ts_uuid].live_count = live_count; |
897 | 0 | replica_map[ts_uuid].read_only_count = read_only_count; |
898 | 0 | replica_map[ts_uuid].placement_uuid = placement_uuid; |
899 | 0 | } else { |
900 | 0 | ReplicaCounts* counts = &replica_map[ts_uuid]; |
901 | 0 | counts->live_count += live_count; |
902 | 0 | counts->read_only_count += read_only_count; |
903 | 0 | } |
904 | 0 | } |
905 | 0 | } |
906 | |
|
907 | 0 | for (auto const& tserver : replica_map) { |
908 | 0 | std::cout << tserver.first << "\t\t" << tserver.second.placement_uuid << "\t\t" |
909 | 0 | << tserver.second.live_count << "\t\t" << tserver.second.read_only_count << std::endl; |
910 | 0 | } |
911 | |
|
912 | 0 | return Status::OK(); |
913 | 0 | } |
914 | | |
915 | 3 | Status ClusterAdminClient::SetPreferredZones(const std::vector<string>& preferred_zones) { |
916 | 3 | rpc::RpcController rpc; |
917 | 3 | master::SetPreferredZonesRequestPB req; |
918 | 3 | master::SetPreferredZonesResponsePB resp; |
919 | 3 | rpc.set_timeout(timeout_); |
920 | | |
921 | 3 | std::set<string> zones; |
922 | 5 | for (const string& zone : preferred_zones) { |
923 | 5 | if (std::find(zones.begin(), zones.end(), zone) != zones.end()) { |
924 | 0 | continue; |
925 | 0 | } |
926 | 5 | size_t last_pos = 0; |
927 | 5 | size_t next_pos; |
928 | 5 | std::vector<string> tokens; |
929 | 15 | while ((next_pos = zone.find(".", last_pos)) != string::npos) { |
930 | 10 | tokens.push_back(zone.substr(last_pos, next_pos - last_pos)); |
931 | 10 | last_pos = next_pos + 1; |
932 | 10 | } |
933 | 5 | tokens.push_back(zone.substr(last_pos, zone.size() - last_pos)); |
934 | 5 | if (tokens.size() != 3) { |
935 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "Invalid argument for preferred zone $0, should " |
936 | 0 | "have format cloud.region.zone", zone); |
937 | 0 | } |
938 | | |
939 | 5 | CloudInfoPB* cloud_info = req.add_preferred_zones(); |
940 | 5 | cloud_info->set_placement_cloud(tokens[0]); |
941 | 5 | cloud_info->set_placement_region(tokens[1]); |
942 | 5 | cloud_info->set_placement_zone(tokens[2]); |
943 | | |
944 | 5 | zones.emplace(zone); |
945 | 5 | } |
946 | | |
947 | 3 | RETURN_NOT_OK(master_cluster_proxy_->SetPreferredZones(req, &resp, &rpc)); |
948 | | |
949 | 3 | if (resp.has_error()) { |
950 | 0 | return STATUS(ServiceUnavailable, resp.error().status().message()); |
951 | 0 | } |
952 | | |
953 | 3 | return Status::OK(); |
954 | 3 | } |
955 | | |
956 | 0 | Status ClusterAdminClient::RotateUniverseKey(const std::string& key_path) { |
957 | 0 | return SendEncryptionRequest(key_path, true); |
958 | 0 | } |
959 | | |
960 | 0 | Status ClusterAdminClient::DisableEncryption() { |
961 | 0 | return SendEncryptionRequest("", false); |
962 | 0 | } |
963 | | |
964 | | Status ClusterAdminClient::SendEncryptionRequest( |
965 | 0 | const std::string& key_path, bool enable_encryption) { |
966 | 0 | RETURN_NOT_OK_PREPEND(WaitUntilMasterLeaderReady(), "Wait for master leader failed!"); |
967 | 0 | rpc::RpcController rpc; |
968 | 0 | rpc.set_timeout(timeout_); |
969 | | |
970 | | // Get the cluster config from the master leader. |
971 | 0 | master::ChangeEncryptionInfoRequestPB encryption_info_req; |
972 | 0 | master::ChangeEncryptionInfoResponsePB encryption_info_resp; |
973 | 0 | encryption_info_req.set_encryption_enabled(enable_encryption); |
974 | 0 | if (key_path != "") { |
975 | 0 | encryption_info_req.set_key_path(key_path); |
976 | 0 | } |
977 | 0 | RETURN_NOT_OK_PREPEND(master_encryption_proxy_-> |
978 | 0 | ChangeEncryptionInfo(encryption_info_req, &encryption_info_resp, &rpc), |
979 | 0 | "MasterServiceImpl::ChangeEncryptionInfo call fails.") |
980 | | |
981 | 0 | if (encryption_info_resp.has_error()) { |
982 | 0 | return StatusFromPB(encryption_info_resp.error().status()); |
983 | 0 | } |
984 | 0 | return Status::OK(); |
985 | 0 | } |
986 | | |
987 | 11 | Status ClusterAdminClient::IsEncryptionEnabled() { |
988 | 11 | RETURN_NOT_OK_PREPEND(WaitUntilMasterLeaderReady(), "Wait for master leader failed!"); |
989 | 11 | rpc::RpcController rpc; |
990 | 11 | rpc.set_timeout(timeout_); |
991 | | |
992 | 11 | master::IsEncryptionEnabledRequestPB req; |
993 | 11 | master::IsEncryptionEnabledResponsePB resp; |
994 | 11 | RETURN_NOT_OK_PREPEND(master_encryption_proxy_-> |
995 | 11 | IsEncryptionEnabled(req, &resp, &rpc), |
996 | 11 | "MasterServiceImpl::IsEncryptionEnabled call fails."); |
997 | 11 | if (resp.has_error()) { |
998 | 0 | return StatusFromPB(resp.error().status()); |
999 | 0 | } |
1000 | | |
1001 | 11 | std::cout << "Encryption status: " << (resp.encryption_enabled() ? |
1002 | 11 | Format("ENABLED with key id $0", resp.key_id()) : "DISABLED"0 ) << std::endl; |
1003 | 11 | return Status::OK(); |
1004 | 11 | } |
1005 | | |
1006 | | Status ClusterAdminClient::AddUniverseKeyToAllMasters( |
1007 | 11 | const std::string& key_id, const std::string& universe_key) { |
1008 | | |
1009 | 11 | RETURN_NOT_OK(encryption::EncryptionParams::IsValidKeySize( |
1010 | 11 | universe_key.size() - encryption::EncryptionParams::kBlockSize)); |
1011 | | |
1012 | 11 | master::AddUniverseKeysRequestPB req; |
1013 | 11 | master::AddUniverseKeysResponsePB resp; |
1014 | 11 | auto* universe_keys = req.mutable_universe_keys(); |
1015 | 11 | (*universe_keys->mutable_map())[key_id] = universe_key; |
1016 | | |
1017 | 33 | for (auto hp : VERIFY_RESULT11 (HostPort::ParseStrings(master_addr_list_, 7100)))11 { |
1018 | 33 | rpc::RpcController rpc; |
1019 | 33 | rpc.set_timeout(timeout_); |
1020 | 33 | master::MasterEncryptionProxy proxy(proxy_cache_.get(), hp); |
1021 | 33 | RETURN_NOT_OK_PREPEND(proxy.AddUniverseKeys(req, &resp, &rpc), |
1022 | 33 | Format("MasterServiceImpl::AddUniverseKeys call fails on host $0.", |
1023 | 33 | hp.ToString())); |
1024 | 33 | if (resp.has_error()) { |
1025 | 0 | return StatusFromPB(resp.error().status()); |
1026 | 0 | } |
1027 | 33 | std::cout << Format("Successfully added key to the node $0.\n", hp.ToString()); |
1028 | 33 | } |
1029 | | |
1030 | 11 | return Status::OK(); |
1031 | 11 | } |
1032 | | |
1033 | 11 | Status ClusterAdminClient::AllMastersHaveUniverseKeyInMemory(const std::string& key_id) { |
1034 | 11 | master::HasUniverseKeyInMemoryRequestPB req; |
1035 | 11 | master::HasUniverseKeyInMemoryResponsePB resp; |
1036 | 11 | req.set_version_id(key_id); |
1037 | | |
1038 | 33 | for (auto hp : VERIFY_RESULT11 (HostPort::ParseStrings(master_addr_list_, 7100)))11 { |
1039 | 33 | rpc::RpcController rpc; |
1040 | 33 | rpc.set_timeout(timeout_); |
1041 | 33 | master::MasterEncryptionProxy proxy(proxy_cache_.get(), hp); |
1042 | 33 | RETURN_NOT_OK_PREPEND(proxy.HasUniverseKeyInMemory(req, &resp, &rpc), |
1043 | 33 | "MasterServiceImpl::ChangeEncryptionInfo call fails."); |
1044 | | |
1045 | 33 | if (resp.has_error()) { |
1046 | 0 | return StatusFromPB(resp.error().status()); |
1047 | 0 | } |
1048 | 33 | if (!resp.has_key()) { |
1049 | 0 | return STATUS_FORMAT(TryAgain, "Node $0 does not have universe key in memory", hp); |
1050 | 0 | } |
1051 | | |
1052 | 33 | std::cout << Format("Node $0 has universe key in memory: $1\n", hp.ToString(), resp.has_key()); |
1053 | 33 | } |
1054 | | |
1055 | 11 | return Status::OK(); |
1056 | 11 | } |
1057 | | |
1058 | 11 | Status ClusterAdminClient::RotateUniverseKeyInMemory(const std::string& key_id) { |
1059 | 11 | RETURN_NOT_OK_PREPEND(WaitUntilMasterLeaderReady(), "Wait for master leader failed!"); |
1060 | 11 | rpc::RpcController rpc; |
1061 | 11 | rpc.set_timeout(timeout_); |
1062 | | |
1063 | 11 | master::ChangeEncryptionInfoRequestPB req; |
1064 | 11 | master::ChangeEncryptionInfoResponsePB resp; |
1065 | 11 | req.set_encryption_enabled(true); |
1066 | 11 | req.set_in_memory(true); |
1067 | 11 | req.set_version_id(key_id); |
1068 | 11 | RETURN_NOT_OK_PREPEND(master_encryption_proxy_->ChangeEncryptionInfo(req, &resp, &rpc), |
1069 | 11 | "MasterServiceImpl::ChangeEncryptionInfo call fails."); |
1070 | 11 | if (resp.has_error()) { |
1071 | 0 | return StatusFromPB(resp.error().status()); |
1072 | 0 | } |
1073 | | |
1074 | 11 | std::cout << "Rotated universe key in memory\n"; |
1075 | 11 | return Status::OK(); |
1076 | 11 | } |
1077 | | |
1078 | 0 | Status ClusterAdminClient::DisableEncryptionInMemory() { |
1079 | 0 | RETURN_NOT_OK_PREPEND(WaitUntilMasterLeaderReady(), "Wait for master leader failed!"); |
1080 | 0 | rpc::RpcController rpc; |
1081 | 0 | rpc.set_timeout(timeout_); |
1082 | |
|
1083 | 0 | master::ChangeEncryptionInfoRequestPB req; |
1084 | 0 | master::ChangeEncryptionInfoResponsePB resp; |
1085 | 0 | req.set_encryption_enabled(false); |
1086 | 0 | RETURN_NOT_OK_PREPEND(master_encryption_proxy_->ChangeEncryptionInfo(req, &resp, &rpc), |
1087 | 0 | "MasterServiceImpl::ChangeEncryptionInfo call fails."); |
1088 | 0 | if (resp.has_error()) { |
1089 | 0 | return StatusFromPB(resp.error().status()); |
1090 | 0 | } |
1091 | | |
1092 | 0 | std::cout << "Encryption disabled\n"; |
1093 | 0 | return Status::OK(); |
1094 | 0 | } |
1095 | | |
1096 | | Status ClusterAdminClient::WriteUniverseKeyToFile( |
1097 | 0 | const std::string& key_id, const std::string& file_name) { |
1098 | 0 | RETURN_NOT_OK_PREPEND(WaitUntilMasterLeaderReady(), "Wait for master leader failed!"); |
1099 | 0 | rpc::RpcController rpc; |
1100 | 0 | rpc.set_timeout(timeout_); |
1101 | |
|
1102 | 0 | master::GetUniverseKeyRegistryRequestPB req; |
1103 | 0 | master::GetUniverseKeyRegistryResponsePB resp; |
1104 | 0 | RETURN_NOT_OK_PREPEND(master_encryption_proxy_->GetUniverseKeyRegistry(req, &resp, &rpc), |
1105 | 0 | "MasterServiceImpl::ChangeEncryptionInfo call fails."); |
1106 | 0 | if (resp.has_error()) { |
1107 | 0 | return StatusFromPB(resp.error().status()); |
1108 | 0 | } |
1109 | | |
1110 | 0 | auto universe_keys = resp.universe_keys(); |
1111 | 0 | const auto& it = universe_keys.map().find(key_id); |
1112 | 0 | if (it == universe_keys.map().end()) { |
1113 | 0 | return STATUS_FORMAT(NotFound, "Could not find key with id $0", key_id); |
1114 | 0 | } |
1115 | | |
1116 | 0 | RETURN_NOT_OK(WriteStringToFile(Env::Default(), Slice(it->second), file_name)); |
1117 | | |
1118 | 0 | std::cout << "Finished writing to file\n"; |
1119 | 0 | return Status::OK(); |
1120 | 0 | } |
1121 | | |
1122 | | Status ClusterAdminClient::CreateCDCSDKDBStream( |
1123 | 0 | const TypedNamespaceName& ns, const std::string& checkpoint_type) { |
1124 | 0 | HostPort ts_addr = VERIFY_RESULT(GetFirstRpcAddressForTS()); |
1125 | 0 | auto cdc_proxy = std::make_unique<cdc::CDCServiceProxy>(proxy_cache_.get(), ts_addr); |
1126 | |
|
1127 | 0 | cdc::CreateCDCStreamRequestPB req; |
1128 | 0 | cdc::CreateCDCStreamResponsePB resp; |
1129 | |
|
1130 | 0 | req.set_namespace_name(ns.name); |
1131 | 0 | req.set_record_type(cdc::CDCRecordType::CHANGE); |
1132 | 0 | req.set_record_format(cdc::CDCRecordFormat::PROTO); |
1133 | 0 | req.set_source_type(cdc::CDCRequestSource::CDCSDK); |
1134 | 0 | if (checkpoint_type == yb::ToString("EXPLICIT")) { |
1135 | 0 | req.set_checkpoint_type(cdc::CDCCheckpointType::EXPLICIT); |
1136 | 0 | } else { |
1137 | 0 | req.set_checkpoint_type(cdc::CDCCheckpointType::IMPLICIT); |
1138 | 0 | } |
1139 | |
|
1140 | 0 | RpcController rpc; |
1141 | 0 | rpc.set_timeout(timeout_); |
1142 | 0 | RETURN_NOT_OK(cdc_proxy->CreateCDCStream(req, &resp, &rpc)); |
1143 | | |
1144 | 0 | if (resp.has_error()) { |
1145 | 0 | cout << "Error creating stream: " << resp.error().status().message() << endl; |
1146 | 0 | return StatusFromPB(resp.error().status()); |
1147 | 0 | } |
1148 | | |
1149 | 0 | cout << "CDC Stream ID: " << resp.db_stream_id() << endl; |
1150 | 0 | return Status::OK(); |
1151 | 0 | } |
1152 | | |
1153 | 0 | Status ClusterAdminClient::CreateCDCStream(const TableId& table_id) { |
1154 | 0 | master::CreateCDCStreamRequestPB req; |
1155 | 0 | master::CreateCDCStreamResponsePB resp; |
1156 | 0 | req.set_table_id(table_id); |
1157 | 0 | req.mutable_options()->Reserve(3); |
1158 | |
|
1159 | 0 | auto record_type_option = req.add_options(); |
1160 | 0 | record_type_option->set_key(cdc::kRecordType); |
1161 | 0 | record_type_option->set_value(CDCRecordType_Name(cdc::CDCRecordType::CHANGE)); |
1162 | |
|
1163 | 0 | auto record_format_option = req.add_options(); |
1164 | 0 | record_format_option->set_key(cdc::kRecordFormat); |
1165 | 0 | record_format_option->set_value(CDCRecordFormat_Name(cdc::CDCRecordFormat::JSON)); |
1166 | |
|
1167 | 0 | auto source_type_option = req.add_options(); |
1168 | 0 | source_type_option->set_key(cdc::kSourceType); |
1169 | 0 | source_type_option->set_value(CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER)); |
1170 | |
|
1171 | 0 | RpcController rpc; |
1172 | 0 | rpc.set_timeout(timeout_); |
1173 | 0 | RETURN_NOT_OK(master_replication_proxy_->CreateCDCStream(req, &resp, &rpc)); |
1174 | | |
1175 | 0 | if (resp.has_error()) { |
1176 | 0 | cout << "Error creating stream: " << resp.error().status().message() << endl; |
1177 | 0 | return StatusFromPB(resp.error().status()); |
1178 | 0 | } |
1179 | | |
1180 | 0 | cout << "CDC Stream ID: " << resp.stream_id() << endl; |
1181 | 0 | return Status::OK(); |
1182 | 0 | } |
1183 | | |
1184 | 0 | Status ClusterAdminClient::DeleteCDCSDKDBStream(const std::string& db_stream_id) { |
1185 | 0 | master::DeleteCDCStreamRequestPB req; |
1186 | 0 | master::DeleteCDCStreamResponsePB resp; |
1187 | 0 | req.add_stream_id(db_stream_id); |
1188 | |
|
1189 | 0 | RpcController rpc; |
1190 | 0 | rpc.set_timeout(timeout_); |
1191 | 0 | RETURN_NOT_OK(master_replication_proxy_->DeleteCDCStream(req, &resp, &rpc)); |
1192 | | |
1193 | 0 | if (resp.has_error()) { |
1194 | 0 | cout << "Error deleting stream: " << resp.error().status().message() << endl; |
1195 | 0 | return StatusFromPB(resp.error().status()); |
1196 | 0 | } |
1197 | | |
1198 | 0 | cout << "Successfully deleted Change Data Stream ID: " << db_stream_id << endl; |
1199 | 0 | return Status::OK(); |
1200 | 0 | } |
1201 | | |
1202 | 0 | Status ClusterAdminClient::DeleteCDCStream(const std::string& stream_id, bool force_delete) { |
1203 | 0 | master::DeleteCDCStreamRequestPB req; |
1204 | 0 | master::DeleteCDCStreamResponsePB resp; |
1205 | 0 | req.add_stream_id(stream_id); |
1206 | 0 | req.set_force_delete(force_delete); |
1207 | |
|
1208 | 0 | RpcController rpc; |
1209 | 0 | rpc.set_timeout(timeout_); |
1210 | 0 | RETURN_NOT_OK(master_replication_proxy_->DeleteCDCStream(req, &resp, &rpc)); |
1211 | | |
1212 | 0 | if (resp.has_error()) { |
1213 | 0 | cout << "Error deleting stream: " << resp.error().status().message() << endl; |
1214 | 0 | return StatusFromPB(resp.error().status()); |
1215 | 0 | } |
1216 | | |
1217 | 0 | cout << "Successfully deleted CDC Stream ID: " << stream_id << endl; |
1218 | 0 | return Status::OK(); |
1219 | 0 | } |
1220 | | |
1221 | 0 | Status ClusterAdminClient::ListCDCStreams(const TableId& table_id) { |
1222 | 0 | master::ListCDCStreamsRequestPB req; |
1223 | 0 | master::ListCDCStreamsResponsePB resp; |
1224 | 0 | req.set_id_type(yb::master::IdTypePB::TABLE_ID); |
1225 | 0 | if (!table_id.empty()) { |
1226 | 0 | req.set_table_id(table_id); |
1227 | 0 | } |
1228 | |
|
1229 | 0 | RpcController rpc; |
1230 | 0 | rpc.set_timeout(timeout_); |
1231 | 0 | RETURN_NOT_OK(master_replication_proxy_->ListCDCStreams(req, &resp, &rpc)); |
1232 | | |
1233 | 0 | if (resp.has_error()) { |
1234 | 0 | cout << "Error getting CDC stream list: " << resp.error().status().message() << endl; |
1235 | 0 | return StatusFromPB(resp.error().status()); |
1236 | 0 | } |
1237 | | |
1238 | 0 | cout << "CDC Streams: \r\n" << resp.DebugString(); |
1239 | 0 | return Status::OK(); |
1240 | 0 | } |
1241 | | |
1242 | 0 | Status ClusterAdminClient::ListCDCSDKStreams(const std::string& namespace_name) { |
1243 | 0 | master::ListCDCStreamsRequestPB req; |
1244 | 0 | master::ListCDCStreamsResponsePB resp; |
1245 | 0 | req.set_id_type(yb::master::IdTypePB::NAMESPACE_ID); |
1246 | |
|
1247 | 0 | if (!namespace_name.empty()) { |
1248 | 0 | cout << "Filtering out DB streams for the namespace: " << namespace_name << "\n\n"; |
1249 | 0 | master::GetNamespaceInfoResponsePB namespace_info_resp; |
1250 | 0 | RETURN_NOT_OK(yb_client_->GetNamespaceInfo("", namespace_name, YQL_DATABASE_PGSQL, |
1251 | 0 | &namespace_info_resp)); |
1252 | 0 | req.set_namespace_id(namespace_info_resp.namespace_().id()); |
1253 | 0 | } |
1254 | | |
1255 | 0 | RpcController rpc; |
1256 | 0 | rpc.set_timeout(timeout_); |
1257 | 0 | RETURN_NOT_OK(master_replication_proxy_->ListCDCStreams(req, &resp, &rpc)); |
1258 | | |
1259 | 0 | if (resp.has_error()) { |
1260 | 0 | cout << "Error getting CDC stream list: " << resp.error().status().message() << endl; |
1261 | 0 | return StatusFromPB(resp.error().status()); |
1262 | 0 | } |
1263 | | |
1264 | 0 | cout << "CDC Streams: \r\n" << resp.DebugString(); |
1265 | 0 | return Status::OK(); |
1266 | 0 | } |
1267 | | |
1268 | 0 | Status ClusterAdminClient::GetCDCDBStreamInfo(const std::string& db_stream_id) { |
1269 | 0 | master::GetCDCDBStreamInfoRequestPB req; |
1270 | 0 | master::GetCDCDBStreamInfoResponsePB resp; |
1271 | 0 | req.set_db_stream_id(db_stream_id); |
1272 | |
|
1273 | 0 | RpcController rpc; |
1274 | 0 | rpc.set_timeout(timeout_); |
1275 | 0 | RETURN_NOT_OK(master_replication_proxy_->GetCDCDBStreamInfo(req, &resp, &rpc)); |
1276 | | |
1277 | 0 | if (resp.has_error()) { |
1278 | 0 | cout << "Error getting info corresponding to CDC db stream : " << |
1279 | 0 | resp.error().status().message() << endl; |
1280 | 0 | return StatusFromPB(resp.error().status()); |
1281 | 0 | } |
1282 | | |
1283 | 0 | cout << "CDC DB Stream Info: \r\n" << resp.DebugString(); |
1284 | 0 | return Status::OK(); |
1285 | 0 | } |
1286 | | |
1287 | 0 | Status ClusterAdminClient::WaitForSetupUniverseReplicationToFinish(const string& producer_uuid) { |
1288 | 0 | master::IsSetupUniverseReplicationDoneRequestPB req; |
1289 | 0 | req.set_producer_id(producer_uuid); |
1290 | 0 | for (;;) { |
1291 | 0 | master::IsSetupUniverseReplicationDoneResponsePB resp; |
1292 | 0 | RpcController rpc; |
1293 | 0 | rpc.set_timeout(timeout_); |
1294 | 0 | Status s = master_replication_proxy_->IsSetupUniverseReplicationDone(req, &resp, &rpc); |
1295 | |
|
1296 | 0 | if (!s.ok() || resp.has_error()) { |
1297 | 0 | LOG(WARNING) << "Encountered error while waiting for setup_universe_replication to complete" |
1298 | 0 | << " : " << (!s.ok() ? s.ToString() : resp.error().status().message()); |
1299 | 0 | } else if (resp.has_done() && resp.done()) { |
1300 | 0 | return StatusFromPB(resp.replication_error()); |
1301 | 0 | } |
1302 | | |
1303 | | // Still processing, wait and then loop again. |
1304 | 0 | std::this_thread::sleep_for(100ms); |
1305 | 0 | } |
1306 | 0 | } |
1307 | | |
1308 | | Status ClusterAdminClient::SetupUniverseReplication( |
1309 | | const string& producer_uuid, const vector<string>& producer_addresses, |
1310 | | const vector<TableId>& tables, |
1311 | 0 | const vector<string>& producer_bootstrap_ids) { |
1312 | 0 | master::SetupUniverseReplicationRequestPB req; |
1313 | 0 | master::SetupUniverseReplicationResponsePB resp; |
1314 | 0 | req.set_producer_id(producer_uuid); |
1315 | |
|
1316 | 0 | req.mutable_producer_master_addresses()->Reserve(narrow_cast<int>(producer_addresses.size())); |
1317 | 0 | for (const auto& addr : producer_addresses) { |
1318 | | // HostPort::FromString() expects a default port. |
1319 | 0 | auto hp = VERIFY_RESULT(HostPort::FromString(addr, master::kMasterDefaultPort)); |
1320 | 0 | HostPortToPB(hp, req.add_producer_master_addresses()); |
1321 | 0 | } |
1322 | | |
1323 | 0 | req.mutable_producer_table_ids()->Reserve(narrow_cast<int>(tables.size())); |
1324 | 0 | for (const auto& table : tables) { |
1325 | 0 | req.add_producer_table_ids(table); |
1326 | 0 | } |
1327 | |
|
1328 | 0 | for (const auto& producer_bootstrap_id : producer_bootstrap_ids) { |
1329 | 0 | req.add_producer_bootstrap_ids(producer_bootstrap_id); |
1330 | 0 | } |
1331 | |
|
1332 | 0 | RpcController rpc; |
1333 | 0 | rpc.set_timeout(timeout_); |
1334 | 0 | auto setup_result_status = master_replication_proxy_->SetupUniverseReplication(req, &resp, &rpc); |
1335 | | |
1336 | | // Clean up config files if setup fails. |
1337 | 0 | if (!setup_result_status.ok()) { |
1338 | 0 | CleanupEnvironmentOnSetupUniverseReplicationFailure(producer_uuid, setup_result_status); |
1339 | 0 | return setup_result_status; |
1340 | 0 | } |
1341 | | |
1342 | 0 | if (resp.has_error()) { |
1343 | 0 | cout << "Error setting up universe replication: " << resp.error().status().message() << endl; |
1344 | 0 | Status status_from_error = StatusFromPB(resp.error().status()); |
1345 | 0 | CleanupEnvironmentOnSetupUniverseReplicationFailure(producer_uuid, status_from_error); |
1346 | |
|
1347 | 0 | return status_from_error; |
1348 | 0 | } |
1349 | | |
1350 | 0 | setup_result_status = WaitForSetupUniverseReplicationToFinish(producer_uuid); |
1351 | | |
1352 | | // Clean up config files if setup fails to complete. |
1353 | 0 | if (!setup_result_status.ok()) { |
1354 | 0 | cout << "Error waiting for universe replication setup to complete: " |
1355 | 0 | << setup_result_status.message().ToBuffer() << endl; |
1356 | 0 | CleanupEnvironmentOnSetupUniverseReplicationFailure(producer_uuid, setup_result_status); |
1357 | 0 | return setup_result_status; |
1358 | 0 | } |
1359 | | |
1360 | 0 | cout << "Replication setup successfully" << endl; |
1361 | 0 | return Status::OK(); |
1362 | 0 | } |
1363 | | |
1364 | | // Helper function for deleting the universe if SetupUniverseReplicaion fails. |
1365 | | void ClusterAdminClient::CleanupEnvironmentOnSetupUniverseReplicationFailure( |
1366 | 0 | const std::string& producer_uuid, const Status& failure_status) { |
1367 | | // We don't need to delete the universe if the call to SetupUniverseReplication |
1368 | | // failed due to one of the sanity checks. |
1369 | 0 | if (failure_status.IsInvalidArgument()) { |
1370 | 0 | return; |
1371 | 0 | } |
1372 | | |
1373 | 0 | cout << "Replication setup failed, cleaning up environment" << endl; |
1374 | |
|
1375 | 0 | Status delete_result_status = DeleteUniverseReplication(producer_uuid, false); |
1376 | 0 | if (!delete_result_status.ok()) { |
1377 | 0 | cout << "Could not clean up environment: " << delete_result_status.message() << endl; |
1378 | 0 | } else { |
1379 | 0 | cout << "Successfully cleaned up environment" << endl; |
1380 | 0 | } |
1381 | 0 | } |
1382 | | |
1383 | | Status ClusterAdminClient::DeleteUniverseReplication(const std::string& producer_id, |
1384 | 0 | bool ignore_errors) { |
1385 | 0 | master::DeleteUniverseReplicationRequestPB req; |
1386 | 0 | master::DeleteUniverseReplicationResponsePB resp; |
1387 | 0 | req.set_producer_id(producer_id); |
1388 | 0 | req.set_ignore_errors(ignore_errors); |
1389 | |
|
1390 | 0 | RpcController rpc; |
1391 | 0 | rpc.set_timeout(timeout_); |
1392 | 0 | RETURN_NOT_OK(master_replication_proxy_->DeleteUniverseReplication(req, &resp, &rpc)); |
1393 | | |
1394 | 0 | if (resp.has_error()) { |
1395 | 0 | cout << "Error deleting universe replication: " << resp.error().status().message() << endl; |
1396 | 0 | return StatusFromPB(resp.error().status()); |
1397 | 0 | } |
1398 | | |
1399 | 0 | if (resp.warnings().size() > 0) { |
1400 | 0 | cout << "Encountered the following warnings while running delete_universe_replication:" << endl; |
1401 | 0 | for (const auto& warning : resp.warnings()) { |
1402 | 0 | cout << " - " << warning.message() << endl; |
1403 | 0 | } |
1404 | 0 | } |
1405 | |
|
1406 | 0 | cout << "Replication deleted successfully" << endl; |
1407 | 0 | return Status::OK(); |
1408 | 0 | } |
1409 | | |
1410 | | Status ClusterAdminClient::AlterUniverseReplication(const std::string& producer_uuid, |
1411 | | const std::vector<std::string>& producer_addresses, |
1412 | | const std::vector<TableId>& add_tables, |
1413 | | const std::vector<TableId>& remove_tables, |
1414 | | const std::vector<std::string>& producer_bootstrap_ids_to_add, |
1415 | 0 | const std::string& new_producer_universe_id) { |
1416 | 0 | master::AlterUniverseReplicationRequestPB req; |
1417 | 0 | master::AlterUniverseReplicationResponsePB resp; |
1418 | 0 | req.set_producer_id(producer_uuid); |
1419 | |
|
1420 | 0 | if (!producer_addresses.empty()) { |
1421 | 0 | req.mutable_producer_master_addresses()->Reserve(narrow_cast<int>(producer_addresses.size())); |
1422 | 0 | for (const auto& addr : producer_addresses) { |
1423 | | // HostPort::FromString() expects a default port. |
1424 | 0 | auto hp = VERIFY_RESULT(HostPort::FromString(addr, master::kMasterDefaultPort)); |
1425 | 0 | HostPortToPB(hp, req.add_producer_master_addresses()); |
1426 | 0 | } |
1427 | 0 | } |
1428 | | |
1429 | 0 | if (!add_tables.empty()) { |
1430 | 0 | req.mutable_producer_table_ids_to_add()->Reserve(narrow_cast<int>(add_tables.size())); |
1431 | 0 | for (const auto& table : add_tables) { |
1432 | 0 | req.add_producer_table_ids_to_add(table); |
1433 | 0 | } |
1434 | |
|
1435 | 0 | if (!producer_bootstrap_ids_to_add.empty()) { |
1436 | | // There msut be a bootstrap id for every table id. |
1437 | 0 | if (producer_bootstrap_ids_to_add.size() != add_tables.size()) { |
1438 | 0 | cout << "The number of bootstrap ids must equal the number of table ids. " |
1439 | 0 | << "Use separate alter commands if only some tables are being bootstrapped." << endl; |
1440 | 0 | return STATUS(InternalError, "Invalid number of bootstrap ids"); |
1441 | 0 | } |
1442 | | |
1443 | 0 | req.mutable_producer_bootstrap_ids_to_add()->Reserve( |
1444 | 0 | narrow_cast<int>(producer_bootstrap_ids_to_add.size())); |
1445 | 0 | for (const auto& bootstrap_id : producer_bootstrap_ids_to_add) { |
1446 | 0 | req.add_producer_bootstrap_ids_to_add(bootstrap_id); |
1447 | 0 | } |
1448 | 0 | } |
1449 | 0 | } |
1450 | | |
1451 | 0 | if (!remove_tables.empty()) { |
1452 | 0 | req.mutable_producer_table_ids_to_remove()->Reserve(narrow_cast<int>(remove_tables.size())); |
1453 | 0 | for (const auto& table : remove_tables) { |
1454 | 0 | req.add_producer_table_ids_to_remove(table); |
1455 | 0 | } |
1456 | 0 | } |
1457 | |
|
1458 | 0 | if (!new_producer_universe_id.empty()) { |
1459 | 0 | req.set_new_producer_universe_id(new_producer_universe_id); |
1460 | 0 | } |
1461 | |
|
1462 | 0 | RpcController rpc; |
1463 | 0 | rpc.set_timeout(timeout_); |
1464 | 0 | RETURN_NOT_OK(master_replication_proxy_->AlterUniverseReplication(req, &resp, &rpc)); |
1465 | | |
1466 | 0 | if (resp.has_error()) { |
1467 | 0 | cout << "Error altering universe replication: " << resp.error().status().message() << endl; |
1468 | 0 | return StatusFromPB(resp.error().status()); |
1469 | 0 | } |
1470 | | |
1471 | 0 | if (!add_tables.empty()) { |
1472 | | // If we are adding tables, then wait for the altered producer to be deleted (this happens once |
1473 | | // it is merged with the original). |
1474 | 0 | RETURN_NOT_OK(WaitForSetupUniverseReplicationToFinish(producer_uuid + ".ALTER")); |
1475 | 0 | } |
1476 | | |
1477 | 0 | cout << "Replication altered successfully" << endl; |
1478 | 0 | return Status::OK(); |
1479 | 0 | } |
1480 | | |
1481 | | CHECKED_STATUS ClusterAdminClient::SetUniverseReplicationEnabled(const std::string& producer_id, |
1482 | 0 | bool is_enabled) { |
1483 | 0 | master::SetUniverseReplicationEnabledRequestPB req; |
1484 | 0 | master::SetUniverseReplicationEnabledResponsePB resp; |
1485 | 0 | req.set_producer_id(producer_id); |
1486 | 0 | req.set_is_enabled(is_enabled); |
1487 | 0 | const string toggle = (is_enabled ? "enabl" : "disabl"); |
1488 | |
|
1489 | 0 | RpcController rpc; |
1490 | 0 | rpc.set_timeout(timeout_); |
1491 | 0 | RETURN_NOT_OK(master_replication_proxy_->SetUniverseReplicationEnabled(req, &resp, &rpc)); |
1492 | | |
1493 | 0 | if (resp.has_error()) { |
1494 | 0 | cout << "Error " << toggle << "ing " |
1495 | 0 | << "universe replication: " << resp.error().status().message() << endl; |
1496 | 0 | return StatusFromPB(resp.error().status()); |
1497 | 0 | } |
1498 | | |
1499 | 0 | cout << "Replication " << toggle << "ed successfully" << endl; |
1500 | 0 | return Status::OK(); |
1501 | 0 | } |
1502 | | |
1503 | 0 | Result<HostPort> ClusterAdminClient::GetFirstRpcAddressForTS() { |
1504 | 0 | RepeatedPtrField<ListTabletServersResponsePB::Entry> servers; |
1505 | 0 | RETURN_NOT_OK(ListTabletServers(&servers)); |
1506 | 0 | for (const ListTabletServersResponsePB::Entry& server : servers) { |
1507 | 0 | if (server.has_registration() && |
1508 | 0 | !server.registration().common().private_rpc_addresses().empty()) { |
1509 | 0 | return HostPortFromPB(server.registration().common().private_rpc_addresses(0)); |
1510 | 0 | } |
1511 | 0 | } |
1512 | | |
1513 | 0 | return STATUS(NotFound, "Didn't find a server registered with the Master"); |
1514 | 0 | } |
1515 | | |
1516 | 0 | Status ClusterAdminClient::BootstrapProducer(const vector<TableId>& table_ids) { |
1517 | |
|
1518 | 0 | HostPort ts_addr = VERIFY_RESULT(GetFirstRpcAddressForTS()); |
1519 | 0 | auto cdc_proxy = std::make_unique<cdc::CDCServiceProxy>(proxy_cache_.get(), ts_addr); |
1520 | |
|
1521 | 0 | cdc::BootstrapProducerRequestPB bootstrap_req; |
1522 | 0 | cdc::BootstrapProducerResponsePB bootstrap_resp; |
1523 | 0 | for (const auto& table_id : table_ids) { |
1524 | 0 | bootstrap_req.add_table_ids(table_id); |
1525 | 0 | } |
1526 | 0 | RpcController rpc; |
1527 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(std::max(timeout_.ToSeconds(), 120.0))); |
1528 | 0 | RETURN_NOT_OK(cdc_proxy->BootstrapProducer(bootstrap_req, &bootstrap_resp, &rpc)); |
1529 | | |
1530 | 0 | if (bootstrap_resp.has_error()) { |
1531 | 0 | cout << "Error bootstrapping consumer: " << bootstrap_resp.error().status().message() << endl; |
1532 | 0 | return StatusFromPB(bootstrap_resp.error().status()); |
1533 | 0 | } |
1534 | | |
1535 | 0 | if (implicit_cast<size_t>(bootstrap_resp.cdc_bootstrap_ids().size()) != table_ids.size()) { |
1536 | 0 | cout << "Received invalid number of bootstrap ids: " << bootstrap_resp.ShortDebugString(); |
1537 | 0 | return STATUS(InternalError, "Invalid number of bootstrap ids"); |
1538 | 0 | } |
1539 | | |
1540 | 0 | int i = 0; |
1541 | 0 | for (const auto& bootstrap_id : bootstrap_resp.cdc_bootstrap_ids()) { |
1542 | 0 | cout << "table id: " << table_ids[i++] << ", CDC bootstrap id: " << bootstrap_id << endl; |
1543 | 0 | } |
1544 | 0 | return Status::OK(); |
1545 | 0 | } |
1546 | | |
1547 | | } // namespace enterprise |
1548 | | } // namespace tools |
1549 | | } // namespace yb |