/Users/deen/code/yugabyte-db/ent/src/yb/master/catalog_manager_ent.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include <memory> |
14 | | #include <regex> |
15 | | #include <set> |
16 | | #include <unordered_set> |
17 | | #include <google/protobuf/util/message_differencer.h> |
18 | | |
19 | | #include "yb/common/common_fwd.h" |
20 | | #include "yb/master/catalog_entity_info.h" |
21 | | #include "yb/master/catalog_manager-internal.h" |
22 | | #include "yb/master/cdc_consumer_registry_service.h" |
23 | | #include "yb/master/cdc_rpc_tasks.h" |
24 | | #include "yb/master/cluster_balance.h" |
25 | | #include "yb/master/master.h" |
26 | | #include "yb/master/master_backup.pb.h" |
27 | | #include "yb/master/master_error.h" |
28 | | |
29 | | #include "yb/cdc/cdc_consumer.pb.h" |
30 | | #include "yb/cdc/cdc_service.h" |
31 | | |
32 | | #include "yb/client/client-internal.h" |
33 | | #include "yb/client/schema.h" |
34 | | #include "yb/client/session.h" |
35 | | #include "yb/client/table.h" |
36 | | #include "yb/client/table_alterer.h" |
37 | | #include "yb/client/table_handle.h" |
38 | | #include "yb/client/table_info.h" |
39 | | #include "yb/client/yb_op.h" |
40 | | #include "yb/client/yb_table_name.h" |
41 | | |
42 | | #include "yb/common/common.pb.h" |
43 | | #include "yb/common/entity_ids.h" |
44 | | #include "yb/common/ql_name.h" |
45 | | #include "yb/common/ql_type.h" |
46 | | #include "yb/common/schema.h" |
47 | | #include "yb/common/wire_protocol.h" |
48 | | #include "yb/consensus/consensus.h" |
49 | | |
50 | | #include "yb/docdb/consensus_frontier.h" |
51 | | #include "yb/docdb/doc_write_batch.h" |
52 | | |
53 | | #include "yb/gutil/bind.h" |
54 | | #include "yb/gutil/casts.h" |
55 | | #include "yb/gutil/strings/join.h" |
56 | | #include "yb/gutil/strings/substitute.h" |
57 | | #include "yb/master/master_client.pb.h" |
58 | | #include "yb/master/master_ddl.pb.h" |
59 | | #include "yb/master/master_defaults.h" |
60 | | #include "yb/master/master_heartbeat.pb.h" |
61 | | #include "yb/master/master_replication.pb.h" |
62 | | #include "yb/master/master_util.h" |
63 | | #include "yb/master/sys_catalog.h" |
64 | | #include "yb/master/sys_catalog-internal.h" |
65 | | #include "yb/master/async_snapshot_tasks.h" |
66 | | #include "yb/master/async_rpc_tasks.h" |
67 | | #include "yb/master/encryption_manager.h" |
68 | | #include "yb/master/restore_sys_catalog_state.h" |
69 | | #include "yb/master/scoped_leader_shared_lock.h" |
70 | | #include "yb/master/scoped_leader_shared_lock-internal.h" |
71 | | |
72 | | #include "yb/rpc/messenger.h" |
73 | | |
74 | | #include "yb/tablet/operations/snapshot_operation.h" |
75 | | #include "yb/tablet/tablet_metadata.h" |
76 | | #include "yb/tablet/tablet_snapshots.h" |
77 | | |
78 | | #include "yb/tserver/backup.proxy.h" |
79 | | #include "yb/tserver/service_util.h" |
80 | | |
81 | | #include "yb/util/cast.h" |
82 | | #include "yb/util/date_time.h" |
83 | | #include "yb/util/flag_tags.h" |
84 | | #include "yb/util/format.h" |
85 | | #include "yb/util/logging.h" |
86 | | #include "yb/util/random_util.h" |
87 | | #include "yb/util/scope_exit.h" |
88 | | #include "yb/util/service_util.h" |
89 | | #include "yb/util/status.h" |
90 | | #include "yb/util/status_format.h" |
91 | | #include "yb/util/status_log.h" |
92 | | #include "yb/util/tostring.h" |
93 | | #include "yb/util/string_util.h" |
94 | | #include "yb/util/trace.h" |
95 | | |
96 | | #include "yb/yql/cql/ql/util/statement_result.h" |
97 | | |
98 | | using namespace std::literals; |
99 | | using namespace std::placeholders; |
100 | | |
101 | | using std::string; |
102 | | using std::unique_ptr; |
103 | | using std::vector; |
104 | | |
105 | | using google::protobuf::RepeatedPtrField; |
106 | | using google::protobuf::util::MessageDifferencer; |
107 | | using strings::Substitute; |
108 | | |
109 | | DEFINE_int32(cdc_state_table_num_tablets, 0, |
110 | | "Number of tablets to use when creating the CDC state table. " |
111 | | "0 to use the same default num tablets as for regular tables."); |
112 | | |
113 | | DEFINE_int32(cdc_wal_retention_time_secs, 4 * 3600, |
114 | | "WAL retention time in seconds to be used for tables for which a CDC stream was " |
115 | | "created."); |
116 | | DECLARE_int32(master_rpc_timeout_ms); |
117 | | |
118 | | DEFINE_bool(enable_transaction_snapshots, true, |
119 | | "The flag enables usage of transaction aware snapshots."); |
120 | | TAG_FLAG(enable_transaction_snapshots, hidden); |
121 | | TAG_FLAG(enable_transaction_snapshots, advanced); |
122 | | TAG_FLAG(enable_transaction_snapshots, runtime); |
123 | | |
124 | | DEFINE_test_flag(bool, disable_cdc_state_insert_on_setup, false, |
125 | | "Disable inserting new entries into cdc state as part of the setup flow."); |
126 | | |
127 | | DEFINE_bool(allow_consecutive_restore, true, |
128 | | "Is it allowed to restore to a time before the last restoration was done."); |
129 | | TAG_FLAG(allow_consecutive_restore, runtime); |
130 | | |
131 | | namespace yb { |
132 | | |
133 | | using rpc::RpcContext; |
134 | | using pb_util::ParseFromSlice; |
135 | | |
136 | | namespace master { |
137 | | namespace enterprise { |
138 | | |
139 | | namespace { |
140 | | |
141 | 456 | CHECKED_STATUS CheckStatus(const Status& status, const char* action) { |
142 | 456 | if (status.ok()) { |
143 | 456 | return status; |
144 | 456 | } |
145 | | |
146 | 0 | const Status s = status.CloneAndPrepend(std::string("An error occurred while ") + action); |
147 | 0 | LOG(WARNING) << s; |
148 | 0 | return s; |
149 | 456 | } |
150 | | |
151 | 0 | CHECKED_STATUS CheckLeaderStatus(const Status& status, const char* action) { |
152 | 0 | return CheckIfNoLongerLeader(CheckStatus(status, action)); |
153 | 0 | } |
154 | | |
155 | | template<class RespClass> |
156 | | CHECKED_STATUS CheckLeaderStatusAndSetupError( |
157 | 312 | const Status& status, const char* action, RespClass* resp) { |
158 | 312 | return CheckIfNoLongerLeaderAndSetupError(CheckStatus(status, action), resp); |
159 | 312 | } catalog_manager_ent.cc:yb::Status yb::master::enterprise::(anonymous namespace)::CheckLeaderStatusAndSetupError<yb::master::CreateCDCStreamResponsePB>(yb::Status const&, char const*, yb::master::CreateCDCStreamResponsePB*) Line | Count | Source | 157 | 310 | const Status& status, const char* action, RespClass* resp) { | 158 | 310 | return CheckIfNoLongerLeaderAndSetupError(CheckStatus(status, action), resp); | 159 | 310 | } |
catalog_manager_ent.cc:yb::Status yb::master::enterprise::(anonymous namespace)::CheckLeaderStatusAndSetupError<yb::master::SetupUniverseReplicationResponsePB>(yb::Status const&, char const*, yb::master::SetupUniverseReplicationResponsePB*) Line | Count | Source | 157 | 2 | const Status& status, const char* action, RespClass* resp) { | 158 | 2 | return CheckIfNoLongerLeaderAndSetupError(CheckStatus(status, action), resp); | 159 | 2 | } |
|
160 | | |
161 | | } // namespace |
162 | | |
163 | | //////////////////////////////////////////////////////////// |
164 | | // Snapshot Loader |
165 | | //////////////////////////////////////////////////////////// |
166 | | |
167 | | class SnapshotLoader : public Visitor<PersistentSnapshotInfo> { |
168 | | public: |
169 | 3.75k | explicit SnapshotLoader(CatalogManager* catalog_manager) : catalog_manager_(catalog_manager) {} |
170 | | |
171 | 8 | CHECKED_STATUS Visit(const SnapshotId& snapshot_id, const SysSnapshotEntryPB& metadata) override { |
172 | 8 | if (TryFullyDecodeTxnSnapshotId(snapshot_id)) { |
173 | | // Transaction aware snapshots should be already loaded. |
174 | 8 | return Status::OK(); |
175 | 8 | } |
176 | 0 | return VisitNonTransactionAwareSnapshot(snapshot_id, metadata); |
177 | 8 | } |
178 | | |
179 | | CHECKED_STATUS VisitNonTransactionAwareSnapshot( |
180 | 0 | const SnapshotId& snapshot_id, const SysSnapshotEntryPB& metadata) { |
181 | | |
182 | | // Setup the snapshot info. |
183 | 0 | auto snapshot_info = make_scoped_refptr<SnapshotInfo>(snapshot_id); |
184 | 0 | auto l = snapshot_info->LockForWrite(); |
185 | 0 | l.mutable_data()->pb.CopyFrom(metadata); |
186 | | |
187 | | // Add the snapshot to the IDs map (if the snapshot is not deleted). |
188 | 0 | auto emplace_result = catalog_manager_->non_txn_snapshot_ids_map_.emplace( |
189 | 0 | snapshot_id, std::move(snapshot_info)); |
190 | 0 | CHECK(emplace_result.second) << "Snapshot already exists: " << snapshot_id; |
191 | |
|
192 | 0 | LOG(INFO) << "Loaded metadata for snapshot (id=" << snapshot_id << "): " |
193 | 0 | << emplace_result.first->second->ToString() << ": " << metadata.ShortDebugString(); |
194 | 0 | l.Commit(); |
195 | 0 | return Status::OK(); |
196 | 0 | } |
197 | | |
198 | | private: |
199 | | CatalogManager *catalog_manager_; |
200 | | |
201 | | DISALLOW_COPY_AND_ASSIGN(SnapshotLoader); |
202 | | }; |
203 | | |
204 | | |
205 | | //////////////////////////////////////////////////////////// |
206 | | // CDC Stream Loader |
207 | | //////////////////////////////////////////////////////////// |
208 | | |
209 | | class CDCStreamLoader : public Visitor<PersistentCDCStreamInfo> { |
210 | | public: |
211 | 3.75k | explicit CDCStreamLoader(CatalogManager* catalog_manager) : catalog_manager_(catalog_manager) {} |
212 | | |
213 | | Status Visit(const CDCStreamId& stream_id, const SysCDCStreamEntryPB& metadata) |
214 | 0 | REQUIRES(catalog_manager_->mutex_) { |
215 | 0 | DCHECK(!ContainsKey(catalog_manager_->cdc_stream_map_, stream_id)) |
216 | 0 | << "CDC stream already exists: " << stream_id; |
217 | |
|
218 | 0 | scoped_refptr<NamespaceInfo> ns; |
219 | 0 | scoped_refptr<TableInfo> table; |
220 | |
|
221 | 0 | if (metadata.has_namespace_id()) { |
222 | 0 | ns = FindPtrOrNull(catalog_manager_->namespace_ids_map_, metadata.namespace_id()); |
223 | |
|
224 | 0 | if (!ns) { |
225 | 0 | LOG(DFATAL) << "Invalid namespace ID " << metadata.namespace_id() << " for stream " |
226 | 0 | << stream_id; |
227 | | // TODO (#2059): Potentially signals a race condition that namesapce got deleted |
228 | | // while stream was being created. |
229 | | // Log error and continue without loading the stream. |
230 | 0 | return Status::OK(); |
231 | 0 | } |
232 | 0 | } else { |
233 | 0 | table = FindPtrOrNull(*catalog_manager_->table_ids_map_, metadata.table_id(0)); |
234 | 0 | if (!table) { |
235 | 0 | LOG(ERROR) << "Invalid table ID " << metadata.table_id(0) << " for stream " << stream_id; |
236 | | // TODO (#2059): Potentially signals a race condition that table got deleted while stream |
237 | | // was being created. |
238 | | // Log error and continue without loading the stream. |
239 | 0 | return Status::OK(); |
240 | 0 | } |
241 | 0 | } |
242 | | |
243 | | // Setup the CDC stream info. |
244 | 0 | auto stream = make_scoped_refptr<CDCStreamInfo>(stream_id); |
245 | 0 | auto l = stream->LockForWrite(); |
246 | 0 | l.mutable_data()->pb.CopyFrom(metadata); |
247 | | |
248 | | // If the table has been deleted, then mark this stream as DELETING so it can be deleted by the |
249 | | // catalog manager background thread. Otherwise if this stream is missing an entry |
250 | | // for state, then mark its state as Active. |
251 | |
|
252 | 0 | if (((table && table->LockForRead()->is_deleting()) || |
253 | 0 | (ns && ns->state() == SysNamespaceEntryPB::DELETING)) && |
254 | 0 | !l.data().is_deleting()) { |
255 | 0 | l.mutable_data()->pb.set_state(SysCDCStreamEntryPB::DELETING); |
256 | 0 | } else if (!l.mutable_data()->pb.has_state()) { |
257 | 0 | l.mutable_data()->pb.set_state(SysCDCStreamEntryPB::ACTIVE); |
258 | 0 | } |
259 | | |
260 | | // Add the CDC stream to the CDC stream map. |
261 | 0 | catalog_manager_->cdc_stream_map_[stream->id()] = stream; |
262 | 0 | if (table) { |
263 | 0 | catalog_manager_->cdc_stream_tables_count_map_[metadata.table_id(0)]++; |
264 | 0 | } |
265 | |
|
266 | 0 | l.Commit(); |
267 | |
|
268 | 0 | LOG(INFO) << "Loaded metadata for CDC stream " << stream->ToString() << ": " |
269 | 0 | << metadata.ShortDebugString(); |
270 | |
|
271 | 0 | return Status::OK(); |
272 | 0 | } |
273 | | |
274 | | private: |
275 | | CatalogManager *catalog_manager_; |
276 | | |
277 | | DISALLOW_COPY_AND_ASSIGN(CDCStreamLoader); |
278 | | }; |
279 | | |
280 | | //////////////////////////////////////////////////////////// |
281 | | // Universe Replication Loader |
282 | | //////////////////////////////////////////////////////////// |
283 | | |
284 | | class UniverseReplicationLoader : public Visitor<PersistentUniverseReplicationInfo> { |
285 | | public: |
286 | | explicit UniverseReplicationLoader(CatalogManager* catalog_manager) |
287 | 3.75k | : catalog_manager_(catalog_manager) {} |
288 | | |
289 | | Status Visit(const std::string& producer_id, const SysUniverseReplicationEntryPB& metadata) |
290 | 0 | REQUIRES(catalog_manager_->mutex_) { |
291 | 0 | DCHECK(!ContainsKey(catalog_manager_->universe_replication_map_, producer_id)) |
292 | 0 | << "Producer universe already exists: " << producer_id; |
293 | | |
294 | | // Setup the universe replication info. |
295 | 0 | UniverseReplicationInfo* const ri = new UniverseReplicationInfo(producer_id); |
296 | 0 | { |
297 | 0 | auto l = ri->LockForWrite(); |
298 | 0 | l.mutable_data()->pb.CopyFrom(metadata); |
299 | |
|
300 | 0 | if (!l->is_active() && !l->is_deleted_or_failed()) { |
301 | | // Replication was not fully setup. |
302 | 0 | LOG(WARNING) << "Universe replication in transient state: " << producer_id; |
303 | | |
304 | | // TODO: Should we delete all failed universe replication items? |
305 | 0 | } |
306 | | |
307 | | // Add universe replication info to the universe replication map. |
308 | 0 | catalog_manager_->universe_replication_map_[ri->id()] = ri; |
309 | |
|
310 | 0 | l.Commit(); |
311 | 0 | } |
312 | | |
313 | | // Also keep track of consumer tables. |
314 | 0 | for (const auto& table : metadata.validated_tables()) { |
315 | 0 | CDCStreamId stream_id = FindWithDefault(metadata.table_streams(), table.first, ""); |
316 | 0 | if (stream_id.empty()) { |
317 | 0 | LOG(WARNING) << "Unable to find stream id for table: " << table.first; |
318 | 0 | continue; |
319 | 0 | } |
320 | 0 | catalog_manager_->xcluster_consumer_tables_to_stream_map_[table.second].emplace( |
321 | 0 | metadata.producer_id(), stream_id); |
322 | 0 | } |
323 | |
|
324 | 0 | LOG(INFO) << "Loaded metadata for universe replication " << ri->ToString(); |
325 | 0 | VLOG(1) << "Metadata for universe replication " << ri->ToString() << ": " |
326 | 0 | << metadata.ShortDebugString(); |
327 | |
|
328 | 0 | return Status::OK(); |
329 | 0 | } |
330 | | |
331 | | private: |
332 | | CatalogManager *catalog_manager_; |
333 | | |
334 | | DISALLOW_COPY_AND_ASSIGN(UniverseReplicationLoader); |
335 | | }; |
336 | | |
337 | | //////////////////////////////////////////////////////////// |
338 | | // CatalogManager |
339 | | //////////////////////////////////////////////////////////// |
340 | | |
341 | 92 | CatalogManager::~CatalogManager() { |
342 | 92 | if (StartShutdown()) { |
343 | 6 | CompleteShutdown(); |
344 | 6 | } |
345 | 92 | } |
346 | | |
347 | 94 | void CatalogManager::CompleteShutdown() { |
348 | 94 | snapshot_coordinator_.Shutdown(); |
349 | | // Call shutdown on base class before exiting derived class destructor |
350 | | // because BgTasks is part of base & uses this derived class on Shutdown. |
351 | 94 | super::CompleteShutdown(); |
352 | 94 | } |
353 | | |
354 | 3.75k | Status CatalogManager::RunLoaders(int64_t term) { |
355 | 3.75k | RETURN_NOT_OK(super::RunLoaders(term)); |
356 | | |
357 | | // Clear the snapshots. |
358 | 3.75k | non_txn_snapshot_ids_map_.clear(); |
359 | | |
360 | | // Clear CDC stream map. |
361 | 3.75k | cdc_stream_map_.clear(); |
362 | 3.75k | cdc_stream_tables_count_map_.clear(); |
363 | | |
364 | | // Clear universe replication map. |
365 | 3.75k | universe_replication_map_.clear(); |
366 | 3.75k | xcluster_consumer_tables_to_stream_map_.clear(); |
367 | | |
368 | 3.75k | LOG_WITH_FUNC(INFO) << "Loading snapshots into memory."; |
369 | 3.75k | unique_ptr<SnapshotLoader> snapshot_loader(new SnapshotLoader(this)); |
370 | 3.75k | RETURN_NOT_OK_PREPEND( |
371 | 3.75k | sys_catalog_->Visit(snapshot_loader.get()), |
372 | 3.75k | "Failed while visiting snapshots in sys catalog"); |
373 | | |
374 | 3.75k | LOG_WITH_FUNC(INFO) << "Loading CDC streams into memory."; |
375 | 3.75k | auto cdc_stream_loader = std::make_unique<CDCStreamLoader>(this); |
376 | 3.75k | RETURN_NOT_OK_PREPEND( |
377 | 3.75k | sys_catalog_->Visit(cdc_stream_loader.get()), |
378 | 3.75k | "Failed while visiting CDC streams in sys catalog"); |
379 | | |
380 | 3.75k | LOG_WITH_FUNC(INFO) << "Loading universe replication info into memory."; |
381 | 3.75k | auto universe_replication_loader = std::make_unique<UniverseReplicationLoader>(this); |
382 | 3.75k | RETURN_NOT_OK_PREPEND( |
383 | 3.75k | sys_catalog_->Visit(universe_replication_loader.get()), |
384 | 3.75k | "Failed while visiting universe replication info in sys catalog"); |
385 | | |
386 | 3.75k | return Status::OK(); |
387 | 3.75k | } |
388 | | |
389 | | Status CatalogManager::CreateSnapshot(const CreateSnapshotRequestPB* req, |
390 | | CreateSnapshotResponsePB* resp, |
391 | 3 | RpcContext* rpc) { |
392 | 3 | LOG(INFO) << "Servicing CreateSnapshot request: " << req->ShortDebugString(); |
393 | | |
394 | 3 | if (FLAGS_enable_transaction_snapshots && req->transaction_aware()) { |
395 | 2 | return CreateTransactionAwareSnapshot(*req, resp, rpc); |
396 | 2 | } |
397 | | |
398 | 1 | if (req->has_schedule_id()) { |
399 | 1 | auto schedule_id = VERIFY_RESULT(FullyDecodeSnapshotScheduleId(req->schedule_id())); |
400 | 0 | auto snapshot_id = snapshot_coordinator_.CreateForSchedule( |
401 | 1 | schedule_id, leader_ready_term(), rpc->GetClientDeadline()); |
402 | 1 | if (!snapshot_id.ok()) { |
403 | 0 | LOG(INFO) << "Create snapshot failed: " << snapshot_id.status(); |
404 | 0 | return snapshot_id.status(); |
405 | 0 | } |
406 | 1 | resp->set_snapshot_id(snapshot_id->data(), snapshot_id->size()); |
407 | 1 | return Status::OK(); |
408 | 1 | } |
409 | | |
410 | 0 | return CreateNonTransactionAwareSnapshot(req, resp, rpc); |
411 | 1 | } |
412 | | |
413 | | Status CatalogManager::CreateNonTransactionAwareSnapshot( |
414 | | const CreateSnapshotRequestPB* req, |
415 | | CreateSnapshotResponsePB* resp, |
416 | 0 | RpcContext* rpc) { |
417 | 0 | SnapshotId snapshot_id; |
418 | 0 | { |
419 | 0 | LockGuard lock(mutex_); |
420 | 0 | TRACE("Acquired catalog manager lock"); |
421 | | |
422 | | // Verify that the system is not in snapshot creating/restoring state. |
423 | 0 | if (!current_snapshot_id_.empty()) { |
424 | 0 | return STATUS(IllegalState, |
425 | 0 | Format( |
426 | 0 | "Current snapshot id: $0. Parallel snapshot operations are not supported" |
427 | 0 | ": $1", current_snapshot_id_, req), |
428 | 0 | MasterError(MasterErrorPB::PARALLEL_SNAPSHOT_OPERATION)); |
429 | 0 | } |
430 | | |
431 | | // Create a new snapshot UUID. |
432 | 0 | snapshot_id = GenerateIdUnlocked(SysRowEntryType::SNAPSHOT); |
433 | 0 | } |
434 | | |
435 | 0 | vector<scoped_refptr<TabletInfo>> all_tablets; |
436 | | |
437 | | // Create in memory snapshot data descriptor. |
438 | 0 | scoped_refptr<SnapshotInfo> snapshot(new SnapshotInfo(snapshot_id)); |
439 | 0 | snapshot->mutable_metadata()->StartMutation(); |
440 | 0 | snapshot->mutable_metadata()->mutable_dirty()->pb.set_state(SysSnapshotEntryPB::CREATING); |
441 | |
|
442 | 0 | auto tables = VERIFY_RESULT(CollectTables(req->tables(), |
443 | 0 | req->add_indexes(), |
444 | 0 | true /* include_parent_colocated_table */)); |
445 | 0 | std::unordered_set<NamespaceId> added_namespaces; |
446 | 0 | for (const auto& table : tables) { |
447 | 0 | snapshot->AddEntries(table, &added_namespaces); |
448 | 0 | all_tablets.insert(all_tablets.end(), table.tablet_infos.begin(), table.tablet_infos.end()); |
449 | 0 | } |
450 | |
|
451 | 0 | VLOG(1) << "Snapshot " << snapshot->ToString() |
452 | 0 | << ": PB=" << snapshot->mutable_metadata()->mutable_dirty()->pb.DebugString(); |
453 | | |
454 | | // Write the snapshot data descriptor to the system catalog (in "creating" state). |
455 | 0 | RETURN_NOT_OK(CheckLeaderStatus( |
456 | 0 | sys_catalog_->Upsert(leader_ready_term(), snapshot), |
457 | 0 | "inserting snapshot into sys-catalog")); |
458 | 0 | TRACE("Wrote snapshot to system catalog"); |
459 | | |
460 | | // Commit in memory snapshot data descriptor. |
461 | 0 | snapshot->mutable_metadata()->CommitMutation(); |
462 | | |
463 | | // Put the snapshot data descriptor to the catalog manager. |
464 | 0 | { |
465 | 0 | LockGuard lock(mutex_); |
466 | 0 | TRACE("Acquired catalog manager lock"); |
467 | | |
468 | | // Verify that the snapshot does not exist. |
469 | 0 | auto inserted = non_txn_snapshot_ids_map_.emplace(snapshot_id, snapshot).second; |
470 | 0 | RSTATUS_DCHECK(inserted, IllegalState, Format("Snapshot already exists: $0", snapshot_id)); |
471 | 0 | current_snapshot_id_ = snapshot_id; |
472 | 0 | } |
473 | | |
474 | | // Send CreateSnapshot requests to all TServers (one tablet - one request). |
475 | 0 | for (const scoped_refptr<TabletInfo>& tablet : all_tablets) { |
476 | 0 | TRACE("Locking tablet"); |
477 | 0 | auto l = tablet->LockForRead(); |
478 | |
|
479 | 0 | LOG(INFO) << "Sending CreateTabletSnapshot to tablet: " << tablet->ToString(); |
480 | | |
481 | | // Send Create Tablet Snapshot request to each tablet leader. |
482 | 0 | auto call = CreateAsyncTabletSnapshotOp( |
483 | 0 | tablet, snapshot_id, tserver::TabletSnapshotOpRequestPB::CREATE_ON_TABLET, |
484 | 0 | TabletSnapshotOperationCallback()); |
485 | 0 | ScheduleTabletSnapshotOp(call); |
486 | 0 | } |
487 | |
|
488 | 0 | resp->set_snapshot_id(snapshot_id); |
489 | 0 | LOG(INFO) << "Successfully started snapshot " << snapshot_id << " creation"; |
490 | 0 | return Status::OK(); |
491 | 0 | } |
492 | | |
493 | 52 | void CatalogManager::Submit(std::unique_ptr<tablet::Operation> operation, int64_t leader_term) { |
494 | 52 | operation->SetTablet(tablet_peer()->tablet()); |
495 | 52 | tablet_peer()->Submit(std::move(operation), leader_term); |
496 | 52 | } |
497 | | |
498 | | Result<SysRowEntries> CatalogManager::CollectEntries( |
499 | | const google::protobuf::RepeatedPtrField<TableIdentifierPB>& table_identifiers, |
500 | 41 | CollectFlags flags) { |
501 | 41 | RETURN_NOT_OK(CheckIsLeaderAndReady()); |
502 | 41 | SysRowEntries entries; |
503 | 41 | std::unordered_set<NamespaceId> namespaces; |
504 | 41 | auto tables = VERIFY_RESULT(CollectTables(table_identifiers, flags, &namespaces)); |
505 | 41 | if (!namespaces.empty()) { |
506 | 39 | SharedLock lock(mutex_); |
507 | 39 | for (const auto& ns_id : namespaces) { |
508 | 39 | auto ns_info = VERIFY_RESULT(FindNamespaceByIdUnlocked(ns_id)); |
509 | 0 | AddInfoEntry(ns_info.get(), entries.mutable_entries()); |
510 | 39 | } |
511 | 39 | } |
512 | 41 | for (const auto& table : tables) { |
513 | | // TODO(txn_snapshot) use single lock to resolve all tables to tablets |
514 | 30 | SnapshotInfo::AddEntries(table, entries.mutable_entries(), /* tablet_infos= */ nullptr, |
515 | 30 | &namespaces); |
516 | 30 | } |
517 | | |
518 | 41 | return entries; |
519 | 41 | } |
520 | | |
521 | | Result<SysRowEntries> CatalogManager::CollectEntriesForSnapshot( |
522 | 39 | const google::protobuf::RepeatedPtrField<TableIdentifierPB>& tables) { |
523 | 39 | return CollectEntries( |
524 | 39 | tables, |
525 | 39 | CollectFlags{CollectFlag::kAddIndexes, CollectFlag::kIncludeParentColocatedTable, |
526 | 39 | CollectFlag::kSucceedIfCreateInProgress}); |
527 | 39 | } |
528 | | |
529 | 318k | server::Clock* CatalogManager::Clock() { |
530 | 318k | return master_->clock(); |
531 | 318k | } |
532 | | |
533 | | Status CatalogManager::CreateTransactionAwareSnapshot( |
534 | 2 | const CreateSnapshotRequestPB& req, CreateSnapshotResponsePB* resp, rpc::RpcContext* rpc) { |
535 | 2 | CollectFlags flags{CollectFlag::kIncludeParentColocatedTable}; |
536 | 2 | flags.SetIf(CollectFlag::kAddIndexes, req.add_indexes()); |
537 | 2 | SysRowEntries entries = VERIFY_RESULT(CollectEntries(req.tables(), flags)); |
538 | | |
539 | 2 | auto snapshot_id = VERIFY_RESULT(snapshot_coordinator_.Create( |
540 | 2 | entries, req.imported(), leader_ready_term(), rpc->GetClientDeadline())); |
541 | 0 | resp->set_snapshot_id(snapshot_id.data(), snapshot_id.size()); |
542 | 2 | return Status::OK(); |
543 | 2 | } |
544 | | |
545 | | Status CatalogManager::ListSnapshots(const ListSnapshotsRequestPB* req, |
546 | 9 | ListSnapshotsResponsePB* resp) { |
547 | 9 | auto txn_snapshot_id = TryFullyDecodeTxnSnapshotId(req->snapshot_id()); |
548 | 9 | { |
549 | 9 | SharedLock lock(mutex_); |
550 | 9 | TRACE("Acquired catalog manager lock"); |
551 | | |
552 | 9 | if (!current_snapshot_id_.empty()) { |
553 | 0 | resp->set_current_snapshot_id(current_snapshot_id_); |
554 | 0 | } |
555 | | |
556 | 9 | auto setup_snapshot_pb_lambda = [resp](scoped_refptr<SnapshotInfo> snapshot_info) { |
557 | 0 | auto snapshot_lock = snapshot_info->LockForRead(); |
558 | |
|
559 | 0 | SnapshotInfoPB* const snapshot = resp->add_snapshots(); |
560 | 0 | snapshot->set_id(snapshot_info->id()); |
561 | 0 | *snapshot->mutable_entry() = snapshot_info->metadata().state().pb; |
562 | 0 | }; |
563 | | |
564 | 9 | if (req->has_snapshot_id()) { |
565 | 8 | if (!txn_snapshot_id) { |
566 | 0 | TRACE("Looking up snapshot"); |
567 | 0 | scoped_refptr<SnapshotInfo> snapshot_info = |
568 | 0 | FindPtrOrNull(non_txn_snapshot_ids_map_, req->snapshot_id()); |
569 | 0 | if (snapshot_info == nullptr) { |
570 | 0 | return STATUS(InvalidArgument, "Could not find snapshot", req->snapshot_id(), |
571 | 0 | MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND)); |
572 | 0 | } |
573 | | |
574 | 0 | setup_snapshot_pb_lambda(snapshot_info); |
575 | 0 | } |
576 | 8 | } else { |
577 | 1 | for (const SnapshotInfoMap::value_type& entry : non_txn_snapshot_ids_map_) { |
578 | 0 | setup_snapshot_pb_lambda(entry.second); |
579 | 0 | } |
580 | 1 | } |
581 | 9 | } |
582 | | |
583 | 9 | if (req->prepare_for_backup() && (0 !req->has_snapshot_id()0 || !txn_snapshot_id0 )) { |
584 | 0 | return STATUS( |
585 | 0 | InvalidArgument, "Request must have correct snapshot_id", (req->has_snapshot_id() ? |
586 | 0 | req->snapshot_id() : "None"), MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
587 | 0 | } |
588 | | |
589 | 9 | RETURN_NOT_OK(snapshot_coordinator_.ListSnapshots( |
590 | 9 | txn_snapshot_id, req->list_deleted_snapshots(), resp)); |
591 | | |
592 | 9 | if (req->prepare_for_backup()) { |
593 | 0 | SharedLock lock(mutex_); |
594 | 0 | TRACE("Acquired catalog manager lock"); |
595 | | |
596 | | // Repack & extend the backup row entries. |
597 | 0 | for (SnapshotInfoPB& snapshot : *resp->mutable_snapshots()) { |
598 | 0 | snapshot.set_format_version(2); |
599 | 0 | SysSnapshotEntryPB& sys_entry = *snapshot.mutable_entry(); |
600 | 0 | snapshot.mutable_backup_entries()->Reserve(sys_entry.entries_size()); |
601 | |
|
602 | 0 | for (SysRowEntry& entry : *sys_entry.mutable_entries()) { |
603 | 0 | BackupRowEntryPB* const backup_entry = snapshot.add_backup_entries(); |
604 | | // Setup BackupRowEntryPB fields. |
605 | | // Set BackupRowEntryPB::pg_schema_name for YSQL table to disambiguate in case tables |
606 | | // in different schema have same name. |
607 | 0 | if (entry.type() == SysRowEntryType::TABLE) { |
608 | 0 | TRACE("Looking up table"); |
609 | 0 | scoped_refptr<TableInfo> table_info = FindPtrOrNull(*table_ids_map_, entry.id()); |
610 | 0 | if (table_info == nullptr) { |
611 | 0 | return STATUS( |
612 | 0 | InvalidArgument, "Table not found by ID", entry.id(), |
613 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
614 | 0 | } |
615 | | |
616 | 0 | TRACE("Locking table"); |
617 | 0 | auto l = table_info->LockForRead(); |
618 | | // PG schema name is available for YSQL table only. |
619 | | // Except '<uuid>.colocated.parent.uuid' table ID. |
620 | 0 | if (l->table_type() == PGSQL_TABLE_TYPE && !IsColocatedParentTableId(entry.id())) { |
621 | 0 | const string pg_schema_name = VERIFY_RESULT(GetPgSchemaName(table_info)); |
622 | 0 | VLOG(1) << "PG Schema: " << pg_schema_name << " for table " << table_info->ToString(); |
623 | 0 | backup_entry->set_pg_schema_name(pg_schema_name); |
624 | 0 | } |
625 | 0 | } |
626 | | |
627 | | // Init BackupRowEntryPB::entry. |
628 | 0 | backup_entry->mutable_entry()->Swap(&entry); |
629 | 0 | } |
630 | | |
631 | 0 | sys_entry.clear_entries(); |
632 | 0 | } |
633 | 0 | } |
634 | | |
635 | 9 | return Status::OK(); |
636 | 9 | } |
637 | | |
638 | | Status CatalogManager::ListSnapshotRestorations(const ListSnapshotRestorationsRequestPB* req, |
639 | 6 | ListSnapshotRestorationsResponsePB* resp) { |
640 | 6 | TxnSnapshotRestorationId restoration_id = TxnSnapshotRestorationId::Nil(); |
641 | 6 | if (!req->restoration_id().empty()) { |
642 | 5 | restoration_id = VERIFY_RESULT(FullyDecodeTxnSnapshotRestorationId(req->restoration_id())); |
643 | 5 | } |
644 | 6 | TxnSnapshotId snapshot_id = TxnSnapshotId::Nil(); |
645 | 6 | if (!req->snapshot_id().empty()) { |
646 | 0 | snapshot_id = VERIFY_RESULT(FullyDecodeTxnSnapshotId(req->snapshot_id())); |
647 | 0 | } |
648 | | |
649 | 6 | return snapshot_coordinator_.ListRestorations(restoration_id, snapshot_id, resp); |
650 | 6 | } |
651 | | |
652 | | Status CatalogManager::RestoreSnapshot(const RestoreSnapshotRequestPB* req, |
653 | 3 | RestoreSnapshotResponsePB* resp) { |
654 | 3 | LOG(INFO) << "Servicing RestoreSnapshot request: " << req->ShortDebugString(); |
655 | | |
656 | 3 | auto txn_snapshot_id = TryFullyDecodeTxnSnapshotId(req->snapshot_id()); |
657 | 3 | if (txn_snapshot_id) { |
658 | 3 | HybridTime ht; |
659 | 3 | if (req->has_restore_ht()) { |
660 | 3 | ht = HybridTime(req->restore_ht()); |
661 | 3 | } |
662 | 3 | TxnSnapshotRestorationId id = VERIFY_RESULT(snapshot_coordinator_.Restore( |
663 | 3 | txn_snapshot_id, ht, leader_ready_term())); |
664 | 0 | resp->set_restoration_id(id.data(), id.size()); |
665 | 3 | return Status::OK(); |
666 | 3 | } |
667 | | |
668 | 0 | return RestoreNonTransactionAwareSnapshot(req->snapshot_id()); |
669 | 3 | } |
670 | | |
671 | 0 | Status CatalogManager::RestoreNonTransactionAwareSnapshot(const string& snapshot_id) { |
672 | 0 | LockGuard lock(mutex_); |
673 | 0 | TRACE("Acquired catalog manager lock"); |
674 | |
|
675 | 0 | if (!current_snapshot_id_.empty()) { |
676 | 0 | return STATUS( |
677 | 0 | IllegalState, |
678 | 0 | Format( |
679 | 0 | "Current snapshot id: $0. Parallel snapshot operations are not supported: $1", |
680 | 0 | current_snapshot_id_, snapshot_id), |
681 | 0 | MasterError(MasterErrorPB::PARALLEL_SNAPSHOT_OPERATION)); |
682 | 0 | } |
683 | | |
684 | 0 | TRACE("Looking up snapshot"); |
685 | 0 | scoped_refptr<SnapshotInfo> snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, snapshot_id); |
686 | 0 | if (snapshot == nullptr) { |
687 | 0 | return STATUS(InvalidArgument, "Could not find snapshot", snapshot_id, |
688 | 0 | MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND)); |
689 | 0 | } |
690 | | |
691 | 0 | auto snapshot_lock = snapshot->LockForWrite(); |
692 | |
|
693 | 0 | if (snapshot_lock->started_deleting()) { |
694 | 0 | return STATUS(NotFound, "The snapshot was deleted", snapshot_id, |
695 | 0 | MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND)); |
696 | 0 | } |
697 | | |
698 | 0 | if (!snapshot_lock->is_complete()) { |
699 | 0 | return STATUS(IllegalState, "The snapshot state is not complete", snapshot_id, |
700 | 0 | MasterError(MasterErrorPB::SNAPSHOT_IS_NOT_READY)); |
701 | 0 | } |
702 | | |
703 | 0 | TRACE("Updating snapshot metadata on disk"); |
704 | 0 | SysSnapshotEntryPB& snapshot_pb = snapshot_lock.mutable_data()->pb; |
705 | 0 | snapshot_pb.set_state(SysSnapshotEntryPB::RESTORING); |
706 | | |
707 | | // Update tablet states. |
708 | 0 | SetTabletSnapshotsState(SysSnapshotEntryPB::RESTORING, &snapshot_pb); |
709 | | |
710 | | // Update sys-catalog with the updated snapshot state. |
711 | | // The mutation will be aborted when 'l' exits the scope on early return. |
712 | 0 | RETURN_NOT_OK(CheckLeaderStatus( |
713 | 0 | sys_catalog_->Upsert(leader_ready_term(), snapshot), |
714 | 0 | "updating snapshot in sys-catalog")); |
715 | | |
716 | | // CataloManager lock 'lock_' is still locked here. |
717 | 0 | current_snapshot_id_ = snapshot_id; |
718 | | |
719 | | // Restore all entries. |
720 | 0 | for (const SysRowEntry& entry : snapshot_pb.entries()) { |
721 | 0 | RETURN_NOT_OK(RestoreEntry(entry, snapshot_id)); |
722 | 0 | } |
723 | | |
724 | | // Commit in memory snapshot data descriptor. |
725 | 0 | TRACE("Committing in-memory snapshot state"); |
726 | 0 | snapshot_lock.Commit(); |
727 | |
|
728 | 0 | LOG(INFO) << "Successfully started snapshot " << snapshot->ToString() << " restoring"; |
729 | 0 | return Status::OK(); |
730 | 0 | } |
731 | | |
732 | 0 | Status CatalogManager::RestoreEntry(const SysRowEntry& entry, const SnapshotId& snapshot_id) { |
733 | 0 | switch (entry.type()) { |
734 | 0 | case SysRowEntryType::NAMESPACE: { // Restore NAMESPACES. |
735 | 0 | TRACE("Looking up namespace"); |
736 | 0 | scoped_refptr<NamespaceInfo> ns = FindPtrOrNull(namespace_ids_map_, entry.id()); |
737 | 0 | if (ns == nullptr) { |
738 | | // Restore Namespace. |
739 | | // TODO: implement |
740 | 0 | LOG(INFO) << "Restoring: NAMESPACE id = " << entry.id(); |
741 | |
|
742 | 0 | return STATUS(NotSupported, Substitute( |
743 | 0 | "Not implemented: restoring namespace: id=$0", entry.type())); |
744 | 0 | } |
745 | 0 | break; |
746 | 0 | } |
747 | 0 | case SysRowEntryType::TABLE: { // Restore TABLES. |
748 | 0 | TRACE("Looking up table"); |
749 | 0 | scoped_refptr<TableInfo> table = FindPtrOrNull(*table_ids_map_, entry.id()); |
750 | 0 | if (table == nullptr) { |
751 | | // Restore Table. |
752 | | // TODO: implement |
753 | 0 | LOG(INFO) << "Restoring: TABLE id = " << entry.id(); |
754 | |
|
755 | 0 | return STATUS(NotSupported, Substitute( |
756 | 0 | "Not implemented: restoring table: id=$0", entry.type())); |
757 | 0 | } |
758 | 0 | break; |
759 | 0 | } |
760 | 0 | case SysRowEntryType::TABLET: { // Restore TABLETS. |
761 | 0 | TRACE("Looking up tablet"); |
762 | 0 | scoped_refptr<TabletInfo> tablet = FindPtrOrNull(*tablet_map_, entry.id()); |
763 | 0 | if (tablet == nullptr) { |
764 | | // Restore Tablet. |
765 | | // TODO: implement |
766 | 0 | LOG(INFO) << "Restoring: TABLET id = " << entry.id(); |
767 | |
|
768 | 0 | return STATUS(NotSupported, Substitute( |
769 | 0 | "Not implemented: restoring tablet: id=$0", entry.type())); |
770 | 0 | } else { |
771 | 0 | TRACE("Locking tablet"); |
772 | 0 | auto l = tablet->LockForRead(); |
773 | |
|
774 | 0 | LOG(INFO) << "Sending RestoreTabletSnapshot to tablet: " << tablet->ToString(); |
775 | | // Send RestoreSnapshot requests to all TServers (one tablet - one request). |
776 | 0 | auto task = CreateAsyncTabletSnapshotOp( |
777 | 0 | tablet, snapshot_id, tserver::TabletSnapshotOpRequestPB::RESTORE_ON_TABLET, |
778 | 0 | TabletSnapshotOperationCallback()); |
779 | 0 | ScheduleTabletSnapshotOp(task); |
780 | 0 | } |
781 | 0 | break; |
782 | 0 | } |
783 | 0 | default: |
784 | 0 | return STATUS_FORMAT( |
785 | 0 | InternalError, "Unexpected entry type in the snapshot: $0", entry.type()); |
786 | 0 | } |
787 | | |
788 | 0 | return Status::OK(); |
789 | 0 | } |
790 | | |
791 | | Status CatalogManager::DeleteSnapshot(const DeleteSnapshotRequestPB* req, |
792 | | DeleteSnapshotResponsePB* resp, |
793 | 0 | RpcContext* rpc) { |
794 | 0 | LOG(INFO) << "Servicing DeleteSnapshot request: " << req->ShortDebugString(); |
795 | |
|
796 | 0 | auto txn_snapshot_id = TryFullyDecodeTxnSnapshotId(req->snapshot_id()); |
797 | 0 | if (txn_snapshot_id) { |
798 | 0 | return snapshot_coordinator_.Delete( |
799 | 0 | txn_snapshot_id, leader_ready_term(), rpc->GetClientDeadline()); |
800 | 0 | } |
801 | | |
802 | 0 | return DeleteNonTransactionAwareSnapshot(req->snapshot_id()); |
803 | 0 | } |
804 | | |
805 | 0 | Status CatalogManager::DeleteNonTransactionAwareSnapshot(const SnapshotId& snapshot_id) { |
806 | 0 | LockGuard lock(mutex_); |
807 | 0 | TRACE("Acquired catalog manager lock"); |
808 | |
|
809 | 0 | TRACE("Looking up snapshot"); |
810 | 0 | scoped_refptr<SnapshotInfo> snapshot = FindPtrOrNull( |
811 | 0 | non_txn_snapshot_ids_map_, snapshot_id); |
812 | 0 | if (snapshot == nullptr) { |
813 | 0 | return STATUS(InvalidArgument, "Could not find snapshot", snapshot_id, |
814 | 0 | MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND)); |
815 | 0 | } |
816 | | |
817 | 0 | auto snapshot_lock = snapshot->LockForWrite(); |
818 | |
|
819 | 0 | if (snapshot_lock->started_deleting()) { |
820 | 0 | return STATUS(NotFound, "The snapshot was deleted", snapshot_id, |
821 | 0 | MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND)); |
822 | 0 | } |
823 | | |
824 | 0 | if (snapshot_lock->is_restoring()) { |
825 | 0 | return STATUS(InvalidArgument, "The snapshot is being restored now", snapshot_id, |
826 | 0 | MasterError(MasterErrorPB::PARALLEL_SNAPSHOT_OPERATION)); |
827 | 0 | } |
828 | | |
829 | 0 | TRACE("Updating snapshot metadata on disk"); |
830 | 0 | SysSnapshotEntryPB& snapshot_pb = snapshot_lock.mutable_data()->pb; |
831 | 0 | snapshot_pb.set_state(SysSnapshotEntryPB::DELETING); |
832 | | |
833 | | // Update tablet states. |
834 | 0 | SetTabletSnapshotsState(SysSnapshotEntryPB::DELETING, &snapshot_pb); |
835 | | |
836 | | // Update sys-catalog with the updated snapshot state. |
837 | | // The mutation will be aborted when 'l' exits the scope on early return. |
838 | 0 | RETURN_NOT_OK(CheckStatus( |
839 | 0 | sys_catalog_->Upsert(leader_ready_term(), snapshot), |
840 | 0 | "updating snapshot in sys-catalog")); |
841 | | |
842 | | // Send DeleteSnapshot requests to all TServers (one tablet - one request). |
843 | 0 | for (const SysRowEntry& entry : snapshot_pb.entries()) { |
844 | 0 | if (entry.type() == SysRowEntryType::TABLET) { |
845 | 0 | TRACE("Looking up tablet"); |
846 | 0 | scoped_refptr<TabletInfo> tablet = FindPtrOrNull(*tablet_map_, entry.id()); |
847 | 0 | if (tablet == nullptr) { |
848 | 0 | LOG(WARNING) << "Deleting tablet not found " << entry.id(); |
849 | 0 | } else { |
850 | 0 | TRACE("Locking tablet"); |
851 | 0 | auto l = tablet->LockForRead(); |
852 | |
|
853 | 0 | LOG(INFO) << "Sending DeleteTabletSnapshot to tablet: " << tablet->ToString(); |
854 | | // Send DeleteSnapshot requests to all TServers (one tablet - one request). |
855 | 0 | auto task = CreateAsyncTabletSnapshotOp( |
856 | 0 | tablet, snapshot_id, tserver::TabletSnapshotOpRequestPB::DELETE_ON_TABLET, |
857 | 0 | TabletSnapshotOperationCallback()); |
858 | 0 | ScheduleTabletSnapshotOp(task); |
859 | 0 | } |
860 | 0 | } |
861 | 0 | } |
862 | | |
863 | | // Commit in memory snapshot data descriptor. |
864 | 0 | TRACE("Committing in-memory snapshot state"); |
865 | 0 | snapshot_lock.Commit(); |
866 | |
|
867 | 0 | LOG(INFO) << "Successfully started snapshot " << snapshot->ToString() << " deletion"; |
868 | 0 | return Status::OK(); |
869 | 0 | } |
870 | | |
871 | | Status CatalogManager::ImportSnapshotPreprocess(const SnapshotInfoPB& snapshot_pb, |
872 | | ImportSnapshotMetaResponsePB* resp, |
873 | | NamespaceMap* namespace_map, |
874 | 0 | ExternalTableSnapshotDataMap* tables_data) { |
875 | 0 | for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) { |
876 | 0 | const SysRowEntry& entry = backup_entry.entry(); |
877 | 0 | switch (entry.type()) { |
878 | 0 | case SysRowEntryType::NAMESPACE: // Recreate NAMESPACE. |
879 | 0 | RETURN_NOT_OK(ImportNamespaceEntry(entry, namespace_map)); |
880 | 0 | break; |
881 | 0 | case SysRowEntryType::TABLE: { // Create TABLE metadata. |
882 | 0 | LOG_IF(DFATAL, entry.id().empty()) << "Empty entry id"; |
883 | 0 | ExternalTableSnapshotData& data = (*tables_data)[entry.id()]; |
884 | |
|
885 | 0 | if (data.old_table_id.empty()) { |
886 | 0 | data.old_table_id = entry.id(); |
887 | 0 | data.table_meta = resp->mutable_tables_meta()->Add(); |
888 | 0 | data.tablet_id_map = data.table_meta->mutable_tablets_ids(); |
889 | 0 | data.table_entry_pb = VERIFY_RESULT(ParseFromSlice<SysTablesEntryPB>(entry.data())); |
890 | | |
891 | 0 | if (backup_entry.has_pg_schema_name()) { |
892 | 0 | data.pg_schema_name = backup_entry.pg_schema_name(); |
893 | 0 | } |
894 | 0 | } else { |
895 | 0 | LOG_WITH_FUNC(WARNING) << "Ignoring duplicate table with id " << entry.id(); |
896 | 0 | } |
897 | | |
898 | 0 | LOG_IF(DFATAL, data.old_table_id.empty()) << "Not initialized table id"; |
899 | 0 | } |
900 | 0 | break; |
901 | 0 | case SysRowEntryType::TABLET: // Preprocess original tablets. |
902 | 0 | RETURN_NOT_OK(PreprocessTabletEntry(entry, tables_data)); |
903 | 0 | break; |
904 | 0 | case SysRowEntryType::CLUSTER_CONFIG: FALLTHROUGH_INTENDED; |
905 | 0 | case SysRowEntryType::REDIS_CONFIG: FALLTHROUGH_INTENDED; |
906 | 0 | case SysRowEntryType::UDTYPE: FALLTHROUGH_INTENDED; |
907 | 0 | case SysRowEntryType::ROLE: FALLTHROUGH_INTENDED; |
908 | 0 | case SysRowEntryType::SYS_CONFIG: FALLTHROUGH_INTENDED; |
909 | 0 | case SysRowEntryType::CDC_STREAM: FALLTHROUGH_INTENDED; |
910 | 0 | case SysRowEntryType::UNIVERSE_REPLICATION: FALLTHROUGH_INTENDED; |
911 | 0 | case SysRowEntryType::SNAPSHOT: FALLTHROUGH_INTENDED; |
912 | 0 | case SysRowEntryType::SNAPSHOT_SCHEDULE: FALLTHROUGH_INTENDED; |
913 | 0 | case SysRowEntryType::DDL_LOG_ENTRY: FALLTHROUGH_INTENDED; |
914 | 0 | case SysRowEntryType::UNKNOWN: |
915 | 0 | FATAL_INVALID_ENUM_VALUE(SysRowEntryType, entry.type()); |
916 | 0 | } |
917 | 0 | } |
918 | | |
919 | 0 | return Status::OK(); |
920 | 0 | } |
921 | | |
922 | | Status CatalogManager::ImportSnapshotCreateObject(const SnapshotInfoPB& snapshot_pb, |
923 | | ImportSnapshotMetaResponsePB* resp, |
924 | | NamespaceMap* namespace_map, |
925 | | ExternalTableSnapshotDataMap* tables_data, |
926 | 0 | CreateObjects create_objects) { |
927 | | // Create ONLY TABLES or ONLY INDEXES in accordance to the argument. |
928 | 0 | for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) { |
929 | 0 | const SysRowEntry& entry = backup_entry.entry(); |
930 | 0 | if (entry.type() == SysRowEntryType::TABLE) { |
931 | 0 | ExternalTableSnapshotData& data = (*tables_data)[entry.id()]; |
932 | 0 | if ((create_objects == CreateObjects::kOnlyIndexes) == data.is_index()) { |
933 | 0 | RETURN_NOT_OK(ImportTableEntry(*namespace_map, *tables_data, &data)); |
934 | 0 | } |
935 | 0 | } |
936 | 0 | } |
937 | | |
938 | 0 | return Status::OK(); |
939 | 0 | } |
940 | | |
941 | | Status CatalogManager::ImportSnapshotWaitForTables(const SnapshotInfoPB& snapshot_pb, |
942 | | ImportSnapshotMetaResponsePB* resp, |
943 | | ExternalTableSnapshotDataMap* tables_data, |
944 | 0 | CoarseTimePoint deadline) { |
945 | 0 | for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) { |
946 | 0 | const SysRowEntry& entry = backup_entry.entry(); |
947 | 0 | if (entry.type() == SysRowEntryType::TABLE) { |
948 | 0 | ExternalTableSnapshotData& data = (*tables_data)[entry.id()]; |
949 | 0 | if (!data.is_index()) { |
950 | 0 | RETURN_NOT_OK(WaitForCreateTableToFinish(data.new_table_id, deadline)); |
951 | 0 | } |
952 | 0 | } |
953 | 0 | } |
954 | | |
955 | 0 | return Status::OK(); |
956 | 0 | } |
957 | | |
958 | | Status CatalogManager::ImportSnapshotProcessTablets(const SnapshotInfoPB& snapshot_pb, |
959 | | ImportSnapshotMetaResponsePB* resp, |
960 | 0 | ExternalTableSnapshotDataMap* tables_data) { |
961 | 0 | for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) { |
962 | 0 | const SysRowEntry& entry = backup_entry.entry(); |
963 | 0 | if (entry.type() == SysRowEntryType::TABLET) { |
964 | | // Create tablets IDs map. |
965 | 0 | RETURN_NOT_OK(ImportTabletEntry(entry, tables_data)); |
966 | 0 | } |
967 | 0 | } |
968 | | |
969 | 0 | return Status::OK(); |
970 | 0 | } |
971 | | |
972 | | template <class RespClass> |
973 | | void ProcessDeleteObjectStatus(const string& obj_name, |
974 | | const string& id, |
975 | | const RespClass& resp, |
976 | 0 | const Status& s) { |
977 | 0 | Status result = s; |
978 | 0 | if (result.ok() && resp.has_error()) { |
979 | 0 | result = StatusFromPB(resp.error().status()); |
980 | 0 | LOG_IF(DFATAL, result.ok()) << "Expecting error status"; |
981 | 0 | } |
982 | |
|
983 | 0 | if (!result.ok()) { |
984 | 0 | LOG_WITH_FUNC(WARNING) << "Failed to delete new " << obj_name << " with id=" << id |
985 | 0 | << ": " << result; |
986 | 0 | } |
987 | 0 | } Unexecuted instantiation: void yb::master::enterprise::ProcessDeleteObjectStatus<yb::master::DeleteTableResponsePB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::DeleteTableResponsePB const&, yb::Status const&) Unexecuted instantiation: void yb::master::enterprise::ProcessDeleteObjectStatus<yb::master::DeleteNamespaceResponsePB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::DeleteNamespaceResponsePB const&, yb::Status const&) |
988 | | |
989 | | void CatalogManager::DeleteNewSnapshotObjects(const NamespaceMap& namespace_map, |
990 | 0 | const ExternalTableSnapshotDataMap& tables_data) { |
991 | 0 | for (const ExternalTableSnapshotDataMap::value_type& entry : tables_data) { |
992 | 0 | const TableId& old_id = entry.first; |
993 | 0 | const TableId& new_id = entry.second.new_table_id; |
994 | 0 | const TableType type = entry.second.table_entry_pb.table_type(); |
995 | | |
996 | | // Do not delete YSQL objects - it must be deleted via PG API. |
997 | 0 | if (new_id.empty() || new_id == old_id || type == TableType::PGSQL_TABLE_TYPE) { |
998 | 0 | continue; |
999 | 0 | } |
1000 | | |
1001 | 0 | LOG_WITH_FUNC(INFO) << "Deleting new table with id=" << new_id << " old id=" << old_id; |
1002 | 0 | DeleteTableRequestPB req; |
1003 | 0 | DeleteTableResponsePB resp; |
1004 | 0 | req.mutable_table()->set_table_id(new_id); |
1005 | 0 | req.set_is_index_table(entry.second.is_index()); |
1006 | 0 | ProcessDeleteObjectStatus("table", new_id, resp, DeleteTable(&req, &resp, nullptr)); |
1007 | 0 | } |
1008 | |
|
1009 | 0 | for (const NamespaceMap::value_type& entry : namespace_map) { |
1010 | 0 | const NamespaceId& old_id = entry.first; |
1011 | 0 | const NamespaceId& new_id = entry.second.first; |
1012 | 0 | const YQLDatabase& db_type = entry.second.second; |
1013 | | |
1014 | | // Do not delete YSQL objects - it must be deleted via PG API. |
1015 | 0 | if (new_id.empty() || new_id == old_id || db_type == YQL_DATABASE_PGSQL) { |
1016 | 0 | continue; |
1017 | 0 | } |
1018 | | |
1019 | 0 | LOG_WITH_FUNC(INFO) << "Deleting new namespace with id=" << new_id << " old id=" << old_id; |
1020 | 0 | DeleteNamespaceRequestPB req; |
1021 | 0 | DeleteNamespaceResponsePB resp; |
1022 | 0 | req.mutable_namespace_()->set_id(new_id); |
1023 | 0 | ProcessDeleteObjectStatus( |
1024 | 0 | "namespace", new_id, resp, DeleteNamespace(&req, &resp, nullptr)); |
1025 | 0 | } |
1026 | 0 | } |
1027 | | |
1028 | | Status CatalogManager::ImportSnapshotMeta(const ImportSnapshotMetaRequestPB* req, |
1029 | | ImportSnapshotMetaResponsePB* resp, |
1030 | 0 | rpc::RpcContext* rpc) { |
1031 | 0 | LOG(INFO) << "Servicing ImportSnapshotMeta request: " << req->ShortDebugString(); |
1032 | |
|
1033 | 0 | NamespaceMap namespace_map; |
1034 | 0 | ExternalTableSnapshotDataMap tables_data; |
1035 | 0 | bool successful_exit = false; |
1036 | |
|
1037 | 0 | auto se = ScopeExit([this, &namespace_map, &tables_data, &successful_exit] { |
1038 | 0 | if (!successful_exit) { |
1039 | 0 | DeleteNewSnapshotObjects(namespace_map, tables_data); |
1040 | 0 | } |
1041 | 0 | }); |
1042 | |
|
1043 | 0 | const SnapshotInfoPB& snapshot_pb = req->snapshot(); |
1044 | |
|
1045 | 0 | if (!snapshot_pb.has_format_version() || snapshot_pb.format_version() != 2) { |
1046 | 0 | return STATUS(InternalError, "Expected snapshot data in format 2", |
1047 | 0 | snapshot_pb.ShortDebugString(), MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1048 | 0 | } |
1049 | | |
1050 | 0 | if (snapshot_pb.backup_entries_size() == 0) { |
1051 | 0 | return STATUS(InternalError, "Expected snapshot data prepared for backup", |
1052 | 0 | snapshot_pb.ShortDebugString(), MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1053 | 0 | } |
1054 | | |
1055 | | // PHASE 1: Recreate namespaces, create table's meta data. |
1056 | 0 | RETURN_NOT_OK(ImportSnapshotPreprocess(snapshot_pb, resp, &namespace_map, &tables_data)); |
1057 | | |
1058 | | // PHASE 2: Recreate ONLY tables. |
1059 | 0 | RETURN_NOT_OK(ImportSnapshotCreateObject( |
1060 | 0 | snapshot_pb, resp, &namespace_map, &tables_data, CreateObjects::kOnlyTables)); |
1061 | | |
1062 | | // PHASE 3: Wait for all tables creation complete. |
1063 | 0 | RETURN_NOT_OK(ImportSnapshotWaitForTables( |
1064 | 0 | snapshot_pb, resp, &tables_data, rpc->GetClientDeadline())); |
1065 | | |
1066 | | // PHASE 4: Recreate ONLY indexes. |
1067 | 0 | RETURN_NOT_OK(ImportSnapshotCreateObject( |
1068 | 0 | snapshot_pb, resp, &namespace_map, &tables_data, CreateObjects::kOnlyIndexes)); |
1069 | | |
1070 | | // PHASE 5: Restore tablets. |
1071 | 0 | RETURN_NOT_OK(ImportSnapshotProcessTablets(snapshot_pb, resp, &tables_data)); |
1072 | | |
1073 | 0 | successful_exit = true; |
1074 | 0 | return Status::OK(); |
1075 | 0 | } |
1076 | | |
1077 | | Status CatalogManager::ChangeEncryptionInfo(const ChangeEncryptionInfoRequestPB* req, |
1078 | 17 | ChangeEncryptionInfoResponsePB* resp) { |
1079 | 17 | auto cluster_config = ClusterConfig(); |
1080 | 17 | auto l = cluster_config->LockForWrite(); |
1081 | 17 | auto encryption_info = l.mutable_data()->pb.mutable_encryption_info(); |
1082 | | |
1083 | 17 | RETURN_NOT_OK(encryption_manager_->ChangeEncryptionInfo(req, encryption_info)); |
1084 | | |
1085 | 17 | l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1); |
1086 | 17 | RETURN_NOT_OK(CheckStatus( |
1087 | 17 | sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()), |
1088 | 17 | "updating cluster config in sys-catalog")); |
1089 | 17 | l.Commit(); |
1090 | | |
1091 | 17 | std::lock_guard<simple_spinlock> lock(should_send_universe_key_registry_mutex_); |
1092 | 17 | for (auto& entry : should_send_universe_key_registry_) { |
1093 | 1 | entry.second = true; |
1094 | 1 | } |
1095 | | |
1096 | 17 | return Status::OK(); |
1097 | 17 | } |
1098 | | |
1099 | | Status CatalogManager::IsEncryptionEnabled(const IsEncryptionEnabledRequestPB* req, |
1100 | 17 | IsEncryptionEnabledResponsePB* resp) { |
1101 | 17 | return encryption_manager_->IsEncryptionEnabled( |
1102 | 17 | ClusterConfig()->LockForRead()->pb.encryption_info(), resp); |
1103 | 17 | } |
1104 | | |
1105 | | Status CatalogManager::ImportNamespaceEntry(const SysRowEntry& entry, |
1106 | 0 | NamespaceMap* namespace_map) { |
1107 | 0 | LOG_IF(DFATAL, entry.type() != SysRowEntryType::NAMESPACE) |
1108 | 0 | << "Unexpected entry type: " << entry.type(); |
1109 | |
|
1110 | 0 | SysNamespaceEntryPB meta = VERIFY_RESULT(ParseFromSlice<SysNamespaceEntryPB>(entry.data())); |
1111 | 0 | const YQLDatabase db_type = GetDatabaseType(meta); |
1112 | 0 | NamespaceData& ns_data = (*namespace_map)[entry.id()]; |
1113 | 0 | ns_data.second = db_type; |
1114 | |
|
1115 | 0 | TRACE("Looking up namespace"); |
1116 | | // First of all try to find the namespace by ID. It will work if we are restoring the backup |
1117 | | // on the original cluster where the backup was created. |
1118 | 0 | scoped_refptr<NamespaceInfo> ns; |
1119 | 0 | { |
1120 | 0 | SharedLock lock(mutex_); |
1121 | 0 | ns = FindPtrOrNull(namespace_ids_map_, entry.id()); |
1122 | 0 | } |
1123 | |
|
1124 | 0 | if (ns != nullptr && ns->name() == meta.name() && ns->state() == SysNamespaceEntryPB::RUNNING) { |
1125 | 0 | ns_data.first = entry.id(); |
1126 | 0 | return Status::OK(); |
1127 | 0 | } |
1128 | | |
1129 | | // If the namespace was not found by ID, it's ok on a new cluster OR if the namespace was |
1130 | | // deleted and created again. In both cases the namespace can be found by NAME. |
1131 | 0 | if (db_type == YQL_DATABASE_PGSQL) { |
1132 | | // YSQL database must be created via external call. Find it by name. |
1133 | 0 | { |
1134 | 0 | SharedLock lock(mutex_); |
1135 | 0 | ns = FindPtrOrNull(namespace_names_mapper_[db_type], meta.name()); |
1136 | 0 | } |
1137 | |
|
1138 | 0 | if (ns == nullptr) { |
1139 | 0 | const string msg = Format("YSQL database must exist: $0", meta.name()); |
1140 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1141 | 0 | return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::NAMESPACE_NOT_FOUND)); |
1142 | 0 | } |
1143 | 0 | if (ns->state() != SysNamespaceEntryPB::RUNNING) { |
1144 | 0 | const string msg = Format("Found YSQL database must be running: $0", meta.name()); |
1145 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1146 | 0 | return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::NAMESPACE_NOT_FOUND)); |
1147 | 0 | } |
1148 | | |
1149 | 0 | auto ns_lock = ns->LockForRead(); |
1150 | 0 | ns_data.first = ns->id(); |
1151 | 0 | } else { |
1152 | 0 | CreateNamespaceRequestPB req; |
1153 | 0 | CreateNamespaceResponsePB resp; |
1154 | 0 | req.set_name(meta.name()); |
1155 | 0 | const Status s = CreateNamespace(&req, &resp, nullptr); |
1156 | |
|
1157 | 0 | if (!s.ok() && !s.IsAlreadyPresent()) { |
1158 | 0 | return s.CloneAndAppend("Failed to create namespace"); |
1159 | 0 | } |
1160 | | |
1161 | 0 | if (s.IsAlreadyPresent()) { |
1162 | 0 | LOG_WITH_FUNC(INFO) << "Using existing namespace '" << meta.name() << "': " << resp.id(); |
1163 | 0 | } |
1164 | |
|
1165 | 0 | ns_data.first = resp.id(); |
1166 | 0 | } |
1167 | 0 | return Status::OK(); |
1168 | 0 | } |
1169 | | |
1170 | | Status CatalogManager::RecreateTable(const NamespaceId& new_namespace_id, |
1171 | | const ExternalTableSnapshotDataMap& table_map, |
1172 | 0 | ExternalTableSnapshotData* table_data) { |
1173 | 0 | const SysTablesEntryPB& meta = DCHECK_NOTNULL(table_data)->table_entry_pb; |
1174 | |
|
1175 | 0 | CreateTableRequestPB req; |
1176 | 0 | CreateTableResponsePB resp; |
1177 | 0 | req.set_name(meta.name()); |
1178 | 0 | req.set_table_type(meta.table_type()); |
1179 | 0 | req.set_num_tablets(narrow_cast<int32_t>(table_data->num_tablets)); |
1180 | 0 | for (const auto& p : table_data->partitions) { |
1181 | 0 | *req.add_partitions() = p; |
1182 | 0 | } |
1183 | 0 | req.mutable_namespace_()->set_id(new_namespace_id); |
1184 | 0 | *req.mutable_partition_schema() = meta.partition_schema(); |
1185 | 0 | *req.mutable_replication_info() = meta.replication_info(); |
1186 | |
|
1187 | 0 | SchemaPB* const schema = req.mutable_schema(); |
1188 | 0 | *schema = meta.schema(); |
1189 | | |
1190 | | // Setup Index info. |
1191 | 0 | if (table_data->is_index()) { |
1192 | 0 | TRACE("Looking up indexed table"); |
1193 | | // First of all try to attach to the new copy of the referenced table, |
1194 | | // because the table restored from the snapshot is preferred. |
1195 | | // For that try to map old indexed table ID into new table ID. |
1196 | 0 | ExternalTableSnapshotDataMap::const_iterator it = table_map.find(meta.indexed_table_id()); |
1197 | 0 | const bool using_existing_table = (it == table_map.end()); |
1198 | |
|
1199 | 0 | if (using_existing_table) { |
1200 | 0 | LOG_WITH_FUNC(INFO) << "Try to use old indexed table id " << meta.indexed_table_id(); |
1201 | 0 | req.set_indexed_table_id(meta.indexed_table_id()); |
1202 | 0 | } else { |
1203 | 0 | LOG_WITH_FUNC(INFO) << "Found new table id " << it->second.new_table_id |
1204 | 0 | << " for old table id " << meta.indexed_table_id() |
1205 | 0 | << " from the snapshot"; |
1206 | 0 | req.set_indexed_table_id(it->second.new_table_id); |
1207 | 0 | } |
1208 | |
|
1209 | 0 | scoped_refptr<TableInfo> indexed_table; |
1210 | 0 | { |
1211 | 0 | SharedLock lock(mutex_); |
1212 | | // Try to find the specified indexed table by id. |
1213 | 0 | indexed_table = FindPtrOrNull(*table_ids_map_, req.indexed_table_id()); |
1214 | 0 | } |
1215 | |
|
1216 | 0 | if (indexed_table == nullptr) { |
1217 | 0 | const string msg = Format("Indexed table not found by id: $0", req.indexed_table_id()); |
1218 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1219 | 0 | return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
1220 | 0 | } |
1221 | | |
1222 | 0 | LOG_WITH_FUNC(INFO) << "Found indexed table by id " << req.indexed_table_id(); |
1223 | | |
1224 | | // Ensure the main table schema (including column ids) was not changed. |
1225 | 0 | if (!using_existing_table) { |
1226 | 0 | Schema new_indexed_schema, src_indexed_schema; |
1227 | 0 | RETURN_NOT_OK(indexed_table->GetSchema(&new_indexed_schema)); |
1228 | 0 | RETURN_NOT_OK(SchemaFromPB(it->second.table_entry_pb.schema(), &src_indexed_schema)); |
1229 | | |
1230 | 0 | if (!new_indexed_schema.Equals(src_indexed_schema)) { |
1231 | 0 | const string msg = Format( |
1232 | 0 | "Recreated table has changes in schema: new schema={$0}, source schema={$1}", |
1233 | 0 | new_indexed_schema.ToString(), src_indexed_schema.ToString()); |
1234 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1235 | 0 | return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1236 | 0 | } |
1237 | | |
1238 | 0 | if (new_indexed_schema.column_ids() != src_indexed_schema.column_ids()) { |
1239 | 0 | const string msg = Format( |
1240 | 0 | "Recreated table has changes in column ids: new ids=$0, source ids=$1", |
1241 | 0 | ToString(new_indexed_schema.column_ids()), ToString(src_indexed_schema.column_ids())); |
1242 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1243 | 0 | return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1244 | 0 | } |
1245 | 0 | } |
1246 | | |
1247 | 0 | req.set_is_local_index(meta.is_local_index()); |
1248 | 0 | req.set_is_unique_index(meta.is_unique_index()); |
1249 | 0 | req.set_skip_index_backfill(true); |
1250 | | // Setup IndexInfoPB - self descriptor. |
1251 | 0 | IndexInfoPB* const index_info_pb = req.mutable_index_info(); |
1252 | 0 | *index_info_pb = meta.index_info(); |
1253 | 0 | index_info_pb->clear_table_id(); |
1254 | 0 | index_info_pb->set_indexed_table_id(req.indexed_table_id()); |
1255 | | |
1256 | | // Reset column ids. |
1257 | 0 | for (int i = 0; i < index_info_pb->columns_size(); ++i) { |
1258 | 0 | index_info_pb->mutable_columns(i)->clear_column_id(); |
1259 | 0 | } |
1260 | 0 | } |
1261 | | |
1262 | 0 | RETURN_NOT_OK(CreateTable(&req, &resp, /* RpcContext */nullptr)); |
1263 | 0 | table_data->new_table_id = resp.table_id(); |
1264 | 0 | LOG_WITH_FUNC(INFO) << "New table id " << table_data->new_table_id << " for " |
1265 | 0 | << table_data->old_table_id; |
1266 | 0 | return Status::OK(); |
1267 | 0 | } |
1268 | | |
1269 | | Status CatalogManager::RepartitionTable(const scoped_refptr<TableInfo> table, |
1270 | 0 | const ExternalTableSnapshotData* table_data) { |
1271 | 0 | DCHECK_EQ(table->id(), table_data->new_table_id); |
1272 | 0 | if (table->GetTableType() != PGSQL_TABLE_TYPE) { |
1273 | 0 | return STATUS_FORMAT(InvalidArgument, |
1274 | 0 | "Cannot repartition non-YSQL table: got $0", |
1275 | 0 | TableType_Name(table->GetTableType())); |
1276 | 0 | } |
1277 | 0 | LOG_WITH_FUNC(INFO) << "Repartition table " << table->id() |
1278 | 0 | << " using external snapshot table " << table_data->old_table_id; |
1279 | | |
1280 | | // Get partitions from external snapshot. |
1281 | 0 | size_t i = 0; |
1282 | 0 | vector<Partition> partitions(table_data->partitions.size()); |
1283 | 0 | for (const auto& partition_pb : table_data->partitions) { |
1284 | 0 | Partition::FromPB(partition_pb, &partitions[i++]); |
1285 | 0 | } |
1286 | 0 | VLOG_WITH_FUNC(3) << "Got " << partitions.size() |
1287 | 0 | << " partitions from external snapshot for table " << table->id(); |
1288 | | |
1289 | | // Change TableInfo to point to the new tablets. |
1290 | 0 | string deletion_msg; |
1291 | 0 | vector<scoped_refptr<TabletInfo>> new_tablets; |
1292 | 0 | vector<scoped_refptr<TabletInfo>> old_tablets; |
1293 | 0 | { |
1294 | | // Acquire the TableInfo pb write lock. Although it is not required for some of the individual |
1295 | | // steps, we want to hold it through so that we guarantee the state does not change during the |
1296 | | // whole process. Consequently, we hold it through some steps that require mutex_, but since |
1297 | | // taking mutex_ after TableInfo pb lock is prohibited for deadlock reasons, acquire mutex_ |
1298 | | // first, then release it when it is no longer needed, still holding table pb lock. |
1299 | 0 | TableInfo::WriteLock table_lock; |
1300 | 0 | { |
1301 | 0 | LockGuard lock(mutex_); |
1302 | 0 | TRACE("Acquired catalog manager lock"); |
1303 | | |
1304 | | // Make sure the table is in RUNNING state. |
1305 | | // This by itself doesn't need a pb write lock: just a read lock. However, we want to prevent |
1306 | | // other writers from entering from this point forward, so take the write lock now. |
1307 | 0 | table_lock = table->LockForWrite(); |
1308 | 0 | if (table->old_pb().state() != SysTablesEntryPB::RUNNING) { |
1309 | 0 | return STATUS_FORMAT(IllegalState, |
1310 | 0 | "Table $0 not running: $1", |
1311 | 0 | table->ToString(), |
1312 | 0 | SysTablesEntryPB_State_Name(table->old_pb().state())); |
1313 | 0 | } |
1314 | | // Make sure the table's tablets can be deleted. |
1315 | 0 | RETURN_NOT_OK_PREPEND(CheckIfForbiddenToDeleteTabletOf(table), |
1316 | 0 | Format("Cannot repartition table $0", table->id())); |
1317 | | |
1318 | | // Create and mark new tablets for creation. |
1319 | | |
1320 | | // Use partitions from external snapshot to create new tablets in state PREPARING. The tablets |
1321 | | // will start CREATING once they are committed in memory. |
1322 | 0 | for (const auto& partition : partitions) { |
1323 | 0 | PartitionPB partition_pb; |
1324 | 0 | partition.ToPB(&partition_pb); |
1325 | 0 | new_tablets.push_back(CreateTabletInfo(table.get(), partition_pb)); |
1326 | 0 | } |
1327 | | |
1328 | | // Add tablets to catalog manager tablet_map_. This should be safe to do after creating |
1329 | | // tablets since we're still holding mutex_. |
1330 | 0 | auto tablet_map_checkout = tablet_map_.CheckOut(); |
1331 | 0 | for (auto& new_tablet : new_tablets) { |
1332 | 0 | InsertOrDie(tablet_map_checkout.get_ptr(), new_tablet->tablet_id(), new_tablet); |
1333 | 0 | } |
1334 | 0 | VLOG_WITH_FUNC(3) << "Prepared creation of " << new_tablets.size() |
1335 | 0 | << " new tablets for table " << table->id(); |
1336 | | |
1337 | | // mutex_ is no longer needed, so release by going out of scope. |
1338 | 0 | } |
1339 | | // The table pb write lock is still held, ensuring that the table state does not change. Later |
1340 | | // steps, like GetTablets or AddTablets, will acquire/release the TableInfo lock_, but it's |
1341 | | // probably fine that they are released between the steps since the table pb write lock is held |
1342 | | // throughout. In other words, there should be no risk that TableInfo tablets_ changes between |
1343 | | // GetTablets and RemoveTablets. |
1344 | | |
1345 | | // Abort tablet mutations in case of early returns. |
1346 | 0 | ScopedInfoCommitter<TabletInfo> unlocker_new(&new_tablets); |
1347 | | |
1348 | | // Mark old tablets for deletion. |
1349 | 0 | old_tablets = table->GetTablets(IncludeInactive::kTrue); |
1350 | | // Sort so that locking can be done in a deterministic order. |
1351 | 0 | std::sort(old_tablets.begin(), old_tablets.end(), [](const auto& lhs, const auto& rhs) { |
1352 | 0 | return lhs->tablet_id() < rhs->tablet_id(); |
1353 | 0 | }); |
1354 | 0 | deletion_msg = Format("Old tablets of table $0 deleted at $1", |
1355 | 0 | table->id(), LocalTimeAsString()); |
1356 | 0 | for (auto& old_tablet : old_tablets) { |
1357 | 0 | old_tablet->mutable_metadata()->StartMutation(); |
1358 | 0 | old_tablet->mutable_metadata()->mutable_dirty()->set_state( |
1359 | 0 | SysTabletsEntryPB::DELETED, deletion_msg); |
1360 | 0 | } |
1361 | 0 | VLOG_WITH_FUNC(3) << "Prepared deletion of " << old_tablets.size() << " old tablets for table " |
1362 | 0 | << table->id(); |
1363 | | |
1364 | | // Abort tablet mutations in case of early returns. |
1365 | 0 | ScopedInfoCommitter<TabletInfo> unlocker_old(&old_tablets); |
1366 | | |
1367 | | // Change table's partition schema to the external snapshot's. |
1368 | 0 | auto& table_pb = table_lock.mutable_data()->pb; |
1369 | 0 | table_pb.mutable_partition_schema()->CopyFrom( |
1370 | 0 | table_data->table_entry_pb.partition_schema()); |
1371 | 0 | table_pb.set_partition_list_version(table_pb.partition_list_version() + 1); |
1372 | | |
1373 | | // Remove old tablets from TableInfo. |
1374 | 0 | table->RemoveTablets(old_tablets); |
1375 | | // Add new tablets to TableInfo. This must be done after removing tablets because |
1376 | | // TableInfo::partitions_ has key PartitionKey, which old and new tablets may conflict on. |
1377 | 0 | table->AddTablets(new_tablets); |
1378 | | |
1379 | | // Commit table and tablets to disk. |
1380 | 0 | RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), table, new_tablets, old_tablets)); |
1381 | 0 | VLOG_WITH_FUNC(2) << "Committed to disk: table " << table->id() << " repartition from " |
1382 | 0 | << old_tablets.size() << " tablets to " << new_tablets.size() << " tablets"; |
1383 | | |
1384 | | // Commit to memory. Commit new tablets (addition) first since that doesn't break anything. |
1385 | | // Commit table next since new tablets are already committed and ready to be referenced. Commit |
1386 | | // old tablets (deletion) last since the table is not referencing them anymore. |
1387 | 0 | unlocker_new.Commit(); |
1388 | 0 | table_lock.Commit(); |
1389 | 0 | unlocker_old.Commit(); |
1390 | 0 | VLOG_WITH_FUNC(1) << "Committed to memory: table " << table->id() << " repartition from " |
1391 | 0 | << old_tablets.size() << " tablets to " << new_tablets.size() << " tablets"; |
1392 | 0 | } |
1393 | | |
1394 | | // Finally, now that everything is committed, send the delete tablet requests. |
1395 | 0 | for (auto& old_tablet : old_tablets) { |
1396 | 0 | DeleteTabletReplicas(old_tablet.get(), deletion_msg, HideOnly::kFalse); |
1397 | 0 | } |
1398 | 0 | VLOG_WITH_FUNC(2) << "Sent delete tablet requests for " << old_tablets.size() << " old tablets" |
1399 | 0 | << " of table " << table->id(); |
1400 | | // The create tablet requests should be handled by bg tasks which find the PREPARING tablets after |
1401 | | // commit. |
1402 | |
|
1403 | 0 | return Status::OK(); |
1404 | 0 | } |
1405 | | |
1406 | | // Helper function for ImportTableEntry. |
1407 | | // |
1408 | | // Given an internal table and an external table snapshot, do some checks to determine if we should |
1409 | | // move forward with using this internal table for import. |
1410 | | // |
1411 | | // table: internal table's info |
1412 | | // snapshot_data: external table's snapshot data |
1413 | | Result<bool> CatalogManager::CheckTableForImport(scoped_refptr<TableInfo> table, |
1414 | 0 | ExternalTableSnapshotData* snapshot_data) { |
1415 | 0 | auto table_lock = table->LockForRead(); |
1416 | | |
1417 | | // Check if table is live. |
1418 | 0 | if (!table_lock->visible_to_client()) { |
1419 | 0 | VLOG_WITH_FUNC(2) << "Table not visible to client: " << table->ToString(); |
1420 | 0 | return false; |
1421 | 0 | } |
1422 | | // Check if table names match. |
1423 | 0 | const string& external_table_name = snapshot_data->table_entry_pb.name(); |
1424 | 0 | if (table_lock->name() != external_table_name) { |
1425 | 0 | VLOG_WITH_FUNC(2) << "Table names do not match: " |
1426 | 0 | << table_lock->name() << " vs " << external_table_name |
1427 | 0 | << " for " << table->ToString(); |
1428 | 0 | return false; |
1429 | 0 | } |
1430 | | // Check index vs table. |
1431 | 0 | if (snapshot_data->is_index() ? table->indexed_table_id().empty() |
1432 | 0 | : !table->indexed_table_id().empty()) { |
1433 | 0 | VLOG_WITH_FUNC(2) << "External snapshot table is " << (snapshot_data->is_index() ? "" : "not ") |
1434 | 0 | << "index but internal table is the opposite: " << table->ToString(); |
1435 | 0 | return false; |
1436 | 0 | } |
1437 | | // Check if table schemas match (if present in snapshot). |
1438 | 0 | if (!snapshot_data->pg_schema_name.empty()) { |
1439 | 0 | if (table->GetTableType() != PGSQL_TABLE_TYPE) { |
1440 | 0 | LOG_WITH_FUNC(DFATAL) << "ExternalTableSnapshotData.pg_schema_name set when table type is not" |
1441 | 0 | << " PGSQL: schema name: " << snapshot_data->pg_schema_name |
1442 | 0 | << ", table type: " << TableType_Name(table->GetTableType()); |
1443 | | // If not a debug build, ignore pg_schema_name. |
1444 | 0 | } else { |
1445 | 0 | const string internal_schema_name = VERIFY_RESULT(GetPgSchemaName(table)); |
1446 | 0 | const string& external_schema_name = snapshot_data->pg_schema_name; |
1447 | 0 | if (internal_schema_name != external_schema_name) { |
1448 | 0 | LOG_WITH_FUNC(INFO) << "Schema names do not match: " |
1449 | 0 | << internal_schema_name << " vs " << external_schema_name |
1450 | 0 | << " for " << table->ToString(); |
1451 | 0 | return false; |
1452 | 0 | } |
1453 | 0 | } |
1454 | 0 | } |
1455 | | |
1456 | 0 | return true; |
1457 | 0 | } |
1458 | | |
1459 | | Status CatalogManager::ImportTableEntry(const NamespaceMap& namespace_map, |
1460 | | const ExternalTableSnapshotDataMap& table_map, |
1461 | 0 | ExternalTableSnapshotData* table_data) { |
1462 | 0 | const SysTablesEntryPB& meta = DCHECK_NOTNULL(table_data)->table_entry_pb; |
1463 | 0 | bool is_parent_colocated_table = false; |
1464 | |
|
1465 | 0 | table_data->old_namespace_id = meta.namespace_id(); |
1466 | 0 | LOG_IF(DFATAL, table_data->old_namespace_id.empty()) << "No namespace id"; |
1467 | |
|
1468 | 0 | LOG_IF(DFATAL, namespace_map.find(table_data->old_namespace_id) == namespace_map.end()) |
1469 | 0 | << "Namespace not found: " << table_data->old_namespace_id; |
1470 | 0 | const NamespaceId new_namespace_id = |
1471 | 0 | namespace_map.find(table_data->old_namespace_id)->second.first; |
1472 | 0 | LOG_IF(DFATAL, new_namespace_id.empty()) << "No namespace id"; |
1473 | |
|
1474 | 0 | Schema schema; |
1475 | 0 | RETURN_NOT_OK(SchemaFromPB(meta.schema(), &schema)); |
1476 | 0 | const vector<ColumnId>& column_ids = schema.column_ids(); |
1477 | 0 | scoped_refptr<TableInfo> table; |
1478 | | |
1479 | | // First, check if namespace id and table id match. If, in addition, other properties match, we |
1480 | | // found the destination table. |
1481 | 0 | if (new_namespace_id == table_data->old_namespace_id) { |
1482 | 0 | TRACE("Looking up table"); |
1483 | 0 | { |
1484 | 0 | SharedLock lock(mutex_); |
1485 | 0 | table = FindPtrOrNull(*table_ids_map_, table_data->old_table_id); |
1486 | 0 | } |
1487 | |
|
1488 | 0 | if (table != nullptr) { |
1489 | 0 | VLOG_WITH_PREFIX(3) << "Begin first search"; |
1490 | | // At this point, namespace id and table id match. Check other properties, like whether the |
1491 | | // table is active and whether table name matches. |
1492 | 0 | SharedLock lock(mutex_); |
1493 | 0 | if (VERIFY_RESULT(CheckTableForImport(table, table_data))) { |
1494 | 0 | LOG_WITH_FUNC(INFO) << "Found existing table: '" << table->ToString() << "'"; |
1495 | 0 | } else { |
1496 | | // A property did not match, so this search by ids failed. |
1497 | 0 | auto table_lock = table->LockForRead(); |
1498 | 0 | LOG_WITH_FUNC(WARNING) << "Existing table " << table->ToString() << " not suitable: " |
1499 | 0 | << table_lock->pb.ShortDebugString() |
1500 | 0 | << ", name: " << table->name() << " vs " << meta.name(); |
1501 | 0 | table.reset(); |
1502 | 0 | } |
1503 | 0 | } |
1504 | 0 | } |
1505 | | |
1506 | | // Second, if we still didn't find a match... |
1507 | 0 | if (table == nullptr) { |
1508 | 0 | VLOG_WITH_PREFIX(3) << "Begin second search"; |
1509 | 0 | switch (meta.table_type()) { |
1510 | 0 | case TableType::YQL_TABLE_TYPE: FALLTHROUGH_INTENDED; |
1511 | 0 | case TableType::REDIS_TABLE_TYPE: { |
1512 | | // For YCQL and YEDIS, simply create the missing table. |
1513 | 0 | RETURN_NOT_OK(RecreateTable(new_namespace_id, table_map, table_data)); |
1514 | 0 | break; |
1515 | 0 | } |
1516 | 0 | case TableType::PGSQL_TABLE_TYPE: { |
1517 | | // For YSQL, the table must be created via external call. Therefore, continue the search for |
1518 | | // the table, this time checking for name matches rather than id matches. |
1519 | |
|
1520 | 0 | if (meta.colocated() && IsColocatedParentTableId(table_data->old_table_id)) { |
1521 | | // For the parent colocated table we need to generate the new_table_id ourselves |
1522 | | // since the names will not match. |
1523 | | // For normal colocated tables, we are still able to follow the normal table flow, so no |
1524 | | // need to generate the new_table_id ourselves. |
1525 | 0 | table_data->new_table_id = new_namespace_id + kColocatedParentTableIdSuffix; |
1526 | 0 | is_parent_colocated_table = true; |
1527 | 0 | } else { |
1528 | 0 | if (!table_data->new_table_id.empty()) { |
1529 | 0 | const string msg = Format( |
1530 | 0 | "$0 expected empty new table id but $1 found", __func__, table_data->new_table_id); |
1531 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1532 | 0 | return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1533 | 0 | } |
1534 | 0 | SharedLock lock(mutex_); |
1535 | |
|
1536 | 0 | for (const auto& entry : *table_ids_map_) { |
1537 | 0 | table = entry.second; |
1538 | |
|
1539 | 0 | if (new_namespace_id != table->namespace_id()) { |
1540 | 0 | VLOG_WITH_FUNC(3) << "Namespace ids do not match: " |
1541 | 0 | << table->namespace_id() << " vs " << new_namespace_id |
1542 | 0 | << " for " << table->ToString(); |
1543 | 0 | continue; |
1544 | 0 | } |
1545 | 0 | if (!VERIFY_RESULT(CheckTableForImport(table, table_data))) { |
1546 | | // Some other check failed. |
1547 | 0 | continue; |
1548 | 0 | } |
1549 | | // Also check if table is user-created. |
1550 | 0 | if (!IsUserCreatedTableUnlocked(*table)) { |
1551 | 0 | VLOG_WITH_FUNC(2) << "Table not user created: " << table->ToString(); |
1552 | 0 | continue; |
1553 | 0 | } |
1554 | | |
1555 | | // Found the new YSQL table by name. |
1556 | 0 | if (table_data->new_table_id.empty()) { |
1557 | 0 | LOG_WITH_FUNC(INFO) << "Found existing table " << entry.first << " for " |
1558 | 0 | << new_namespace_id << "/" << meta.name() << " (old table " |
1559 | 0 | << table_data->old_table_id << ") with schema " |
1560 | 0 | << table_data->pg_schema_name; |
1561 | 0 | table_data->new_table_id = entry.first; |
1562 | 0 | } else if (table_data->new_table_id != entry.first) { |
1563 | 0 | const string msg = Format( |
1564 | 0 | "Found 2 YSQL tables with the same name: $0 - $1, $2", |
1565 | 0 | meta.name(), table_data->new_table_id, entry.first); |
1566 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1567 | 0 | return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1568 | 0 | } |
1569 | 0 | } |
1570 | | |
1571 | 0 | if (table_data->new_table_id.empty()) { |
1572 | 0 | const string msg = Format("YSQL table not found: $0", meta.name()); |
1573 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1574 | 0 | return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
1575 | 0 | } |
1576 | 0 | } |
1577 | 0 | break; |
1578 | 0 | } |
1579 | 0 | case TableType::TRANSACTION_STATUS_TABLE_TYPE: { |
1580 | 0 | return STATUS( |
1581 | 0 | InvalidArgument, |
1582 | 0 | Format("Unexpected table type: $0", TableType_Name(meta.table_type())), |
1583 | 0 | MasterError(MasterErrorPB::INVALID_TABLE_TYPE)); |
1584 | 0 | } |
1585 | 0 | } |
1586 | 0 | } else { |
1587 | 0 | table_data->new_table_id = table_data->old_table_id; |
1588 | 0 | LOG_WITH_FUNC(INFO) << "Use existing table " << table_data->new_table_id; |
1589 | 0 | } |
1590 | | |
1591 | | // The destination table should be found or created by now. |
1592 | 0 | TRACE("Looking up new table"); |
1593 | 0 | { |
1594 | 0 | SharedLock lock(mutex_); |
1595 | 0 | table = FindPtrOrNull(*table_ids_map_, table_data->new_table_id); |
1596 | 0 | } |
1597 | 0 | if (table == nullptr) { |
1598 | 0 | const string msg = Format("Created table not found: $0", table_data->new_table_id); |
1599 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1600 | 0 | return STATUS(InternalError, msg, MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
1601 | 0 | } |
1602 | | |
1603 | | // Don't do schema validation/column updates on the parent colocated table. |
1604 | | // However, still do the validation for regular colocated tables. |
1605 | 0 | if (!is_parent_colocated_table) { |
1606 | 0 | Schema persisted_schema; |
1607 | 0 | size_t new_num_tablets = 0; |
1608 | 0 | { |
1609 | 0 | TRACE("Locking table"); |
1610 | 0 | auto table_lock = table->LockForRead(); |
1611 | 0 | RETURN_NOT_OK(table->GetSchema(&persisted_schema)); |
1612 | 0 | new_num_tablets = table->NumPartitions(); |
1613 | 0 | } |
1614 | | |
1615 | | // Ignore 'nullable' attribute - due to difference in implementation |
1616 | | // of PgCreateTable::AddColumn() and PgAlterTable::AddColumn(). |
1617 | 0 | struct CompareColumnsExceptNullable { |
1618 | 0 | bool operator ()(const ColumnSchema& a, const ColumnSchema& b) { |
1619 | 0 | return ColumnSchema::CompHashKey(a, b) && ColumnSchema::CompSortingType(a, b) && |
1620 | 0 | ColumnSchema::CompTypeInfo(a, b) && ColumnSchema::CompName(a, b); |
1621 | 0 | } |
1622 | 0 | } comparator; |
1623 | | // Schema::Equals() compares only column names & types. It does not compare the column ids. |
1624 | 0 | if (!persisted_schema.Equals(schema, comparator) |
1625 | 0 | || persisted_schema.column_ids().size() != column_ids.size()) { |
1626 | 0 | const string msg = Format( |
1627 | 0 | "Invalid created $0 table '$1' in namespace id $2: schema={$3}, expected={$4}", |
1628 | 0 | TableType_Name(meta.table_type()), meta.name(), new_namespace_id, |
1629 | 0 | persisted_schema, schema); |
1630 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1631 | 0 | return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1632 | 0 | } |
1633 | | |
1634 | 0 | if (table_data->num_tablets > 0) { |
1635 | 0 | if (meta.table_type() == TableType::PGSQL_TABLE_TYPE) { |
1636 | 0 | bool partitions_match = true; |
1637 | 0 | if (new_num_tablets != table_data->num_tablets) { |
1638 | 0 | partitions_match = false; |
1639 | 0 | } else { |
1640 | | // Check if partition boundaries match. Only check the starts; assume the ends are fine. |
1641 | 0 | size_t i = 0; |
1642 | 0 | vector<PartitionKey> partition_starts(table_data->num_tablets); |
1643 | 0 | for (const auto& partition_pb : table_data->partitions) { |
1644 | 0 | partition_starts[i] = partition_pb.partition_key_start(); |
1645 | 0 | LOG_IF(DFATAL, (i == 0) ? partition_starts[i] != "" |
1646 | 0 | : partition_starts[i] <= partition_starts[i-1]) |
1647 | 0 | << "Wrong partition key start: " << b2a_hex(partition_starts[i]); |
1648 | 0 | i++; |
1649 | 0 | } |
1650 | 0 | if (!table->HasPartitions(partition_starts)) { |
1651 | 0 | LOG_WITH_FUNC(INFO) << "Partition boundaries mismatch for table " << table->id(); |
1652 | 0 | partitions_match = false; |
1653 | 0 | } |
1654 | 0 | } |
1655 | |
|
1656 | 0 | if (!partitions_match) { |
1657 | 0 | RETURN_NOT_OK(RepartitionTable(table, table_data)); |
1658 | 0 | } |
1659 | 0 | } else { // not PGSQL_TABLE_TYPE |
1660 | 0 | if (new_num_tablets != table_data->num_tablets) { |
1661 | 0 | const string msg = Format( |
1662 | 0 | "Wrong number of tablets in created $0 table '$1' in namespace id $2:" |
1663 | 0 | " $3 (expected $4)", |
1664 | 0 | TableType_Name(meta.table_type()), meta.name(), new_namespace_id, |
1665 | 0 | new_num_tablets, table_data->num_tablets); |
1666 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1667 | 0 | return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1668 | 0 | } |
1669 | 0 | } |
1670 | 0 | } |
1671 | | |
1672 | | // Update the table column ids if it's not equal to the stored ids. |
1673 | 0 | if (persisted_schema.column_ids() != column_ids) { |
1674 | 0 | if (meta.table_type() != TableType::PGSQL_TABLE_TYPE) { |
1675 | 0 | LOG_WITH_FUNC(WARNING) << "Unexpected wrong column ids in " |
1676 | 0 | << TableType_Name(meta.table_type()) << " table '" << meta.name() |
1677 | 0 | << "' in namespace id " << new_namespace_id; |
1678 | 0 | } |
1679 | |
|
1680 | 0 | LOG_WITH_FUNC(INFO) << "Restoring column ids in " << TableType_Name(meta.table_type()) |
1681 | 0 | << " table '" << meta.name() << "' in namespace id " |
1682 | 0 | << new_namespace_id; |
1683 | 0 | auto l = table->LockForWrite(); |
1684 | 0 | size_t col_idx = 0; |
1685 | 0 | for (auto& column : *l.mutable_data()->pb.mutable_schema()->mutable_columns()) { |
1686 | | // Expecting here correct schema (columns - order, names, types), but with only wrong |
1687 | | // column ids. Checking correct column order and column names below. |
1688 | 0 | if (column.name() != schema.column(col_idx).name()) { |
1689 | 0 | const string msg = Format( |
1690 | 0 | "Unexpected column name for index=$0: name=$1, expected name=$2", |
1691 | 0 | col_idx, schema.column(col_idx).name(), column.name()); |
1692 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1693 | 0 | return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1694 | 0 | } |
1695 | | // Copy the column id from imported (original) schema. |
1696 | 0 | column.set_id(column_ids[col_idx++]); |
1697 | 0 | } |
1698 | | |
1699 | 0 | l.mutable_data()->pb.set_next_column_id(schema.max_col_id() + 1); |
1700 | 0 | l.mutable_data()->pb.set_version(l->pb.version() + 1); |
1701 | | // Update sys-catalog with the new table schema. |
1702 | 0 | RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), table)); |
1703 | 0 | l.Commit(); |
1704 | | // Update the new table schema in tablets. |
1705 | 0 | RETURN_NOT_OK(SendAlterTableRequest(table)); |
1706 | 0 | } |
1707 | 0 | } |
1708 | | |
1709 | | // Set the type of the table in the response pb (default is TABLE so only set if colocated). |
1710 | 0 | if (meta.colocated()) { |
1711 | 0 | if (is_parent_colocated_table) { |
1712 | 0 | table_data->table_meta->set_table_type( |
1713 | 0 | ImportSnapshotMetaResponsePB_TableType_PARENT_COLOCATED_TABLE); |
1714 | 0 | } else { |
1715 | 0 | table_data->table_meta->set_table_type( |
1716 | 0 | ImportSnapshotMetaResponsePB_TableType_COLOCATED_TABLE); |
1717 | 0 | } |
1718 | 0 | } |
1719 | |
|
1720 | 0 | TabletInfos new_tablets; |
1721 | 0 | { |
1722 | 0 | TRACE("Locking table"); |
1723 | 0 | auto table_lock = table->LockForRead(); |
1724 | 0 | new_tablets = table->GetTablets(); |
1725 | 0 | } |
1726 | |
|
1727 | 0 | for (const scoped_refptr<TabletInfo>& tablet : new_tablets) { |
1728 | 0 | auto tablet_lock = tablet->LockForRead(); |
1729 | 0 | const PartitionPB& partition_pb = tablet->metadata().state().pb.partition(); |
1730 | 0 | const ExternalTableSnapshotData::PartitionKeys key( |
1731 | 0 | partition_pb.partition_key_start(), partition_pb.partition_key_end()); |
1732 | 0 | table_data->new_tablets_map[key] = tablet->id(); |
1733 | 0 | } |
1734 | |
|
1735 | 0 | IdPairPB* const namespace_ids = table_data->table_meta->mutable_namespace_ids(); |
1736 | 0 | namespace_ids->set_new_id(new_namespace_id); |
1737 | 0 | namespace_ids->set_old_id(table_data->old_namespace_id); |
1738 | |
|
1739 | 0 | IdPairPB* const table_ids = table_data->table_meta->mutable_table_ids(); |
1740 | 0 | table_ids->set_new_id(table_data->new_table_id); |
1741 | 0 | table_ids->set_old_id(table_data->old_table_id); |
1742 | |
|
1743 | 0 | return Status::OK(); |
1744 | 0 | } |
1745 | | |
1746 | | Status CatalogManager::PreprocessTabletEntry(const SysRowEntry& entry, |
1747 | 0 | ExternalTableSnapshotDataMap* table_map) { |
1748 | 0 | LOG_IF(DFATAL, entry.type() != SysRowEntryType::TABLET) |
1749 | 0 | << "Unexpected entry type: " << entry.type(); |
1750 | |
|
1751 | 0 | SysTabletsEntryPB meta = VERIFY_RESULT(ParseFromSlice<SysTabletsEntryPB>(entry.data())); |
1752 | | |
1753 | 0 | ExternalTableSnapshotData& table_data = (*table_map)[meta.table_id()]; |
1754 | 0 | ++table_data.num_tablets; |
1755 | 0 | if (meta.has_partition()) { |
1756 | 0 | table_data.partitions.push_back(meta.partition()); |
1757 | 0 | } |
1758 | 0 | return Status::OK(); |
1759 | 0 | } |
1760 | | |
1761 | | Status CatalogManager::ImportTabletEntry(const SysRowEntry& entry, |
1762 | 0 | ExternalTableSnapshotDataMap* table_map) { |
1763 | 0 | LOG_IF(DFATAL, entry.type() != SysRowEntryType::TABLET) |
1764 | 0 | << "Unexpected entry type: " << entry.type(); |
1765 | |
|
1766 | 0 | SysTabletsEntryPB meta = VERIFY_RESULT(ParseFromSlice<SysTabletsEntryPB>(entry.data())); |
1767 | | |
1768 | 0 | LOG_IF(DFATAL, table_map->find(meta.table_id()) == table_map->end()) |
1769 | 0 | << "Table not found: " << meta.table_id(); |
1770 | 0 | ExternalTableSnapshotData& table_data = (*table_map)[meta.table_id()]; |
1771 | |
|
1772 | 0 | if (meta.colocated() && table_data.tablet_id_map->size() >= 1) { |
1773 | 0 | LOG_WITH_FUNC(INFO) << "Already processed this colocated tablet: " << entry.id(); |
1774 | 0 | return Status::OK(); |
1775 | 0 | } |
1776 | | |
1777 | | // Update tablets IDs map. |
1778 | 0 | if (table_data.new_table_id == table_data.old_table_id) { |
1779 | 0 | TRACE("Looking up tablet"); |
1780 | 0 | SharedLock lock(mutex_); |
1781 | 0 | scoped_refptr<TabletInfo> tablet = FindPtrOrNull(*tablet_map_, entry.id()); |
1782 | |
|
1783 | 0 | if (tablet != nullptr) { |
1784 | 0 | IdPairPB* const pair = table_data.tablet_id_map->Add(); |
1785 | 0 | pair->set_old_id(entry.id()); |
1786 | 0 | pair->set_new_id(entry.id()); |
1787 | 0 | return Status::OK(); |
1788 | 0 | } |
1789 | 0 | } |
1790 | | |
1791 | 0 | const PartitionPB& partition_pb = meta.partition(); |
1792 | 0 | const ExternalTableSnapshotData::PartitionKeys key( |
1793 | 0 | partition_pb.partition_key_start(), partition_pb.partition_key_end()); |
1794 | 0 | const ExternalTableSnapshotData::PartitionToIdMap::const_iterator it = |
1795 | 0 | table_data.new_tablets_map.find(key); |
1796 | |
|
1797 | 0 | if (it == table_data.new_tablets_map.end()) { |
1798 | 0 | const string msg = Format( |
1799 | 0 | "For new table $0 (old table $1, expecting $2 tablets) not found new tablet with " |
1800 | 0 | "expected [$3]", table_data.new_table_id, table_data.old_table_id, |
1801 | 0 | table_data.num_tablets, partition_pb); |
1802 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1803 | 0 | return STATUS(NotFound, msg, MasterError(MasterErrorPB::INTERNAL_ERROR)); |
1804 | 0 | } |
1805 | | |
1806 | 0 | IdPairPB* const pair = table_data.tablet_id_map->Add(); |
1807 | 0 | pair->set_old_id(entry.id()); |
1808 | 0 | pair->set_new_id(it->second); |
1809 | 0 | return Status::OK(); |
1810 | 0 | } |
1811 | | |
1812 | 284 | const Schema& CatalogManager::schema() { |
1813 | 284 | return sys_catalog()->schema(); |
1814 | 284 | } |
1815 | | |
1816 | 17 | TabletInfos CatalogManager::GetTabletInfos(const std::vector<TabletId>& ids) { |
1817 | 17 | TabletInfos result; |
1818 | 17 | result.reserve(ids.size()); |
1819 | 17 | SharedLock lock(mutex_); |
1820 | 45 | for (const auto& id : ids) { |
1821 | 45 | auto it = tablet_map_->find(id); |
1822 | 45 | result.push_back(it != tablet_map_->end() ? it->second : nullptr0 ); |
1823 | 45 | } |
1824 | 17 | return result; |
1825 | 17 | } |
1826 | | |
1827 | | AsyncTabletSnapshotOpPtr CatalogManager::CreateAsyncTabletSnapshotOp( |
1828 | | const TabletInfoPtr& tablet, const std::string& snapshot_id, |
1829 | | tserver::TabletSnapshotOpRequestPB::Operation operation, |
1830 | 45 | TabletSnapshotOperationCallback callback) { |
1831 | 45 | auto result = std::make_shared<AsyncTabletSnapshotOp>( |
1832 | 45 | master_, AsyncTaskPool(), tablet, snapshot_id, operation); |
1833 | 45 | result->SetCallback(std::move(callback)); |
1834 | 45 | tablet->table()->AddTask(result); |
1835 | 45 | return result; |
1836 | 45 | } |
1837 | | |
1838 | 45 | void CatalogManager::ScheduleTabletSnapshotOp(const AsyncTabletSnapshotOpPtr& task) { |
1839 | 45 | WARN_NOT_OK(ScheduleTask(task), "Failed to send create snapshot request"); |
1840 | 45 | } |
1841 | | |
1842 | | Status CatalogManager::RestoreSysCatalog( |
1843 | 9 | SnapshotScheduleRestoration* restoration, tablet::Tablet* tablet, Status* complete_status) { |
1844 | 9 | VLOG_WITH_PREFIX_AND_FUNC0 (1) << restoration->restoration_id0 ; |
1845 | | // Restore master snapshot and load it to RocksDB. |
1846 | 9 | auto dir = VERIFY_RESULT(tablet->snapshots().RestoreToTemporary( |
1847 | 9 | restoration->snapshot_id, restoration->restore_at)); |
1848 | 0 | rocksdb::Options rocksdb_options; |
1849 | 9 | std::string log_prefix = LogPrefix(); |
1850 | | // Remove ": " to patch suffix. |
1851 | 9 | log_prefix.erase(log_prefix.size() - 2); |
1852 | 9 | tablet->InitRocksDBOptions(&rocksdb_options, log_prefix + " [TMP]: "); |
1853 | 9 | auto db = VERIFY_RESULT(rocksdb::DB::Open(rocksdb_options, dir)); |
1854 | | |
1855 | 0 | auto doc_db = docdb::DocDB::FromRegularUnbounded(db.get()); |
1856 | | |
1857 | | // Load objects to restore and determine obsolete objects. |
1858 | 9 | RestoreSysCatalogState state(restoration); |
1859 | 9 | RETURN_NOT_OK(state.LoadRestoringObjects(schema(), doc_db)); |
1860 | | // Load existing objects from RocksDB because on followers they are NOT present in loaded sys |
1861 | | // catalog state. |
1862 | 9 | RETURN_NOT_OK(state.LoadExistingObjects(schema(), tablet->doc_db())); |
1863 | 9 | RETURN_NOT_OK(state.Process()); |
1864 | | |
1865 | 9 | docdb::DocWriteBatch write_batch( |
1866 | 9 | tablet->doc_db(), docdb::InitMarkerBehavior::kOptional); |
1867 | | |
1868 | 9 | if (FLAGS_enable_ysql) { |
1869 | | // Restore the pg_catalog tables. |
1870 | 6 | const auto* meta = tablet->metadata(); |
1871 | 6 | const auto& pg_yb_catalog_version_schema = |
1872 | 6 | *VERIFY_RESULT(meta->GetTableInfo(kPgYbCatalogVersionTableId))->schema; |
1873 | 6 | RETURN_NOT_OK(state.ProcessPgCatalogRestores( |
1874 | 6 | pg_yb_catalog_version_schema, doc_db, tablet->doc_db(), &write_batch)); |
1875 | 6 | } |
1876 | | |
1877 | | // Restore the other tables. |
1878 | 9 | RETURN_NOT_OK(state.PrepareWriteBatch(schema(), &write_batch)); |
1879 | | |
1880 | | // Apply write batch to RocksDB. |
1881 | 9 | state.WriteToRocksDB(&write_batch, restoration->write_time, restoration->op_id, tablet); |
1882 | | |
1883 | | // TODO(pitr) Handle master leader failover. |
1884 | 9 | RETURN_NOT_OK(ElectedAsLeaderCb()); |
1885 | | |
1886 | 9 | return Status::OK(); |
1887 | 9 | } |
1888 | | |
1889 | 3 | Status CatalogManager::VerifyRestoredObjects(const SnapshotScheduleRestoration& restoration) { |
1890 | 3 | auto entries = VERIFY_RESULT(CollectEntriesForSnapshot( |
1891 | 3 | restoration.schedules[0].second.tables().tables())); |
1892 | 0 | auto objects_to_restore = restoration.non_system_objects_to_restore; |
1893 | 3 | VLOG_WITH_PREFIX0 (1) << "Objects to restore: " << AsString(objects_to_restore)0 ; |
1894 | 17 | for (const auto& entry : entries.entries()) { |
1895 | 17 | VLOG_WITH_PREFIX0 (1) |
1896 | 0 | << "Alive " << SysRowEntryType_Name(entry.type()) << ": " << entry.id(); |
1897 | 17 | auto it = objects_to_restore.find(entry.id()); |
1898 | 17 | if (it == objects_to_restore.end()) { |
1899 | 0 | return STATUS_FORMAT(IllegalState, "Object $0/$1 present, but should not be restored", |
1900 | 0 | SysRowEntryType_Name(entry.type()), entry.id()); |
1901 | 0 | } |
1902 | 17 | if (it->second != entry.type()) { |
1903 | 0 | return STATUS_FORMAT( |
1904 | 0 | IllegalState, "Restored object $0 has wrong type $1, while $2 expected", |
1905 | 0 | entry.id(), SysRowEntryType_Name(entry.type()), SysRowEntryType_Name(it->second)); |
1906 | 0 | } |
1907 | 17 | objects_to_restore.erase(it); |
1908 | 17 | } |
1909 | 3 | for (const auto& id_and_type : objects_to_restore) { |
1910 | 0 | return STATUS_FORMAT( |
1911 | 0 | IllegalState, "Expected to restore $0/$1, but it is not present after restoration", |
1912 | 0 | SysRowEntryType_Name(id_and_type.second), id_and_type.first); |
1913 | 0 | } |
1914 | 3 | return Status::OK(); |
1915 | 3 | } |
1916 | | |
1917 | 310k | void CatalogManager::CleanupHiddenObjects(const ScheduleMinRestoreTime& schedule_min_restore_time) { |
1918 | 310k | VLOG_WITH_PREFIX_AND_FUNC1 (4) << AsString(schedule_min_restore_time)1 ; |
1919 | | |
1920 | 310k | std::vector<TabletInfoPtr> hidden_tablets; |
1921 | 310k | std::vector<TableInfoPtr> tables; |
1922 | 310k | { |
1923 | 310k | SharedLock lock(mutex_); |
1924 | 310k | hidden_tablets = hidden_tablets_; |
1925 | 310k | tables.reserve(table_ids_map_->size()); |
1926 | 17.1M | for (const auto& p : *table_ids_map_) { |
1927 | 17.1M | if (!p.second->is_system()) { |
1928 | 291k | tables.push_back(p.second); |
1929 | 291k | } |
1930 | 17.1M | } |
1931 | 310k | } |
1932 | 310k | CleanupHiddenTablets(hidden_tablets, schedule_min_restore_time); |
1933 | 310k | CleanupHiddenTables(std::move(tables)); |
1934 | 310k | } |
1935 | | |
1936 | | void CatalogManager::CleanupHiddenTablets( |
1937 | | const std::vector<TabletInfoPtr>& hidden_tablets, |
1938 | 310k | const ScheduleMinRestoreTime& schedule_min_restore_time) { |
1939 | 310k | if (hidden_tablets.empty()) { |
1940 | 310k | return; |
1941 | 310k | } |
1942 | 54 | std::vector<TabletInfoPtr> tablets_to_delete; |
1943 | 54 | std::vector<TabletInfoPtr> tablets_to_remove_from_hidden; |
1944 | | |
1945 | 162 | for (const auto& tablet : hidden_tablets) { |
1946 | 162 | auto lock = tablet->LockForRead(); |
1947 | 162 | if (!lock->ListedAsHidden()) { |
1948 | 0 | tablets_to_remove_from_hidden.push_back(tablet); |
1949 | 0 | continue; |
1950 | 0 | } |
1951 | 162 | auto hide_hybrid_time = HybridTime::FromPB(lock->pb.hide_hybrid_time()); |
1952 | 162 | bool cleanup = true; |
1953 | 162 | for (const auto& schedule_id_str : lock->pb.retained_by_snapshot_schedules()) { |
1954 | 162 | auto schedule_id = TryFullyDecodeSnapshotScheduleId(schedule_id_str); |
1955 | 162 | auto it = schedule_min_restore_time.find(schedule_id); |
1956 | | // If schedule is not present in schedule_min_restore_time then it means that schedule |
1957 | | // was deleted, so it should not retain the tablet. |
1958 | 162 | if (it != schedule_min_restore_time.end() && it->second <= hide_hybrid_time) { |
1959 | 162 | VLOG_WITH_PREFIX0 (1) |
1960 | 0 | << "Retaining tablet: " << tablet->tablet_id() << ", hide hybrid time: " |
1961 | 0 | << hide_hybrid_time << ", because of schedule: " << schedule_id |
1962 | 0 | << ", min restore time: " << it->second; |
1963 | 162 | cleanup = false; |
1964 | 162 | break; |
1965 | 162 | } |
1966 | 162 | } |
1967 | 162 | if (cleanup) { |
1968 | 0 | SharedLock read_lock(mutex_); |
1969 | 0 | if (IsTableCdcProducer(*tablet->table())) { |
1970 | | // We also need to check if this tablet is being kept for xcluster replication. |
1971 | 0 | auto l = tablet->table()->LockForRead(); |
1972 | 0 | cleanup = hide_hybrid_time.AddSeconds(l->pb.wal_retention_secs()) < master_->clock()->Now(); |
1973 | 0 | } |
1974 | 0 | } |
1975 | 162 | if (cleanup) { |
1976 | 0 | tablets_to_delete.push_back(tablet); |
1977 | 0 | } |
1978 | 162 | } |
1979 | 54 | if (!tablets_to_delete.empty()) { |
1980 | 0 | LOG_WITH_PREFIX(INFO) << "Cleanup hidden tablets: " << AsString(tablets_to_delete); |
1981 | 0 | WARN_NOT_OK(DeleteTabletListAndSendRequests(tablets_to_delete, "Cleanup hidden tablets", {}), |
1982 | 0 | "Failed to cleanup hidden tablets"); |
1983 | 0 | } |
1984 | | |
1985 | 54 | if (!tablets_to_remove_from_hidden.empty()) { |
1986 | 0 | auto it = tablets_to_remove_from_hidden.begin(); |
1987 | 0 | LockGuard lock(mutex_); |
1988 | | // Order of tablets in tablets_to_remove_from_hidden matches order in hidden_tablets_, |
1989 | | // so we could avoid searching in tablets_to_remove_from_hidden. |
1990 | 0 | auto filter = [&it, end = tablets_to_remove_from_hidden.end()](const TabletInfoPtr& tablet) { |
1991 | 0 | if (it != end && tablet.get() == it->get()) { |
1992 | 0 | ++it; |
1993 | 0 | return true; |
1994 | 0 | } |
1995 | 0 | return false; |
1996 | 0 | }; |
1997 | 0 | hidden_tablets_.erase(std::remove_if(hidden_tablets_.begin(), hidden_tablets_.end(), filter), |
1998 | 0 | hidden_tablets_.end()); |
1999 | 0 | } |
2000 | 54 | } |
2001 | | |
2002 | 310k | void CatalogManager::CleanupHiddenTables(std::vector<TableInfoPtr> tables) { |
2003 | 310k | std::vector<TableInfo::WriteLock> locks; |
2004 | 310k | EraseIf([this, &locks](const TableInfoPtr& table) { |
2005 | 291k | { |
2006 | 291k | auto lock = table->LockForRead(); |
2007 | 291k | if (!lock->is_hidden() || lock->started_deleting()53 || !table->AreAllTabletsDeleted()53 ) { |
2008 | 291k | return true; |
2009 | 291k | } |
2010 | 291k | } |
2011 | 0 | auto lock = table->LockForWrite(); |
2012 | 0 | if (lock->started_deleting()) { |
2013 | 0 | return true; |
2014 | 0 | } |
2015 | 0 | LOG_WITH_PREFIX(INFO) << "Should delete table: " << AsString(table); |
2016 | 0 | lock.mutable_data()->set_state( |
2017 | 0 | SysTablesEntryPB::DELETED, Format("Cleanup hidden table at $0", LocalTimeAsString())); |
2018 | 0 | locks.push_back(std::move(lock)); |
2019 | 0 | return false; |
2020 | 0 | }, &tables); |
2021 | 310k | if (tables.empty()) { |
2022 | 310k | return; |
2023 | 310k | } |
2024 | | |
2025 | 0 | Status s = sys_catalog_->Upsert(leader_ready_term(), tables); |
2026 | 0 | if (!s.ok()) { |
2027 | 0 | LOG_WITH_PREFIX(WARNING) << "Failed to mark tables as deleted: " << s; |
2028 | 0 | return; |
2029 | 0 | } |
2030 | 0 | for (auto& lock : locks) { |
2031 | 0 | lock.Commit(); |
2032 | 0 | } |
2033 | 0 | } |
2034 | | |
2035 | 7.94k | rpc::Scheduler& CatalogManager::Scheduler() { |
2036 | 7.94k | return master_->messenger()->scheduler(); |
2037 | 7.94k | } |
2038 | | |
2039 | 695k | int64_t CatalogManager::LeaderTerm() { |
2040 | 695k | auto peer = tablet_peer(); |
2041 | 695k | if (!peer) { |
2042 | 159 | return false; |
2043 | 159 | } |
2044 | 694k | auto consensus = peer->shared_consensus(); |
2045 | 694k | if (!consensus) { |
2046 | 2 | return false; |
2047 | 2 | } |
2048 | 694k | return consensus->GetLeaderState(/* allow_stale= */ true).term; |
2049 | 694k | } |
2050 | | |
2051 | 25 | void CatalogManager::HandleCreateTabletSnapshotResponse(TabletInfo *tablet, bool error) { |
2052 | 25 | LOG(INFO) << "Handling Create Tablet Snapshot Response for tablet " |
2053 | 25 | << DCHECK_NOTNULL(tablet)->ToString() << (error ? " ERROR"0 : " OK"); |
2054 | | |
2055 | | // Get the snapshot data descriptor from the catalog manager. |
2056 | 25 | scoped_refptr<SnapshotInfo> snapshot; |
2057 | 25 | { |
2058 | 25 | LockGuard lock(mutex_); |
2059 | 25 | TRACE("Acquired catalog manager lock"); |
2060 | | |
2061 | 25 | if (current_snapshot_id_.empty()) { |
2062 | 25 | LOG(WARNING) << "No active snapshot: " << current_snapshot_id_; |
2063 | 25 | return; |
2064 | 25 | } |
2065 | | |
2066 | 0 | snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, current_snapshot_id_); |
2067 | |
|
2068 | 0 | if (!snapshot) { |
2069 | 0 | LOG(WARNING) << "Snapshot not found: " << current_snapshot_id_; |
2070 | 0 | return; |
2071 | 0 | } |
2072 | 0 | } |
2073 | | |
2074 | 0 | if (!snapshot->IsCreateInProgress()) { |
2075 | 0 | LOG(WARNING) << "Snapshot is not in creating state: " << snapshot->id(); |
2076 | 0 | return; |
2077 | 0 | } |
2078 | | |
2079 | 0 | auto tablet_l = tablet->LockForRead(); |
2080 | 0 | auto l = snapshot->LockForWrite(); |
2081 | 0 | RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots = |
2082 | 0 | l.mutable_data()->pb.mutable_tablet_snapshots(); |
2083 | 0 | int num_tablets_complete = 0; |
2084 | |
|
2085 | 0 | for (int i = 0; i < tablet_snapshots->size(); ++i) { |
2086 | 0 | SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i); |
2087 | |
|
2088 | 0 | if (tablet_info->id() == tablet->id()) { |
2089 | 0 | tablet_info->set_state(error ? SysSnapshotEntryPB::FAILED : SysSnapshotEntryPB::COMPLETE); |
2090 | 0 | } |
2091 | |
|
2092 | 0 | if (tablet_info->state() == SysSnapshotEntryPB::COMPLETE) { |
2093 | 0 | ++num_tablets_complete; |
2094 | 0 | } |
2095 | 0 | } |
2096 | | |
2097 | | // Finish the snapshot. |
2098 | 0 | bool finished = true; |
2099 | 0 | if (error) { |
2100 | 0 | l.mutable_data()->pb.set_state(SysSnapshotEntryPB::FAILED); |
2101 | 0 | LOG(WARNING) << "Failed snapshot " << snapshot->id() << " on tablet " << tablet->id(); |
2102 | 0 | } else if (num_tablets_complete == tablet_snapshots->size()) { |
2103 | 0 | l.mutable_data()->pb.set_state(SysSnapshotEntryPB::COMPLETE); |
2104 | 0 | LOG(INFO) << "Completed snapshot " << snapshot->id(); |
2105 | 0 | } else { |
2106 | 0 | finished = false; |
2107 | 0 | } |
2108 | |
|
2109 | 0 | if (finished) { |
2110 | 0 | LockGuard lock(mutex_); |
2111 | 0 | TRACE("Acquired catalog manager lock"); |
2112 | 0 | current_snapshot_id_ = ""; |
2113 | 0 | } |
2114 | |
|
2115 | 0 | VLOG(1) << "Snapshot: " << snapshot->id() |
2116 | 0 | << " PB: " << l.mutable_data()->pb.DebugString() |
2117 | 0 | << " Complete " << num_tablets_complete << " tablets from " << tablet_snapshots->size(); |
2118 | |
|
2119 | 0 | const Status s = sys_catalog_->Upsert(leader_ready_term(), snapshot); |
2120 | |
|
2121 | 0 | l.CommitOrWarn(s, "updating snapshot in sys-catalog"); |
2122 | 0 | } |
2123 | | |
2124 | 10 | void CatalogManager::HandleRestoreTabletSnapshotResponse(TabletInfo *tablet, bool error) { |
2125 | 10 | LOG(INFO) << "Handling Restore Tablet Snapshot Response for tablet " |
2126 | 10 | << DCHECK_NOTNULL(tablet)->ToString() << (error ? " ERROR"0 : " OK"); |
2127 | | |
2128 | | // Get the snapshot data descriptor from the catalog manager. |
2129 | 10 | scoped_refptr<SnapshotInfo> snapshot; |
2130 | 10 | { |
2131 | 10 | LockGuard lock(mutex_); |
2132 | 10 | TRACE("Acquired catalog manager lock"); |
2133 | | |
2134 | 10 | if (current_snapshot_id_.empty()) { |
2135 | 10 | LOG(WARNING) << "No restoring snapshot: " << current_snapshot_id_; |
2136 | 10 | return; |
2137 | 10 | } |
2138 | | |
2139 | 0 | snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, current_snapshot_id_); |
2140 | |
|
2141 | 0 | if (!snapshot) { |
2142 | 0 | LOG(WARNING) << "Restoring snapshot not found: " << current_snapshot_id_; |
2143 | 0 | return; |
2144 | 0 | } |
2145 | 0 | } |
2146 | | |
2147 | 0 | if (!snapshot->IsRestoreInProgress()) { |
2148 | 0 | LOG(WARNING) << "Snapshot is not in restoring state: " << snapshot->id(); |
2149 | 0 | return; |
2150 | 0 | } |
2151 | | |
2152 | 0 | auto tablet_l = tablet->LockForRead(); |
2153 | 0 | auto l = snapshot->LockForWrite(); |
2154 | 0 | RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots = |
2155 | 0 | l.mutable_data()->pb.mutable_tablet_snapshots(); |
2156 | 0 | int num_tablets_complete = 0; |
2157 | |
|
2158 | 0 | for (int i = 0; i < tablet_snapshots->size(); ++i) { |
2159 | 0 | SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i); |
2160 | |
|
2161 | 0 | if (tablet_info->id() == tablet->id()) { |
2162 | 0 | tablet_info->set_state(error ? SysSnapshotEntryPB::FAILED : SysSnapshotEntryPB::COMPLETE); |
2163 | 0 | } |
2164 | |
|
2165 | 0 | if (tablet_info->state() == SysSnapshotEntryPB::COMPLETE) { |
2166 | 0 | ++num_tablets_complete; |
2167 | 0 | } |
2168 | 0 | } |
2169 | | |
2170 | | // Finish the snapshot. |
2171 | 0 | if (error || num_tablets_complete == tablet_snapshots->size()) { |
2172 | 0 | if (error) { |
2173 | 0 | l.mutable_data()->pb.set_state(SysSnapshotEntryPB::FAILED); |
2174 | 0 | LOG(WARNING) << "Failed restoring snapshot " << snapshot->id() |
2175 | 0 | << " on tablet " << tablet->id(); |
2176 | 0 | } else { |
2177 | 0 | LOG_IF(DFATAL, num_tablets_complete != tablet_snapshots->size()) |
2178 | 0 | << "Wrong number of tablets"; |
2179 | 0 | l.mutable_data()->pb.set_state(SysSnapshotEntryPB::COMPLETE); |
2180 | 0 | LOG(INFO) << "Restored snapshot " << snapshot->id(); |
2181 | 0 | } |
2182 | |
|
2183 | 0 | LockGuard lock(mutex_); |
2184 | 0 | TRACE("Acquired catalog manager lock"); |
2185 | 0 | current_snapshot_id_ = ""; |
2186 | 0 | } |
2187 | |
|
2188 | 0 | VLOG(1) << "Snapshot: " << snapshot->id() |
2189 | 0 | << " PB: " << l.mutable_data()->pb.DebugString() |
2190 | 0 | << " Complete " << num_tablets_complete << " tablets from " << tablet_snapshots->size(); |
2191 | |
|
2192 | 0 | const Status s = sys_catalog_->Upsert(leader_ready_term(), snapshot); |
2193 | |
|
2194 | 0 | l.CommitOrWarn(s, "updating snapshot in sys-catalog"); |
2195 | 0 | } |
2196 | | |
2197 | | void CatalogManager::HandleDeleteTabletSnapshotResponse( |
2198 | 0 | const SnapshotId& snapshot_id, TabletInfo *tablet, bool error) { |
2199 | 0 | LOG(INFO) << "Handling Delete Tablet Snapshot Response for tablet " |
2200 | 0 | << DCHECK_NOTNULL(tablet)->ToString() << (error ? " ERROR" : " OK"); |
2201 | | |
2202 | | // Get the snapshot data descriptor from the catalog manager. |
2203 | 0 | scoped_refptr<SnapshotInfo> snapshot; |
2204 | 0 | { |
2205 | 0 | LockGuard lock(mutex_); |
2206 | 0 | TRACE("Acquired catalog manager lock"); |
2207 | |
|
2208 | 0 | snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, snapshot_id); |
2209 | |
|
2210 | 0 | if (!snapshot) { |
2211 | 0 | LOG(WARNING) << __func__ << " Snapshot not found: " << snapshot_id; |
2212 | 0 | return; |
2213 | 0 | } |
2214 | 0 | } |
2215 | | |
2216 | 0 | if (!snapshot->IsDeleteInProgress()) { |
2217 | 0 | LOG(WARNING) << "Snapshot is not in deleting state: " << snapshot->id(); |
2218 | 0 | return; |
2219 | 0 | } |
2220 | | |
2221 | 0 | auto tablet_l = tablet->LockForRead(); |
2222 | 0 | auto l = snapshot->LockForWrite(); |
2223 | 0 | RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots = |
2224 | 0 | l.mutable_data()->pb.mutable_tablet_snapshots(); |
2225 | 0 | int num_tablets_complete = 0; |
2226 | |
|
2227 | 0 | for (int i = 0; i < tablet_snapshots->size(); ++i) { |
2228 | 0 | SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i); |
2229 | |
|
2230 | 0 | if (tablet_info->id() == tablet->id()) { |
2231 | 0 | tablet_info->set_state(error ? SysSnapshotEntryPB::FAILED : SysSnapshotEntryPB::DELETED); |
2232 | 0 | } |
2233 | |
|
2234 | 0 | if (tablet_info->state() != SysSnapshotEntryPB::DELETING) { |
2235 | 0 | ++num_tablets_complete; |
2236 | 0 | } |
2237 | 0 | } |
2238 | |
|
2239 | 0 | if (num_tablets_complete == tablet_snapshots->size()) { |
2240 | | // Delete the snapshot. |
2241 | 0 | l.mutable_data()->pb.set_state(SysSnapshotEntryPB::DELETED); |
2242 | 0 | LOG(INFO) << "Deleted snapshot " << snapshot->id(); |
2243 | |
|
2244 | 0 | const Status s = sys_catalog_->Delete(leader_ready_term(), snapshot); |
2245 | |
|
2246 | 0 | LockGuard lock(mutex_); |
2247 | 0 | TRACE("Acquired catalog manager lock"); |
2248 | |
|
2249 | 0 | if (current_snapshot_id_ == snapshot_id) { |
2250 | 0 | current_snapshot_id_ = ""; |
2251 | 0 | } |
2252 | | |
2253 | | // Remove it from the maps. |
2254 | 0 | TRACE("Removing from maps"); |
2255 | 0 | if (non_txn_snapshot_ids_map_.erase(snapshot_id) < 1) { |
2256 | 0 | LOG(WARNING) << "Could not remove snapshot " << snapshot_id << " from map"; |
2257 | 0 | } |
2258 | |
|
2259 | 0 | l.CommitOrWarn(s, "deleting snapshot from sys-catalog"); |
2260 | 0 | } else if (error) { |
2261 | 0 | l.mutable_data()->pb.set_state(SysSnapshotEntryPB::FAILED); |
2262 | 0 | LOG(WARNING) << "Failed snapshot " << snapshot->id() << " deletion on tablet " << tablet->id(); |
2263 | |
|
2264 | 0 | const Status s = sys_catalog_->Upsert(leader_ready_term(), snapshot); |
2265 | 0 | l.CommitOrWarn(s, "updating snapshot in sys-catalog"); |
2266 | 0 | } |
2267 | |
|
2268 | 0 | VLOG(1) << "Deleting snapshot: " << snapshot->id() |
2269 | 0 | << " PB: " << l.mutable_data()->pb.DebugString() |
2270 | 0 | << " Complete " << num_tablets_complete << " tablets from " << tablet_snapshots->size(); |
2271 | 0 | } |
2272 | | |
2273 | | Status CatalogManager::CreateSnapshotSchedule(const CreateSnapshotScheduleRequestPB* req, |
2274 | | CreateSnapshotScheduleResponsePB* resp, |
2275 | 8 | rpc::RpcContext* rpc) { |
2276 | 8 | auto id = VERIFY_RESULT(snapshot_coordinator_.CreateSchedule( |
2277 | 8 | *req, leader_ready_term(), rpc->GetClientDeadline())); |
2278 | 0 | resp->set_snapshot_schedule_id(id.data(), id.size()); |
2279 | 8 | return Status::OK(); |
2280 | 8 | } |
2281 | | |
2282 | | Status CatalogManager::ListSnapshotSchedules(const ListSnapshotSchedulesRequestPB* req, |
2283 | | ListSnapshotSchedulesResponsePB* resp, |
2284 | 11 | rpc::RpcContext* rpc) { |
2285 | 11 | auto snapshot_schedule_id = TryFullyDecodeSnapshotScheduleId(req->snapshot_schedule_id()); |
2286 | | |
2287 | 11 | return snapshot_coordinator_.ListSnapshotSchedules(snapshot_schedule_id, resp); |
2288 | 11 | } |
2289 | | |
2290 | | Status CatalogManager::DeleteSnapshotSchedule(const DeleteSnapshotScheduleRequestPB* req, |
2291 | | DeleteSnapshotScheduleResponsePB* resp, |
2292 | 0 | rpc::RpcContext* rpc) { |
2293 | 0 | auto snapshot_schedule_id = TryFullyDecodeSnapshotScheduleId(req->snapshot_schedule_id()); |
2294 | |
|
2295 | 0 | return snapshot_coordinator_.DeleteSnapshotSchedule( |
2296 | 0 | snapshot_schedule_id, leader_ready_term(), rpc->GetClientDeadline()); |
2297 | 0 | } |
2298 | | |
2299 | 0 | void CatalogManager::DumpState(std::ostream* out, bool on_disk_dump) const { |
2300 | 0 | super::DumpState(out, on_disk_dump); |
2301 | | |
2302 | | // TODO: dump snapshots |
2303 | 0 | } |
2304 | | |
2305 | | template <typename Registry, typename Mutex> |
2306 | | bool ShouldResendRegistry( |
2307 | 144k | const std::string& ts_uuid, bool has_registration, Registry* registry, Mutex* mutex) { |
2308 | 144k | bool should_resend_registry; |
2309 | 144k | { |
2310 | 144k | std::lock_guard<Mutex> lock(*mutex); |
2311 | 144k | auto it = registry->find(ts_uuid); |
2312 | 144k | should_resend_registry = (it == registry->end() || it->second144k || has_registration144k ); |
2313 | 144k | if (it == registry->end()) { |
2314 | 40 | registry->emplace(ts_uuid, false); |
2315 | 144k | } else { |
2316 | 144k | it->second = false; |
2317 | 144k | } |
2318 | 144k | } |
2319 | 144k | return should_resend_registry; |
2320 | 144k | } |
2321 | | |
2322 | | Status CatalogManager::FillHeartbeatResponse(const TSHeartbeatRequestPB* req, |
2323 | 4.81M | TSHeartbeatResponsePB* resp) { |
2324 | 4.81M | SysClusterConfigEntryPB cluster_config; |
2325 | 4.81M | RETURN_NOT_OK(GetClusterConfig(&cluster_config)); |
2326 | 4.81M | RETURN_NOT_OK(FillHeartbeatResponseEncryption(cluster_config, req, resp)); |
2327 | 4.81M | RETURN_NOT_OK(snapshot_coordinator_.FillHeartbeatResponse(resp)); |
2328 | 4.81M | return FillHeartbeatResponseCDC(cluster_config, req, resp); |
2329 | 4.81M | } |
2330 | | |
2331 | | Status CatalogManager::FillHeartbeatResponseCDC(const SysClusterConfigEntryPB& cluster_config, |
2332 | | const TSHeartbeatRequestPB* req, |
2333 | 4.81M | TSHeartbeatResponsePB* resp) { |
2334 | 4.81M | resp->set_cluster_config_version(cluster_config.version()); |
2335 | 4.81M | if (!cluster_config.has_consumer_registry() || |
2336 | 4.81M | req->cluster_config_version() >= cluster_config.version()0 ) { |
2337 | 4.81M | return Status::OK(); |
2338 | 4.81M | } |
2339 | 70 | *resp->mutable_consumer_registry() = cluster_config.consumer_registry(); |
2340 | 70 | return Status::OK(); |
2341 | 4.81M | } |
2342 | | |
2343 | | Status CatalogManager::FillHeartbeatResponseEncryption( |
2344 | | const SysClusterConfigEntryPB& cluster_config, |
2345 | | const TSHeartbeatRequestPB* req, |
2346 | 4.81M | TSHeartbeatResponsePB* resp) { |
2347 | 4.81M | const auto& ts_uuid = req->common().ts_instance().permanent_uuid(); |
2348 | 4.81M | if (!cluster_config.has_encryption_info() || |
2349 | 4.81M | !ShouldResendRegistry(ts_uuid, req->has_registration(), &should_send_universe_key_registry_, |
2350 | 4.81M | &should_send_universe_key_registry_mutex_)) { |
2351 | 4.81M | return Status::OK(); |
2352 | 4.81M | } |
2353 | | |
2354 | 519 | const auto& encryption_info = cluster_config.encryption_info(); |
2355 | 519 | RETURN_NOT_OK(encryption_manager_->FillHeartbeatResponseEncryption(encryption_info, resp)); |
2356 | | |
2357 | 519 | return Status::OK(); |
2358 | 519 | } |
2359 | | |
2360 | | void CatalogManager::SetTabletSnapshotsState(SysSnapshotEntryPB::State state, |
2361 | 0 | SysSnapshotEntryPB* snapshot_pb) { |
2362 | 0 | RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots = |
2363 | 0 | snapshot_pb->mutable_tablet_snapshots(); |
2364 | |
|
2365 | 0 | for (int i = 0; i < tablet_snapshots->size(); ++i) { |
2366 | 0 | SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i); |
2367 | 0 | tablet_info->set_state(state); |
2368 | 0 | } |
2369 | 0 | } |
2370 | | |
2371 | 310 | Status CatalogManager::CreateCdcStateTableIfNeeded(rpc::RpcContext *rpc) { |
2372 | | // If CDC state table exists do nothing, otherwise create it. |
2373 | 310 | if (VERIFY_RESULT(TableExists(kSystemNamespaceName, kCdcStateTableName))) { |
2374 | 208 | return Status::OK(); |
2375 | 208 | } |
2376 | | // Set up a CreateTable request internally. |
2377 | 102 | CreateTableRequestPB req; |
2378 | 102 | CreateTableResponsePB resp; |
2379 | 102 | req.set_name(kCdcStateTableName); |
2380 | 102 | req.mutable_namespace_()->set_name(kSystemNamespaceName); |
2381 | 102 | req.set_table_type(TableType::YQL_TABLE_TYPE); |
2382 | | |
2383 | 102 | client::YBSchemaBuilder schema_builder; |
2384 | 102 | schema_builder.AddColumn(master::kCdcTabletId)->HashPrimaryKey()->Type(DataType::STRING); |
2385 | 102 | schema_builder.AddColumn(master::kCdcStreamId)->PrimaryKey()->Type(DataType::STRING); |
2386 | 102 | schema_builder.AddColumn(master::kCdcCheckpoint)->Type(DataType::STRING); |
2387 | 102 | schema_builder.AddColumn(master::kCdcData)->Type(QLType::CreateTypeMap( |
2388 | 102 | DataType::STRING, DataType::STRING)); |
2389 | 102 | schema_builder.AddColumn(master::kCdcLastReplicationTime)->Type(DataType::TIMESTAMP); |
2390 | | |
2391 | 102 | client::YBSchema yb_schema; |
2392 | 102 | CHECK_OK(schema_builder.Build(&yb_schema)); |
2393 | | |
2394 | 102 | auto schema = yb::client::internal::GetSchema(yb_schema); |
2395 | 102 | SchemaToPB(schema, req.mutable_schema()); |
2396 | | // Explicitly set the number tablets if the corresponding flag is set, otherwise CreateTable |
2397 | | // will use the same defaults as for regular tables. |
2398 | 102 | if (FLAGS_cdc_state_table_num_tablets > 0) { |
2399 | 4 | req.mutable_schema()->mutable_table_properties()->set_num_tablets( |
2400 | 4 | FLAGS_cdc_state_table_num_tablets); |
2401 | 4 | } |
2402 | | |
2403 | 102 | Status s = CreateTable(&req, &resp, rpc); |
2404 | | // We do not lock here so it is technically possible that the table was already created. |
2405 | | // If so, there is nothing to do so we just ignore the "AlreadyPresent" error. |
2406 | 102 | if (!s.ok() && !s.IsAlreadyPresent()0 ) { |
2407 | 0 | return s; |
2408 | 0 | } |
2409 | 102 | return Status::OK(); |
2410 | 102 | } |
2411 | | |
2412 | 0 | Status CatalogManager::IsCdcStateTableCreated(IsCreateTableDoneResponsePB* resp) { |
2413 | 0 | IsCreateTableDoneRequestPB req; |
2414 | |
|
2415 | 0 | req.mutable_table()->set_table_name(kCdcStateTableName); |
2416 | 0 | req.mutable_table()->mutable_namespace_()->set_name(kSystemNamespaceName); |
2417 | |
|
2418 | 0 | return IsCreateTableDone(&req, resp); |
2419 | 0 | } |
2420 | | |
2421 | | // Helper class to print a vector of CDCStreamInfo pointers. |
2422 | | namespace { |
2423 | | |
2424 | | template<class CDCStreamInfoPointer> |
2425 | 192 | std::string JoinStreamsCSVLine(std::vector<CDCStreamInfoPointer> cdc_streams) { |
2426 | 192 | std::vector<CDCStreamId> cdc_stream_ids; |
2427 | 192 | for (const auto& cdc_stream : cdc_streams) { |
2428 | 192 | cdc_stream_ids.push_back(cdc_stream->id()); |
2429 | 192 | } |
2430 | 192 | return JoinCSVLine(cdc_stream_ids); |
2431 | 192 | } catalog_manager_ent.cc:std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > yb::master::enterprise::(anonymous namespace)::JoinStreamsCSVLine<scoped_refptr<yb::master::CDCStreamInfo> >(std::__1::vector<scoped_refptr<yb::master::CDCStreamInfo>, std::__1::allocator<scoped_refptr<yb::master::CDCStreamInfo> > >) Line | Count | Source | 2425 | 2 | std::string JoinStreamsCSVLine(std::vector<CDCStreamInfoPointer> cdc_streams) { | 2426 | 2 | std::vector<CDCStreamId> cdc_stream_ids; | 2427 | 2 | for (const auto& cdc_stream : cdc_streams) { | 2428 | 2 | cdc_stream_ids.push_back(cdc_stream->id()); | 2429 | 2 | } | 2430 | 2 | return JoinCSVLine(cdc_stream_ids); | 2431 | 2 | } |
catalog_manager_ent.cc:std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > yb::master::enterprise::(anonymous namespace)::JoinStreamsCSVLine<yb::master::CDCStreamInfo*>(std::__1::vector<yb::master::CDCStreamInfo*, std::__1::allocator<yb::master::CDCStreamInfo*> >) Line | Count | Source | 2425 | 190 | std::string JoinStreamsCSVLine(std::vector<CDCStreamInfoPointer> cdc_streams) { | 2426 | 190 | std::vector<CDCStreamId> cdc_stream_ids; | 2427 | 190 | for (const auto& cdc_stream : cdc_streams) { | 2428 | 190 | cdc_stream_ids.push_back(cdc_stream->id()); | 2429 | 190 | } | 2430 | 190 | return JoinCSVLine(cdc_stream_ids); | 2431 | 190 | } |
|
2432 | | |
2433 | | } // namespace |
2434 | | |
2435 | 5.58k | Status CatalogManager::DeleteCDCStreamsForTable(const TableId& table_id) { |
2436 | 5.58k | return DeleteCDCStreamsForTables({table_id}); |
2437 | 5.58k | } |
2438 | | |
2439 | 5.67k | Status CatalogManager::DeleteCDCStreamsForTables(const vector<TableId>& table_ids) { |
2440 | 5.67k | std::ostringstream tid_stream; |
2441 | 13.6k | for (const auto& tid : table_ids) { |
2442 | 13.6k | tid_stream << " " << tid; |
2443 | 13.6k | } |
2444 | 5.67k | LOG(INFO) << "Deleting CDC streams for tables:" << tid_stream.str(); |
2445 | | |
2446 | 5.67k | std::vector<scoped_refptr<CDCStreamInfo>> streams; |
2447 | 13.6k | for (const auto& tid : table_ids) { |
2448 | 13.6k | auto newstreams = FindCDCStreamsForTable(tid); |
2449 | 13.6k | streams.insert(streams.end(), newstreams.begin(), newstreams.end()); |
2450 | 13.6k | } |
2451 | | |
2452 | 5.67k | if (streams.empty()) { |
2453 | 5.61k | return Status::OK(); |
2454 | 5.61k | } |
2455 | | |
2456 | | // Do not delete them here, just mark them as DELETING and the catalog manager background thread |
2457 | | // will handle the deletion. |
2458 | 63 | return MarkCDCStreamsAsDeleting(streams); |
2459 | 5.67k | } |
2460 | | |
2461 | | std::vector<scoped_refptr<CDCStreamInfo>> CatalogManager::FindCDCStreamsForTable( |
2462 | 13.8k | const TableId& table_id) const { |
2463 | 13.8k | std::vector<scoped_refptr<CDCStreamInfo>> streams; |
2464 | 13.8k | SharedLock lock(mutex_); |
2465 | | |
2466 | 13.8k | for (const auto& entry : cdc_stream_map_) { |
2467 | 72 | auto ltm = entry.second->LockForRead(); |
2468 | | // for xCluster the first entry will be the table_id |
2469 | 72 | if (ltm->table_id().Get(0) == table_id && !ltm->started_deleting()62 ) { |
2470 | 62 | streams.push_back(entry.second); |
2471 | 62 | } |
2472 | 72 | } |
2473 | 13.8k | return streams; |
2474 | 13.8k | } |
2475 | | |
2476 | 0 | void CatalogManager::GetAllCDCStreams(std::vector<scoped_refptr<CDCStreamInfo>>* streams) { |
2477 | 0 | streams->clear(); |
2478 | 0 | SharedLock lock(mutex_); |
2479 | 0 | streams->reserve(cdc_stream_map_.size()); |
2480 | 0 | for (const CDCStreamInfoMap::value_type& e : cdc_stream_map_) { |
2481 | 0 | if (!e.second->LockForRead()->is_deleting()) { |
2482 | 0 | streams->push_back(e.second); |
2483 | 0 | } |
2484 | 0 | } |
2485 | 0 | } |
2486 | | |
2487 | | Status CatalogManager::CreateCDCStream(const CreateCDCStreamRequestPB* req, |
2488 | | CreateCDCStreamResponsePB* resp, |
2489 | 5.50k | rpc::RpcContext* rpc) { |
2490 | 5.50k | LOG(INFO) << "CreateCDCStream from " << RequestorString(rpc) << ": " << req->ShortDebugString(); |
2491 | | |
2492 | 5.50k | std::string id_type_option_value(cdc::kTableId); |
2493 | | |
2494 | 22.2k | for (auto option : req->options()) { |
2495 | 22.2k | if (option.key() == cdc::kIdType) { |
2496 | 306 | id_type_option_value = option.value(); |
2497 | 306 | } |
2498 | 22.2k | } |
2499 | | |
2500 | | |
2501 | 5.50k | if (id_type_option_value != cdc::kNamespaceId) { |
2502 | 5.19k | scoped_refptr<TableInfo> table = VERIFY_RESULT5.19k (FindTableById(req->table_id()));5.19k |
2503 | 0 | { |
2504 | 5.19k | auto l = table->LockForRead(); |
2505 | 5.19k | if (l->started_deleting()) { |
2506 | 0 | return STATUS( |
2507 | 0 | NotFound, "Table does not exist", req->table_id(), |
2508 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
2509 | 0 | } |
2510 | 5.19k | } |
2511 | | |
2512 | 5.19k | AlterTableRequestPB alter_table_req; |
2513 | 5.19k | alter_table_req.mutable_table()->set_table_id(req->table_id()); |
2514 | 5.19k | alter_table_req.set_wal_retention_secs(FLAGS_cdc_wal_retention_time_secs); |
2515 | 5.19k | AlterTableResponsePB alter_table_resp; |
2516 | 5.19k | Status s = this->AlterTable(&alter_table_req, &alter_table_resp, rpc); |
2517 | 5.19k | if (!s.ok()) { |
2518 | 0 | return STATUS(InternalError, |
2519 | 0 | "Unable to change the WAL retention time for table", req->table_id(), |
2520 | 0 | MasterError(MasterErrorPB::INTERNAL_ERROR)); |
2521 | 0 | } |
2522 | 5.19k | } |
2523 | | |
2524 | 5.49k | scoped_refptr<CDCStreamInfo> stream; |
2525 | 5.49k | if (!req->has_db_stream_id()) { |
2526 | 310 | { |
2527 | 310 | TRACE("Acquired catalog manager lock"); |
2528 | 310 | LockGuard lock(mutex_); |
2529 | | // Construct the CDC stream if the producer wasn't bootstrapped. |
2530 | 310 | CDCStreamId stream_id; |
2531 | 310 | stream_id = GenerateIdUnlocked(SysRowEntryType::CDC_STREAM); |
2532 | | |
2533 | 310 | stream = make_scoped_refptr<CDCStreamInfo>(stream_id); |
2534 | 310 | stream->mutable_metadata()->StartMutation(); |
2535 | 310 | SysCDCStreamEntryPB* metadata = &stream->mutable_metadata()->mutable_dirty()->pb; |
2536 | 310 | bool create_namespace = id_type_option_value == cdc::kNamespaceId; |
2537 | 310 | if (create_namespace) { |
2538 | 306 | metadata->set_namespace_id(req->table_id()); |
2539 | 306 | } else { |
2540 | 4 | metadata->add_table_id(req->table_id()); |
2541 | 4 | } |
2542 | 310 | metadata->mutable_options()->CopyFrom(req->options()); |
2543 | 310 | metadata->set_state( |
2544 | 310 | req->has_initial_state() ? req->initial_state()306 : SysCDCStreamEntryPB::ACTIVE4 ); |
2545 | | |
2546 | | // Add the stream to the in-memory map. |
2547 | 310 | cdc_stream_map_[stream->id()] = stream; |
2548 | 310 | if (!create_namespace) { |
2549 | 4 | cdc_stream_tables_count_map_[req->table_id()]++; |
2550 | 4 | } |
2551 | 310 | resp->set_stream_id(stream->id()); |
2552 | 310 | } |
2553 | 310 | TRACE("Inserted new CDC stream into CatalogManager maps"); |
2554 | | |
2555 | | // Update the on-disk system catalog. |
2556 | 310 | RETURN_NOT_OK(CheckLeaderStatusAndSetupError( |
2557 | 310 | sys_catalog_->Upsert(leader_ready_term(), stream), |
2558 | 310 | "inserting CDC stream into sys-catalog", resp)); |
2559 | 310 | TRACE("Wrote CDC stream to sys-catalog"); |
2560 | | |
2561 | | // Commit the in-memory state. |
2562 | 310 | stream->mutable_metadata()->CommitMutation(); |
2563 | 310 | LOG(INFO) << "Created CDC stream " << stream->ToString(); |
2564 | | |
2565 | 310 | RETURN_NOT_OK(CreateCdcStateTableIfNeeded(rpc)); |
2566 | 310 | if (!PREDICT_FALSE(FLAGS_TEST_disable_cdc_state_insert_on_setup) && |
2567 | 310 | (306 !req->has_initial_state()306 || |
2568 | 306 | (req->initial_state() == master::SysCDCStreamEntryPB::ACTIVE)) && |
2569 | 310 | (id_type_option_value != cdc::kNamespaceId)306 ) { |
2570 | | // Create the cdc state entries for the tablets in this table from scratch since we have no |
2571 | | // data to bootstrap. If we data to bootstrap, let the BootstrapProducer logic take care of |
2572 | | // populating entries in cdc_state. |
2573 | 0 | auto ybclient = master_->cdc_state_client_initializer().client(); |
2574 | 0 | if (!ybclient) { |
2575 | 0 | return STATUS(IllegalState, "Client not initialized or shutting down"); |
2576 | 0 | } |
2577 | 0 | client::TableHandle cdc_table; |
2578 | 0 | const client::YBTableName cdc_state_table_name( |
2579 | 0 | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
2580 | 0 | RETURN_NOT_OK(ybclient->WaitForCreateTableToFinish(cdc_state_table_name)); |
2581 | 0 | RETURN_NOT_OK(cdc_table.Open(cdc_state_table_name, ybclient)); |
2582 | 0 | std::shared_ptr<client::YBSession> session = ybclient->NewSession(); |
2583 | 0 | scoped_refptr<TableInfo> table = VERIFY_RESULT(FindTableById(req->table_id())); |
2584 | 0 | auto tablets = table->GetTablets(); |
2585 | 0 | for (const auto& tablet : tablets) { |
2586 | 0 | const auto op = cdc_table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
2587 | 0 | auto* const req = op->mutable_request(); |
2588 | 0 | QLAddStringHashValue(req, tablet->id()); |
2589 | 0 | QLAddStringRangeValue(req, stream->id()); |
2590 | 0 | cdc_table.AddStringColumnValue(req, master::kCdcCheckpoint, OpId().ToString()); |
2591 | 0 | cdc_table.AddTimestampColumnValue( |
2592 | 0 | req, master::kCdcLastReplicationTime, GetCurrentTimeMicros()); |
2593 | 0 | session->Apply(op); |
2594 | 0 | } |
2595 | 0 | RETURN_NOT_OK(session->Flush()); |
2596 | 0 | } |
2597 | 5.18k | } else { |
2598 | | // Update and add table_id. |
2599 | 5.18k | { |
2600 | 5.18k | SharedLock lock(mutex_); |
2601 | 5.18k | stream = FindPtrOrNull(cdc_stream_map_, req->db_stream_id()); |
2602 | 5.18k | } |
2603 | | |
2604 | 5.18k | if (stream == nullptr) { |
2605 | 0 | return STATUS( |
2606 | 0 | NotFound, "Could not find CDC stream", req->ShortDebugString(), |
2607 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
2608 | 0 | } |
2609 | | |
2610 | 5.18k | auto stream_lock = stream->LockForWrite(); |
2611 | 5.18k | if (stream_lock->is_deleting()) { |
2612 | 0 | return STATUS( |
2613 | 0 | NotFound, "CDC stream has been deleted", req->ShortDebugString(), |
2614 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
2615 | 0 | } |
2616 | 5.18k | stream_lock.mutable_data()->pb.add_table_id(req->table_id()); |
2617 | | |
2618 | | // Also need to persist changes in sys catalog. |
2619 | 5.18k | RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), stream)); |
2620 | 5.18k | stream_lock.Commit(); |
2621 | 5.18k | } |
2622 | 5.49k | return Status::OK(); |
2623 | 5.49k | } |
2624 | | |
2625 | | Status CatalogManager::DeleteCDCStream(const DeleteCDCStreamRequestPB* req, |
2626 | | DeleteCDCStreamResponsePB* resp, |
2627 | 2 | rpc::RpcContext* rpc) { |
2628 | 2 | LOG(INFO) << "Servicing DeleteCDCStream request from " << RequestorString(rpc) |
2629 | 2 | << ": " << req->ShortDebugString(); |
2630 | | |
2631 | 2 | if (req->stream_id_size() < 1) { |
2632 | 0 | return STATUS(InvalidArgument, "No CDC Stream ID given", req->ShortDebugString(), |
2633 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
2634 | 0 | } |
2635 | | |
2636 | 2 | std::vector<scoped_refptr<CDCStreamInfo>> streams; |
2637 | 2 | { |
2638 | 2 | SharedLock lock(mutex_); |
2639 | 2 | for (const auto& stream_id : req->stream_id()) { |
2640 | 2 | auto stream = FindPtrOrNull(cdc_stream_map_, stream_id); |
2641 | | |
2642 | 2 | if (stream == nullptr || stream->LockForRead()->is_deleting()) { |
2643 | 0 | resp->add_not_found_stream_ids(stream_id); |
2644 | 0 | LOG(WARNING) << "CDC stream does not exist: " << stream_id; |
2645 | 2 | } else { |
2646 | 2 | auto ltm = stream->LockForRead(); |
2647 | 2 | bool active = (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE); |
2648 | 2 | bool is_WAL = false; |
2649 | 5 | for (const auto& option : ltm->pb.options()) { |
2650 | 5 | if (option.key() == "record_format" && option.value() == "WAL"1 ) { |
2651 | 0 | is_WAL = true; |
2652 | 0 | } |
2653 | 5 | } |
2654 | 2 | if (!req->force_delete() && is_WAL && active0 ) { |
2655 | 0 | return STATUS(NotSupported, |
2656 | 0 | "Cannot delete an xCluster Stream in replication. " |
2657 | 0 | "Use 'force_delete' to override", |
2658 | 0 | req->ShortDebugString(), |
2659 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
2660 | 0 | } |
2661 | 2 | streams.push_back(stream); |
2662 | 2 | } |
2663 | 2 | } |
2664 | 2 | } |
2665 | | |
2666 | 2 | if (!resp->not_found_stream_ids().empty() && !req->ignore_errors()0 ) { |
2667 | 0 | string missing_streams = JoinElementsIterator(resp->not_found_stream_ids().begin(), |
2668 | 0 | resp->not_found_stream_ids().end(), |
2669 | 0 | ","); |
2670 | 0 | return STATUS( |
2671 | 0 | NotFound, |
2672 | 0 | Substitute("Did not find all requested CDC streams. Missing streams: [$0]. Request: $1", |
2673 | 0 | missing_streams, req->ShortDebugString()), |
2674 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
2675 | 0 | } |
2676 | | |
2677 | | // Do not delete them here, just mark them as DELETING and the catalog manager background thread |
2678 | | // will handle the deletion. |
2679 | 2 | Status s = MarkCDCStreamsAsDeleting(streams); |
2680 | 2 | if (!s.ok()) { |
2681 | 0 | if (s.IsIllegalState()) { |
2682 | 0 | PANIC_RPC(rpc, s.message().ToString()); |
2683 | 0 | } |
2684 | 0 | return CheckIfNoLongerLeaderAndSetupError(s, resp); |
2685 | 0 | } |
2686 | | |
2687 | 2 | LOG(INFO) << "Successfully deleted CDC streams " << JoinStreamsCSVLine(streams) |
2688 | 2 | << " per request from " << RequestorString(rpc); |
2689 | | |
2690 | 2 | return Status::OK(); |
2691 | 2 | } |
2692 | | |
2693 | | Status CatalogManager::MarkCDCStreamsAsDeleting( |
2694 | 64 | const std::vector<scoped_refptr<CDCStreamInfo>>& streams) { |
2695 | 64 | if (streams.empty()) { |
2696 | 0 | return Status::OK(); |
2697 | 0 | } |
2698 | 64 | std::vector<CDCStreamInfo::WriteLock> locks; |
2699 | 64 | std::vector<CDCStreamInfo*> streams_to_mark; |
2700 | 64 | locks.reserve(streams.size()); |
2701 | 64 | for (auto& stream : streams) { |
2702 | 64 | auto l = stream->LockForWrite(); |
2703 | 64 | l.mutable_data()->pb.set_state(SysCDCStreamEntryPB::DELETING); |
2704 | 64 | locks.push_back(std::move(l)); |
2705 | 64 | streams_to_mark.push_back(stream.get()); |
2706 | 64 | } |
2707 | | // The mutation will be aborted when 'l' exits the scope on early return. |
2708 | 64 | RETURN_NOT_OK(CheckStatus( |
2709 | 64 | sys_catalog_->Upsert(leader_ready_term(), streams_to_mark), |
2710 | 64 | "updating CDC streams in sys-catalog")); |
2711 | 64 | LOG(INFO) << "Successfully marked streams " << JoinStreamsCSVLine(streams_to_mark) |
2712 | 64 | << " as DELETING in sys catalog"; |
2713 | 64 | for (auto& lock : locks) { |
2714 | 64 | lock.Commit(); |
2715 | 64 | } |
2716 | 64 | return Status::OK(); |
2717 | 64 | } |
2718 | | |
2719 | | Status CatalogManager::FindCDCStreamsMarkedAsDeleting( |
2720 | 1.56M | std::vector<scoped_refptr<CDCStreamInfo>>* streams) { |
2721 | 1.56M | TRACE("Acquired catalog manager lock"); |
2722 | 1.56M | SharedLock lock(mutex_); |
2723 | 1.56M | for (const CDCStreamInfoMap::value_type& entry : cdc_stream_map_) { |
2724 | 1.71k | auto ltm = entry.second->LockForRead(); |
2725 | 1.71k | if (ltm->is_deleting()) { |
2726 | 63 | LOG(INFO) << "Stream " << entry.second->id() << " was marked as DELETING"; |
2727 | 63 | streams->push_back(entry.second); |
2728 | 63 | } |
2729 | 1.71k | } |
2730 | 1.56M | return Status::OK(); |
2731 | 1.56M | } |
2732 | | |
2733 | | Status CatalogManager::CleanUpDeletedCDCStreams( |
2734 | 63 | const std::vector<scoped_refptr<CDCStreamInfo>>& streams) { |
2735 | 63 | auto ybclient = master_->cdc_state_client_initializer().client(); |
2736 | 63 | if (!ybclient) { |
2737 | 0 | return STATUS(IllegalState, "Client not initialized or shutting down"); |
2738 | 0 | } |
2739 | | |
2740 | | // First. For each deleted stream, delete the cdc state rows. |
2741 | | // Delete all the entries in cdc_state table that contain all the deleted cdc streams. |
2742 | 63 | client::TableHandle cdc_table; |
2743 | 63 | const client::YBTableName cdc_state_table_name( |
2744 | 63 | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
2745 | 63 | Status s = cdc_table.Open(cdc_state_table_name, ybclient); |
2746 | 63 | if (!s.ok()) { |
2747 | 0 | LOG(WARNING) << "Unable to open table " << master::kCdcStateTableName |
2748 | 0 | << " to delete stream ids entries: " << s; |
2749 | 0 | return s.CloneAndPrepend("Unable to open cdc_state table"); |
2750 | 0 | } |
2751 | | |
2752 | 63 | std::shared_ptr<client::YBSession> session = ybclient->NewSession(); |
2753 | 63 | std::vector<std::pair<CDCStreamId, std::shared_ptr<client::YBqlWriteOp>>> stream_ops; |
2754 | 63 | std::set<CDCStreamId> failed_streams; |
2755 | 63 | for (const auto& stream : streams) { |
2756 | 63 | LOG(INFO) << "Deleting rows for stream " << stream->id(); |
2757 | 63 | for (const auto& table_id : stream->table_id()) { |
2758 | 62 | TabletInfos tablets; |
2759 | 62 | scoped_refptr<TableInfo> table; |
2760 | 62 | { |
2761 | 62 | TRACE("Acquired catalog manager lock"); |
2762 | 62 | SharedLock lock(mutex_); |
2763 | 62 | table = FindPtrOrNull(*table_ids_map_, table_id); |
2764 | 62 | } |
2765 | | // GetTablets locks lock_ in shared mode. |
2766 | 62 | if (table) { |
2767 | 62 | tablets = table->GetTablets(); |
2768 | 62 | } |
2769 | | |
2770 | 62 | for (const auto& tablet : tablets) { |
2771 | 0 | const auto delete_op = cdc_table.NewDeleteOp(); |
2772 | 0 | auto* delete_req = delete_op->mutable_request(); |
2773 | |
|
2774 | 0 | QLAddStringHashValue(delete_req, tablet->tablet_id()); |
2775 | 0 | QLAddStringRangeValue(delete_req, stream->id()); |
2776 | 0 | session->Apply(delete_op); |
2777 | 0 | stream_ops.push_back(std::make_pair(stream->id(), delete_op)); |
2778 | 0 | LOG(INFO) << "Deleting stream " << stream->id() << " for tablet " << tablet->tablet_id() |
2779 | 0 | << " with request " << delete_req->ShortDebugString(); |
2780 | 0 | } |
2781 | 62 | } |
2782 | 63 | } |
2783 | | // Flush all the delete operations. |
2784 | 63 | s = session->Flush(); |
2785 | 63 | if (!s.ok()) { |
2786 | 0 | LOG(ERROR) << "Unable to flush operations to delete cdc streams: " << s; |
2787 | 0 | return s.CloneAndPrepend("Error deleting cdc stream rows from cdc_state table"); |
2788 | 0 | } |
2789 | | |
2790 | 63 | for (const auto& e : stream_ops) { |
2791 | 0 | if (!e.second->succeeded()) { |
2792 | 0 | LOG(WARNING) << "Error deleting cdc_state row with tablet id " |
2793 | 0 | << e.second->request().hashed_column_values(0).value().string_value() |
2794 | 0 | << " and stream id " |
2795 | 0 | << e.second->request().range_column_values(0).value().string_value() |
2796 | 0 | << ": " << e.second->response().status(); |
2797 | 0 | failed_streams.insert(e.first); |
2798 | 0 | } |
2799 | 0 | } |
2800 | | |
2801 | | // TODO: Read cdc_state table and verify that there are not rows with the specified cdc stream |
2802 | | // and keep those in the map in the DELETED state to retry later. |
2803 | | |
2804 | 63 | std::vector<CDCStreamInfo::WriteLock> locks; |
2805 | 63 | locks.reserve(streams.size() - failed_streams.size()); |
2806 | 63 | std::vector<CDCStreamInfo*> streams_to_delete; |
2807 | 63 | streams_to_delete.reserve(streams.size() - failed_streams.size()); |
2808 | | |
2809 | | // Delete from sys catalog only those streams that were successfully delete from cdc_state. |
2810 | 63 | for (auto& stream : streams) { |
2811 | 63 | if (failed_streams.find(stream->id()) == failed_streams.end()) { |
2812 | 63 | locks.push_back(stream->LockForWrite()); |
2813 | 63 | streams_to_delete.push_back(stream.get()); |
2814 | 63 | } |
2815 | 63 | } |
2816 | | |
2817 | | // The mutation will be aborted when 'l' exits the scope on early return. |
2818 | 63 | RETURN_NOT_OK(CheckStatus( |
2819 | 63 | sys_catalog_->Delete(leader_ready_term(), streams_to_delete), |
2820 | 63 | "deleting CDC streams from sys-catalog")); |
2821 | 63 | LOG(INFO) << "Successfully deleted streams " << JoinStreamsCSVLine(streams_to_delete) |
2822 | 63 | << " from sys catalog"; |
2823 | | |
2824 | | // Remove it from the map. |
2825 | 63 | TRACE("Removing from CDC stream maps"); |
2826 | 63 | { |
2827 | 63 | LockGuard lock(mutex_); |
2828 | 63 | for (const auto& stream : streams_to_delete) { |
2829 | 63 | if (cdc_stream_map_.erase(stream->id()) < 1) { |
2830 | 0 | return STATUS(IllegalState, "Could not remove CDC stream from map", stream->id()); |
2831 | 0 | } |
2832 | 63 | for (auto& id : stream->table_id()) { |
2833 | 62 | cdc_stream_tables_count_map_[id]--; |
2834 | 62 | } |
2835 | 63 | } |
2836 | 63 | } |
2837 | 63 | LOG(INFO) << "Successfully deleted streams " << JoinStreamsCSVLine(streams_to_delete) |
2838 | 63 | << " from stream map"; |
2839 | | |
2840 | 63 | for (auto& lock : locks) { |
2841 | 63 | lock.Commit(); |
2842 | 63 | } |
2843 | 63 | return Status::OK(); |
2844 | 63 | } |
2845 | | |
2846 | | Status CatalogManager::GetCDCStream(const GetCDCStreamRequestPB* req, |
2847 | | GetCDCStreamResponsePB* resp, |
2848 | 6 | rpc::RpcContext* rpc) { |
2849 | 6 | LOG(INFO) << "GetCDCStream from " << RequestorString(rpc) |
2850 | 6 | << ": " << req->DebugString(); |
2851 | | |
2852 | 6 | if (!req->has_stream_id()) { |
2853 | 0 | return STATUS(InvalidArgument, "CDC Stream ID must be provided", req->ShortDebugString(), |
2854 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
2855 | 0 | } |
2856 | | |
2857 | 6 | scoped_refptr<CDCStreamInfo> stream; |
2858 | 6 | { |
2859 | 6 | SharedLock lock(mutex_); |
2860 | 6 | stream = FindPtrOrNull(cdc_stream_map_, req->stream_id()); |
2861 | 6 | } |
2862 | | |
2863 | 6 | if (stream == nullptr || stream->LockForRead()->is_deleting()5 ) { |
2864 | 2 | return STATUS(NotFound, "Could not find CDC stream", req->ShortDebugString(), |
2865 | 2 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
2866 | 2 | } |
2867 | | |
2868 | 4 | auto stream_lock = stream->LockForRead(); |
2869 | | |
2870 | 4 | CDCStreamInfoPB* stream_info = resp->mutable_stream(); |
2871 | | |
2872 | 4 | stream_info->set_stream_id(stream->id()); |
2873 | 4 | std::string id_type_option_value(cdc::kTableId); |
2874 | | |
2875 | 4 | for (auto option : stream_lock->options()) { |
2876 | 0 | if (option.has_key() && option.key() == cdc::kIdType) |
2877 | 0 | id_type_option_value = option.value(); |
2878 | 0 | } |
2879 | 4 | if (id_type_option_value == cdc::kNamespaceId) { |
2880 | 0 | stream_info->set_namespace_id(stream_lock->namespace_id()); |
2881 | 0 | } |
2882 | | |
2883 | 4 | for (auto& table_id : stream_lock->table_id()) { |
2884 | 4 | stream_info->add_table_id(table_id); |
2885 | 4 | } |
2886 | | |
2887 | 4 | stream_info->mutable_options()->CopyFrom(stream_lock->options()); |
2888 | | |
2889 | 4 | return Status::OK(); |
2890 | 6 | } |
2891 | | |
2892 | | Status CatalogManager::GetCDCDBStreamInfo(const GetCDCDBStreamInfoRequestPB* req, |
2893 | 22 | GetCDCDBStreamInfoResponsePB* resp) { |
2894 | | |
2895 | 22 | if (!req->has_db_stream_id()) { |
2896 | 0 | return STATUS(InvalidArgument, "CDC DB Stream ID must be provided", req->ShortDebugString(), |
2897 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
2898 | 0 | } |
2899 | | |
2900 | 22 | scoped_refptr<CDCStreamInfo> stream; |
2901 | 22 | { |
2902 | 22 | SharedLock lock(mutex_); |
2903 | 22 | stream = FindPtrOrNull(cdc_stream_map_, req->db_stream_id()); |
2904 | 22 | } |
2905 | | |
2906 | 22 | if (stream == nullptr || stream->LockForRead()->is_deleting()) { |
2907 | 0 | return STATUS(NotFound, "Could not find CDC stream", req->ShortDebugString(), |
2908 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
2909 | 0 | } |
2910 | | |
2911 | 22 | auto stream_lock = stream->LockForRead(); |
2912 | | |
2913 | 22 | if (!stream->namespace_id().empty()) { |
2914 | 22 | resp->set_namespace_id(stream->namespace_id()); |
2915 | 22 | } |
2916 | | |
2917 | 22 | for (const auto& table_id : stream_lock->table_id()) { |
2918 | 22 | const auto table_info = resp->add_table_info(); |
2919 | 22 | table_info->set_stream_id(req->db_stream_id()); |
2920 | 22 | table_info->set_table_id(table_id); |
2921 | 22 | } |
2922 | | |
2923 | 22 | return Status::OK(); |
2924 | 22 | } |
2925 | | |
2926 | | Status CatalogManager::ListCDCStreams(const ListCDCStreamsRequestPB* req, |
2927 | 1 | ListCDCStreamsResponsePB* resp) { |
2928 | | |
2929 | 1 | scoped_refptr<TableInfo> table; |
2930 | 1 | bool filter_table = req->has_table_id(); |
2931 | 1 | if (filter_table) { |
2932 | 0 | table = VERIFY_RESULT(FindTableById(req->table_id())); |
2933 | 0 | } |
2934 | | |
2935 | 1 | SharedLock lock(mutex_); |
2936 | | |
2937 | 1 | for (const CDCStreamInfoMap::value_type& entry : cdc_stream_map_) { |
2938 | 1 | bool skip_stream = false; |
2939 | 1 | bool id_type_option_present = false; |
2940 | | |
2941 | | // if the request is to list the DB streams of a specific namespace then the other namespaces |
2942 | | // should not be considered |
2943 | 1 | if (req->has_namespace_id() && (req->namespace_id() != entry.second->namespace_id())0 ) { |
2944 | 0 | continue; |
2945 | 0 | } |
2946 | | |
2947 | 1 | if (filter_table && table->id() != entry.second->table_id().Get(0)0 ) { |
2948 | 0 | continue; // Skip deleting/deleted streams and streams from other tables. |
2949 | 0 | } |
2950 | | |
2951 | 1 | auto ltm = entry.second->LockForRead(); |
2952 | | |
2953 | 1 | if (ltm->is_deleting()) { |
2954 | 0 | continue; |
2955 | 0 | } |
2956 | | |
2957 | 1 | for (const auto& option : ltm->options()) { |
2958 | 0 | if (option.key() == cdc::kIdType) { |
2959 | 0 | id_type_option_present = true; |
2960 | 0 | if (req->has_id_type()) { |
2961 | 0 | if (req->id_type() == IdTypePB::NAMESPACE_ID && |
2962 | 0 | option.value() != cdc::kNamespaceId) { |
2963 | 0 | skip_stream = true; |
2964 | 0 | break; |
2965 | 0 | } |
2966 | 0 | if (req->id_type() == IdTypePB::TABLE_ID && |
2967 | 0 | option.value() == cdc::kNamespaceId) { |
2968 | 0 | skip_stream = true; |
2969 | 0 | break; |
2970 | 0 | } |
2971 | 0 | } |
2972 | 0 | } |
2973 | 0 | } |
2974 | | |
2975 | 1 | if ((!id_type_option_present && req->id_type() == IdTypePB::NAMESPACE_ID) || |
2976 | 1 | skip_stream) |
2977 | 0 | continue; |
2978 | | |
2979 | 1 | CDCStreamInfoPB* stream = resp->add_streams(); |
2980 | 1 | stream->set_stream_id(entry.second->id()); |
2981 | 1 | for (const auto& table_id : ltm->table_id()) { |
2982 | 1 | stream->add_table_id(table_id); |
2983 | 1 | } |
2984 | 1 | stream->mutable_options()->CopyFrom(ltm->options()); |
2985 | | // Also add an option for the current state. |
2986 | 1 | if (ltm->pb.has_state()) { |
2987 | 1 | auto state_option = stream->add_options(); |
2988 | 1 | state_option->set_key("state"); |
2989 | 1 | state_option->set_value(master::SysCDCStreamEntryPB::State_Name(ltm->pb.state())); |
2990 | 1 | } |
2991 | 1 | } |
2992 | 1 | return Status::OK(); |
2993 | 1 | } |
2994 | | |
2995 | 310 | bool CatalogManager::CDCStreamExistsUnlocked(const CDCStreamId& stream_id) { |
2996 | 310 | scoped_refptr<CDCStreamInfo> stream = FindPtrOrNull(cdc_stream_map_, stream_id); |
2997 | 310 | if (stream == nullptr || stream->LockForRead()->is_deleting()0 ) { |
2998 | 310 | return false; |
2999 | 310 | } |
3000 | 0 | return true; |
3001 | 310 | } |
3002 | | |
3003 | | Status CatalogManager::UpdateCDCStream(const UpdateCDCStreamRequestPB *req, |
3004 | | UpdateCDCStreamResponsePB* resp, |
3005 | 0 | rpc::RpcContext* rpc) { |
3006 | 0 | LOG(INFO) << "UpdateCDCStream from " << RequestorString(rpc) |
3007 | 0 | << ": " << req->DebugString(); |
3008 | | |
3009 | | // Check fields. |
3010 | 0 | if (!req->has_stream_id()) { |
3011 | 0 | return STATUS(InvalidArgument, "Stream ID must be provided", |
3012 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3013 | 0 | } |
3014 | 0 | if (!req->has_entry()) { |
3015 | 0 | return STATUS(InvalidArgument, "CDC Stream Entry must be provided", |
3016 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3017 | 0 | } |
3018 | | |
3019 | 0 | scoped_refptr<CDCStreamInfo> stream; |
3020 | 0 | { |
3021 | 0 | SharedLock lock(mutex_); |
3022 | 0 | stream = FindPtrOrNull(cdc_stream_map_, req->stream_id()); |
3023 | 0 | } |
3024 | 0 | if (stream == nullptr) { |
3025 | 0 | return STATUS(NotFound, "Could not find CDC stream", req->ShortDebugString(), |
3026 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
3027 | 0 | } |
3028 | | |
3029 | 0 | auto stream_lock = stream->LockForWrite(); |
3030 | 0 | if (stream_lock->is_deleting()) { |
3031 | 0 | return STATUS(NotFound, "CDC stream has been deleted", req->ShortDebugString(), |
3032 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
3033 | 0 | } |
3034 | | |
3035 | 0 | stream_lock.mutable_data()->pb.CopyFrom(req->entry()); |
3036 | | // Also need to persist changes in sys catalog. |
3037 | 0 | RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), stream)); |
3038 | | |
3039 | 0 | stream_lock.Commit(); |
3040 | |
|
3041 | 0 | return Status::OK(); |
3042 | 0 | } |
3043 | | |
3044 | | /* |
3045 | | * UniverseReplication is setup in 4 stages within the Catalog Manager |
3046 | | * 1. SetupUniverseReplication: Validates user input & requests Producer schema. |
3047 | | * 2. GetTableSchemaCallback: Validates Schema compatibility & requests Producer CDC init. |
3048 | | * 3. AddCDCStreamToUniverseAndInitConsumer: Setup RPC connections for CDC Streaming |
3049 | | * 4. InitCDCConsumer: Initializes the Consumer settings to begin tailing data |
3050 | | */ |
3051 | | Status CatalogManager::SetupUniverseReplication(const SetupUniverseReplicationRequestPB* req, |
3052 | | SetupUniverseReplicationResponsePB* resp, |
3053 | 2 | rpc::RpcContext* rpc) { |
3054 | 2 | LOG(INFO) << "SetupUniverseReplication from " << RequestorString(rpc) |
3055 | 2 | << ": " << req->DebugString(); |
3056 | | |
3057 | | // Sanity checking section. |
3058 | 2 | if (!req->has_producer_id()) { |
3059 | 0 | return STATUS(InvalidArgument, "Producer universe ID must be provided", |
3060 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3061 | 0 | } |
3062 | | |
3063 | 2 | if (req->producer_master_addresses_size() <= 0) { |
3064 | 0 | return STATUS(InvalidArgument, "Producer master address must be provided", |
3065 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3066 | 0 | } |
3067 | | |
3068 | 2 | if (req->producer_bootstrap_ids().size() > 0 && |
3069 | 2 | req->producer_bootstrap_ids().size() != req->producer_table_ids().size()0 ) { |
3070 | 0 | return STATUS(InvalidArgument, "Number of bootstrap ids must be equal to number of tables", |
3071 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3072 | 0 | } |
3073 | | |
3074 | 2 | { |
3075 | 2 | auto l = ClusterConfig()->LockForRead(); |
3076 | 2 | if (l->pb.cluster_uuid() == req->producer_id()) { |
3077 | 0 | return STATUS(InvalidArgument, "The request UUID and cluster UUID are identical.", |
3078 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3079 | 0 | } |
3080 | 2 | } |
3081 | | |
3082 | 2 | { |
3083 | 2 | auto request_master_addresses = req->producer_master_addresses(); |
3084 | 2 | std::vector<ServerEntryPB> cluster_master_addresses; |
3085 | 2 | RETURN_NOT_OK(master_->ListMasters(&cluster_master_addresses)); |
3086 | 2 | for (const auto &req_elem : request_master_addresses) { |
3087 | 2 | for (const auto &cluster_elem : cluster_master_addresses) { |
3088 | 2 | if (cluster_elem.has_registration()) { |
3089 | 2 | auto p_rpc_addresses = cluster_elem.registration().private_rpc_addresses(); |
3090 | 2 | for (const auto &p_rpc_elem : p_rpc_addresses) { |
3091 | 2 | if (req_elem.host() == p_rpc_elem.host() && req_elem.port() == p_rpc_elem.port()0 ) { |
3092 | 0 | return STATUS(InvalidArgument, |
3093 | 0 | "Duplicate between request master addresses and private RPC addresses", |
3094 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3095 | 0 | } |
3096 | 2 | } |
3097 | | |
3098 | 2 | auto broadcast_addresses = cluster_elem.registration().broadcast_addresses(); |
3099 | 2 | for (const auto &bc_elem : broadcast_addresses) { |
3100 | 2 | if (req_elem.host() == bc_elem.host() && req_elem.port() == bc_elem.port()0 ) { |
3101 | 0 | return STATUS(InvalidArgument, |
3102 | 0 | "Duplicate between request master addresses and broadcast addresses", |
3103 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3104 | 0 | } |
3105 | 2 | } |
3106 | 2 | } |
3107 | 2 | } |
3108 | 2 | } |
3109 | 2 | } |
3110 | | |
3111 | 2 | std::unordered_map<TableId, std::string> table_id_to_bootstrap_id; |
3112 | | |
3113 | 2 | if (req->producer_bootstrap_ids_size() > 0) { |
3114 | 0 | for (int i = 0; i < req->producer_table_ids().size(); i++) { |
3115 | 0 | table_id_to_bootstrap_id[req->producer_table_ids(i)] = req->producer_bootstrap_ids(i); |
3116 | 0 | } |
3117 | 0 | } |
3118 | | |
3119 | | // We assume that the list of table ids is unique. |
3120 | 2 | if (req->producer_bootstrap_ids().size() > 0 && |
3121 | 2 | implicit_cast<size_t>(req->producer_table_ids().size()) != table_id_to_bootstrap_id.size()0 ) { |
3122 | 0 | return STATUS(InvalidArgument, "When providing bootstrap ids, " |
3123 | 0 | "the list of tables must be unique", req->ShortDebugString(), |
3124 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
3125 | 0 | } |
3126 | | |
3127 | 2 | scoped_refptr<UniverseReplicationInfo> ri; |
3128 | 2 | { |
3129 | 2 | TRACE("Acquired catalog manager lock"); |
3130 | 2 | SharedLock lock(mutex_); |
3131 | | |
3132 | 2 | if (FindPtrOrNull(universe_replication_map_, req->producer_id()) != nullptr) { |
3133 | 0 | return STATUS(InvalidArgument, "Producer already present", req->producer_id(), |
3134 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
3135 | 0 | } |
3136 | 2 | } |
3137 | | |
3138 | | // Create an entry in the system catalog DocDB for this new universe replication. |
3139 | 2 | ri = new UniverseReplicationInfo(req->producer_id()); |
3140 | 2 | ri->mutable_metadata()->StartMutation(); |
3141 | 2 | SysUniverseReplicationEntryPB *metadata = &ri->mutable_metadata()->mutable_dirty()->pb; |
3142 | 2 | metadata->set_producer_id(req->producer_id()); |
3143 | 2 | metadata->mutable_producer_master_addresses()->CopyFrom(req->producer_master_addresses()); |
3144 | 2 | metadata->mutable_tables()->CopyFrom(req->producer_table_ids()); |
3145 | 2 | metadata->set_state(SysUniverseReplicationEntryPB::INITIALIZING); |
3146 | | |
3147 | 2 | RETURN_NOT_OK(CheckLeaderStatusAndSetupError( |
3148 | 2 | sys_catalog_->Upsert(leader_ready_term(), ri), |
3149 | 2 | "inserting universe replication info into sys-catalog", resp)); |
3150 | 2 | TRACE("Wrote universe replication info to sys-catalog"); |
3151 | | |
3152 | | // Commit the in-memory state now that it's added to the persistent catalog. |
3153 | 2 | ri->mutable_metadata()->CommitMutation(); |
3154 | 2 | LOG(INFO) << "Setup universe replication from producer " << ri->ToString(); |
3155 | | |
3156 | 2 | { |
3157 | 2 | LockGuard lock(mutex_); |
3158 | 2 | universe_replication_map_[ri->id()] = ri; |
3159 | 2 | } |
3160 | | |
3161 | | // Initialize the CDC Stream by querying the Producer server for RPC sanity checks. |
3162 | 2 | auto result = ri->GetOrCreateCDCRpcTasks(req->producer_master_addresses()); |
3163 | 2 | LOG(INFO) << "GetOrCreateCDCRpcTasks: " << result.ok(); |
3164 | 2 | if (!result.ok()) { |
3165 | 0 | MarkUniverseReplicationFailed(ri, ResultToStatus(result)); |
3166 | 0 | return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, result.status()); |
3167 | 0 | } |
3168 | 2 | std::shared_ptr<CDCRpcTasks> cdc_rpc = *result; |
3169 | | |
3170 | | // For each table, run an async RPC task to verify a sufficient Producer:Consumer schema match. |
3171 | 2 | for (int i = 0; i < req->producer_table_ids_size(); i++0 ) { |
3172 | | |
3173 | | // SETUP CONTINUES after this async call. |
3174 | 0 | Status s; |
3175 | 0 | if (IsColocatedParentTableId(req->producer_table_ids(i))) { |
3176 | 0 | auto tables_info = std::make_shared<std::vector<client::YBTableInfo>>(); |
3177 | 0 | s = cdc_rpc->client()->GetColocatedTabletSchemaById( |
3178 | 0 | req->producer_table_ids(i), tables_info, |
3179 | 0 | Bind(&enterprise::CatalogManager::GetColocatedTabletSchemaCallback, Unretained(this), |
3180 | 0 | ri->id(), tables_info, table_id_to_bootstrap_id)); |
3181 | 0 | } else if (IsTablegroupParentTableId(req->producer_table_ids(i))) { |
3182 | 0 | auto tablegroup_info = std::make_shared<std::vector<client::YBTableInfo>>(); |
3183 | 0 | s = cdc_rpc->client()->GetTablegroupSchemaById( |
3184 | 0 | req->producer_table_ids(i), tablegroup_info, |
3185 | 0 | Bind(&enterprise::CatalogManager::GetTablegroupSchemaCallback, Unretained(this), |
3186 | 0 | ri->id(), tablegroup_info, req->producer_table_ids(i), table_id_to_bootstrap_id)); |
3187 | 0 | } else { |
3188 | 0 | auto table_info = std::make_shared<client::YBTableInfo>(); |
3189 | 0 | s = cdc_rpc->client()->GetTableSchemaById( |
3190 | 0 | req->producer_table_ids(i), table_info, |
3191 | 0 | Bind(&enterprise::CatalogManager::GetTableSchemaCallback, Unretained(this), |
3192 | 0 | ri->id(), table_info, table_id_to_bootstrap_id)); |
3193 | 0 | } |
3194 | |
|
3195 | 0 | if (!s.ok()) { |
3196 | 0 | MarkUniverseReplicationFailed(ri, s); |
3197 | 0 | return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s); |
3198 | 0 | } |
3199 | 0 | } |
3200 | | |
3201 | 2 | LOG(INFO) << "Started schema validation for universe replication " << ri->ToString(); |
3202 | 2 | return Status::OK(); |
3203 | 2 | } |
3204 | | |
3205 | | void CatalogManager::MarkUniverseReplicationFailed( |
3206 | 0 | scoped_refptr<UniverseReplicationInfo> universe, const Status& failure_status) { |
3207 | 0 | auto l = universe->LockForWrite(); |
3208 | 0 | if (l->pb.state() == SysUniverseReplicationEntryPB::DELETED) { |
3209 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DELETED_ERROR); |
3210 | 0 | } else { |
3211 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED); |
3212 | 0 | } |
3213 | |
|
3214 | 0 | universe->SetSetupUniverseReplicationErrorStatus(failure_status); |
3215 | | |
3216 | | // Update sys_catalog. |
3217 | 0 | const Status s = sys_catalog_->Upsert(leader_ready_term(), universe); |
3218 | |
|
3219 | 0 | l.CommitOrWarn(s, "updating universe replication info in sys-catalog"); |
3220 | 0 | } |
3221 | | |
3222 | | Status CatalogManager::ValidateTableSchema( |
3223 | | const std::shared_ptr<client::YBTableInfo>& info, |
3224 | | const std::unordered_map<TableId, std::string>& table_bootstrap_ids, |
3225 | 0 | GetTableSchemaResponsePB* resp) { |
3226 | | // Get corresponding table schema on local universe. |
3227 | 0 | GetTableSchemaRequestPB req; |
3228 | |
|
3229 | 0 | auto* table = req.mutable_table(); |
3230 | 0 | table->set_table_name(info->table_name.table_name()); |
3231 | 0 | table->mutable_namespace_()->set_name(info->table_name.namespace_name()); |
3232 | 0 | table->mutable_namespace_()->set_database_type( |
3233 | 0 | GetDatabaseTypeForTable(client::ClientToPBTableType(info->table_type))); |
3234 | | |
3235 | | // Since YSQL tables are not present in table map, we first need to list tables to get the table |
3236 | | // ID and then get table schema. |
3237 | | // Remove this once table maps are fixed for YSQL. |
3238 | 0 | ListTablesRequestPB list_req; |
3239 | 0 | ListTablesResponsePB list_resp; |
3240 | |
|
3241 | 0 | list_req.set_name_filter(info->table_name.table_name()); |
3242 | 0 | Status status = ListTables(&list_req, &list_resp); |
3243 | 0 | if (!status.ok() || list_resp.has_error()) { |
3244 | 0 | return STATUS(NotFound, Substitute("Error while listing table: $0", status.ToString())); |
3245 | 0 | } |
3246 | | |
3247 | | // TODO: This does not work for situation where tables in different YSQL schemas have the same |
3248 | | // name. This will be fixed as part of #1476. |
3249 | 0 | for (const auto& t : list_resp.tables()) { |
3250 | 0 | if (t.name() == info->table_name.table_name() && |
3251 | 0 | t.namespace_().name() == info->table_name.namespace_name()) { |
3252 | 0 | table->set_table_id(t.id()); |
3253 | 0 | break; |
3254 | 0 | } |
3255 | 0 | } |
3256 | |
|
3257 | 0 | if (!table->has_table_id()) { |
3258 | 0 | return STATUS(NotFound, |
3259 | 0 | Substitute("Could not find matching table for $0", info->table_name.ToString())); |
3260 | 0 | } |
3261 | | |
3262 | | // We have a table match. Now get the table schema and validate. |
3263 | 0 | status = GetTableSchema(&req, resp); |
3264 | 0 | if (!status.ok() || resp->has_error()) { |
3265 | 0 | return STATUS(NotFound, Substitute("Error while getting table schema: $0", status.ToString())); |
3266 | 0 | } |
3267 | | |
3268 | 0 | auto result = info->schema.EquivalentForDataCopy(resp->schema()); |
3269 | 0 | if (!result.ok() || !*result) { |
3270 | 0 | return STATUS(IllegalState, |
3271 | 0 | Substitute("Source and target schemas don't match: " |
3272 | 0 | "Source: $0, Target: $1, Source schema: $2, Target schema: $3", |
3273 | 0 | info->table_id, resp->identifier().table_id(), |
3274 | 0 | info->schema.ToString(), resp->schema().DebugString())); |
3275 | 0 | } |
3276 | | |
3277 | | // Still need to make map of table id to resp table id (to add to validated map) |
3278 | | // For colocated tables, only add the parent table since we only added the parent table to the |
3279 | | // original pb (we use the number of tables in the pb to determine when validation is done). |
3280 | 0 | if (info->colocated) { |
3281 | | // We require that colocated tables have the same colocation ID. |
3282 | | // |
3283 | | // Backward compatibility: tables created prior to #7378 use YSQL table OID as a colocation ID. |
3284 | 0 | auto source_clc_id = info->schema.has_colocation_id() |
3285 | 0 | ? info->schema.colocation_id() |
3286 | 0 | : CHECK_RESULT(GetPgsqlTableOid(info->table_id)); |
3287 | 0 | auto target_clc_id = (resp->schema().has_colocated_table_id() && |
3288 | 0 | resp->schema().colocated_table_id().has_colocation_id()) |
3289 | 0 | ? resp->schema().colocated_table_id().colocation_id() |
3290 | 0 | : CHECK_RESULT(GetPgsqlTableOid(resp->identifier().table_id())); |
3291 | 0 | if (source_clc_id != target_clc_id) { |
3292 | 0 | return STATUS(IllegalState, |
3293 | 0 | Substitute("Source and target colocation IDs don't match for colocated table: " |
3294 | 0 | "Source: $0, Target: $1, Source colocation ID: $2, Target colocation ID: $3", |
3295 | 0 | info->table_id, resp->identifier().table_id(), source_clc_id, target_clc_id)); |
3296 | 0 | } |
3297 | 0 | } |
3298 | | |
3299 | 0 | return Status::OK(); |
3300 | 0 | } |
3301 | | |
3302 | | Status CatalogManager::AddValidatedTableAndCreateCdcStreams( |
3303 | | scoped_refptr<UniverseReplicationInfo> universe, |
3304 | | const std::unordered_map<TableId, std::string>& table_bootstrap_ids, |
3305 | | const TableId& producer_table, |
3306 | 0 | const TableId& consumer_table) { |
3307 | 0 | auto l = universe->LockForWrite(); |
3308 | 0 | auto master_addresses = l->pb.producer_master_addresses(); |
3309 | |
|
3310 | 0 | auto res = universe->GetOrCreateCDCRpcTasks(master_addresses); |
3311 | 0 | if (!res.ok()) { |
3312 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED); |
3313 | 0 | const Status s = sys_catalog_->Upsert(leader_ready_term(), universe); |
3314 | 0 | if (!s.ok()) { |
3315 | 0 | return CheckStatus(s, "updating universe replication info in sys-catalog"); |
3316 | 0 | } |
3317 | 0 | l.Commit(); |
3318 | 0 | return STATUS(InternalError, |
3319 | 0 | Substitute("Error while setting up client for producer $0", universe->id())); |
3320 | 0 | } |
3321 | 0 | std::shared_ptr<CDCRpcTasks> cdc_rpc = *res; |
3322 | 0 | vector<TableId> validated_tables; |
3323 | |
|
3324 | 0 | if (l->is_deleted_or_failed()) { |
3325 | | // Nothing to do since universe is being deleted. |
3326 | 0 | return STATUS(Aborted, "Universe is being deleted"); |
3327 | 0 | } |
3328 | | |
3329 | 0 | auto map = l.mutable_data()->pb.mutable_validated_tables(); |
3330 | 0 | (*map)[producer_table] = consumer_table; |
3331 | | |
3332 | | // Now, all tables are validated. |
3333 | 0 | if (l.mutable_data()->pb.validated_tables_size() == l.mutable_data()->pb.tables_size()) { |
3334 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::VALIDATED); |
3335 | 0 | auto tbl_iter = l->pb.tables(); |
3336 | 0 | validated_tables.insert(validated_tables.begin(), tbl_iter.begin(), tbl_iter.end()); |
3337 | 0 | } |
3338 | | |
3339 | | // TODO: end of config validation should be where SetupUniverseReplication exits back to user |
3340 | 0 | LOG(INFO) << "UpdateItem in AddValidatedTable"; |
3341 | | |
3342 | | // Update sys_catalog. |
3343 | 0 | Status status = sys_catalog_->Upsert(leader_ready_term(), universe); |
3344 | 0 | if (!status.ok()) { |
3345 | 0 | LOG(ERROR) << "Error during UpdateItem: " << status; |
3346 | 0 | return CheckStatus(status, "updating universe replication info in sys-catalog"); |
3347 | 0 | } |
3348 | 0 | l.Commit(); |
3349 | | |
3350 | | // Create CDC stream for each validated table, after persisting the replication state change. |
3351 | 0 | if (!validated_tables.empty()) { |
3352 | 0 | std::unordered_map<std::string, std::string> options; |
3353 | 0 | options.reserve(4); |
3354 | 0 | options.emplace(cdc::kRecordType, CDCRecordType_Name(cdc::CDCRecordType::CHANGE)); |
3355 | 0 | options.emplace(cdc::kRecordFormat, CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL)); |
3356 | 0 | options.emplace(cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER)); |
3357 | 0 | options.emplace(cdc::kCheckpointType, CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT)); |
3358 | |
|
3359 | 0 | for (const auto& table : validated_tables) { |
3360 | 0 | string producer_bootstrap_id; |
3361 | 0 | auto it = table_bootstrap_ids.find(table); |
3362 | 0 | if (it != table_bootstrap_ids.end()) { |
3363 | 0 | producer_bootstrap_id = it->second; |
3364 | 0 | } |
3365 | 0 | if (!producer_bootstrap_id.empty()) { |
3366 | 0 | auto table_id = std::make_shared<TableId>(); |
3367 | 0 | auto stream_options = std::make_shared<std::unordered_map<std::string, std::string>>(); |
3368 | 0 | cdc_rpc->client()->GetCDCStream(producer_bootstrap_id, table_id, stream_options, |
3369 | 0 | std::bind(&enterprise::CatalogManager::GetCDCStreamCallback, this, |
3370 | 0 | producer_bootstrap_id, table_id, stream_options, universe->id(), table, cdc_rpc, |
3371 | 0 | std::placeholders::_1)); |
3372 | 0 | } else { |
3373 | 0 | cdc_rpc->client()->CreateCDCStream( |
3374 | 0 | table, options, |
3375 | 0 | std::bind(&enterprise::CatalogManager::AddCDCStreamToUniverseAndInitConsumer, this, |
3376 | 0 | universe->id(), table, std::placeholders::_1, nullptr /* on_success_cb */)); |
3377 | 0 | } |
3378 | 0 | } |
3379 | 0 | } |
3380 | 0 | return Status::OK(); |
3381 | 0 | } |
3382 | | |
3383 | | void CatalogManager::GetTableSchemaCallback( |
3384 | | const std::string& universe_id, const std::shared_ptr<client::YBTableInfo>& info, |
3385 | 0 | const std::unordered_map<TableId, std::string>& table_bootstrap_ids, const Status& s) { |
3386 | | // First get the universe. |
3387 | 0 | scoped_refptr<UniverseReplicationInfo> universe; |
3388 | 0 | { |
3389 | 0 | SharedLock lock(mutex_); |
3390 | 0 | TRACE("Acquired catalog manager lock"); |
3391 | |
|
3392 | 0 | universe = FindPtrOrNull(universe_replication_map_, universe_id); |
3393 | 0 | if (universe == nullptr) { |
3394 | 0 | LOG(ERROR) << "Universe not found: " << universe_id; |
3395 | 0 | return; |
3396 | 0 | } |
3397 | 0 | } |
3398 | | |
3399 | 0 | if (!s.ok()) { |
3400 | 0 | MarkUniverseReplicationFailed(universe, s); |
3401 | 0 | LOG(ERROR) << "Error getting schema for table " << info->table_id << ": " << s; |
3402 | 0 | return; |
3403 | 0 | } |
3404 | | |
3405 | | // Validate the table schema. |
3406 | 0 | GetTableSchemaResponsePB resp; |
3407 | 0 | Status status = ValidateTableSchema(info, table_bootstrap_ids, &resp); |
3408 | 0 | if (!status.ok()) { |
3409 | 0 | MarkUniverseReplicationFailed(universe, status); |
3410 | 0 | LOG(ERROR) << "Found error while validating table schema for table " << info->table_id |
3411 | 0 | << ": " << status; |
3412 | 0 | return; |
3413 | 0 | } |
3414 | | |
3415 | 0 | status = AddValidatedTableAndCreateCdcStreams(universe, |
3416 | 0 | table_bootstrap_ids, |
3417 | 0 | info->table_id, |
3418 | 0 | resp.identifier().table_id()); |
3419 | 0 | if (!status.ok()) { |
3420 | 0 | LOG(ERROR) << "Found error while adding validated table to system catalog: " << info->table_id |
3421 | 0 | << ": " << status; |
3422 | 0 | return; |
3423 | 0 | } |
3424 | 0 | } |
3425 | | |
3426 | | void CatalogManager::GetTablegroupSchemaCallback( |
3427 | | const std::string& universe_id, |
3428 | | const std::shared_ptr<std::vector<client::YBTableInfo>>& infos, |
3429 | | const TablegroupId& producer_tablegroup_id, |
3430 | | const std::unordered_map<TableId, std::string>& table_bootstrap_ids, |
3431 | 0 | const Status& s) { |
3432 | | // First get the universe. |
3433 | 0 | scoped_refptr<UniverseReplicationInfo> universe; |
3434 | 0 | { |
3435 | 0 | SharedLock lock(mutex_); |
3436 | 0 | TRACE("Acquired catalog manager lock"); |
3437 | |
|
3438 | 0 | universe = FindPtrOrNull(universe_replication_map_, universe_id); |
3439 | 0 | if (universe == nullptr) { |
3440 | 0 | LOG(ERROR) << "Universe not found: " << universe_id; |
3441 | 0 | return; |
3442 | 0 | } |
3443 | 0 | } |
3444 | | |
3445 | 0 | if (!s.ok()) { |
3446 | 0 | MarkUniverseReplicationFailed(universe, s); |
3447 | 0 | std::ostringstream oss; |
3448 | 0 | for (size_t i = 0; i < infos->size(); ++i) { |
3449 | 0 | oss << ((i == 0) ? "" : ", ") << (*infos)[i].table_id; |
3450 | 0 | } |
3451 | 0 | LOG(ERROR) << "Error getting schema for tables: [ " << oss.str() << " ]: " << s; |
3452 | 0 | return; |
3453 | 0 | } |
3454 | | |
3455 | 0 | if (infos->empty()) { |
3456 | 0 | LOG(WARNING) << "Received empty list of tables to validate: " << s; |
3457 | 0 | return; |
3458 | 0 | } |
3459 | | |
3460 | | // validated_consumer_tables contains the table IDs corresponding to that |
3461 | | // from the producer tables. |
3462 | 0 | std::unordered_set<TableId> validated_consumer_tables; |
3463 | 0 | for (const auto& info : *infos) { |
3464 | | // Validate each of the member table in the tablegroup. |
3465 | 0 | GetTableSchemaResponsePB resp; |
3466 | 0 | Status table_status = ValidateTableSchema(std::make_shared<client::YBTableInfo>(info), |
3467 | 0 | table_bootstrap_ids, |
3468 | 0 | &resp); |
3469 | |
|
3470 | 0 | if (!table_status.ok()) { |
3471 | 0 | MarkUniverseReplicationFailed(universe, table_status); |
3472 | 0 | LOG(ERROR) << "Found error while validating table schema for table " << info.table_id |
3473 | 0 | << ": " << table_status; |
3474 | 0 | return; |
3475 | 0 | } |
3476 | | |
3477 | 0 | validated_consumer_tables.insert(resp.identifier().table_id()); |
3478 | 0 | } |
3479 | | |
3480 | | // Get the consumer tablegroup ID. Since this call is expensive (one needs to reverse lookup |
3481 | | // the tablegroup ID from table ID), we only do this call once and do validation afterward. |
3482 | 0 | TablegroupId consumer_tablegroup_id; |
3483 | 0 | { |
3484 | 0 | const auto& result = FindTablegroupByTableId(*validated_consumer_tables.begin()); |
3485 | 0 | if (!result.has_value()) { |
3486 | 0 | std::string message = |
3487 | 0 | Format("No consumer tablegroup found for producer tablegroup: $0", |
3488 | 0 | producer_tablegroup_id); |
3489 | 0 | MarkUniverseReplicationFailed(universe, STATUS(IllegalState, message)); |
3490 | 0 | LOG(ERROR) << message; |
3491 | 0 | return; |
3492 | 0 | } |
3493 | 0 | consumer_tablegroup_id = result.value(); |
3494 | 0 | } |
3495 | | |
3496 | | // tables_in_consumer_tablegroup are the tables listed within the consumer_tablegroup_id. |
3497 | | // We need validated_consumer_tables and tables_in_consumer_tablegroup to be identical. |
3498 | 0 | std::unordered_set<TableId> tables_in_consumer_tablegroup; |
3499 | 0 | { |
3500 | 0 | GetTablegroupSchemaRequestPB req; |
3501 | 0 | GetTablegroupSchemaResponsePB resp; |
3502 | 0 | req.mutable_parent_tablegroup()->set_id(consumer_tablegroup_id); |
3503 | 0 | Status status = GetTablegroupSchema(&req, &resp); |
3504 | 0 | if (!status.ok() || resp.has_error()) { |
3505 | 0 | std::string message = Format("Error when getting consumer tablegroup schema: $0", |
3506 | 0 | consumer_tablegroup_id); |
3507 | 0 | MarkUniverseReplicationFailed(universe, STATUS(IllegalState, message)); |
3508 | 0 | LOG(ERROR) << message; |
3509 | 0 | return; |
3510 | 0 | } |
3511 | | |
3512 | 0 | for (const auto& info : resp.get_table_schema_response_pbs()) { |
3513 | 0 | tables_in_consumer_tablegroup.insert(info.identifier().table_id()); |
3514 | 0 | } |
3515 | 0 | } |
3516 | | |
3517 | 0 | if (validated_consumer_tables != tables_in_consumer_tablegroup) { |
3518 | 0 | std::ostringstream validated_tables_oss; |
3519 | 0 | for (auto it = validated_consumer_tables.begin(); |
3520 | 0 | it != validated_consumer_tables.end(); it++) { |
3521 | 0 | validated_tables_oss << (it == validated_consumer_tables.begin() ? "" : ",") << *it; |
3522 | 0 | } |
3523 | 0 | std::ostringstream consumer_tables_oss; |
3524 | 0 | for (auto it = tables_in_consumer_tablegroup.begin(); |
3525 | 0 | it != tables_in_consumer_tablegroup.end(); it++) { |
3526 | 0 | consumer_tables_oss << (it == tables_in_consumer_tablegroup.begin() ? "" : ",") << *it; |
3527 | 0 | } |
3528 | |
|
3529 | 0 | std::string message = |
3530 | 0 | Format("Mismatch between tables associated with producer tablegroup $0 and " |
3531 | 0 | "tables in consumer tablegroup $1: ($2) vs ($3).", |
3532 | 0 | producer_tablegroup_id, consumer_tablegroup_id, |
3533 | 0 | validated_tables_oss.str(), consumer_tables_oss.str()); |
3534 | 0 | MarkUniverseReplicationFailed(universe, STATUS(IllegalState, message)); |
3535 | 0 | LOG(ERROR) << message; |
3536 | 0 | return; |
3537 | 0 | } |
3538 | | |
3539 | 0 | Status status = AddValidatedTableAndCreateCdcStreams(universe, |
3540 | 0 | table_bootstrap_ids, |
3541 | 0 | producer_tablegroup_id, |
3542 | 0 | consumer_tablegroup_id); |
3543 | 0 | if (!status.ok()) { |
3544 | 0 | LOG(ERROR) << "Found error while adding validated table to system catalog: " |
3545 | 0 | << producer_tablegroup_id << ": " << status; |
3546 | 0 | return; |
3547 | 0 | } |
3548 | 0 | } |
3549 | | |
3550 | | void CatalogManager::GetColocatedTabletSchemaCallback( |
3551 | | const std::string& universe_id, const std::shared_ptr<std::vector<client::YBTableInfo>>& infos, |
3552 | 0 | const std::unordered_map<TableId, std::string>& table_bootstrap_ids, const Status& s) { |
3553 | | // First get the universe. |
3554 | 0 | scoped_refptr<UniverseReplicationInfo> universe; |
3555 | 0 | { |
3556 | 0 | SharedLock lock(mutex_); |
3557 | 0 | TRACE("Acquired catalog manager lock"); |
3558 | |
|
3559 | 0 | universe = FindPtrOrNull(universe_replication_map_, universe_id); |
3560 | 0 | if (universe == nullptr) { |
3561 | 0 | LOG(ERROR) << "Universe not found: " << universe_id; |
3562 | 0 | return; |
3563 | 0 | } |
3564 | 0 | } |
3565 | | |
3566 | 0 | if (!s.ok()) { |
3567 | 0 | MarkUniverseReplicationFailed(universe, s); |
3568 | 0 | std::ostringstream oss; |
3569 | 0 | for (size_t i = 0; i < infos->size(); ++i) { |
3570 | 0 | oss << ((i == 0) ? "" : ", ") << (*infos)[i].table_id; |
3571 | 0 | } |
3572 | 0 | LOG(ERROR) << "Error getting schema for tables: [ " << oss.str() << " ]: " << s; |
3573 | 0 | return; |
3574 | 0 | } |
3575 | | |
3576 | 0 | if (infos->empty()) { |
3577 | 0 | LOG(WARNING) << "Received empty list of tables to validate: " << s; |
3578 | 0 | return; |
3579 | 0 | } |
3580 | | |
3581 | | // Validate table schemas. |
3582 | 0 | std::unordered_set<TableId> producer_parent_table_ids; |
3583 | 0 | std::unordered_set<TableId> consumer_parent_table_ids; |
3584 | 0 | for (const auto& info : *infos) { |
3585 | | // Verify that we have a colocated table. |
3586 | 0 | if (!info.colocated) { |
3587 | 0 | MarkUniverseReplicationFailed(universe, |
3588 | 0 | STATUS(InvalidArgument, Substitute("Received non-colocated table: $0", info.table_id))); |
3589 | 0 | LOG(ERROR) << "Received non-colocated table: " << info.table_id; |
3590 | 0 | return; |
3591 | 0 | } |
3592 | | // Validate each table, and get the parent colocated table id for the consumer. |
3593 | 0 | GetTableSchemaResponsePB resp; |
3594 | 0 | Status table_status = ValidateTableSchema(std::make_shared<client::YBTableInfo>(info), |
3595 | 0 | table_bootstrap_ids, |
3596 | 0 | &resp); |
3597 | 0 | if (!table_status.ok()) { |
3598 | 0 | MarkUniverseReplicationFailed(universe, table_status); |
3599 | 0 | LOG(ERROR) << "Found error while validating table schema for table " << info.table_id |
3600 | 0 | << ": " << table_status; |
3601 | 0 | return; |
3602 | 0 | } |
3603 | | // Store the parent table ids. |
3604 | 0 | producer_parent_table_ids.insert( |
3605 | 0 | info.table_name.namespace_id() + kColocatedParentTableIdSuffix); |
3606 | 0 | consumer_parent_table_ids.insert( |
3607 | 0 | resp.identifier().namespace_().id() + kColocatedParentTableIdSuffix); |
3608 | 0 | } |
3609 | | |
3610 | | // Verify that we only found one producer and one consumer colocated parent table id. |
3611 | 0 | if (producer_parent_table_ids.size() != 1) { |
3612 | 0 | std::ostringstream oss; |
3613 | 0 | for (auto it = producer_parent_table_ids.begin(); it != producer_parent_table_ids.end(); ++it) { |
3614 | 0 | oss << ((it == producer_parent_table_ids.begin()) ? "" : ", ") << *it; |
3615 | 0 | } |
3616 | 0 | MarkUniverseReplicationFailed(universe, STATUS(InvalidArgument, |
3617 | 0 | Substitute("Found incorrect number of producer colocated parent table ids. " |
3618 | 0 | "Expected 1, but found: [ $0 ]", oss.str()))); |
3619 | 0 | LOG(ERROR) << "Found incorrect number of producer colocated parent table ids. " |
3620 | 0 | << "Expected 1, but found: [ " << oss.str() << " ]"; |
3621 | 0 | return; |
3622 | 0 | } |
3623 | 0 | if (consumer_parent_table_ids.size() != 1) { |
3624 | 0 | std::ostringstream oss; |
3625 | 0 | for (auto it = consumer_parent_table_ids.begin(); it != consumer_parent_table_ids.end(); ++it) { |
3626 | 0 | oss << ((it == consumer_parent_table_ids.begin()) ? "" : ", ") << *it; |
3627 | 0 | } |
3628 | 0 | MarkUniverseReplicationFailed(universe, STATUS(InvalidArgument, |
3629 | 0 | Substitute("Found incorrect number of consumer colocated parent table ids. " |
3630 | 0 | "Expected 1, but found: [ $0 ]", oss.str()))); |
3631 | 0 | LOG(ERROR) << "Found incorrect number of consumer colocated parent table ids. " |
3632 | 0 | << "Expected 1, but found: [ " << oss.str() << " ]"; |
3633 | 0 | return; |
3634 | 0 | } |
3635 | | |
3636 | 0 | Status status = AddValidatedTableAndCreateCdcStreams(universe, |
3637 | 0 | table_bootstrap_ids, |
3638 | 0 | *producer_parent_table_ids.begin(), |
3639 | 0 | *consumer_parent_table_ids.begin()); |
3640 | 0 | if (!status.ok()) { |
3641 | 0 | LOG(ERROR) << "Found error while adding validated table to system catalog: " |
3642 | 0 | << *producer_parent_table_ids.begin() << ": " << status; |
3643 | 0 | return; |
3644 | 0 | } |
3645 | 0 | } |
3646 | | |
3647 | | void CatalogManager::GetCDCStreamCallback( |
3648 | | const CDCStreamId& bootstrap_id, |
3649 | | std::shared_ptr<TableId> table_id, |
3650 | | std::shared_ptr<std::unordered_map<std::string, std::string>> options, |
3651 | | const std::string& universe_id, |
3652 | | const TableId& table, |
3653 | | std::shared_ptr<CDCRpcTasks> cdc_rpc, |
3654 | 0 | const Status& s) { |
3655 | 0 | if (!s.ok()) { |
3656 | 0 | LOG(ERROR) << "Unable to find bootstrap id " << bootstrap_id; |
3657 | 0 | AddCDCStreamToUniverseAndInitConsumer(universe_id, table, s); |
3658 | 0 | } else { |
3659 | 0 | if (*table_id != table) { |
3660 | 0 | const Status invalid_bootstrap_id_status = STATUS_FORMAT( |
3661 | 0 | InvalidArgument, "Invalid bootstrap id for table $0. Bootstrap id $1 belongs to table $2", |
3662 | 0 | table, bootstrap_id, *table_id); |
3663 | 0 | LOG(ERROR) << invalid_bootstrap_id_status; |
3664 | 0 | AddCDCStreamToUniverseAndInitConsumer(universe_id, table, invalid_bootstrap_id_status); |
3665 | 0 | return; |
3666 | 0 | } |
3667 | | // todo check options |
3668 | 0 | AddCDCStreamToUniverseAndInitConsumer(universe_id, table, bootstrap_id, [&] () { |
3669 | | // Extra callback on universe setup success - update the producer to let it know that |
3670 | | // the bootstrapping is complete. |
3671 | 0 | SysCDCStreamEntryPB new_entry; |
3672 | 0 | new_entry.add_table_id(*table_id); |
3673 | 0 | new_entry.mutable_options()->Reserve(narrow_cast<int>(options->size())); |
3674 | 0 | for (const auto& option : *options) { |
3675 | 0 | auto new_option = new_entry.add_options(); |
3676 | 0 | new_option->set_key(option.first); |
3677 | 0 | new_option->set_value(option.second); |
3678 | 0 | } |
3679 | 0 | new_entry.set_state(master::SysCDCStreamEntryPB::ACTIVE); |
3680 | |
|
3681 | 0 | WARN_NOT_OK(cdc_rpc->client()->UpdateCDCStream(bootstrap_id, new_entry), |
3682 | 0 | "Unable to update CDC stream options"); |
3683 | 0 | }); |
3684 | 0 | } |
3685 | 0 | } |
3686 | | |
3687 | | void CatalogManager::AddCDCStreamToUniverseAndInitConsumer( |
3688 | | const std::string& universe_id, const TableId& table_id, const Result<CDCStreamId>& stream_id, |
3689 | 0 | std::function<void()> on_success_cb) { |
3690 | 0 | scoped_refptr<UniverseReplicationInfo> universe; |
3691 | 0 | { |
3692 | 0 | SharedLock lock(mutex_); |
3693 | 0 | TRACE("Acquired catalog manager lock"); |
3694 | |
|
3695 | 0 | universe = FindPtrOrNull(universe_replication_map_, universe_id); |
3696 | 0 | if (universe == nullptr) { |
3697 | 0 | LOG(ERROR) << "Universe not found: " << universe_id; |
3698 | 0 | return; |
3699 | 0 | } |
3700 | 0 | } |
3701 | | |
3702 | 0 | if (!stream_id.ok()) { |
3703 | 0 | LOG(ERROR) << "Error setting up CDC stream for table " << table_id; |
3704 | 0 | MarkUniverseReplicationFailed(universe, ResultToStatus(stream_id)); |
3705 | 0 | return; |
3706 | 0 | } |
3707 | | |
3708 | 0 | bool merge_alter = false; |
3709 | 0 | bool validated_all_tables = false; |
3710 | 0 | { |
3711 | 0 | auto l = universe->LockForWrite(); |
3712 | 0 | if (l->is_deleted_or_failed()) { |
3713 | | // Nothing to do if universe is being deleted. |
3714 | 0 | return; |
3715 | 0 | } |
3716 | | |
3717 | 0 | auto map = l.mutable_data()->pb.mutable_table_streams(); |
3718 | 0 | (*map)[table_id] = *stream_id; |
3719 | | |
3720 | | // This functions as a barrier: waiting for the last RPC call from GetTableSchemaCallback. |
3721 | 0 | if (l.mutable_data()->pb.table_streams_size() == l->pb.tables_size()) { |
3722 | | // All tables successfully validated! Register CDC consumers & start replication. |
3723 | 0 | validated_all_tables = true; |
3724 | 0 | LOG(INFO) << "Registering CDC consumers for universe " << universe->id(); |
3725 | |
|
3726 | 0 | auto& validated_tables = l->pb.validated_tables(); |
3727 | |
|
3728 | 0 | std::vector<CDCConsumerStreamInfo> consumer_info; |
3729 | 0 | consumer_info.reserve(l->pb.tables_size()); |
3730 | 0 | for (const auto& table : validated_tables) { |
3731 | 0 | CDCConsumerStreamInfo info; |
3732 | 0 | info.producer_table_id = table.first; |
3733 | 0 | info.consumer_table_id = table.second; |
3734 | 0 | info.stream_id = (*map)[info.producer_table_id]; |
3735 | 0 | consumer_info.push_back(info); |
3736 | 0 | } |
3737 | |
|
3738 | 0 | std::vector<HostPort> hp; |
3739 | 0 | HostPortsFromPBs(l->pb.producer_master_addresses(), &hp); |
3740 | |
|
3741 | 0 | auto cdc_rpc_tasks_result = |
3742 | 0 | universe->GetOrCreateCDCRpcTasks(l->pb.producer_master_addresses()); |
3743 | 0 | if (!cdc_rpc_tasks_result.ok()) { |
3744 | 0 | LOG(WARNING) << "CDC streams won't be created: " << cdc_rpc_tasks_result; |
3745 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED); |
3746 | 0 | } else { |
3747 | 0 | auto cdc_rpc_tasks = *cdc_rpc_tasks_result; |
3748 | 0 | Status s = InitCDCConsumer( |
3749 | 0 | consumer_info, HostPort::ToCommaSeparatedString(hp), l->pb.producer_id(), |
3750 | 0 | cdc_rpc_tasks); |
3751 | 0 | if (!s.ok()) { |
3752 | 0 | LOG(ERROR) << "Error registering subscriber: " << s; |
3753 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED); |
3754 | 0 | } else { |
3755 | 0 | GStringPiece original_producer_id(universe->id()); |
3756 | 0 | if (original_producer_id.ends_with(".ALTER")) { |
3757 | | // Don't enable ALTER universes, merge them into the main universe instead. |
3758 | 0 | merge_alter = true; |
3759 | 0 | } else { |
3760 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::ACTIVE); |
3761 | 0 | } |
3762 | 0 | } |
3763 | 0 | } |
3764 | 0 | } |
3765 | | |
3766 | | // Update sys_catalog with new producer table id info. |
3767 | 0 | Status status = sys_catalog_->Upsert(leader_ready_term(), universe); |
3768 | | |
3769 | | // Before committing, run any callbacks on success. |
3770 | 0 | if (status.ok() && on_success_cb && |
3771 | 0 | (l.mutable_data()->pb.state() == SysUniverseReplicationEntryPB::ACTIVE || merge_alter)) { |
3772 | 0 | on_success_cb(); |
3773 | 0 | } |
3774 | |
|
3775 | 0 | l.CommitOrWarn(status, "updating universe replication info in sys-catalog"); |
3776 | 0 | } |
3777 | | |
3778 | 0 | if (validated_all_tables) { |
3779 | | // Also update the set of consumer tables. |
3780 | 0 | LockGuard lock(mutex_); |
3781 | 0 | auto l = universe->LockForRead(); |
3782 | 0 | for (const auto& table : l->pb.validated_tables()) { |
3783 | 0 | xcluster_consumer_tables_to_stream_map_[table.second].emplace(universe->id(), *stream_id); |
3784 | 0 | } |
3785 | 0 | } |
3786 | | |
3787 | | // If this is an 'alter', merge back into primary command now that setup is a success. |
3788 | 0 | if (merge_alter) { |
3789 | 0 | MergeUniverseReplication(universe); |
3790 | 0 | } |
3791 | 0 | } |
3792 | | |
3793 | | /* |
3794 | | * UpdateXClusterConsumerOnTabletSplit updates the consumer -> producer tablet mapping after a local |
3795 | | * tablet split. |
3796 | | */ |
3797 | | Status CatalogManager::UpdateXClusterConsumerOnTabletSplit( |
3798 | | const TableId& consumer_table_id, |
3799 | 140 | const SplitTabletIds& split_tablet_ids) { |
3800 | | // Check if this table is consuming a stream. |
3801 | 140 | XClusterConsumerTableStreamInfoMap stream_infos = |
3802 | 140 | GetXClusterStreamInfoForConsumerTable(consumer_table_id); |
3803 | | |
3804 | 140 | if (stream_infos.empty()) { |
3805 | 140 | return Status::OK(); |
3806 | 140 | } |
3807 | | |
3808 | 0 | auto cluster_config = ClusterConfig(); |
3809 | 0 | auto l = cluster_config->LockForWrite(); |
3810 | 0 | for (const auto& stream_info : stream_infos) { |
3811 | 0 | std::string universe_id = stream_info.first; |
3812 | 0 | CDCStreamId stream_id = stream_info.second; |
3813 | | // Fetch the stream entry so we can update the mappings. |
3814 | 0 | auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
3815 | 0 | auto producer_entry = FindOrNull(*producer_map, universe_id); |
3816 | 0 | if (!producer_entry) { |
3817 | 0 | return STATUS_FORMAT(NotFound, |
3818 | 0 | "Unable to find the producer entry for universe $0", |
3819 | 0 | universe_id); |
3820 | 0 | } |
3821 | 0 | auto stream_entry = FindOrNull(*producer_entry->mutable_stream_map(), stream_id); |
3822 | 0 | if (!stream_entry) { |
3823 | 0 | return STATUS_FORMAT(NotFound, |
3824 | 0 | "Unable to find the stream entry for universe $0, stream $1", |
3825 | 0 | universe_id, |
3826 | 0 | stream_id); |
3827 | 0 | } |
3828 | | |
3829 | 0 | RETURN_NOT_OK(UpdateTableMappingOnTabletSplit(stream_entry, split_tablet_ids)); |
3830 | | |
3831 | | // We also need to mark this stream as no longer being able to perform 1-1 mappings. |
3832 | 0 | stream_entry->set_same_num_producer_consumer_tablets(false); |
3833 | 0 | } |
3834 | | |
3835 | | // Also bump the cluster_config_ version so that changes are propagated to tservers. |
3836 | 0 | l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1); |
3837 | |
|
3838 | 0 | RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()), |
3839 | 0 | "Updating cluster config in sys-catalog")); |
3840 | 0 | l.Commit(); |
3841 | |
|
3842 | 0 | return Status::OK(); |
3843 | 0 | } |
3844 | | |
3845 | | Status CatalogManager::UpdateXClusterProducerOnTabletSplit( |
3846 | | const TableId& producer_table_id, |
3847 | 140 | const SplitTabletIds& split_tablet_ids) { |
3848 | | // First check if this table has any streams associated with it. |
3849 | 140 | auto streams = FindCDCStreamsForTable(producer_table_id); |
3850 | | |
3851 | 140 | if (!streams.empty()) { |
3852 | | // For each stream, need to add in the children entries to the cdc_state table. |
3853 | 0 | client::TableHandle cdc_table; |
3854 | 0 | const client::YBTableName cdc_state_table_name( |
3855 | 0 | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
3856 | 0 | auto ybclient = master_->cdc_state_client_initializer().client(); |
3857 | 0 | if (!ybclient) { |
3858 | 0 | return STATUS(IllegalState, "Client not initialized or shutting down"); |
3859 | 0 | } |
3860 | 0 | RETURN_NOT_OK(cdc_table.Open(cdc_state_table_name, ybclient)); |
3861 | 0 | std::shared_ptr<client::YBSession> session = ybclient->NewSession(); |
3862 | |
|
3863 | 0 | for (const auto& stream : streams) { |
3864 | 0 | for (const auto& child_tablet_id : |
3865 | 0 | {split_tablet_ids.children.first, split_tablet_ids.children.second}) { |
3866 | 0 | const auto insert_op = cdc_table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
3867 | 0 | auto* insert_req = insert_op->mutable_request(); |
3868 | 0 | QLAddStringHashValue(insert_req, child_tablet_id); |
3869 | 0 | QLAddStringRangeValue(insert_req, stream->id()); |
3870 | | // TODO(JHE) set the checkpoint to a different value? OpId of the SPLIT_OP itself perhaps? |
3871 | 0 | cdc_table.AddStringColumnValue(insert_req, master::kCdcCheckpoint, OpId().ToString()); |
3872 | | // TODO(JHE) what to set the time to? |
3873 | | // cdc_table.AddTimestampColumnValue( |
3874 | | // insert_req, master::kCdcLastReplicationTime, GetCurrentTimeMicros()); |
3875 | 0 | session->Apply(insert_op); |
3876 | 0 | } |
3877 | 0 | } |
3878 | 0 | RETURN_NOT_OK(session->Flush()); |
3879 | 0 | } |
3880 | | |
3881 | 140 | return Status::OK(); |
3882 | 140 | } |
3883 | | |
3884 | | Status CatalogManager::InitCDCConsumer( |
3885 | | const std::vector<CDCConsumerStreamInfo>& consumer_info, |
3886 | | const std::string& master_addrs, |
3887 | | const std::string& producer_universe_uuid, |
3888 | 0 | std::shared_ptr<CDCRpcTasks> cdc_rpc_tasks) { |
3889 | |
|
3890 | 0 | std::unordered_set<HostPort, HostPortHash> tserver_addrs; |
3891 | | // Get the tablets in the consumer table. |
3892 | 0 | cdc::ProducerEntryPB producer_entry; |
3893 | 0 | for (const auto& stream_info : consumer_info) { |
3894 | 0 | GetTableLocationsRequestPB consumer_table_req; |
3895 | 0 | consumer_table_req.set_max_returned_locations(std::numeric_limits<int32_t>::max()); |
3896 | 0 | GetTableLocationsResponsePB consumer_table_resp; |
3897 | 0 | TableIdentifierPB table_identifer; |
3898 | 0 | table_identifer.set_table_id(stream_info.consumer_table_id); |
3899 | 0 | *(consumer_table_req.mutable_table()) = table_identifer; |
3900 | 0 | RETURN_NOT_OK(GetTableLocations(&consumer_table_req, &consumer_table_resp)); |
3901 | | |
3902 | 0 | cdc::StreamEntryPB stream_entry; |
3903 | | // Get producer tablets and map them to the consumer tablets |
3904 | 0 | RETURN_NOT_OK(CreateTabletMapping( |
3905 | 0 | stream_info.producer_table_id, stream_info.consumer_table_id, producer_universe_uuid, |
3906 | 0 | master_addrs, consumer_table_resp, &tserver_addrs, &stream_entry, cdc_rpc_tasks)); |
3907 | 0 | (*producer_entry.mutable_stream_map())[stream_info.stream_id] = std::move(stream_entry); |
3908 | 0 | } |
3909 | | |
3910 | | // Log the Network topology of the Producer Cluster |
3911 | 0 | auto master_addrs_list = StringSplit(master_addrs, ','); |
3912 | 0 | producer_entry.mutable_master_addrs()->Reserve(narrow_cast<int>(master_addrs_list.size())); |
3913 | 0 | for (const auto& addr : master_addrs_list) { |
3914 | 0 | auto hp = VERIFY_RESULT(HostPort::FromString(addr, 0)); |
3915 | 0 | HostPortToPB(hp, producer_entry.add_master_addrs()); |
3916 | 0 | } |
3917 | | |
3918 | 0 | producer_entry.mutable_tserver_addrs()->Reserve(narrow_cast<int>(tserver_addrs.size())); |
3919 | 0 | for (const auto& addr : tserver_addrs) { |
3920 | 0 | HostPortToPB(addr, producer_entry.add_tserver_addrs()); |
3921 | 0 | } |
3922 | |
|
3923 | 0 | auto cluster_config = ClusterConfig(); |
3924 | 0 | auto l = cluster_config->LockForWrite(); |
3925 | 0 | auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
3926 | 0 | auto it = producer_map->find(producer_universe_uuid); |
3927 | 0 | if (it != producer_map->end()) { |
3928 | 0 | return STATUS(InvalidArgument, "Already created a consumer for this universe"); |
3929 | 0 | } |
3930 | | |
3931 | | // TServers will use the ClusterConfig to create CDC Consumers for applicable local tablets. |
3932 | 0 | (*producer_map)[producer_universe_uuid] = std::move(producer_entry); |
3933 | 0 | l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1); |
3934 | 0 | RETURN_NOT_OK(CheckStatus( |
3935 | 0 | sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()), |
3936 | 0 | "updating cluster config in sys-catalog")); |
3937 | 0 | l.Commit(); |
3938 | |
|
3939 | 0 | return Status::OK(); |
3940 | 0 | } |
3941 | | |
3942 | 0 | void CatalogManager::MergeUniverseReplication(scoped_refptr<UniverseReplicationInfo> universe) { |
3943 | | // Merge back into primary command now that setup is a success. |
3944 | 0 | GStringPiece original_producer_id(universe->id()); |
3945 | 0 | if (!original_producer_id.ends_with(".ALTER")) { |
3946 | 0 | return; |
3947 | 0 | } |
3948 | 0 | original_producer_id.remove_suffix(sizeof(".ALTER")-1 /* exclude \0 ending */); |
3949 | 0 | LOG(INFO) << "Merging CDC universe: " << universe->id() |
3950 | 0 | << " into " << original_producer_id.ToString(); |
3951 | |
|
3952 | 0 | scoped_refptr<UniverseReplicationInfo> original_universe; |
3953 | 0 | { |
3954 | 0 | SharedLock lock(mutex_); |
3955 | 0 | TRACE("Acquired catalog manager lock"); |
3956 | |
|
3957 | 0 | original_universe = FindPtrOrNull(universe_replication_map_, original_producer_id.ToString()); |
3958 | 0 | if (original_universe == nullptr) { |
3959 | 0 | LOG(ERROR) << "Universe not found: " << original_producer_id.ToString(); |
3960 | 0 | return; |
3961 | 0 | } |
3962 | 0 | } |
3963 | | // Merge Cluster Config for TServers. |
3964 | 0 | { |
3965 | 0 | auto cluster_config = ClusterConfig(); |
3966 | 0 | auto cl = cluster_config->LockForWrite(); |
3967 | 0 | auto pm = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
3968 | 0 | auto original_producer_entry = pm->find(original_universe->id()); |
3969 | 0 | auto alter_producer_entry = pm->find(universe->id()); |
3970 | 0 | if (original_producer_entry != pm->end() && alter_producer_entry != pm->end()) { |
3971 | | // Merge the Tables from the Alter into the original. |
3972 | 0 | auto as = alter_producer_entry->second.stream_map(); |
3973 | 0 | original_producer_entry->second.mutable_stream_map()->insert(as.begin(), as.end()); |
3974 | | // Delete the Alter |
3975 | 0 | pm->erase(alter_producer_entry); |
3976 | 0 | } else { |
3977 | 0 | LOG(WARNING) << "Could not find both universes in Cluster Config: " << universe->id(); |
3978 | 0 | } |
3979 | 0 | cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1); |
3980 | 0 | const Status s = sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()); |
3981 | 0 | cl.CommitOrWarn(s, "updating cluster config in sys-catalog"); |
3982 | 0 | } |
3983 | | // Merge Master Config on Consumer. (no need for Producer changes, since it uses stream_id) |
3984 | 0 | { |
3985 | 0 | auto original_lock = original_universe->LockForWrite(); |
3986 | 0 | auto alter_lock = universe->LockForWrite(); |
3987 | | // Merge Table->StreamID mapping. |
3988 | 0 | auto at = alter_lock.mutable_data()->pb.mutable_tables(); |
3989 | 0 | original_lock.mutable_data()->pb.mutable_tables()->MergeFrom(*at); |
3990 | 0 | at->Clear(); |
3991 | 0 | auto as = alter_lock.mutable_data()->pb.mutable_table_streams(); |
3992 | 0 | original_lock.mutable_data()->pb.mutable_table_streams()->insert(as->begin(), as->end()); |
3993 | 0 | as->clear(); |
3994 | 0 | auto av = alter_lock.mutable_data()->pb.mutable_validated_tables(); |
3995 | 0 | original_lock.mutable_data()->pb.mutable_validated_tables()->insert(av->begin(), av->end()); |
3996 | 0 | av->clear(); |
3997 | 0 | alter_lock.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DELETED); |
3998 | |
|
3999 | 0 | vector<UniverseReplicationInfo*> universes{original_universe.get(), universe.get()}; |
4000 | 0 | const Status s = sys_catalog_->Upsert(leader_ready_term(), universes); |
4001 | 0 | alter_lock.CommitOrWarn(s, "updating universe replication entries in sys-catalog"); |
4002 | 0 | if (s.ok()) { |
4003 | 0 | original_lock.Commit(); |
4004 | 0 | } |
4005 | 0 | } |
4006 | | // TODO: universe_replication_map_.erase(universe->id()) at a later time. |
4007 | | // TwoDCTest.AlterUniverseReplicationTables crashes due to undiagnosed race right now. |
4008 | 0 | LOG(INFO) << "Done with Merging " << universe->id() << " into " << original_universe->id(); |
4009 | 0 | } |
4010 | | |
4011 | | Status ReturnErrorOrAddWarning(const Status& s, |
4012 | | const DeleteUniverseReplicationRequestPB* req, |
4013 | 0 | DeleteUniverseReplicationResponsePB* resp) { |
4014 | 0 | if (!s.ok()) { |
4015 | 0 | if (req->ignore_errors()) { |
4016 | | // Continue executing, save the status as a warning. |
4017 | 0 | AppStatusPB* warning = resp->add_warnings(); |
4018 | 0 | StatusToPB(s, warning); |
4019 | 0 | return Status::OK(); |
4020 | 0 | } |
4021 | 0 | return s.CloneAndAppend("\nUse 'ignore-errors' to ignore this error."); |
4022 | 0 | } |
4023 | 0 | return s; |
4024 | 0 | } |
4025 | | |
4026 | | Status CatalogManager::DeleteUniverseReplication(const DeleteUniverseReplicationRequestPB* req, |
4027 | | DeleteUniverseReplicationResponsePB* resp, |
4028 | 0 | rpc::RpcContext* rpc) { |
4029 | 0 | LOG(INFO) << "Servicing DeleteUniverseReplication request from " << RequestorString(rpc) |
4030 | 0 | << ": " << req->ShortDebugString(); |
4031 | |
|
4032 | 0 | if (!req->has_producer_id()) { |
4033 | 0 | return STATUS(InvalidArgument, "Producer universe ID required", req->ShortDebugString(), |
4034 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
4035 | 0 | } |
4036 | | |
4037 | 0 | scoped_refptr<UniverseReplicationInfo> ri; |
4038 | 0 | { |
4039 | 0 | SharedLock lock(mutex_); |
4040 | 0 | TRACE("Acquired catalog manager lock"); |
4041 | |
|
4042 | 0 | ri = FindPtrOrNull(universe_replication_map_, req->producer_id()); |
4043 | 0 | if (ri == nullptr) { |
4044 | 0 | return STATUS(NotFound, "Universe replication info does not exist", |
4045 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
4046 | 0 | } |
4047 | 0 | } |
4048 | | |
4049 | 0 | auto l = ri->LockForWrite(); |
4050 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DELETED); |
4051 | | |
4052 | | // Delete subscribers on the Consumer Registry (removes from TServers). |
4053 | 0 | LOG(INFO) << "Deleting subscribers for producer " << req->producer_id(); |
4054 | 0 | { |
4055 | 0 | auto cluster_config = ClusterConfig(); |
4056 | 0 | auto cl = cluster_config->LockForWrite(); |
4057 | 0 | auto producer_map = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
4058 | 0 | auto it = producer_map->find(req->producer_id()); |
4059 | 0 | if (it != producer_map->end()) { |
4060 | 0 | producer_map->erase(it); |
4061 | 0 | cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1); |
4062 | 0 | RETURN_NOT_OK(CheckStatus( |
4063 | 0 | sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()), |
4064 | 0 | "updating cluster config in sys-catalog")); |
4065 | 0 | cl.Commit(); |
4066 | 0 | } |
4067 | 0 | } |
4068 | | |
4069 | | // Delete CDC stream config on the Producer. |
4070 | 0 | if (!l->pb.table_streams().empty()) { |
4071 | 0 | auto result = ri->GetOrCreateCDCRpcTasks(l->pb.producer_master_addresses()); |
4072 | 0 | if (!result.ok()) { |
4073 | 0 | LOG(WARNING) << "Unable to create cdc rpc task. CDC streams won't be deleted: " << result; |
4074 | 0 | } else { |
4075 | 0 | auto cdc_rpc = *result; |
4076 | 0 | vector<CDCStreamId> streams; |
4077 | 0 | std::unordered_map<CDCStreamId, TableId> stream_to_producer_table_id; |
4078 | 0 | for (const auto& table : l->pb.table_streams()) { |
4079 | 0 | streams.push_back(table.second); |
4080 | 0 | stream_to_producer_table_id.emplace(table.second, table.first); |
4081 | 0 | } |
4082 | |
|
4083 | 0 | DeleteCDCStreamResponsePB delete_cdc_stream_resp; |
4084 | | // Set force_delete=true since we are deleting active xCluster streams. |
4085 | 0 | auto s = cdc_rpc->client()->DeleteCDCStream(streams, |
4086 | 0 | true, /* force_delete */ |
4087 | 0 | req->ignore_errors(), |
4088 | 0 | &delete_cdc_stream_resp); |
4089 | |
|
4090 | 0 | if (delete_cdc_stream_resp.not_found_stream_ids().size() > 0) { |
4091 | 0 | std::ostringstream missing_streams; |
4092 | 0 | for (auto it = delete_cdc_stream_resp.not_found_stream_ids().begin(); |
4093 | 0 | it != delete_cdc_stream_resp.not_found_stream_ids().end(); |
4094 | 0 | ++it) { |
4095 | 0 | if (it != delete_cdc_stream_resp.not_found_stream_ids().begin()) { |
4096 | 0 | missing_streams << ","; |
4097 | 0 | } |
4098 | 0 | missing_streams << *it << " (table_id: " << stream_to_producer_table_id[*it] << ")"; |
4099 | 0 | } |
4100 | 0 | if (s.ok()) { |
4101 | | // Returned but did not find some streams, so still need to warn the user about those. |
4102 | 0 | s = STATUS(NotFound, |
4103 | 0 | "Could not find the following streams: [" + missing_streams.str() + "]."); |
4104 | 0 | } else { |
4105 | 0 | s = s.CloneAndPrepend( |
4106 | 0 | "Could not find the following streams: [" + missing_streams.str() + "]."); |
4107 | 0 | } |
4108 | 0 | } |
4109 | |
|
4110 | 0 | RETURN_NOT_OK(ReturnErrorOrAddWarning(s, req, resp)); |
4111 | 0 | } |
4112 | 0 | } |
4113 | | |
4114 | | // Delete universe in the Universe Config. |
4115 | 0 | RETURN_NOT_OK(ReturnErrorOrAddWarning(DeleteUniverseReplicationUnlocked(ri), req, resp)); |
4116 | 0 | l.Commit(); |
4117 | |
|
4118 | 0 | LOG(INFO) << "Processed delete universe replication " << ri->ToString() |
4119 | 0 | << " per request from " << RequestorString(rpc); |
4120 | |
|
4121 | 0 | return Status::OK(); |
4122 | 0 | } |
4123 | | |
4124 | | Status CatalogManager::DeleteUniverseReplicationUnlocked( |
4125 | 0 | scoped_refptr<UniverseReplicationInfo> universe) { |
4126 | | // Assumes that caller has locked universe. |
4127 | 0 | RETURN_NOT_OK_PREPEND( |
4128 | 0 | sys_catalog_->Delete(leader_ready_term(), universe), |
4129 | 0 | Substitute("An error occurred while updating sys-catalog, universe_id: $0", universe->id())); |
4130 | | |
4131 | | // Remove it from the map. |
4132 | 0 | LockGuard lock(mutex_); |
4133 | 0 | if (universe_replication_map_.erase(universe->id()) < 1) { |
4134 | 0 | LOG(WARNING) << "Failed to remove replication info from map: universe_id: " << universe->id(); |
4135 | 0 | } |
4136 | | // Also update the mapping of consumer tables. |
4137 | 0 | for (const auto& table : universe->metadata().state().pb.validated_tables()) { |
4138 | 0 | if (xcluster_consumer_tables_to_stream_map_[table.second].erase(universe->id()) < 1) { |
4139 | 0 | LOG(WARNING) << "Failed to remove consumer table from mapping. " |
4140 | 0 | << "table_id: " << table.second << ": universe_id: " << universe->id(); |
4141 | 0 | } |
4142 | 0 | if (xcluster_consumer_tables_to_stream_map_[table.second].empty()) { |
4143 | 0 | xcluster_consumer_tables_to_stream_map_.erase(table.second); |
4144 | 0 | } |
4145 | 0 | } |
4146 | 0 | return Status::OK(); |
4147 | 0 | } |
4148 | | |
4149 | | Status CatalogManager::SetUniverseReplicationEnabled( |
4150 | | const SetUniverseReplicationEnabledRequestPB* req, |
4151 | | SetUniverseReplicationEnabledResponsePB* resp, |
4152 | 0 | rpc::RpcContext* rpc) { |
4153 | 0 | LOG(INFO) << "Servicing SetUniverseReplicationEnabled request from " << RequestorString(rpc) |
4154 | 0 | << ": " << req->ShortDebugString(); |
4155 | | |
4156 | | // Sanity Checking Cluster State and Input. |
4157 | 0 | if (!req->has_producer_id()) { |
4158 | 0 | return STATUS(InvalidArgument, "Producer universe ID must be provided", |
4159 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4160 | 0 | } |
4161 | 0 | if (!req->has_is_enabled()) { |
4162 | 0 | return STATUS(InvalidArgument, "Must explicitly set whether to enable", |
4163 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4164 | 0 | } |
4165 | | |
4166 | 0 | scoped_refptr<UniverseReplicationInfo> universe; |
4167 | 0 | { |
4168 | 0 | SharedLock lock(mutex_); |
4169 | |
|
4170 | 0 | universe = FindPtrOrNull(universe_replication_map_, req->producer_id()); |
4171 | 0 | if (universe == nullptr) { |
4172 | 0 | return STATUS(NotFound, "Could not find CDC producer universe", |
4173 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
4174 | 0 | } |
4175 | 0 | } |
4176 | | |
4177 | | // Update the Master's Universe Config with the new state. |
4178 | 0 | { |
4179 | 0 | auto l = universe->LockForWrite(); |
4180 | 0 | if (l->pb.state() != SysUniverseReplicationEntryPB::DISABLED && |
4181 | 0 | l->pb.state() != SysUniverseReplicationEntryPB::ACTIVE) { |
4182 | 0 | return STATUS( |
4183 | 0 | InvalidArgument, |
4184 | 0 | Format("Universe Replication in invalid state: $0. Retry or Delete.", |
4185 | 0 | SysUniverseReplicationEntryPB::State_Name(l->pb.state())), |
4186 | 0 | req->ShortDebugString(), |
4187 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
4188 | 0 | } |
4189 | 0 | if (req->is_enabled()) { |
4190 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::ACTIVE); |
4191 | 0 | } else { // DISABLE. |
4192 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DISABLED); |
4193 | 0 | } |
4194 | 0 | RETURN_NOT_OK(CheckStatus( |
4195 | 0 | sys_catalog_->Upsert(leader_ready_term(), universe), |
4196 | 0 | "updating universe replication info in sys-catalog")); |
4197 | 0 | l.Commit(); |
4198 | 0 | } |
4199 | | |
4200 | | // Modify the Consumer Registry, which will fan out this info to all TServers on heartbeat. |
4201 | 0 | { |
4202 | 0 | auto cluster_config = ClusterConfig(); |
4203 | 0 | auto l = cluster_config->LockForWrite(); |
4204 | 0 | auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
4205 | 0 | auto it = producer_map->find(req->producer_id()); |
4206 | 0 | if (it == producer_map->end()) { |
4207 | 0 | LOG(WARNING) << "Valid Producer Universe not in Consumer Registry: " << req->producer_id(); |
4208 | 0 | return STATUS(NotFound, "Could not find CDC producer universe", |
4209 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
4210 | 0 | } |
4211 | 0 | (*it).second.set_disable_stream(!req->is_enabled()); |
4212 | 0 | l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1); |
4213 | 0 | RETURN_NOT_OK(CheckStatus( |
4214 | 0 | sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()), |
4215 | 0 | "updating cluster config in sys-catalog")); |
4216 | 0 | l.Commit(); |
4217 | 0 | } |
4218 | | |
4219 | 0 | return Status::OK(); |
4220 | 0 | } |
4221 | | |
4222 | | Status CatalogManager::AlterUniverseReplication(const AlterUniverseReplicationRequestPB* req, |
4223 | | AlterUniverseReplicationResponsePB* resp, |
4224 | 0 | rpc::RpcContext* rpc) { |
4225 | 0 | LOG(INFO) << "Servicing AlterUniverseReplication request from " << RequestorString(rpc) |
4226 | 0 | << ": " << req->ShortDebugString(); |
4227 | | |
4228 | | // Sanity Checking Cluster State and Input. |
4229 | 0 | if (!req->has_producer_id()) { |
4230 | 0 | return STATUS(InvalidArgument, "Producer universe ID must be provided", |
4231 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4232 | 0 | } |
4233 | | |
4234 | | // Verify that there is an existing Universe config |
4235 | 0 | scoped_refptr<UniverseReplicationInfo> original_ri; |
4236 | 0 | { |
4237 | 0 | SharedLock lock(mutex_); |
4238 | |
|
4239 | 0 | original_ri = FindPtrOrNull(universe_replication_map_, req->producer_id()); |
4240 | 0 | if (original_ri == nullptr) { |
4241 | 0 | return STATUS(NotFound, "Could not find CDC producer universe", |
4242 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
4243 | 0 | } |
4244 | 0 | } |
4245 | | |
4246 | | // Currently, config options are mutually exclusive to simplify transactionality. |
4247 | 0 | int config_count = (req->producer_master_addresses_size() > 0 ? 1 : 0) + |
4248 | 0 | (req->producer_table_ids_to_remove_size() > 0 ? 1 : 0) + |
4249 | 0 | (req->producer_table_ids_to_add_size() > 0 ? 1 : 0) + |
4250 | 0 | (req->has_new_producer_universe_id() ? 1 : 0); |
4251 | 0 | if (config_count != 1) { |
4252 | 0 | return STATUS(InvalidArgument, "Only 1 Alter operation per request currently supported", |
4253 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4254 | 0 | } |
4255 | | |
4256 | | // Config logic... |
4257 | 0 | if (req->producer_master_addresses_size() > 0) { |
4258 | | // 'set_master_addresses' |
4259 | | // TODO: Verify the input. Setup an RPC Task, ListTables, ensure same. |
4260 | | |
4261 | | // 1a. Persistent Config: Update the Universe Config for Master. |
4262 | 0 | { |
4263 | 0 | auto l = original_ri->LockForWrite(); |
4264 | 0 | l.mutable_data()->pb.mutable_producer_master_addresses()->CopyFrom( |
4265 | 0 | req->producer_master_addresses()); |
4266 | 0 | RETURN_NOT_OK(CheckStatus( |
4267 | 0 | sys_catalog_->Upsert(leader_ready_term(), original_ri), |
4268 | 0 | "updating universe replication info in sys-catalog")); |
4269 | 0 | l.Commit(); |
4270 | 0 | } |
4271 | | // 1b. Persistent Config: Update the Consumer Registry (updates TServers) |
4272 | 0 | { |
4273 | 0 | auto cluster_config = ClusterConfig(); |
4274 | 0 | auto l = cluster_config->LockForWrite(); |
4275 | 0 | auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
4276 | 0 | auto it = producer_map->find(req->producer_id()); |
4277 | 0 | if (it == producer_map->end()) { |
4278 | 0 | LOG(WARNING) << "Valid Producer Universe not in Consumer Registry: " << req->producer_id(); |
4279 | 0 | return STATUS(NotFound, "Could not find CDC producer universe", |
4280 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
4281 | 0 | } |
4282 | 0 | (*it).second.mutable_master_addrs()->CopyFrom(req->producer_master_addresses()); |
4283 | 0 | l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1); |
4284 | 0 | RETURN_NOT_OK(CheckStatus( |
4285 | 0 | sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()), |
4286 | 0 | "updating cluster config in sys-catalog")); |
4287 | 0 | l.Commit(); |
4288 | 0 | } |
4289 | | // 2. Memory Update: Change cdc_rpc_tasks (Master cache) |
4290 | 0 | { |
4291 | 0 | auto result = original_ri->GetOrCreateCDCRpcTasks(req->producer_master_addresses()); |
4292 | 0 | if (!result.ok()) { |
4293 | 0 | return SetupError(resp->mutable_error(), MasterErrorPB::INTERNAL_ERROR, result.status()); |
4294 | 0 | } |
4295 | 0 | } |
4296 | 0 | } else if (req->producer_table_ids_to_remove_size() > 0) { |
4297 | | // 'remove_table' |
4298 | 0 | auto it = req->producer_table_ids_to_remove(); |
4299 | 0 | std::set<string> table_ids_to_remove(it.begin(), it.end()); |
4300 | | // Filter out any tables that aren't in the existing replication config. |
4301 | 0 | { |
4302 | 0 | auto l = original_ri->LockForRead(); |
4303 | 0 | auto tbl_iter = l->pb.tables(); |
4304 | 0 | std::set<string> existing_tables(tbl_iter.begin(), tbl_iter.end()), filtered_list; |
4305 | 0 | set_intersection(table_ids_to_remove.begin(), table_ids_to_remove.end(), |
4306 | 0 | existing_tables.begin(), existing_tables.end(), |
4307 | 0 | std::inserter(filtered_list, filtered_list.begin())); |
4308 | 0 | filtered_list.swap(table_ids_to_remove); |
4309 | 0 | } |
4310 | |
|
4311 | 0 | vector<CDCStreamId> streams_to_remove; |
4312 | | // 1. Update the Consumer Registry (removes from TServers). |
4313 | 0 | { |
4314 | 0 | auto cluster_config = ClusterConfig(); |
4315 | 0 | auto cl = cluster_config->LockForWrite(); |
4316 | 0 | auto pm = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
4317 | 0 | auto producer_entry = pm->find(req->producer_id()); |
4318 | 0 | if (producer_entry != pm->end()) { |
4319 | | // Remove the Tables Specified (not part of the key). |
4320 | 0 | auto stream_map = producer_entry->second.mutable_stream_map(); |
4321 | 0 | for (auto& p : *stream_map) { |
4322 | 0 | if (table_ids_to_remove.count(p.second.producer_table_id()) > 0) { |
4323 | 0 | streams_to_remove.push_back(p.first); |
4324 | 0 | } |
4325 | 0 | } |
4326 | 0 | if (streams_to_remove.size() == stream_map->size()) { |
4327 | | // If this ends with an empty Map, disallow and force user to delete. |
4328 | 0 | LOG(WARNING) << "CDC 'remove_table' tried to remove all tables." << req->producer_id(); |
4329 | 0 | return STATUS( |
4330 | 0 | InvalidArgument, |
4331 | 0 | "Cannot remove all tables with alter. Use delete_universe_replication instead.", |
4332 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4333 | 0 | } else if (streams_to_remove.empty()) { |
4334 | | // If this doesn't delete anything, notify the user. |
4335 | 0 | return STATUS(InvalidArgument, "Removal matched no entries.", |
4336 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4337 | 0 | } |
4338 | 0 | for (auto& key : streams_to_remove) { |
4339 | 0 | stream_map->erase(stream_map->find(key)); |
4340 | 0 | } |
4341 | 0 | } |
4342 | 0 | cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1); |
4343 | 0 | RETURN_NOT_OK(CheckStatus( |
4344 | 0 | sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()), |
4345 | 0 | "updating cluster config in sys-catalog")); |
4346 | 0 | cl.Commit(); |
4347 | 0 | } |
4348 | | // 2. Remove from Master Configs on Producer and Consumer. |
4349 | 0 | { |
4350 | 0 | auto l = original_ri->LockForWrite(); |
4351 | 0 | if (!l->pb.table_streams().empty()) { |
4352 | | // Delete Relevant Table->StreamID mappings on Consumer. |
4353 | 0 | auto table_streams = l.mutable_data()->pb.mutable_table_streams(); |
4354 | 0 | auto validated_tables = l.mutable_data()->pb.mutable_validated_tables(); |
4355 | 0 | for (auto& key : table_ids_to_remove) { |
4356 | 0 | table_streams->erase(table_streams->find(key)); |
4357 | 0 | validated_tables->erase(validated_tables->find(key)); |
4358 | 0 | } |
4359 | 0 | for (int i = 0; i < l.mutable_data()->pb.tables_size(); i++) { |
4360 | 0 | if (table_ids_to_remove.count(l.mutable_data()->pb.tables(i)) > 0) { |
4361 | 0 | l.mutable_data()->pb.mutable_tables()->DeleteSubrange(i, 1); |
4362 | 0 | --i; |
4363 | 0 | } |
4364 | 0 | } |
4365 | | // Delete CDC stream config on the Producer. |
4366 | 0 | auto result = original_ri->GetOrCreateCDCRpcTasks(l->pb.producer_master_addresses()); |
4367 | 0 | if (!result.ok()) { |
4368 | 0 | LOG(WARNING) << "Unable to create cdc rpc task. CDC streams won't be deleted: " << result; |
4369 | 0 | } else { |
4370 | 0 | auto s = (*result)->client()->DeleteCDCStream(streams_to_remove, |
4371 | 0 | true /* force_delete */); |
4372 | 0 | if (!s.ok()) { |
4373 | 0 | std::stringstream os; |
4374 | 0 | std::copy(streams_to_remove.begin(), streams_to_remove.end(), |
4375 | 0 | std::ostream_iterator<CDCStreamId>(os, ", ")); |
4376 | 0 | LOG(WARNING) << "Unable to delete CDC streams: " << os.str() << s; |
4377 | 0 | } |
4378 | 0 | } |
4379 | 0 | } |
4380 | 0 | RETURN_NOT_OK(CheckStatus( |
4381 | 0 | sys_catalog_->Upsert(leader_ready_term(), original_ri), |
4382 | 0 | "updating universe replication info in sys-catalog")); |
4383 | 0 | l.Commit(); |
4384 | 0 | } |
4385 | 0 | } else if (req->producer_table_ids_to_add_size() > 0) { |
4386 | | // 'add_table' |
4387 | 0 | string alter_producer_id = req->producer_id() + ".ALTER"; |
4388 | | |
4389 | | // If user passed in bootstrap ids, check that there is a bootstrap id for every table. |
4390 | 0 | if (req->producer_bootstrap_ids_to_add().size() > 0 && |
4391 | 0 | req->producer_table_ids_to_add().size() != req->producer_bootstrap_ids_to_add().size()) { |
4392 | 0 | return STATUS(InvalidArgument, "Number of bootstrap ids must be equal to number of tables", |
4393 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4394 | 0 | } |
4395 | | |
4396 | | // Verify no 'alter' command running. |
4397 | 0 | scoped_refptr<UniverseReplicationInfo> alter_ri; |
4398 | 0 | { |
4399 | 0 | SharedLock lock(mutex_); |
4400 | 0 | alter_ri = FindPtrOrNull(universe_replication_map_, alter_producer_id); |
4401 | 0 | } |
4402 | 0 | { |
4403 | 0 | if (alter_ri != nullptr) { |
4404 | 0 | LOG(INFO) << "Found " << alter_producer_id << "... Removing"; |
4405 | 0 | if (alter_ri->LockForRead()->is_deleted_or_failed()) { |
4406 | | // Delete previous Alter if it's completed but failed. |
4407 | 0 | master::DeleteUniverseReplicationRequestPB delete_req; |
4408 | 0 | delete_req.set_producer_id(alter_ri->id()); |
4409 | 0 | master::DeleteUniverseReplicationResponsePB delete_resp; |
4410 | 0 | Status s = DeleteUniverseReplication(&delete_req, &delete_resp, rpc); |
4411 | 0 | if (!s.ok()) { |
4412 | 0 | if (delete_resp.has_error()) { |
4413 | 0 | resp->mutable_error()->Swap(delete_resp.mutable_error()); |
4414 | 0 | return s; |
4415 | 0 | } |
4416 | 0 | return SetupError(resp->mutable_error(), s); |
4417 | 0 | } |
4418 | 0 | } else { |
4419 | 0 | return STATUS(InvalidArgument, "Alter for CDC producer currently running", |
4420 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4421 | 0 | } |
4422 | 0 | } |
4423 | 0 | } |
4424 | | |
4425 | | // Map each table id to its corresponding bootstrap id. |
4426 | 0 | std::unordered_map<TableId, std::string> table_id_to_bootstrap_id; |
4427 | 0 | if (req->producer_bootstrap_ids_to_add().size() > 0) { |
4428 | 0 | for (int i = 0; i < req->producer_table_ids_to_add().size(); i++) { |
4429 | 0 | table_id_to_bootstrap_id[req->producer_table_ids_to_add(i)] |
4430 | 0 | = req->producer_bootstrap_ids_to_add(i); |
4431 | 0 | } |
4432 | | |
4433 | | // Ensure that table ids are unique. We need to do this here even though |
4434 | | // the same check is performed by SetupUniverseReplication because |
4435 | | // duplicate table ids can cause a bootstrap id entry in table_id_to_bootstrap_id |
4436 | | // to be overwritten. |
4437 | 0 | if (table_id_to_bootstrap_id.size() != |
4438 | 0 | implicit_cast<size_t>(req->producer_table_ids_to_add().size())) { |
4439 | 0 | return STATUS(InvalidArgument, "When providing bootstrap ids, " |
4440 | 0 | "the list of tables must be unique", req->ShortDebugString(), |
4441 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
4442 | 0 | } |
4443 | 0 | } |
4444 | | |
4445 | | // Only add new tables. Ignore tables that are currently being replicated. |
4446 | 0 | auto tid_iter = req->producer_table_ids_to_add(); |
4447 | 0 | std::unordered_set<string> new_tables(tid_iter.begin(), tid_iter.end()); |
4448 | 0 | { |
4449 | 0 | auto l = original_ri->LockForRead(); |
4450 | 0 | for(auto t : l->pb.tables()) { |
4451 | 0 | auto pos = new_tables.find(t); |
4452 | 0 | if (pos != new_tables.end()) { |
4453 | 0 | new_tables.erase(pos); |
4454 | 0 | } |
4455 | 0 | } |
4456 | 0 | } |
4457 | 0 | if (new_tables.empty()) { |
4458 | 0 | return STATUS(InvalidArgument, "CDC producer already contains all requested tables", |
4459 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4460 | 0 | } |
4461 | | |
4462 | | // 1. create an ALTER table request that mirrors the original 'setup_replication'. |
4463 | 0 | master::SetupUniverseReplicationRequestPB setup_req; |
4464 | 0 | master::SetupUniverseReplicationResponsePB setup_resp; |
4465 | 0 | setup_req.set_producer_id(alter_producer_id); |
4466 | 0 | setup_req.mutable_producer_master_addresses()->CopyFrom( |
4467 | 0 | original_ri->LockForRead()->pb.producer_master_addresses()); |
4468 | 0 | for (auto t : new_tables) { |
4469 | 0 | setup_req.add_producer_table_ids(t); |
4470 | | |
4471 | | // Add bootstrap id to request if it exists. |
4472 | 0 | auto bootstrap_id_lookup_result = table_id_to_bootstrap_id.find(t); |
4473 | 0 | if (bootstrap_id_lookup_result != table_id_to_bootstrap_id.end()) { |
4474 | 0 | setup_req.add_producer_bootstrap_ids(bootstrap_id_lookup_result->second); |
4475 | 0 | } |
4476 | 0 | } |
4477 | | |
4478 | | // 2. run the 'setup_replication' pipeline on the ALTER Table |
4479 | 0 | Status s = SetupUniverseReplication(&setup_req, &setup_resp, rpc); |
4480 | 0 | if (!s.ok()) { |
4481 | 0 | if (setup_resp.has_error()) { |
4482 | 0 | resp->mutable_error()->Swap(setup_resp.mutable_error()); |
4483 | 0 | return s; |
4484 | 0 | } |
4485 | 0 | return SetupError(resp->mutable_error(), s); |
4486 | 0 | } |
4487 | | // NOTE: ALTER merges back into original after completion. |
4488 | 0 | } else if (req->has_new_producer_universe_id()) { |
4489 | 0 | Status s = RenameUniverseReplication(original_ri, req, resp, rpc); |
4490 | 0 | if (!s.ok()) { |
4491 | 0 | return SetupError(resp->mutable_error(), s); |
4492 | 0 | } |
4493 | 0 | } |
4494 | | |
4495 | 0 | return Status::OK(); |
4496 | 0 | } |
4497 | | |
4498 | | Status CatalogManager::RenameUniverseReplication( |
4499 | | scoped_refptr<UniverseReplicationInfo> universe, |
4500 | | const AlterUniverseReplicationRequestPB* req, |
4501 | | AlterUniverseReplicationResponsePB* resp, |
4502 | 0 | rpc::RpcContext* rpc) { |
4503 | 0 | const string old_universe_replication_id = universe->id(); |
4504 | 0 | const string new_producer_universe_id = req->new_producer_universe_id(); |
4505 | 0 | if (old_universe_replication_id == new_producer_universe_id) { |
4506 | 0 | return STATUS(InvalidArgument, "Old and new replication ids must be different", |
4507 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4508 | 0 | } |
4509 | | |
4510 | 0 | { |
4511 | 0 | LockGuard lock(mutex_); |
4512 | 0 | auto l = universe->LockForWrite(); |
4513 | 0 | scoped_refptr<UniverseReplicationInfo> new_ri; |
4514 | | |
4515 | | // Assert that new_replication_name isn't already in use. |
4516 | 0 | if (FindPtrOrNull(universe_replication_map_, new_producer_universe_id) != nullptr) { |
4517 | 0 | return STATUS(InvalidArgument, "New replication id is already in use", |
4518 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4519 | 0 | } |
4520 | | |
4521 | | // Since the producer_id is used as the key, we need to create a new UniverseReplicationInfo. |
4522 | 0 | new_ri = new UniverseReplicationInfo(new_producer_universe_id); |
4523 | 0 | new_ri->mutable_metadata()->StartMutation(); |
4524 | 0 | SysUniverseReplicationEntryPB *metadata = &new_ri->mutable_metadata()->mutable_dirty()->pb; |
4525 | 0 | metadata->CopyFrom(l->pb); |
4526 | 0 | metadata->set_producer_id(new_producer_universe_id); |
4527 | | |
4528 | | // Also need to update internal maps. |
4529 | 0 | auto cluster_config = ClusterConfig(); |
4530 | 0 | auto cl = cluster_config->LockForWrite(); |
4531 | 0 | auto producer_map = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
4532 | 0 | (*producer_map)[new_producer_universe_id] = |
4533 | 0 | std::move((*producer_map)[old_universe_replication_id]); |
4534 | 0 | producer_map->erase(old_universe_replication_id); |
4535 | |
|
4536 | 0 | { |
4537 | | // Need both these updates to be atomic. |
4538 | 0 | auto w = sys_catalog_->NewWriter(leader_ready_term()); |
4539 | 0 | RETURN_NOT_OK(w->Mutate(QLWriteRequestPB::QL_STMT_DELETE, universe.get())); |
4540 | 0 | RETURN_NOT_OK(w->Mutate(QLWriteRequestPB::QL_STMT_UPDATE, |
4541 | 0 | new_ri.get(), |
4542 | 0 | cluster_config.get())); |
4543 | 0 | RETURN_NOT_OK(CheckStatus( |
4544 | 0 | sys_catalog_->SyncWrite(w.get()), |
4545 | 0 | "Updating universe replication info and cluster config in sys-catalog")); |
4546 | 0 | } |
4547 | 0 | new_ri->mutable_metadata()->CommitMutation(); |
4548 | 0 | cl.Commit(); |
4549 | | |
4550 | | // Update universe_replication_map after persistent data is saved. |
4551 | 0 | universe_replication_map_[new_producer_universe_id] = new_ri; |
4552 | 0 | universe_replication_map_.erase(old_universe_replication_id); |
4553 | 0 | } |
4554 | | |
4555 | 0 | return Status::OK(); |
4556 | 0 | } |
4557 | | |
4558 | | Status CatalogManager::GetUniverseReplication(const GetUniverseReplicationRequestPB* req, |
4559 | | GetUniverseReplicationResponsePB* resp, |
4560 | 0 | rpc::RpcContext* rpc) { |
4561 | 0 | LOG(INFO) << "GetUniverseReplication from " << RequestorString(rpc) |
4562 | 0 | << ": " << req->DebugString(); |
4563 | |
|
4564 | 0 | if (!req->has_producer_id()) { |
4565 | 0 | return STATUS(InvalidArgument, "Producer universe ID must be provided", |
4566 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4567 | 0 | } |
4568 | | |
4569 | 0 | scoped_refptr<UniverseReplicationInfo> universe; |
4570 | 0 | { |
4571 | 0 | SharedLock lock(mutex_); |
4572 | |
|
4573 | 0 | universe = FindPtrOrNull(universe_replication_map_, req->producer_id()); |
4574 | 0 | if (universe == nullptr) { |
4575 | 0 | return STATUS(NotFound, "Could not find CDC producer universe", |
4576 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
4577 | 0 | } |
4578 | 0 | } |
4579 | | |
4580 | 0 | resp->mutable_entry()->CopyFrom(universe->LockForRead()->pb); |
4581 | 0 | return Status::OK(); |
4582 | 0 | } |
4583 | | |
4584 | | /* |
4585 | | * Checks if the universe replication setup has completed. |
4586 | | * Returns Status::OK() if this call succeeds, and uses resp->done() to determine if the setup has |
4587 | | * completed (either failed or succeeded). If the setup has failed, then resp->replication_error() |
4588 | | * is also set. |
4589 | | */ |
4590 | | Status CatalogManager::IsSetupUniverseReplicationDone( |
4591 | | const IsSetupUniverseReplicationDoneRequestPB* req, |
4592 | | IsSetupUniverseReplicationDoneResponsePB* resp, |
4593 | 0 | rpc::RpcContext* rpc) { |
4594 | 0 | LOG(INFO) << "IsSetupUniverseReplicationDone from " << RequestorString(rpc) |
4595 | 0 | << ": " << req->DebugString(); |
4596 | |
|
4597 | 0 | if (!req->has_producer_id()) { |
4598 | 0 | return STATUS(InvalidArgument, "Producer universe ID must be provided", |
4599 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4600 | 0 | } |
4601 | | |
4602 | 0 | GetUniverseReplicationRequestPB universe_req; |
4603 | 0 | GetUniverseReplicationResponsePB universe_resp; |
4604 | 0 | universe_req.set_producer_id(req->producer_id()); |
4605 | |
|
4606 | 0 | RETURN_NOT_OK(GetUniverseReplication(&universe_req, &universe_resp, /* RpcContext */ nullptr)); |
4607 | 0 | if (universe_resp.has_error()) { |
4608 | 0 | RETURN_NOT_OK(StatusFromPB(universe_resp.error().status())); |
4609 | 0 | } |
4610 | | |
4611 | | // Two cases for completion: |
4612 | | // - For a regular SetupUniverseReplication, we want to wait for the universe to become ACTIVE. |
4613 | | // - For an AlterUniverseReplication, we need to wait until the .ALTER universe gets merged with |
4614 | | // the main universe - at which point the .ALTER universe is deleted. |
4615 | 0 | bool isAlterRequest = GStringPiece(req->producer_id()).ends_with(".ALTER"); |
4616 | 0 | if ((!isAlterRequest && universe_resp.entry().state() == SysUniverseReplicationEntryPB::ACTIVE) || |
4617 | 0 | (isAlterRequest && universe_resp.entry().state() == SysUniverseReplicationEntryPB::DELETED)) { |
4618 | 0 | resp->set_done(true); |
4619 | 0 | StatusToPB(Status::OK(), resp->mutable_replication_error()); |
4620 | 0 | return Status::OK(); |
4621 | 0 | } |
4622 | | |
4623 | | // Otherwise we have either failed (see MarkUniverseReplicationFailed), or are still working. |
4624 | 0 | if (universe_resp.entry().state() == SysUniverseReplicationEntryPB::DELETED_ERROR || |
4625 | 0 | universe_resp.entry().state() == SysUniverseReplicationEntryPB::FAILED) { |
4626 | 0 | resp->set_done(true); |
4627 | | |
4628 | | // Get the more detailed error. |
4629 | 0 | scoped_refptr<UniverseReplicationInfo> universe; |
4630 | 0 | { |
4631 | 0 | SharedLock lock(mutex_); |
4632 | 0 | universe = FindPtrOrNull(universe_replication_map_, req->producer_id()); |
4633 | 0 | if (universe == nullptr) { |
4634 | 0 | StatusToPB( |
4635 | 0 | STATUS(InternalError, "Could not find CDC producer universe after having failed."), |
4636 | 0 | resp->mutable_replication_error()); |
4637 | 0 | return Status::OK(); |
4638 | 0 | } |
4639 | 0 | } |
4640 | 0 | if (!universe->GetSetupUniverseReplicationErrorStatus().ok()) { |
4641 | 0 | StatusToPB(universe->GetSetupUniverseReplicationErrorStatus(), |
4642 | 0 | resp->mutable_replication_error()); |
4643 | 0 | } else { |
4644 | 0 | LOG(WARNING) << "Did not find setup universe replication error status."; |
4645 | 0 | StatusToPB(STATUS(InternalError, "unknown error"), resp->mutable_replication_error()); |
4646 | 0 | } |
4647 | 0 | return Status::OK(); |
4648 | 0 | } |
4649 | | |
4650 | | // Not done yet. |
4651 | 0 | resp->set_done(false); |
4652 | 0 | return Status::OK(); |
4653 | 0 | } |
4654 | | |
4655 | | Status CatalogManager::UpdateConsumerOnProducerSplit( |
4656 | | const UpdateConsumerOnProducerSplitRequestPB* req, |
4657 | | UpdateConsumerOnProducerSplitResponsePB* resp, |
4658 | 0 | rpc::RpcContext* rpc) { |
4659 | 0 | LOG(INFO) << "UpdateConsumerOnProducerSplit from " << RequestorString(rpc) |
4660 | 0 | << ": " << req->DebugString(); |
4661 | |
|
4662 | 0 | if (!req->has_producer_id()) { |
4663 | 0 | return STATUS(InvalidArgument, "Producer universe ID must be provided", |
4664 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4665 | 0 | } |
4666 | 0 | if (!req->has_stream_id()) { |
4667 | 0 | return STATUS(InvalidArgument, "Stream ID must be provided", |
4668 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4669 | 0 | } |
4670 | 0 | if (!req->has_producer_split_tablet_info()) { |
4671 | 0 | return STATUS(InvalidArgument, "Producer split tablet info must be provided", |
4672 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4673 | 0 | } |
4674 | | |
4675 | 0 | auto cluster_config = ClusterConfig(); |
4676 | 0 | auto l = cluster_config->LockForWrite(); |
4677 | 0 | auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
4678 | 0 | auto producer_entry = FindOrNull(*producer_map, req->producer_id()); |
4679 | 0 | if (!producer_entry) { |
4680 | 0 | return STATUS_FORMAT( |
4681 | 0 | NotFound, "Unable to find the producer entry for universe $0", req->producer_id()); |
4682 | 0 | } |
4683 | 0 | auto stream_entry = FindOrNull(*producer_entry->mutable_stream_map(), req->stream_id()); |
4684 | 0 | if (!stream_entry) { |
4685 | 0 | return STATUS_FORMAT( |
4686 | 0 | NotFound, "Unable to find the stream entry for universe $0, stream $1", |
4687 | 0 | req->producer_id(), req->stream_id()); |
4688 | 0 | } |
4689 | | |
4690 | | // Find the parent tablet in the tablet mapping. |
4691 | 0 | bool found = false; |
4692 | 0 | auto mutable_map = stream_entry->mutable_consumer_producer_tablet_map(); |
4693 | | // Also keep track if we see the split children tablets. |
4694 | 0 | vector<string> split_child_tablet_ids{req->producer_split_tablet_info().new_tablet1_id(), |
4695 | 0 | req->producer_split_tablet_info().new_tablet2_id()}; |
4696 | 0 | for (auto& consumer_tablet_to_producer_tablets : *mutable_map) { |
4697 | 0 | auto& producer_tablets_list = consumer_tablet_to_producer_tablets.second; |
4698 | 0 | auto producer_tablets = producer_tablets_list.mutable_tablets(); |
4699 | 0 | for (auto tablet = producer_tablets->begin(); tablet < producer_tablets->end(); ++tablet) { |
4700 | 0 | if (*tablet == req->producer_split_tablet_info().tablet_id()) { |
4701 | | // Remove the parent tablet id. |
4702 | 0 | producer_tablets->erase(tablet); |
4703 | | // For now we add the children tablets to the same consumer tablet. |
4704 | | // See github issue #10186 for further improvements. |
4705 | 0 | producer_tablets_list.add_tablets(req->producer_split_tablet_info().new_tablet1_id()); |
4706 | 0 | producer_tablets_list.add_tablets(req->producer_split_tablet_info().new_tablet2_id()); |
4707 | | // There should only be one copy of each producer tablet per stream, so can exit early. |
4708 | 0 | found = true; |
4709 | 0 | break; |
4710 | 0 | } |
4711 | | // Check if this is one of the child split tablets. |
4712 | 0 | auto it = std::find(split_child_tablet_ids.begin(), split_child_tablet_ids.end(), *tablet); |
4713 | 0 | if (it != split_child_tablet_ids.end()) { |
4714 | 0 | split_child_tablet_ids.erase(it); |
4715 | 0 | } |
4716 | 0 | } |
4717 | 0 | if (found) { |
4718 | 0 | break; |
4719 | 0 | } |
4720 | 0 | } |
4721 | |
|
4722 | 0 | if (!found) { |
4723 | | // Did not find the source tablet, but did find the children - means that we have already |
4724 | | // processed this SPLIT_OP, so for idempotency, we can return OK. |
4725 | 0 | if (split_child_tablet_ids.empty()) { |
4726 | 0 | LOG(INFO) << "Already processed this tablet split: " << req->DebugString(); |
4727 | 0 | return Status::OK(); |
4728 | 0 | } |
4729 | | |
4730 | | // When there are sequential SPLIT_OPs, we may try to reprocess an older SPLIT_OP. However, if |
4731 | | // one or both of those children have also already been split and processed, then we'll end up |
4732 | | // here (!found && !split_child_tablet_ids.empty()). |
4733 | | // This is alright, we can log a warning, and then continue (to not block later records). |
4734 | 0 | LOG(WARNING) |
4735 | 0 | << "Unable to find matching source tablet " << req->producer_split_tablet_info().tablet_id() |
4736 | 0 | << " for universe " << req->producer_id() << " stream " << req->stream_id(); |
4737 | |
|
4738 | 0 | return Status::OK(); |
4739 | 0 | } |
4740 | | |
4741 | | // Also make sure that we switch off of 1-1 mapping optimizations. |
4742 | 0 | stream_entry->set_same_num_producer_consumer_tablets(false); |
4743 | | |
4744 | | // Also bump the cluster_config_ version so that changes are propagated to tservers (and new |
4745 | | // pollers are created for the new tablets). |
4746 | 0 | l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1); |
4747 | |
|
4748 | 0 | RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()), |
4749 | 0 | "Updating cluster config in sys-catalog")); |
4750 | 0 | l.Commit(); |
4751 | |
|
4752 | 0 | return Status::OK(); |
4753 | 0 | } |
4754 | | |
4755 | 93.3M | bool CatalogManager::IsTableCdcProducer(const TableInfo& table_info) const { |
4756 | 93.3M | auto it = cdc_stream_tables_count_map_.find(table_info.id()); |
4757 | 93.3M | if (it == cdc_stream_tables_count_map_.end()) { |
4758 | 93.3M | return false; |
4759 | 93.3M | } else if (4 it->second > 04 ) { |
4760 | 5 | auto tid = table_info.id(); |
4761 | 5 | for (const auto& entry : cdc_stream_map_) { |
4762 | 5 | auto s = entry.second->LockForRead(); |
4763 | | // for xCluster the first entry will be the table_id |
4764 | 5 | const auto& table_id = s->table_id(); |
4765 | 5 | if (!table_id.empty() && table_id.Get(0) == tid && !(s->is_deleting() || s->is_deleted())) { |
4766 | 5 | return true; |
4767 | 5 | } |
4768 | 5 | } |
4769 | 5 | } |
4770 | 18.4E | return false; |
4771 | 93.3M | } |
4772 | | |
4773 | 93.3M | bool CatalogManager::IsTableCdcConsumer(const TableInfo& table_info) const { |
4774 | 93.3M | auto it = xcluster_consumer_tables_to_stream_map_.find(table_info.id()); |
4775 | 93.3M | if (it == xcluster_consumer_tables_to_stream_map_.end()) { |
4776 | 93.3M | return false; |
4777 | 93.3M | } |
4778 | 2 | return !it->second.empty(); |
4779 | 93.3M | } |
4780 | | |
4781 | | CatalogManager::XClusterConsumerTableStreamInfoMap |
4782 | 140 | CatalogManager::GetXClusterStreamInfoForConsumerTable(const TableId& table_id) const { |
4783 | 140 | SharedLock lock(mutex_); |
4784 | 140 | auto it = xcluster_consumer_tables_to_stream_map_.find(table_id); |
4785 | 140 | if (it == xcluster_consumer_tables_to_stream_map_.end()) { |
4786 | 140 | return {}; |
4787 | 140 | } |
4788 | | |
4789 | 0 | return it->second; |
4790 | 140 | } |
4791 | | |
4792 | | bool CatalogManager::IsCdcEnabled( |
4793 | 93.3M | const TableInfo& table_info) const { |
4794 | 93.3M | SharedLock lock(mutex_); |
4795 | 93.3M | return IsTableCdcProducer(table_info) || IsTableCdcConsumer(table_info)93.3M ; |
4796 | 93.3M | } |
4797 | | |
4798 | 7.94k | void CatalogManager::Started() { |
4799 | 7.94k | snapshot_coordinator_.Start(); |
4800 | 7.94k | } |
4801 | | |
4802 | | Result<SnapshotSchedulesToObjectIdsMap> CatalogManager::MakeSnapshotSchedulesToObjectIdsMap( |
4803 | 15.4k | SysRowEntryType type) { |
4804 | 15.4k | return snapshot_coordinator_.MakeSnapshotSchedulesToObjectIdsMap(type); |
4805 | 15.4k | } |
4806 | | |
4807 | 0 | Result<bool> CatalogManager::IsTablePartOfSomeSnapshotSchedule(const TableInfo& table_info) { |
4808 | 0 | return snapshot_coordinator_.IsTableCoveredBySomeSnapshotSchedule(table_info); |
4809 | 0 | } |
4810 | | |
4811 | 3.00k | void CatalogManager::SysCatalogLoaded(int64_t term) { |
4812 | 3.00k | return snapshot_coordinator_.SysCatalogLoaded(term); |
4813 | 3.00k | } |
4814 | | |
4815 | 270 | Result<size_t> CatalogManager::GetNumLiveTServersForActiveCluster() { |
4816 | 270 | BlacklistSet blacklist = VERIFY_RESULT268 (BlacklistSetFromPB());268 |
4817 | 0 | TSDescriptorVector ts_descs; |
4818 | 268 | auto uuid = VERIFY_RESULT(placement_uuid()); |
4819 | 0 | master_->ts_manager()->GetAllLiveDescriptorsInCluster(&ts_descs, uuid, blacklist); |
4820 | 268 | return ts_descs.size(); |
4821 | 268 | } |
4822 | | |
4823 | | } // namespace enterprise |
4824 | | } // namespace master |
4825 | | } // namespace yb |