/Users/deen/code/yugabyte-db/ent/src/yb/master/catalog_manager_ent.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include <memory> |
14 | | #include <regex> |
15 | | #include <set> |
16 | | #include <unordered_set> |
17 | | #include <google/protobuf/util/message_differencer.h> |
18 | | |
19 | | #include "yb/common/common_fwd.h" |
20 | | #include "yb/master/catalog_entity_info.h" |
21 | | #include "yb/master/catalog_manager-internal.h" |
22 | | #include "yb/master/cdc_consumer_registry_service.h" |
23 | | #include "yb/master/cdc_rpc_tasks.h" |
24 | | #include "yb/master/cluster_balance.h" |
25 | | #include "yb/master/master.h" |
26 | | #include "yb/master/master_backup.pb.h" |
27 | | #include "yb/master/master_error.h" |
28 | | |
29 | | #include "yb/cdc/cdc_consumer.pb.h" |
30 | | #include "yb/cdc/cdc_service.h" |
31 | | |
32 | | #include "yb/client/client-internal.h" |
33 | | #include "yb/client/schema.h" |
34 | | #include "yb/client/session.h" |
35 | | #include "yb/client/table.h" |
36 | | #include "yb/client/table_alterer.h" |
37 | | #include "yb/client/table_handle.h" |
38 | | #include "yb/client/table_info.h" |
39 | | #include "yb/client/yb_op.h" |
40 | | #include "yb/client/yb_table_name.h" |
41 | | |
42 | | #include "yb/common/common.pb.h" |
43 | | #include "yb/common/entity_ids.h" |
44 | | #include "yb/common/ql_name.h" |
45 | | #include "yb/common/ql_type.h" |
46 | | #include "yb/common/schema.h" |
47 | | #include "yb/common/wire_protocol.h" |
48 | | #include "yb/consensus/consensus.h" |
49 | | |
50 | | #include "yb/docdb/consensus_frontier.h" |
51 | | #include "yb/docdb/doc_write_batch.h" |
52 | | |
53 | | #include "yb/gutil/bind.h" |
54 | | #include "yb/gutil/casts.h" |
55 | | #include "yb/gutil/strings/join.h" |
56 | | #include "yb/gutil/strings/substitute.h" |
57 | | #include "yb/master/master_client.pb.h" |
58 | | #include "yb/master/master_ddl.pb.h" |
59 | | #include "yb/master/master_defaults.h" |
60 | | #include "yb/master/master_heartbeat.pb.h" |
61 | | #include "yb/master/master_replication.pb.h" |
62 | | #include "yb/master/master_util.h" |
63 | | #include "yb/master/sys_catalog.h" |
64 | | #include "yb/master/sys_catalog-internal.h" |
65 | | #include "yb/master/async_snapshot_tasks.h" |
66 | | #include "yb/master/async_rpc_tasks.h" |
67 | | #include "yb/master/encryption_manager.h" |
68 | | #include "yb/master/restore_sys_catalog_state.h" |
69 | | #include "yb/master/scoped_leader_shared_lock.h" |
70 | | #include "yb/master/scoped_leader_shared_lock-internal.h" |
71 | | |
72 | | #include "yb/rpc/messenger.h" |
73 | | |
74 | | #include "yb/tablet/operations/snapshot_operation.h" |
75 | | #include "yb/tablet/tablet_metadata.h" |
76 | | #include "yb/tablet/tablet_snapshots.h" |
77 | | |
78 | | #include "yb/tserver/backup.proxy.h" |
79 | | #include "yb/tserver/service_util.h" |
80 | | |
81 | | #include "yb/util/cast.h" |
82 | | #include "yb/util/date_time.h" |
83 | | #include "yb/util/flag_tags.h" |
84 | | #include "yb/util/format.h" |
85 | | #include "yb/util/logging.h" |
86 | | #include "yb/util/random_util.h" |
87 | | #include "yb/util/scope_exit.h" |
88 | | #include "yb/util/service_util.h" |
89 | | #include "yb/util/status.h" |
90 | | #include "yb/util/status_format.h" |
91 | | #include "yb/util/status_log.h" |
92 | | #include "yb/util/tostring.h" |
93 | | #include "yb/util/string_util.h" |
94 | | #include "yb/util/trace.h" |
95 | | |
96 | | #include "yb/yql/cql/ql/util/statement_result.h" |
97 | | |
98 | | using namespace std::literals; |
99 | | using namespace std::placeholders; |
100 | | |
101 | | using std::string; |
102 | | using std::unique_ptr; |
103 | | using std::vector; |
104 | | |
105 | | using google::protobuf::RepeatedPtrField; |
106 | | using google::protobuf::util::MessageDifferencer; |
107 | | using strings::Substitute; |
108 | | |
109 | | DEFINE_int32(cdc_state_table_num_tablets, 0, |
110 | | "Number of tablets to use when creating the CDC state table. " |
111 | | "0 to use the same default num tablets as for regular tables."); |
112 | | |
113 | | DEFINE_int32(cdc_wal_retention_time_secs, 4 * 3600, |
114 | | "WAL retention time in seconds to be used for tables for which a CDC stream was " |
115 | | "created."); |
116 | | DECLARE_int32(master_rpc_timeout_ms); |
117 | | |
118 | | DEFINE_bool(enable_transaction_snapshots, true, |
119 | | "The flag enables usage of transaction aware snapshots."); |
120 | | TAG_FLAG(enable_transaction_snapshots, hidden); |
121 | | TAG_FLAG(enable_transaction_snapshots, advanced); |
122 | | TAG_FLAG(enable_transaction_snapshots, runtime); |
123 | | |
124 | | DEFINE_test_flag(bool, disable_cdc_state_insert_on_setup, false, |
125 | | "Disable inserting new entries into cdc state as part of the setup flow."); |
126 | | |
127 | | DEFINE_bool(allow_consecutive_restore, true, |
128 | | "Is it allowed to restore to a time before the last restoration was done."); |
129 | | TAG_FLAG(allow_consecutive_restore, runtime); |
130 | | |
131 | | namespace yb { |
132 | | |
133 | | using rpc::RpcContext; |
134 | | using pb_util::ParseFromSlice; |
135 | | |
136 | | namespace master { |
137 | | namespace enterprise { |
138 | | |
139 | | namespace { |
140 | | |
141 | 167 | CHECKED_STATUS CheckStatus(const Status& status, const char* action) { |
142 | 167 | if (status.ok()) { |
143 | 167 | return status; |
144 | 167 | } |
145 | | |
146 | 0 | const Status s = status.CloneAndPrepend(std::string("An error occurred while ") + action); |
147 | 0 | LOG(WARNING) << s; |
148 | 0 | return s; |
149 | 0 | } |
150 | | |
151 | 0 | CHECKED_STATUS CheckLeaderStatus(const Status& status, const char* action) { |
152 | 0 | return CheckIfNoLongerLeader(CheckStatus(status, action)); |
153 | 0 | } |
154 | | |
155 | | template<class RespClass> |
156 | | CHECKED_STATUS CheckLeaderStatusAndSetupError( |
157 | 159 | const Status& status, const char* action, RespClass* resp) { |
158 | 159 | return CheckIfNoLongerLeaderAndSetupError(CheckStatus(status, action), resp); |
159 | 159 | } catalog_manager_ent.cc:_ZN2yb6master10enterprise12_GLOBAL__N_130CheckLeaderStatusAndSetupErrorINS0_25CreateCDCStreamResponsePBEEENS_6StatusERKS5_PKcPT_ Line | Count | Source | 157 | 157 | const Status& status, const char* action, RespClass* resp) { | 158 | 157 | return CheckIfNoLongerLeaderAndSetupError(CheckStatus(status, action), resp); | 159 | 157 | } |
catalog_manager_ent.cc:_ZN2yb6master10enterprise12_GLOBAL__N_130CheckLeaderStatusAndSetupErrorINS0_34SetupUniverseReplicationResponsePBEEENS_6StatusERKS5_PKcPT_ Line | Count | Source | 157 | 2 | const Status& status, const char* action, RespClass* resp) { | 158 | 2 | return CheckIfNoLongerLeaderAndSetupError(CheckStatus(status, action), resp); | 159 | 2 | } |
|
160 | | |
161 | | } // namespace |
162 | | |
163 | | //////////////////////////////////////////////////////////// |
164 | | // Snapshot Loader |
165 | | //////////////////////////////////////////////////////////// |
166 | | |
167 | | class SnapshotLoader : public Visitor<PersistentSnapshotInfo> { |
168 | | public: |
169 | 2.37k | explicit SnapshotLoader(CatalogManager* catalog_manager) : catalog_manager_(catalog_manager) {} |
170 | | |
171 | 0 | CHECKED_STATUS Visit(const SnapshotId& snapshot_id, const SysSnapshotEntryPB& metadata) override { |
172 | 0 | if (TryFullyDecodeTxnSnapshotId(snapshot_id)) { |
173 | | // Transaction aware snapshots should be already loaded. |
174 | 0 | return Status::OK(); |
175 | 0 | } |
176 | 0 | return VisitNonTransactionAwareSnapshot(snapshot_id, metadata); |
177 | 0 | } |
178 | | |
179 | | CHECKED_STATUS VisitNonTransactionAwareSnapshot( |
180 | 0 | const SnapshotId& snapshot_id, const SysSnapshotEntryPB& metadata) { |
181 | | |
182 | | // Setup the snapshot info. |
183 | 0 | auto snapshot_info = make_scoped_refptr<SnapshotInfo>(snapshot_id); |
184 | 0 | auto l = snapshot_info->LockForWrite(); |
185 | 0 | l.mutable_data()->pb.CopyFrom(metadata); |
186 | | |
187 | | // Add the snapshot to the IDs map (if the snapshot is not deleted). |
188 | 0 | auto emplace_result = catalog_manager_->non_txn_snapshot_ids_map_.emplace( |
189 | 0 | snapshot_id, std::move(snapshot_info)); |
190 | 0 | CHECK(emplace_result.second) << "Snapshot already exists: " << snapshot_id; |
191 | |
|
192 | 0 | LOG(INFO) << "Loaded metadata for snapshot (id=" << snapshot_id << "): " |
193 | 0 | << emplace_result.first->second->ToString() << ": " << metadata.ShortDebugString(); |
194 | 0 | l.Commit(); |
195 | 0 | return Status::OK(); |
196 | 0 | } |
197 | | |
198 | | private: |
199 | | CatalogManager *catalog_manager_; |
200 | | |
201 | | DISALLOW_COPY_AND_ASSIGN(SnapshotLoader); |
202 | | }; |
203 | | |
204 | | |
205 | | //////////////////////////////////////////////////////////// |
206 | | // CDC Stream Loader |
207 | | //////////////////////////////////////////////////////////// |
208 | | |
209 | | class CDCStreamLoader : public Visitor<PersistentCDCStreamInfo> { |
210 | | public: |
211 | 2.37k | explicit CDCStreamLoader(CatalogManager* catalog_manager) : catalog_manager_(catalog_manager) {} |
212 | | |
213 | | Status Visit(const CDCStreamId& stream_id, const SysCDCStreamEntryPB& metadata) |
214 | 0 | REQUIRES(catalog_manager_->mutex_) { |
215 | 0 | DCHECK(!ContainsKey(catalog_manager_->cdc_stream_map_, stream_id)) |
216 | 0 | << "CDC stream already exists: " << stream_id; |
217 | |
|
218 | 0 | scoped_refptr<NamespaceInfo> ns; |
219 | 0 | scoped_refptr<TableInfo> table; |
220 | |
|
221 | 0 | if (metadata.has_namespace_id()) { |
222 | 0 | ns = FindPtrOrNull(catalog_manager_->namespace_ids_map_, metadata.namespace_id()); |
223 | |
|
224 | 0 | if (!ns) { |
225 | 0 | LOG(DFATAL) << "Invalid namespace ID " << metadata.namespace_id() << " for stream " |
226 | 0 | << stream_id; |
227 | | // TODO (#2059): Potentially signals a race condition that namesapce got deleted |
228 | | // while stream was being created. |
229 | | // Log error and continue without loading the stream. |
230 | 0 | return Status::OK(); |
231 | 0 | } |
232 | 0 | } else { |
233 | 0 | table = FindPtrOrNull(*catalog_manager_->table_ids_map_, metadata.table_id(0)); |
234 | 0 | if (!table) { |
235 | 0 | LOG(ERROR) << "Invalid table ID " << metadata.table_id(0) << " for stream " << stream_id; |
236 | | // TODO (#2059): Potentially signals a race condition that table got deleted while stream |
237 | | // was being created. |
238 | | // Log error and continue without loading the stream. |
239 | 0 | return Status::OK(); |
240 | 0 | } |
241 | 0 | } |
242 | | |
243 | | // Setup the CDC stream info. |
244 | 0 | auto stream = make_scoped_refptr<CDCStreamInfo>(stream_id); |
245 | 0 | auto l = stream->LockForWrite(); |
246 | 0 | l.mutable_data()->pb.CopyFrom(metadata); |
247 | | |
248 | | // If the table has been deleted, then mark this stream as DELETING so it can be deleted by the |
249 | | // catalog manager background thread. Otherwise if this stream is missing an entry |
250 | | // for state, then mark its state as Active. |
251 | |
|
252 | 0 | if (((table && table->LockForRead()->is_deleting()) || |
253 | 0 | (ns && ns->state() == SysNamespaceEntryPB::DELETING)) && |
254 | 0 | !l.data().is_deleting()) { |
255 | 0 | l.mutable_data()->pb.set_state(SysCDCStreamEntryPB::DELETING); |
256 | 0 | } else if (!l.mutable_data()->pb.has_state()) { |
257 | 0 | l.mutable_data()->pb.set_state(SysCDCStreamEntryPB::ACTIVE); |
258 | 0 | } |
259 | | |
260 | | // Add the CDC stream to the CDC stream map. |
261 | 0 | catalog_manager_->cdc_stream_map_[stream->id()] = stream; |
262 | 0 | if (table) { |
263 | 0 | catalog_manager_->cdc_stream_tables_count_map_[metadata.table_id(0)]++; |
264 | 0 | } |
265 | |
|
266 | 0 | l.Commit(); |
267 | |
|
268 | 0 | LOG(INFO) << "Loaded metadata for CDC stream " << stream->ToString() << ": " |
269 | 0 | << metadata.ShortDebugString(); |
270 | |
|
271 | 0 | return Status::OK(); |
272 | 0 | } |
273 | | |
274 | | private: |
275 | | CatalogManager *catalog_manager_; |
276 | | |
277 | | DISALLOW_COPY_AND_ASSIGN(CDCStreamLoader); |
278 | | }; |
279 | | |
280 | | //////////////////////////////////////////////////////////// |
281 | | // Universe Replication Loader |
282 | | //////////////////////////////////////////////////////////// |
283 | | |
284 | | class UniverseReplicationLoader : public Visitor<PersistentUniverseReplicationInfo> { |
285 | | public: |
286 | | explicit UniverseReplicationLoader(CatalogManager* catalog_manager) |
287 | 2.37k | : catalog_manager_(catalog_manager) {} |
288 | | |
289 | | Status Visit(const std::string& producer_id, const SysUniverseReplicationEntryPB& metadata) |
290 | 0 | REQUIRES(catalog_manager_->mutex_) { |
291 | 0 | DCHECK(!ContainsKey(catalog_manager_->universe_replication_map_, producer_id)) |
292 | 0 | << "Producer universe already exists: " << producer_id; |
293 | | |
294 | | // Setup the universe replication info. |
295 | 0 | UniverseReplicationInfo* const ri = new UniverseReplicationInfo(producer_id); |
296 | 0 | { |
297 | 0 | auto l = ri->LockForWrite(); |
298 | 0 | l.mutable_data()->pb.CopyFrom(metadata); |
299 | |
|
300 | 0 | if (!l->is_active() && !l->is_deleted_or_failed()) { |
301 | | // Replication was not fully setup. |
302 | 0 | LOG(WARNING) << "Universe replication in transient state: " << producer_id; |
303 | | |
304 | | // TODO: Should we delete all failed universe replication items? |
305 | 0 | } |
306 | | |
307 | | // Add universe replication info to the universe replication map. |
308 | 0 | catalog_manager_->universe_replication_map_[ri->id()] = ri; |
309 | |
|
310 | 0 | l.Commit(); |
311 | 0 | } |
312 | | |
313 | | // Also keep track of consumer tables. |
314 | 0 | for (const auto& table : metadata.validated_tables()) { |
315 | 0 | CDCStreamId stream_id = FindWithDefault(metadata.table_streams(), table.first, ""); |
316 | 0 | if (stream_id.empty()) { |
317 | 0 | LOG(WARNING) << "Unable to find stream id for table: " << table.first; |
318 | 0 | continue; |
319 | 0 | } |
320 | 0 | catalog_manager_->xcluster_consumer_tables_to_stream_map_[table.second].emplace( |
321 | 0 | metadata.producer_id(), stream_id); |
322 | 0 | } |
323 | |
|
324 | 0 | LOG(INFO) << "Loaded metadata for universe replication " << ri->ToString(); |
325 | 0 | VLOG(1) << "Metadata for universe replication " << ri->ToString() << ": " |
326 | 0 | << metadata.ShortDebugString(); |
327 | |
|
328 | 0 | return Status::OK(); |
329 | 0 | } |
330 | | |
331 | | private: |
332 | | CatalogManager *catalog_manager_; |
333 | | |
334 | | DISALLOW_COPY_AND_ASSIGN(UniverseReplicationLoader); |
335 | | }; |
336 | | |
337 | | //////////////////////////////////////////////////////////// |
338 | | // CatalogManager |
339 | | //////////////////////////////////////////////////////////// |
340 | | |
341 | 92 | CatalogManager::~CatalogManager() { |
342 | 92 | if (StartShutdown()) { |
343 | 7 | CompleteShutdown(); |
344 | 7 | } |
345 | 92 | } |
346 | | |
347 | 92 | void CatalogManager::CompleteShutdown() { |
348 | 92 | snapshot_coordinator_.Shutdown(); |
349 | | // Call shutdown on base class before exiting derived class destructor |
350 | | // because BgTasks is part of base & uses this derived class on Shutdown. |
351 | 92 | super::CompleteShutdown(); |
352 | 92 | } |
353 | | |
354 | 2.37k | Status CatalogManager::RunLoaders(int64_t term) { |
355 | 2.37k | RETURN_NOT_OK(super::RunLoaders(term)); |
356 | | |
357 | | // Clear the snapshots. |
358 | 2.37k | non_txn_snapshot_ids_map_.clear(); |
359 | | |
360 | | // Clear CDC stream map. |
361 | 2.37k | cdc_stream_map_.clear(); |
362 | 2.37k | cdc_stream_tables_count_map_.clear(); |
363 | | |
364 | | // Clear universe replication map. |
365 | 2.37k | universe_replication_map_.clear(); |
366 | 2.37k | xcluster_consumer_tables_to_stream_map_.clear(); |
367 | | |
368 | 2.37k | LOG_WITH_FUNC(INFO) << "Loading snapshots into memory."; |
369 | 2.37k | unique_ptr<SnapshotLoader> snapshot_loader(new SnapshotLoader(this)); |
370 | 2.37k | RETURN_NOT_OK_PREPEND( |
371 | 2.37k | sys_catalog_->Visit(snapshot_loader.get()), |
372 | 2.37k | "Failed while visiting snapshots in sys catalog"); |
373 | | |
374 | 2.37k | LOG_WITH_FUNC(INFO) << "Loading CDC streams into memory."; |
375 | 2.37k | auto cdc_stream_loader = std::make_unique<CDCStreamLoader>(this); |
376 | 2.37k | RETURN_NOT_OK_PREPEND( |
377 | 2.37k | sys_catalog_->Visit(cdc_stream_loader.get()), |
378 | 2.37k | "Failed while visiting CDC streams in sys catalog"); |
379 | | |
380 | 2.37k | LOG_WITH_FUNC(INFO) << "Loading universe replication info into memory."; |
381 | 2.37k | auto universe_replication_loader = std::make_unique<UniverseReplicationLoader>(this); |
382 | 2.37k | RETURN_NOT_OK_PREPEND( |
383 | 2.37k | sys_catalog_->Visit(universe_replication_loader.get()), |
384 | 2.37k | "Failed while visiting universe replication info in sys catalog"); |
385 | | |
386 | 2.37k | return Status::OK(); |
387 | 2.37k | } |
388 | | |
389 | | Status CatalogManager::CreateSnapshot(const CreateSnapshotRequestPB* req, |
390 | | CreateSnapshotResponsePB* resp, |
391 | 0 | RpcContext* rpc) { |
392 | 0 | LOG(INFO) << "Servicing CreateSnapshot request: " << req->ShortDebugString(); |
393 | |
|
394 | 0 | if (FLAGS_enable_transaction_snapshots && req->transaction_aware()) { |
395 | 0 | return CreateTransactionAwareSnapshot(*req, resp, rpc); |
396 | 0 | } |
397 | | |
398 | 0 | if (req->has_schedule_id()) { |
399 | 0 | auto schedule_id = VERIFY_RESULT(FullyDecodeSnapshotScheduleId(req->schedule_id())); |
400 | 0 | auto snapshot_id = snapshot_coordinator_.CreateForSchedule( |
401 | 0 | schedule_id, leader_ready_term(), rpc->GetClientDeadline()); |
402 | 0 | if (!snapshot_id.ok()) { |
403 | 0 | LOG(INFO) << "Create snapshot failed: " << snapshot_id.status(); |
404 | 0 | return snapshot_id.status(); |
405 | 0 | } |
406 | 0 | resp->set_snapshot_id(snapshot_id->data(), snapshot_id->size()); |
407 | 0 | return Status::OK(); |
408 | 0 | } |
409 | | |
410 | 0 | return CreateNonTransactionAwareSnapshot(req, resp, rpc); |
411 | 0 | } |
412 | | |
413 | | Status CatalogManager::CreateNonTransactionAwareSnapshot( |
414 | | const CreateSnapshotRequestPB* req, |
415 | | CreateSnapshotResponsePB* resp, |
416 | 0 | RpcContext* rpc) { |
417 | 0 | SnapshotId snapshot_id; |
418 | 0 | { |
419 | 0 | LockGuard lock(mutex_); |
420 | 0 | TRACE("Acquired catalog manager lock"); |
421 | | |
422 | | // Verify that the system is not in snapshot creating/restoring state. |
423 | 0 | if (!current_snapshot_id_.empty()) { |
424 | 0 | return STATUS(IllegalState, |
425 | 0 | Format( |
426 | 0 | "Current snapshot id: $0. Parallel snapshot operations are not supported" |
427 | 0 | ": $1", current_snapshot_id_, req), |
428 | 0 | MasterError(MasterErrorPB::PARALLEL_SNAPSHOT_OPERATION)); |
429 | 0 | } |
430 | | |
431 | | // Create a new snapshot UUID. |
432 | 0 | snapshot_id = GenerateIdUnlocked(SysRowEntryType::SNAPSHOT); |
433 | 0 | } |
434 | |
|
435 | 0 | vector<scoped_refptr<TabletInfo>> all_tablets; |
436 | | |
437 | | // Create in memory snapshot data descriptor. |
438 | 0 | scoped_refptr<SnapshotInfo> snapshot(new SnapshotInfo(snapshot_id)); |
439 | 0 | snapshot->mutable_metadata()->StartMutation(); |
440 | 0 | snapshot->mutable_metadata()->mutable_dirty()->pb.set_state(SysSnapshotEntryPB::CREATING); |
441 | |
|
442 | 0 | auto tables = VERIFY_RESULT(CollectTables(req->tables(), |
443 | 0 | req->add_indexes(), |
444 | 0 | true /* include_parent_colocated_table */)); |
445 | 0 | std::unordered_set<NamespaceId> added_namespaces; |
446 | 0 | for (const auto& table : tables) { |
447 | 0 | snapshot->AddEntries(table, &added_namespaces); |
448 | 0 | all_tablets.insert(all_tablets.end(), table.tablet_infos.begin(), table.tablet_infos.end()); |
449 | 0 | } |
450 | |
|
451 | 0 | VLOG(1) << "Snapshot " << snapshot->ToString() |
452 | 0 | << ": PB=" << snapshot->mutable_metadata()->mutable_dirty()->pb.DebugString(); |
453 | | |
454 | | // Write the snapshot data descriptor to the system catalog (in "creating" state). |
455 | 0 | RETURN_NOT_OK(CheckLeaderStatus( |
456 | 0 | sys_catalog_->Upsert(leader_ready_term(), snapshot), |
457 | 0 | "inserting snapshot into sys-catalog")); |
458 | 0 | TRACE("Wrote snapshot to system catalog"); |
459 | | |
460 | | // Commit in memory snapshot data descriptor. |
461 | 0 | snapshot->mutable_metadata()->CommitMutation(); |
462 | | |
463 | | // Put the snapshot data descriptor to the catalog manager. |
464 | 0 | { |
465 | 0 | LockGuard lock(mutex_); |
466 | 0 | TRACE("Acquired catalog manager lock"); |
467 | | |
468 | | // Verify that the snapshot does not exist. |
469 | 0 | auto inserted = non_txn_snapshot_ids_map_.emplace(snapshot_id, snapshot).second; |
470 | 0 | RSTATUS_DCHECK(inserted, IllegalState, Format("Snapshot already exists: $0", snapshot_id)); |
471 | 0 | current_snapshot_id_ = snapshot_id; |
472 | 0 | } |
473 | | |
474 | | // Send CreateSnapshot requests to all TServers (one tablet - one request). |
475 | 0 | for (const scoped_refptr<TabletInfo>& tablet : all_tablets) { |
476 | 0 | TRACE("Locking tablet"); |
477 | 0 | auto l = tablet->LockForRead(); |
478 | |
|
479 | 0 | LOG(INFO) << "Sending CreateTabletSnapshot to tablet: " << tablet->ToString(); |
480 | | |
481 | | // Send Create Tablet Snapshot request to each tablet leader. |
482 | 0 | auto call = CreateAsyncTabletSnapshotOp( |
483 | 0 | tablet, snapshot_id, tserver::TabletSnapshotOpRequestPB::CREATE_ON_TABLET, |
484 | 0 | TabletSnapshotOperationCallback()); |
485 | 0 | ScheduleTabletSnapshotOp(call); |
486 | 0 | } |
487 | |
|
488 | 0 | resp->set_snapshot_id(snapshot_id); |
489 | 0 | LOG(INFO) << "Successfully started snapshot " << snapshot_id << " creation"; |
490 | 0 | return Status::OK(); |
491 | 0 | } |
492 | | |
493 | 0 | void CatalogManager::Submit(std::unique_ptr<tablet::Operation> operation, int64_t leader_term) { |
494 | 0 | operation->SetTablet(tablet_peer()->tablet()); |
495 | 0 | tablet_peer()->Submit(std::move(operation), leader_term); |
496 | 0 | } |
497 | | |
498 | | Result<SysRowEntries> CatalogManager::CollectEntries( |
499 | | const google::protobuf::RepeatedPtrField<TableIdentifierPB>& table_identifiers, |
500 | 0 | CollectFlags flags) { |
501 | 0 | RETURN_NOT_OK(CheckIsLeaderAndReady()); |
502 | 0 | SysRowEntries entries; |
503 | 0 | std::unordered_set<NamespaceId> namespaces; |
504 | 0 | auto tables = VERIFY_RESULT(CollectTables(table_identifiers, flags, &namespaces)); |
505 | 0 | if (!namespaces.empty()) { |
506 | 0 | SharedLock lock(mutex_); |
507 | 0 | for (const auto& ns_id : namespaces) { |
508 | 0 | auto ns_info = VERIFY_RESULT(FindNamespaceByIdUnlocked(ns_id)); |
509 | 0 | AddInfoEntry(ns_info.get(), entries.mutable_entries()); |
510 | 0 | } |
511 | 0 | } |
512 | 0 | for (const auto& table : tables) { |
513 | | // TODO(txn_snapshot) use single lock to resolve all tables to tablets |
514 | 0 | SnapshotInfo::AddEntries(table, entries.mutable_entries(), /* tablet_infos= */ nullptr, |
515 | 0 | &namespaces); |
516 | 0 | } |
517 | |
|
518 | 0 | return entries; |
519 | 0 | } |
520 | | |
521 | | Result<SysRowEntries> CatalogManager::CollectEntriesForSnapshot( |
522 | 0 | const google::protobuf::RepeatedPtrField<TableIdentifierPB>& tables) { |
523 | 0 | return CollectEntries( |
524 | 0 | tables, |
525 | 0 | CollectFlags{CollectFlag::kAddIndexes, CollectFlag::kIncludeParentColocatedTable, |
526 | 0 | CollectFlag::kSucceedIfCreateInProgress}); |
527 | 0 | } |
528 | | |
529 | 16.5k | server::Clock* CatalogManager::Clock() { |
530 | 16.5k | return master_->clock(); |
531 | 16.5k | } |
532 | | |
533 | | Status CatalogManager::CreateTransactionAwareSnapshot( |
534 | 0 | const CreateSnapshotRequestPB& req, CreateSnapshotResponsePB* resp, rpc::RpcContext* rpc) { |
535 | 0 | CollectFlags flags{CollectFlag::kIncludeParentColocatedTable}; |
536 | 0 | flags.SetIf(CollectFlag::kAddIndexes, req.add_indexes()); |
537 | 0 | SysRowEntries entries = VERIFY_RESULT(CollectEntries(req.tables(), flags)); |
538 | |
|
539 | 0 | auto snapshot_id = VERIFY_RESULT(snapshot_coordinator_.Create( |
540 | 0 | entries, req.imported(), leader_ready_term(), rpc->GetClientDeadline())); |
541 | 0 | resp->set_snapshot_id(snapshot_id.data(), snapshot_id.size()); |
542 | 0 | return Status::OK(); |
543 | 0 | } |
544 | | |
545 | | Status CatalogManager::ListSnapshots(const ListSnapshotsRequestPB* req, |
546 | 0 | ListSnapshotsResponsePB* resp) { |
547 | 0 | auto txn_snapshot_id = TryFullyDecodeTxnSnapshotId(req->snapshot_id()); |
548 | 0 | { |
549 | 0 | SharedLock lock(mutex_); |
550 | 0 | TRACE("Acquired catalog manager lock"); |
551 | |
|
552 | 0 | if (!current_snapshot_id_.empty()) { |
553 | 0 | resp->set_current_snapshot_id(current_snapshot_id_); |
554 | 0 | } |
555 | |
|
556 | 0 | auto setup_snapshot_pb_lambda = [resp](scoped_refptr<SnapshotInfo> snapshot_info) { |
557 | 0 | auto snapshot_lock = snapshot_info->LockForRead(); |
558 | |
|
559 | 0 | SnapshotInfoPB* const snapshot = resp->add_snapshots(); |
560 | 0 | snapshot->set_id(snapshot_info->id()); |
561 | 0 | *snapshot->mutable_entry() = snapshot_info->metadata().state().pb; |
562 | 0 | }; |
563 | |
|
564 | 0 | if (req->has_snapshot_id()) { |
565 | 0 | if (!txn_snapshot_id) { |
566 | 0 | TRACE("Looking up snapshot"); |
567 | 0 | scoped_refptr<SnapshotInfo> snapshot_info = |
568 | 0 | FindPtrOrNull(non_txn_snapshot_ids_map_, req->snapshot_id()); |
569 | 0 | if (snapshot_info == nullptr) { |
570 | 0 | return STATUS(InvalidArgument, "Could not find snapshot", req->snapshot_id(), |
571 | 0 | MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND)); |
572 | 0 | } |
573 | | |
574 | 0 | setup_snapshot_pb_lambda(snapshot_info); |
575 | 0 | } |
576 | 0 | } else { |
577 | 0 | for (const SnapshotInfoMap::value_type& entry : non_txn_snapshot_ids_map_) { |
578 | 0 | setup_snapshot_pb_lambda(entry.second); |
579 | 0 | } |
580 | 0 | } |
581 | 0 | } |
582 | |
|
583 | 0 | if (req->prepare_for_backup() && (!req->has_snapshot_id() || !txn_snapshot_id)) { |
584 | 0 | return STATUS( |
585 | 0 | InvalidArgument, "Request must have correct snapshot_id", (req->has_snapshot_id() ? |
586 | 0 | req->snapshot_id() : "None"), MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
587 | 0 | } |
588 | | |
589 | 0 | RETURN_NOT_OK(snapshot_coordinator_.ListSnapshots( |
590 | 0 | txn_snapshot_id, req->list_deleted_snapshots(), resp)); |
591 | |
|
592 | 0 | if (req->prepare_for_backup()) { |
593 | 0 | SharedLock lock(mutex_); |
594 | 0 | TRACE("Acquired catalog manager lock"); |
595 | | |
596 | | // Repack & extend the backup row entries. |
597 | 0 | for (SnapshotInfoPB& snapshot : *resp->mutable_snapshots()) { |
598 | 0 | snapshot.set_format_version(2); |
599 | 0 | SysSnapshotEntryPB& sys_entry = *snapshot.mutable_entry(); |
600 | 0 | snapshot.mutable_backup_entries()->Reserve(sys_entry.entries_size()); |
601 | |
|
602 | 0 | for (SysRowEntry& entry : *sys_entry.mutable_entries()) { |
603 | 0 | BackupRowEntryPB* const backup_entry = snapshot.add_backup_entries(); |
604 | | // Setup BackupRowEntryPB fields. |
605 | | // Set BackupRowEntryPB::pg_schema_name for YSQL table to disambiguate in case tables |
606 | | // in different schema have same name. |
607 | 0 | if (entry.type() == SysRowEntryType::TABLE) { |
608 | 0 | TRACE("Looking up table"); |
609 | 0 | scoped_refptr<TableInfo> table_info = FindPtrOrNull(*table_ids_map_, entry.id()); |
610 | 0 | if (table_info == nullptr) { |
611 | 0 | return STATUS( |
612 | 0 | InvalidArgument, "Table not found by ID", entry.id(), |
613 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
614 | 0 | } |
615 | | |
616 | 0 | TRACE("Locking table"); |
617 | 0 | auto l = table_info->LockForRead(); |
618 | | // PG schema name is available for YSQL table only. |
619 | | // Except '<uuid>.colocated.parent.uuid' table ID. |
620 | 0 | if (l->table_type() == PGSQL_TABLE_TYPE && !IsColocatedParentTableId(entry.id())) { |
621 | 0 | const string pg_schema_name = VERIFY_RESULT(GetPgSchemaName(table_info)); |
622 | 0 | VLOG(1) << "PG Schema: " << pg_schema_name << " for table " << table_info->ToString(); |
623 | 0 | backup_entry->set_pg_schema_name(pg_schema_name); |
624 | 0 | } |
625 | 0 | } |
626 | | |
627 | | // Init BackupRowEntryPB::entry. |
628 | 0 | backup_entry->mutable_entry()->Swap(&entry); |
629 | 0 | } |
630 | |
|
631 | 0 | sys_entry.clear_entries(); |
632 | 0 | } |
633 | 0 | } |
634 | |
|
635 | 0 | return Status::OK(); |
636 | 0 | } |
637 | | |
638 | | Status CatalogManager::ListSnapshotRestorations(const ListSnapshotRestorationsRequestPB* req, |
639 | 0 | ListSnapshotRestorationsResponsePB* resp) { |
640 | 0 | TxnSnapshotRestorationId restoration_id = TxnSnapshotRestorationId::Nil(); |
641 | 0 | if (!req->restoration_id().empty()) { |
642 | 0 | restoration_id = VERIFY_RESULT(FullyDecodeTxnSnapshotRestorationId(req->restoration_id())); |
643 | 0 | } |
644 | 0 | TxnSnapshotId snapshot_id = TxnSnapshotId::Nil(); |
645 | 0 | if (!req->snapshot_id().empty()) { |
646 | 0 | snapshot_id = VERIFY_RESULT(FullyDecodeTxnSnapshotId(req->snapshot_id())); |
647 | 0 | } |
648 | |
|
649 | 0 | return snapshot_coordinator_.ListRestorations(restoration_id, snapshot_id, resp); |
650 | 0 | } |
651 | | |
652 | | Status CatalogManager::RestoreSnapshot(const RestoreSnapshotRequestPB* req, |
653 | 0 | RestoreSnapshotResponsePB* resp) { |
654 | 0 | LOG(INFO) << "Servicing RestoreSnapshot request: " << req->ShortDebugString(); |
655 | |
|
656 | 0 | auto txn_snapshot_id = TryFullyDecodeTxnSnapshotId(req->snapshot_id()); |
657 | 0 | if (txn_snapshot_id) { |
658 | 0 | HybridTime ht; |
659 | 0 | if (req->has_restore_ht()) { |
660 | 0 | ht = HybridTime(req->restore_ht()); |
661 | 0 | } |
662 | 0 | TxnSnapshotRestorationId id = VERIFY_RESULT(snapshot_coordinator_.Restore( |
663 | 0 | txn_snapshot_id, ht, leader_ready_term())); |
664 | 0 | resp->set_restoration_id(id.data(), id.size()); |
665 | 0 | return Status::OK(); |
666 | 0 | } |
667 | | |
668 | 0 | return RestoreNonTransactionAwareSnapshot(req->snapshot_id()); |
669 | 0 | } |
670 | | |
671 | 0 | Status CatalogManager::RestoreNonTransactionAwareSnapshot(const string& snapshot_id) { |
672 | 0 | LockGuard lock(mutex_); |
673 | 0 | TRACE("Acquired catalog manager lock"); |
674 | |
|
675 | 0 | if (!current_snapshot_id_.empty()) { |
676 | 0 | return STATUS( |
677 | 0 | IllegalState, |
678 | 0 | Format( |
679 | 0 | "Current snapshot id: $0. Parallel snapshot operations are not supported: $1", |
680 | 0 | current_snapshot_id_, snapshot_id), |
681 | 0 | MasterError(MasterErrorPB::PARALLEL_SNAPSHOT_OPERATION)); |
682 | 0 | } |
683 | | |
684 | 0 | TRACE("Looking up snapshot"); |
685 | 0 | scoped_refptr<SnapshotInfo> snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, snapshot_id); |
686 | 0 | if (snapshot == nullptr) { |
687 | 0 | return STATUS(InvalidArgument, "Could not find snapshot", snapshot_id, |
688 | 0 | MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND)); |
689 | 0 | } |
690 | | |
691 | 0 | auto snapshot_lock = snapshot->LockForWrite(); |
692 | |
|
693 | 0 | if (snapshot_lock->started_deleting()) { |
694 | 0 | return STATUS(NotFound, "The snapshot was deleted", snapshot_id, |
695 | 0 | MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND)); |
696 | 0 | } |
697 | | |
698 | 0 | if (!snapshot_lock->is_complete()) { |
699 | 0 | return STATUS(IllegalState, "The snapshot state is not complete", snapshot_id, |
700 | 0 | MasterError(MasterErrorPB::SNAPSHOT_IS_NOT_READY)); |
701 | 0 | } |
702 | | |
703 | 0 | TRACE("Updating snapshot metadata on disk"); |
704 | 0 | SysSnapshotEntryPB& snapshot_pb = snapshot_lock.mutable_data()->pb; |
705 | 0 | snapshot_pb.set_state(SysSnapshotEntryPB::RESTORING); |
706 | | |
707 | | // Update tablet states. |
708 | 0 | SetTabletSnapshotsState(SysSnapshotEntryPB::RESTORING, &snapshot_pb); |
709 | | |
710 | | // Update sys-catalog with the updated snapshot state. |
711 | | // The mutation will be aborted when 'l' exits the scope on early return. |
712 | 0 | RETURN_NOT_OK(CheckLeaderStatus( |
713 | 0 | sys_catalog_->Upsert(leader_ready_term(), snapshot), |
714 | 0 | "updating snapshot in sys-catalog")); |
715 | | |
716 | | // CataloManager lock 'lock_' is still locked here. |
717 | 0 | current_snapshot_id_ = snapshot_id; |
718 | | |
719 | | // Restore all entries. |
720 | 0 | for (const SysRowEntry& entry : snapshot_pb.entries()) { |
721 | 0 | RETURN_NOT_OK(RestoreEntry(entry, snapshot_id)); |
722 | 0 | } |
723 | | |
724 | | // Commit in memory snapshot data descriptor. |
725 | 0 | TRACE("Committing in-memory snapshot state"); |
726 | 0 | snapshot_lock.Commit(); |
727 | |
|
728 | 0 | LOG(INFO) << "Successfully started snapshot " << snapshot->ToString() << " restoring"; |
729 | 0 | return Status::OK(); |
730 | 0 | } |
731 | | |
732 | 0 | Status CatalogManager::RestoreEntry(const SysRowEntry& entry, const SnapshotId& snapshot_id) { |
733 | 0 | switch (entry.type()) { |
734 | 0 | case SysRowEntryType::NAMESPACE: { // Restore NAMESPACES. |
735 | 0 | TRACE("Looking up namespace"); |
736 | 0 | scoped_refptr<NamespaceInfo> ns = FindPtrOrNull(namespace_ids_map_, entry.id()); |
737 | 0 | if (ns == nullptr) { |
738 | | // Restore Namespace. |
739 | | // TODO: implement |
740 | 0 | LOG(INFO) << "Restoring: NAMESPACE id = " << entry.id(); |
741 | |
|
742 | 0 | return STATUS(NotSupported, Substitute( |
743 | 0 | "Not implemented: restoring namespace: id=$0", entry.type())); |
744 | 0 | } |
745 | 0 | break; |
746 | 0 | } |
747 | 0 | case SysRowEntryType::TABLE: { // Restore TABLES. |
748 | 0 | TRACE("Looking up table"); |
749 | 0 | scoped_refptr<TableInfo> table = FindPtrOrNull(*table_ids_map_, entry.id()); |
750 | 0 | if (table == nullptr) { |
751 | | // Restore Table. |
752 | | // TODO: implement |
753 | 0 | LOG(INFO) << "Restoring: TABLE id = " << entry.id(); |
754 | |
|
755 | 0 | return STATUS(NotSupported, Substitute( |
756 | 0 | "Not implemented: restoring table: id=$0", entry.type())); |
757 | 0 | } |
758 | 0 | break; |
759 | 0 | } |
760 | 0 | case SysRowEntryType::TABLET: { // Restore TABLETS. |
761 | 0 | TRACE("Looking up tablet"); |
762 | 0 | scoped_refptr<TabletInfo> tablet = FindPtrOrNull(*tablet_map_, entry.id()); |
763 | 0 | if (tablet == nullptr) { |
764 | | // Restore Tablet. |
765 | | // TODO: implement |
766 | 0 | LOG(INFO) << "Restoring: TABLET id = " << entry.id(); |
767 | |
|
768 | 0 | return STATUS(NotSupported, Substitute( |
769 | 0 | "Not implemented: restoring tablet: id=$0", entry.type())); |
770 | 0 | } else { |
771 | 0 | TRACE("Locking tablet"); |
772 | 0 | auto l = tablet->LockForRead(); |
773 | |
|
774 | 0 | LOG(INFO) << "Sending RestoreTabletSnapshot to tablet: " << tablet->ToString(); |
775 | | // Send RestoreSnapshot requests to all TServers (one tablet - one request). |
776 | 0 | auto task = CreateAsyncTabletSnapshotOp( |
777 | 0 | tablet, snapshot_id, tserver::TabletSnapshotOpRequestPB::RESTORE_ON_TABLET, |
778 | 0 | TabletSnapshotOperationCallback()); |
779 | 0 | ScheduleTabletSnapshotOp(task); |
780 | 0 | } |
781 | 0 | break; |
782 | 0 | } |
783 | 0 | default: |
784 | 0 | return STATUS_FORMAT( |
785 | 0 | InternalError, "Unexpected entry type in the snapshot: $0", entry.type()); |
786 | 0 | } |
787 | | |
788 | 0 | return Status::OK(); |
789 | 0 | } |
790 | | |
791 | | Status CatalogManager::DeleteSnapshot(const DeleteSnapshotRequestPB* req, |
792 | | DeleteSnapshotResponsePB* resp, |
793 | 0 | RpcContext* rpc) { |
794 | 0 | LOG(INFO) << "Servicing DeleteSnapshot request: " << req->ShortDebugString(); |
795 | |
|
796 | 0 | auto txn_snapshot_id = TryFullyDecodeTxnSnapshotId(req->snapshot_id()); |
797 | 0 | if (txn_snapshot_id) { |
798 | 0 | return snapshot_coordinator_.Delete( |
799 | 0 | txn_snapshot_id, leader_ready_term(), rpc->GetClientDeadline()); |
800 | 0 | } |
801 | | |
802 | 0 | return DeleteNonTransactionAwareSnapshot(req->snapshot_id()); |
803 | 0 | } |
804 | | |
805 | 0 | Status CatalogManager::DeleteNonTransactionAwareSnapshot(const SnapshotId& snapshot_id) { |
806 | 0 | LockGuard lock(mutex_); |
807 | 0 | TRACE("Acquired catalog manager lock"); |
808 | |
|
809 | 0 | TRACE("Looking up snapshot"); |
810 | 0 | scoped_refptr<SnapshotInfo> snapshot = FindPtrOrNull( |
811 | 0 | non_txn_snapshot_ids_map_, snapshot_id); |
812 | 0 | if (snapshot == nullptr) { |
813 | 0 | return STATUS(InvalidArgument, "Could not find snapshot", snapshot_id, |
814 | 0 | MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND)); |
815 | 0 | } |
816 | | |
817 | 0 | auto snapshot_lock = snapshot->LockForWrite(); |
818 | |
|
819 | 0 | if (snapshot_lock->started_deleting()) { |
820 | 0 | return STATUS(NotFound, "The snapshot was deleted", snapshot_id, |
821 | 0 | MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND)); |
822 | 0 | } |
823 | | |
824 | 0 | if (snapshot_lock->is_restoring()) { |
825 | 0 | return STATUS(InvalidArgument, "The snapshot is being restored now", snapshot_id, |
826 | 0 | MasterError(MasterErrorPB::PARALLEL_SNAPSHOT_OPERATION)); |
827 | 0 | } |
828 | | |
829 | 0 | TRACE("Updating snapshot metadata on disk"); |
830 | 0 | SysSnapshotEntryPB& snapshot_pb = snapshot_lock.mutable_data()->pb; |
831 | 0 | snapshot_pb.set_state(SysSnapshotEntryPB::DELETING); |
832 | | |
833 | | // Update tablet states. |
834 | 0 | SetTabletSnapshotsState(SysSnapshotEntryPB::DELETING, &snapshot_pb); |
835 | | |
836 | | // Update sys-catalog with the updated snapshot state. |
837 | | // The mutation will be aborted when 'l' exits the scope on early return. |
838 | 0 | RETURN_NOT_OK(CheckStatus( |
839 | 0 | sys_catalog_->Upsert(leader_ready_term(), snapshot), |
840 | 0 | "updating snapshot in sys-catalog")); |
841 | | |
842 | | // Send DeleteSnapshot requests to all TServers (one tablet - one request). |
843 | 0 | for (const SysRowEntry& entry : snapshot_pb.entries()) { |
844 | 0 | if (entry.type() == SysRowEntryType::TABLET) { |
845 | 0 | TRACE("Looking up tablet"); |
846 | 0 | scoped_refptr<TabletInfo> tablet = FindPtrOrNull(*tablet_map_, entry.id()); |
847 | 0 | if (tablet == nullptr) { |
848 | 0 | LOG(WARNING) << "Deleting tablet not found " << entry.id(); |
849 | 0 | } else { |
850 | 0 | TRACE("Locking tablet"); |
851 | 0 | auto l = tablet->LockForRead(); |
852 | |
|
853 | 0 | LOG(INFO) << "Sending DeleteTabletSnapshot to tablet: " << tablet->ToString(); |
854 | | // Send DeleteSnapshot requests to all TServers (one tablet - one request). |
855 | 0 | auto task = CreateAsyncTabletSnapshotOp( |
856 | 0 | tablet, snapshot_id, tserver::TabletSnapshotOpRequestPB::DELETE_ON_TABLET, |
857 | 0 | TabletSnapshotOperationCallback()); |
858 | 0 | ScheduleTabletSnapshotOp(task); |
859 | 0 | } |
860 | 0 | } |
861 | 0 | } |
862 | | |
863 | | // Commit in memory snapshot data descriptor. |
864 | 0 | TRACE("Committing in-memory snapshot state"); |
865 | 0 | snapshot_lock.Commit(); |
866 | |
|
867 | 0 | LOG(INFO) << "Successfully started snapshot " << snapshot->ToString() << " deletion"; |
868 | 0 | return Status::OK(); |
869 | 0 | } |
870 | | |
871 | | Status CatalogManager::ImportSnapshotPreprocess(const SnapshotInfoPB& snapshot_pb, |
872 | | ImportSnapshotMetaResponsePB* resp, |
873 | | NamespaceMap* namespace_map, |
874 | 0 | ExternalTableSnapshotDataMap* tables_data) { |
875 | 0 | for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) { |
876 | 0 | const SysRowEntry& entry = backup_entry.entry(); |
877 | 0 | switch (entry.type()) { |
878 | 0 | case SysRowEntryType::NAMESPACE: // Recreate NAMESPACE. |
879 | 0 | RETURN_NOT_OK(ImportNamespaceEntry(entry, namespace_map)); |
880 | 0 | break; |
881 | 0 | case SysRowEntryType::TABLE: { // Create TABLE metadata. |
882 | 0 | LOG_IF(DFATAL, entry.id().empty()) << "Empty entry id"; |
883 | 0 | ExternalTableSnapshotData& data = (*tables_data)[entry.id()]; |
884 | |
|
885 | 0 | if (data.old_table_id.empty()) { |
886 | 0 | data.old_table_id = entry.id(); |
887 | 0 | data.table_meta = resp->mutable_tables_meta()->Add(); |
888 | 0 | data.tablet_id_map = data.table_meta->mutable_tablets_ids(); |
889 | 0 | data.table_entry_pb = VERIFY_RESULT(ParseFromSlice<SysTablesEntryPB>(entry.data())); |
890 | |
|
891 | 0 | if (backup_entry.has_pg_schema_name()) { |
892 | 0 | data.pg_schema_name = backup_entry.pg_schema_name(); |
893 | 0 | } |
894 | 0 | } else { |
895 | 0 | LOG_WITH_FUNC(WARNING) << "Ignoring duplicate table with id " << entry.id(); |
896 | 0 | } |
897 | |
|
898 | 0 | LOG_IF(DFATAL, data.old_table_id.empty()) << "Not initialized table id"; |
899 | 0 | } |
900 | 0 | break; |
901 | 0 | case SysRowEntryType::TABLET: // Preprocess original tablets. |
902 | 0 | RETURN_NOT_OK(PreprocessTabletEntry(entry, tables_data)); |
903 | 0 | break; |
904 | 0 | case SysRowEntryType::CLUSTER_CONFIG: FALLTHROUGH_INTENDED; |
905 | 0 | case SysRowEntryType::REDIS_CONFIG: FALLTHROUGH_INTENDED; |
906 | 0 | case SysRowEntryType::UDTYPE: FALLTHROUGH_INTENDED; |
907 | 0 | case SysRowEntryType::ROLE: FALLTHROUGH_INTENDED; |
908 | 0 | case SysRowEntryType::SYS_CONFIG: FALLTHROUGH_INTENDED; |
909 | 0 | case SysRowEntryType::CDC_STREAM: FALLTHROUGH_INTENDED; |
910 | 0 | case SysRowEntryType::UNIVERSE_REPLICATION: FALLTHROUGH_INTENDED; |
911 | 0 | case SysRowEntryType::SNAPSHOT: FALLTHROUGH_INTENDED; |
912 | 0 | case SysRowEntryType::SNAPSHOT_SCHEDULE: FALLTHROUGH_INTENDED; |
913 | 0 | case SysRowEntryType::DDL_LOG_ENTRY: FALLTHROUGH_INTENDED; |
914 | 0 | case SysRowEntryType::UNKNOWN: |
915 | 0 | FATAL_INVALID_ENUM_VALUE(SysRowEntryType, entry.type()); |
916 | 0 | } |
917 | 0 | } |
918 | |
|
919 | 0 | return Status::OK(); |
920 | 0 | } |
921 | | |
922 | | Status CatalogManager::ImportSnapshotCreateObject(const SnapshotInfoPB& snapshot_pb, |
923 | | ImportSnapshotMetaResponsePB* resp, |
924 | | NamespaceMap* namespace_map, |
925 | | ExternalTableSnapshotDataMap* tables_data, |
926 | 0 | CreateObjects create_objects) { |
927 | | // Create ONLY TABLES or ONLY INDEXES in accordance to the argument. |
928 | 0 | for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) { |
929 | 0 | const SysRowEntry& entry = backup_entry.entry(); |
930 | 0 | if (entry.type() == SysRowEntryType::TABLE) { |
931 | 0 | ExternalTableSnapshotData& data = (*tables_data)[entry.id()]; |
932 | 0 | if ((create_objects == CreateObjects::kOnlyIndexes) == data.is_index()) { |
933 | 0 | RETURN_NOT_OK(ImportTableEntry(*namespace_map, *tables_data, &data)); |
934 | 0 | } |
935 | 0 | } |
936 | 0 | } |
937 | |
|
938 | 0 | return Status::OK(); |
939 | 0 | } |
940 | | |
941 | | Status CatalogManager::ImportSnapshotWaitForTables(const SnapshotInfoPB& snapshot_pb, |
942 | | ImportSnapshotMetaResponsePB* resp, |
943 | 0 | ExternalTableSnapshotDataMap* tables_data) { |
944 | 0 | for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) { |
945 | 0 | const SysRowEntry& entry = backup_entry.entry(); |
946 | 0 | if (entry.type() == SysRowEntryType::TABLE) { |
947 | 0 | ExternalTableSnapshotData& data = (*tables_data)[entry.id()]; |
948 | 0 | if (!data.is_index()) { |
949 | 0 | RETURN_NOT_OK(WaitForCreateTableToFinish(data.new_table_id)); |
950 | 0 | } |
951 | 0 | } |
952 | 0 | } |
953 | |
|
954 | 0 | return Status::OK(); |
955 | 0 | } |
956 | | |
957 | | Status CatalogManager::ImportSnapshotProcessTablets(const SnapshotInfoPB& snapshot_pb, |
958 | | ImportSnapshotMetaResponsePB* resp, |
959 | 0 | ExternalTableSnapshotDataMap* tables_data) { |
960 | 0 | for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) { |
961 | 0 | const SysRowEntry& entry = backup_entry.entry(); |
962 | 0 | if (entry.type() == SysRowEntryType::TABLET) { |
963 | | // Create tablets IDs map. |
964 | 0 | RETURN_NOT_OK(ImportTabletEntry(entry, tables_data)); |
965 | 0 | } |
966 | 0 | } |
967 | |
|
968 | 0 | return Status::OK(); |
969 | 0 | } |
970 | | |
971 | | template <class RespClass> |
972 | | void ProcessDeleteObjectStatus(const string& obj_name, |
973 | | const string& id, |
974 | | const RespClass& resp, |
975 | 0 | const Status& s) { |
976 | 0 | Status result = s; |
977 | 0 | if (result.ok() && resp.has_error()) { |
978 | 0 | result = StatusFromPB(resp.error().status()); |
979 | 0 | LOG_IF(DFATAL, result.ok()) << "Expecting error status"; |
980 | 0 | } |
981 | |
|
982 | 0 | if (!result.ok()) { |
983 | 0 | LOG_WITH_FUNC(WARNING) << "Failed to delete new " << obj_name << " with id=" << id |
984 | 0 | << ": " << result; |
985 | 0 | } |
986 | 0 | } Unexecuted instantiation: _ZN2yb6master10enterprise25ProcessDeleteObjectStatusINS0_21DeleteTableResponsePBEEEvRKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEESC_RKT_RKNS_6StatusE Unexecuted instantiation: _ZN2yb6master10enterprise25ProcessDeleteObjectStatusINS0_25DeleteNamespaceResponsePBEEEvRKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEESC_RKT_RKNS_6StatusE |
987 | | |
988 | | void CatalogManager::DeleteNewSnapshotObjects(const NamespaceMap& namespace_map, |
989 | 0 | const ExternalTableSnapshotDataMap& tables_data) { |
990 | 0 | for (const ExternalTableSnapshotDataMap::value_type& entry : tables_data) { |
991 | 0 | const TableId& old_id = entry.first; |
992 | 0 | const TableId& new_id = entry.second.new_table_id; |
993 | 0 | const TableType type = entry.second.table_entry_pb.table_type(); |
994 | | |
995 | | // Do not delete YSQL objects - it must be deleted via PG API. |
996 | 0 | if (new_id.empty() || new_id == old_id || type == TableType::PGSQL_TABLE_TYPE) { |
997 | 0 | continue; |
998 | 0 | } |
999 | | |
1000 | 0 | LOG_WITH_FUNC(INFO) << "Deleting new table with id=" << new_id << " old id=" << old_id; |
1001 | 0 | DeleteTableRequestPB req; |
1002 | 0 | DeleteTableResponsePB resp; |
1003 | 0 | req.mutable_table()->set_table_id(new_id); |
1004 | 0 | req.set_is_index_table(entry.second.is_index()); |
1005 | 0 | ProcessDeleteObjectStatus("table", new_id, resp, DeleteTable(&req, &resp, nullptr)); |
1006 | 0 | } |
1007 | |
|
1008 | 0 | for (const NamespaceMap::value_type& entry : namespace_map) { |
1009 | 0 | const NamespaceId& old_id = entry.first; |
1010 | 0 | const NamespaceId& new_id = entry.second.first; |
1011 | 0 | const YQLDatabase& db_type = entry.second.second; |
1012 | | |
1013 | | // Do not delete YSQL objects - it must be deleted via PG API. |
1014 | 0 | if (new_id.empty() || new_id == old_id || db_type == YQL_DATABASE_PGSQL) { |
1015 | 0 | continue; |
1016 | 0 | } |
1017 | | |
1018 | 0 | LOG_WITH_FUNC(INFO) << "Deleting new namespace with id=" << new_id << " old id=" << old_id; |
1019 | 0 | DeleteNamespaceRequestPB req; |
1020 | 0 | DeleteNamespaceResponsePB resp; |
1021 | 0 | req.mutable_namespace_()->set_id(new_id); |
1022 | 0 | ProcessDeleteObjectStatus( |
1023 | 0 | "namespace", new_id, resp, DeleteNamespace(&req, &resp, nullptr)); |
1024 | 0 | } |
1025 | 0 | } |
1026 | | |
1027 | | Status CatalogManager::ImportSnapshotMeta(const ImportSnapshotMetaRequestPB* req, |
1028 | 0 | ImportSnapshotMetaResponsePB* resp) { |
1029 | 0 | LOG(INFO) << "Servicing ImportSnapshotMeta request: " << req->ShortDebugString(); |
1030 | |
|
1031 | 0 | NamespaceMap namespace_map; |
1032 | 0 | ExternalTableSnapshotDataMap tables_data; |
1033 | 0 | bool successful_exit = false; |
1034 | |
|
1035 | 0 | auto se = ScopeExit([this, &namespace_map, &tables_data, &successful_exit] { |
1036 | 0 | if (!successful_exit) { |
1037 | 0 | DeleteNewSnapshotObjects(namespace_map, tables_data); |
1038 | 0 | } |
1039 | 0 | }); |
1040 | |
|
1041 | 0 | const SnapshotInfoPB& snapshot_pb = req->snapshot(); |
1042 | |
|
1043 | 0 | if (!snapshot_pb.has_format_version() || snapshot_pb.format_version() != 2) { |
1044 | 0 | return STATUS(InternalError, "Expected snapshot data in format 2", |
1045 | 0 | snapshot_pb.ShortDebugString(), MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1046 | 0 | } |
1047 | | |
1048 | 0 | if (snapshot_pb.backup_entries_size() == 0) { |
1049 | 0 | return STATUS(InternalError, "Expected snapshot data prepared for backup", |
1050 | 0 | snapshot_pb.ShortDebugString(), MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1051 | 0 | } |
1052 | | |
1053 | | // PHASE 1: Recreate namespaces, create table's meta data. |
1054 | 0 | RETURN_NOT_OK(ImportSnapshotPreprocess(snapshot_pb, resp, &namespace_map, &tables_data)); |
1055 | | |
1056 | | // PHASE 2: Recreate ONLY tables. |
1057 | 0 | RETURN_NOT_OK(ImportSnapshotCreateObject( |
1058 | 0 | snapshot_pb, resp, &namespace_map, &tables_data, CreateObjects::kOnlyTables)); |
1059 | | |
1060 | | // PHASE 3: Wait for all tables creation complete. |
1061 | 0 | RETURN_NOT_OK(ImportSnapshotWaitForTables(snapshot_pb, resp, &tables_data)); |
1062 | | |
1063 | | // PHASE 4: Recreate ONLY indexes. |
1064 | 0 | RETURN_NOT_OK(ImportSnapshotCreateObject( |
1065 | 0 | snapshot_pb, resp, &namespace_map, &tables_data, CreateObjects::kOnlyIndexes)); |
1066 | | |
1067 | | // PHASE 5: Restore tablets. |
1068 | 0 | RETURN_NOT_OK(ImportSnapshotProcessTablets(snapshot_pb, resp, &tables_data)); |
1069 | |
|
1070 | 0 | successful_exit = true; |
1071 | 0 | return Status::OK(); |
1072 | 0 | } |
1073 | | |
1074 | | Status CatalogManager::ChangeEncryptionInfo(const ChangeEncryptionInfoRequestPB* req, |
1075 | 6 | ChangeEncryptionInfoResponsePB* resp) { |
1076 | 6 | auto l = cluster_config_->LockForWrite(); |
1077 | 6 | auto encryption_info = l.mutable_data()->pb.mutable_encryption_info(); |
1078 | | |
1079 | 6 | RETURN_NOT_OK(encryption_manager_->ChangeEncryptionInfo(req, encryption_info)); |
1080 | | |
1081 | 6 | l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1); |
1082 | 6 | RETURN_NOT_OK(CheckStatus( |
1083 | 6 | sys_catalog_->Upsert(leader_ready_term(), cluster_config_), |
1084 | 6 | "updating cluster config in sys-catalog")); |
1085 | 6 | l.Commit(); |
1086 | | |
1087 | 6 | std::lock_guard<simple_spinlock> lock(should_send_universe_key_registry_mutex_); |
1088 | 2 | for (auto& entry : should_send_universe_key_registry_) { |
1089 | 2 | entry.second = true; |
1090 | 2 | } |
1091 | | |
1092 | 6 | return Status::OK(); |
1093 | 6 | } |
1094 | | |
1095 | | Status CatalogManager::IsEncryptionEnabled(const IsEncryptionEnabledRequestPB* req, |
1096 | 6 | IsEncryptionEnabledResponsePB* resp) { |
1097 | 6 | return encryption_manager_->IsEncryptionEnabled( |
1098 | 6 | cluster_config_->LockForRead()->pb.encryption_info(), resp); |
1099 | 6 | } |
1100 | | |
1101 | | Status CatalogManager::ImportNamespaceEntry(const SysRowEntry& entry, |
1102 | 0 | NamespaceMap* namespace_map) { |
1103 | 0 | LOG_IF(DFATAL, entry.type() != SysRowEntryType::NAMESPACE) |
1104 | 0 | << "Unexpected entry type: " << entry.type(); |
1105 | |
|
1106 | 0 | SysNamespaceEntryPB meta = VERIFY_RESULT(ParseFromSlice<SysNamespaceEntryPB>(entry.data())); |
1107 | 0 | const YQLDatabase db_type = GetDatabaseType(meta); |
1108 | 0 | NamespaceData& ns_data = (*namespace_map)[entry.id()]; |
1109 | 0 | ns_data.second = db_type; |
1110 | |
|
1111 | 0 | TRACE("Looking up namespace"); |
1112 | | // First of all try to find the namespace by ID. It will work if we are restoring the backup |
1113 | | // on the original cluster where the backup was created. |
1114 | 0 | scoped_refptr<NamespaceInfo> ns; |
1115 | 0 | { |
1116 | 0 | SharedLock lock(mutex_); |
1117 | 0 | ns = FindPtrOrNull(namespace_ids_map_, entry.id()); |
1118 | 0 | } |
1119 | |
|
1120 | 0 | if (ns != nullptr && ns->name() == meta.name() && ns->state() == SysNamespaceEntryPB::RUNNING) { |
1121 | 0 | ns_data.first = entry.id(); |
1122 | 0 | return Status::OK(); |
1123 | 0 | } |
1124 | | |
1125 | | // If the namespace was not found by ID, it's ok on a new cluster OR if the namespace was |
1126 | | // deleted and created again. In both cases the namespace can be found by NAME. |
1127 | 0 | if (db_type == YQL_DATABASE_PGSQL) { |
1128 | | // YSQL database must be created via external call. Find it by name. |
1129 | 0 | { |
1130 | 0 | SharedLock lock(mutex_); |
1131 | 0 | ns = FindPtrOrNull(namespace_names_mapper_[db_type], meta.name()); |
1132 | 0 | } |
1133 | |
|
1134 | 0 | if (ns == nullptr) { |
1135 | 0 | const string msg = Format("YSQL database must exist: $0", meta.name()); |
1136 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1137 | 0 | return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::NAMESPACE_NOT_FOUND)); |
1138 | 0 | } |
1139 | 0 | if (ns->state() != SysNamespaceEntryPB::RUNNING) { |
1140 | 0 | const string msg = Format("Found YSQL database must be running: $0", meta.name()); |
1141 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1142 | 0 | return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::NAMESPACE_NOT_FOUND)); |
1143 | 0 | } |
1144 | | |
1145 | 0 | auto ns_lock = ns->LockForRead(); |
1146 | 0 | ns_data.first = ns->id(); |
1147 | 0 | } else { |
1148 | 0 | CreateNamespaceRequestPB req; |
1149 | 0 | CreateNamespaceResponsePB resp; |
1150 | 0 | req.set_name(meta.name()); |
1151 | 0 | const Status s = CreateNamespace(&req, &resp, nullptr); |
1152 | |
|
1153 | 0 | if (!s.ok() && !s.IsAlreadyPresent()) { |
1154 | 0 | return s.CloneAndAppend("Failed to create namespace"); |
1155 | 0 | } |
1156 | | |
1157 | 0 | if (s.IsAlreadyPresent()) { |
1158 | 0 | LOG_WITH_FUNC(INFO) << "Using existing namespace '" << meta.name() << "': " << resp.id(); |
1159 | 0 | } |
1160 | |
|
1161 | 0 | ns_data.first = resp.id(); |
1162 | 0 | } |
1163 | 0 | return Status::OK(); |
1164 | 0 | } |
1165 | | |
1166 | | Status CatalogManager::RecreateTable(const NamespaceId& new_namespace_id, |
1167 | | const ExternalTableSnapshotDataMap& table_map, |
1168 | 0 | ExternalTableSnapshotData* table_data) { |
1169 | 0 | const SysTablesEntryPB& meta = DCHECK_NOTNULL(table_data)->table_entry_pb; |
1170 | |
|
1171 | 0 | CreateTableRequestPB req; |
1172 | 0 | CreateTableResponsePB resp; |
1173 | 0 | req.set_name(meta.name()); |
1174 | 0 | req.set_table_type(meta.table_type()); |
1175 | 0 | req.set_num_tablets(narrow_cast<int32_t>(table_data->num_tablets)); |
1176 | 0 | for (const auto& p : table_data->partitions) { |
1177 | 0 | *req.add_partitions() = p; |
1178 | 0 | } |
1179 | 0 | req.mutable_namespace_()->set_id(new_namespace_id); |
1180 | 0 | *req.mutable_partition_schema() = meta.partition_schema(); |
1181 | 0 | *req.mutable_replication_info() = meta.replication_info(); |
1182 | |
|
1183 | 0 | SchemaPB* const schema = req.mutable_schema(); |
1184 | 0 | *schema = meta.schema(); |
1185 | | |
1186 | | // Setup Index info. |
1187 | 0 | if (table_data->is_index()) { |
1188 | 0 | TRACE("Looking up indexed table"); |
1189 | | // First of all try to attach to the new copy of the referenced table, |
1190 | | // because the table restored from the snapshot is preferred. |
1191 | | // For that try to map old indexed table ID into new table ID. |
1192 | 0 | ExternalTableSnapshotDataMap::const_iterator it = table_map.find(meta.indexed_table_id()); |
1193 | 0 | const bool using_existing_table = (it == table_map.end()); |
1194 | |
|
1195 | 0 | if (using_existing_table) { |
1196 | 0 | LOG_WITH_FUNC(INFO) << "Try to use old indexed table id " << meta.indexed_table_id(); |
1197 | 0 | req.set_indexed_table_id(meta.indexed_table_id()); |
1198 | 0 | } else { |
1199 | 0 | LOG_WITH_FUNC(INFO) << "Found new table id " << it->second.new_table_id |
1200 | 0 | << " for old table id " << meta.indexed_table_id() |
1201 | 0 | << " from the snapshot"; |
1202 | 0 | req.set_indexed_table_id(it->second.new_table_id); |
1203 | 0 | } |
1204 | |
|
1205 | 0 | scoped_refptr<TableInfo> indexed_table; |
1206 | 0 | { |
1207 | 0 | SharedLock lock(mutex_); |
1208 | | // Try to find the specified indexed table by id. |
1209 | 0 | indexed_table = FindPtrOrNull(*table_ids_map_, req.indexed_table_id()); |
1210 | 0 | } |
1211 | |
|
1212 | 0 | if (indexed_table == nullptr) { |
1213 | 0 | const string msg = Format("Indexed table not found by id: $0", req.indexed_table_id()); |
1214 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1215 | 0 | return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
1216 | 0 | } |
1217 | | |
1218 | 0 | LOG_WITH_FUNC(INFO) << "Found indexed table by id " << req.indexed_table_id(); |
1219 | | |
1220 | | // Ensure the main table schema (including column ids) was not changed. |
1221 | 0 | if (!using_existing_table) { |
1222 | 0 | Schema new_indexed_schema, src_indexed_schema; |
1223 | 0 | RETURN_NOT_OK(indexed_table->GetSchema(&new_indexed_schema)); |
1224 | 0 | RETURN_NOT_OK(SchemaFromPB(it->second.table_entry_pb.schema(), &src_indexed_schema)); |
1225 | |
|
1226 | 0 | if (!new_indexed_schema.Equals(src_indexed_schema)) { |
1227 | 0 | const string msg = Format( |
1228 | 0 | "Recreated table has changes in schema: new schema={$0}, source schema={$1}", |
1229 | 0 | new_indexed_schema.ToString(), src_indexed_schema.ToString()); |
1230 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1231 | 0 | return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1232 | 0 | } |
1233 | | |
1234 | 0 | if (new_indexed_schema.column_ids() != src_indexed_schema.column_ids()) { |
1235 | 0 | const string msg = Format( |
1236 | 0 | "Recreated table has changes in column ids: new ids=$0, source ids=$1", |
1237 | 0 | ToString(new_indexed_schema.column_ids()), ToString(src_indexed_schema.column_ids())); |
1238 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1239 | 0 | return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1240 | 0 | } |
1241 | 0 | } |
1242 | | |
1243 | 0 | req.set_is_local_index(meta.is_local_index()); |
1244 | 0 | req.set_is_unique_index(meta.is_unique_index()); |
1245 | 0 | req.set_skip_index_backfill(true); |
1246 | | // Setup IndexInfoPB - self descriptor. |
1247 | 0 | IndexInfoPB* const index_info_pb = req.mutable_index_info(); |
1248 | 0 | *index_info_pb = meta.index_info(); |
1249 | 0 | index_info_pb->clear_table_id(); |
1250 | 0 | index_info_pb->set_indexed_table_id(req.indexed_table_id()); |
1251 | | |
1252 | | // Reset column ids. |
1253 | 0 | for (int i = 0; i < index_info_pb->columns_size(); ++i) { |
1254 | 0 | index_info_pb->mutable_columns(i)->clear_column_id(); |
1255 | 0 | } |
1256 | 0 | } |
1257 | |
|
1258 | 0 | RETURN_NOT_OK(CreateTable(&req, &resp, /* RpcContext */nullptr)); |
1259 | 0 | table_data->new_table_id = resp.table_id(); |
1260 | 0 | LOG_WITH_FUNC(INFO) << "New table id " << table_data->new_table_id << " for " |
1261 | 0 | << table_data->old_table_id; |
1262 | 0 | return Status::OK(); |
1263 | 0 | } |
1264 | | |
1265 | | Status CatalogManager::RepartitionTable(const scoped_refptr<TableInfo> table, |
1266 | 0 | const ExternalTableSnapshotData* table_data) { |
1267 | 0 | DCHECK_EQ(table->id(), table_data->new_table_id); |
1268 | 0 | if (table->GetTableType() != PGSQL_TABLE_TYPE) { |
1269 | 0 | return STATUS_FORMAT(InvalidArgument, |
1270 | 0 | "Cannot repartition non-YSQL table: got $0", |
1271 | 0 | TableType_Name(table->GetTableType())); |
1272 | 0 | } |
1273 | 0 | LOG_WITH_FUNC(INFO) << "Repartition table " << table->id() |
1274 | 0 | << " using external snapshot table " << table_data->old_table_id; |
1275 | | |
1276 | | // Get partitions from external snapshot. |
1277 | 0 | size_t i = 0; |
1278 | 0 | vector<Partition> partitions(table_data->partitions.size()); |
1279 | 0 | for (const auto& partition_pb : table_data->partitions) { |
1280 | 0 | Partition::FromPB(partition_pb, &partitions[i++]); |
1281 | 0 | } |
1282 | 0 | VLOG_WITH_FUNC(3) << "Got " << partitions.size() |
1283 | 0 | << " partitions from external snapshot for table " << table->id(); |
1284 | | |
1285 | | // Change TableInfo to point to the new tablets. |
1286 | 0 | string deletion_msg; |
1287 | 0 | vector<scoped_refptr<TabletInfo>> new_tablets; |
1288 | 0 | vector<scoped_refptr<TabletInfo>> old_tablets; |
1289 | 0 | { |
1290 | | // Acquire the TableInfo pb write lock. Although it is not required for some of the individual |
1291 | | // steps, we want to hold it through so that we guarantee the state does not change during the |
1292 | | // whole process. Consequently, we hold it through some steps that require mutex_, but since |
1293 | | // taking mutex_ after TableInfo pb lock is prohibited for deadlock reasons, acquire mutex_ |
1294 | | // first, then release it when it is no longer needed, still holding table pb lock. |
1295 | 0 | TableInfo::WriteLock table_lock; |
1296 | 0 | { |
1297 | 0 | LockGuard lock(mutex_); |
1298 | 0 | TRACE("Acquired catalog manager lock"); |
1299 | | |
1300 | | // Make sure the table is in RUNNING state. |
1301 | | // This by itself doesn't need a pb write lock: just a read lock. However, we want to prevent |
1302 | | // other writers from entering from this point forward, so take the write lock now. |
1303 | 0 | table_lock = table->LockForWrite(); |
1304 | 0 | if (table->old_pb().state() != SysTablesEntryPB::RUNNING) { |
1305 | 0 | return STATUS_FORMAT(IllegalState, |
1306 | 0 | "Table $0 not running: $1", |
1307 | 0 | table->ToString(), |
1308 | 0 | SysTablesEntryPB_State_Name(table->old_pb().state())); |
1309 | 0 | } |
1310 | | // Make sure the table's tablets can be deleted. |
1311 | 0 | RETURN_NOT_OK_PREPEND(CheckIfForbiddenToDeleteTabletOf(table), |
1312 | 0 | Format("Cannot repartition table $0", table->id())); |
1313 | | |
1314 | | // Create and mark new tablets for creation. |
1315 | | |
1316 | | // Use partitions from external snapshot to create new tablets in state PREPARING. The tablets |
1317 | | // will start CREATING once they are committed in memory. |
1318 | 0 | for (const auto& partition : partitions) { |
1319 | 0 | PartitionPB partition_pb; |
1320 | 0 | partition.ToPB(&partition_pb); |
1321 | 0 | new_tablets.push_back(CreateTabletInfo(table.get(), partition_pb)); |
1322 | 0 | } |
1323 | | |
1324 | | // Add tablets to catalog manager tablet_map_. This should be safe to do after creating |
1325 | | // tablets since we're still holding mutex_. |
1326 | 0 | auto tablet_map_checkout = tablet_map_.CheckOut(); |
1327 | 0 | for (auto& new_tablet : new_tablets) { |
1328 | 0 | InsertOrDie(tablet_map_checkout.get_ptr(), new_tablet->tablet_id(), new_tablet); |
1329 | 0 | } |
1330 | 0 | VLOG_WITH_FUNC(3) << "Prepared creation of " << new_tablets.size() |
1331 | 0 | << " new tablets for table " << table->id(); |
1332 | | |
1333 | | // mutex_ is no longer needed, so release by going out of scope. |
1334 | 0 | } |
1335 | | // The table pb write lock is still held, ensuring that the table state does not change. Later |
1336 | | // steps, like GetTablets or AddTablets, will acquire/release the TableInfo lock_, but it's |
1337 | | // probably fine that they are released between the steps since the table pb write lock is held |
1338 | | // throughout. In other words, there should be no risk that TableInfo tablets_ changes between |
1339 | | // GetTablets and RemoveTablets. |
1340 | | |
1341 | | // Abort tablet mutations in case of early returns. |
1342 | 0 | ScopedInfoCommitter<TabletInfo> unlocker_new(&new_tablets); |
1343 | | |
1344 | | // Mark old tablets for deletion. |
1345 | 0 | old_tablets = table->GetTablets(IncludeInactive::kTrue); |
1346 | | // Sort so that locking can be done in a deterministic order. |
1347 | 0 | std::sort(old_tablets.begin(), old_tablets.end(), [](const auto& lhs, const auto& rhs) { |
1348 | 0 | return lhs->tablet_id() < rhs->tablet_id(); |
1349 | 0 | }); |
1350 | 0 | deletion_msg = Format("Old tablets of table $0 deleted at $1", |
1351 | 0 | table->id(), LocalTimeAsString()); |
1352 | 0 | for (auto& old_tablet : old_tablets) { |
1353 | 0 | old_tablet->mutable_metadata()->StartMutation(); |
1354 | 0 | old_tablet->mutable_metadata()->mutable_dirty()->set_state( |
1355 | 0 | SysTabletsEntryPB::DELETED, deletion_msg); |
1356 | 0 | } |
1357 | 0 | VLOG_WITH_FUNC(3) << "Prepared deletion of " << old_tablets.size() << " old tablets for table " |
1358 | 0 | << table->id(); |
1359 | | |
1360 | | // Abort tablet mutations in case of early returns. |
1361 | 0 | ScopedInfoCommitter<TabletInfo> unlocker_old(&old_tablets); |
1362 | | |
1363 | | // Change table's partition schema to the external snapshot's. |
1364 | 0 | auto& table_pb = table_lock.mutable_data()->pb; |
1365 | 0 | table_pb.mutable_partition_schema()->CopyFrom( |
1366 | 0 | table_data->table_entry_pb.partition_schema()); |
1367 | 0 | table_pb.set_partition_list_version(table_pb.partition_list_version() + 1); |
1368 | | |
1369 | | // Remove old tablets from TableInfo. |
1370 | 0 | table->RemoveTablets(old_tablets); |
1371 | | // Add new tablets to TableInfo. This must be done after removing tablets because |
1372 | | // TableInfo::partitions_ has key PartitionKey, which old and new tablets may conflict on. |
1373 | 0 | table->AddTablets(new_tablets); |
1374 | | |
1375 | | // Commit table and tablets to disk. |
1376 | 0 | RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), table, new_tablets, old_tablets)); |
1377 | 0 | VLOG_WITH_FUNC(2) << "Committed to disk: table " << table->id() << " repartition from " |
1378 | 0 | << old_tablets.size() << " tablets to " << new_tablets.size() << " tablets"; |
1379 | | |
1380 | | // Commit to memory. Commit new tablets (addition) first since that doesn't break anything. |
1381 | | // Commit table next since new tablets are already committed and ready to be referenced. Commit |
1382 | | // old tablets (deletion) last since the table is not referencing them anymore. |
1383 | 0 | unlocker_new.Commit(); |
1384 | 0 | table_lock.Commit(); |
1385 | 0 | unlocker_old.Commit(); |
1386 | 0 | VLOG_WITH_FUNC(1) << "Committed to memory: table " << table->id() << " repartition from " |
1387 | 0 | << old_tablets.size() << " tablets to " << new_tablets.size() << " tablets"; |
1388 | 0 | } |
1389 | | |
1390 | | // Finally, now that everything is committed, send the delete tablet requests. |
1391 | 0 | for (auto& old_tablet : old_tablets) { |
1392 | 0 | DeleteTabletReplicas(old_tablet.get(), deletion_msg, HideOnly::kFalse); |
1393 | 0 | } |
1394 | 0 | VLOG_WITH_FUNC(2) << "Sent delete tablet requests for " << old_tablets.size() << " old tablets" |
1395 | 0 | << " of table " << table->id(); |
1396 | | // The create tablet requests should be handled by bg tasks which find the PREPARING tablets after |
1397 | | // commit. |
1398 | |
|
1399 | 0 | return Status::OK(); |
1400 | 0 | } |
1401 | | |
1402 | | // Helper function for ImportTableEntry. |
1403 | | // |
1404 | | // Given an internal table and an external table snapshot, do some checks to determine if we should |
1405 | | // move forward with using this internal table for import. |
1406 | | // |
1407 | | // table: internal table's info |
1408 | | // snapshot_data: external table's snapshot data |
1409 | | Result<bool> CatalogManager::CheckTableForImport(scoped_refptr<TableInfo> table, |
1410 | 0 | ExternalTableSnapshotData* snapshot_data) { |
1411 | 0 | auto table_lock = table->LockForRead(); |
1412 | | |
1413 | | // Check if table is live. |
1414 | 0 | if (!table_lock->visible_to_client()) { |
1415 | 0 | VLOG_WITH_FUNC(2) << "Table not visible to client: " << table->ToString(); |
1416 | 0 | return false; |
1417 | 0 | } |
1418 | | // Check if table names match. |
1419 | 0 | const string& external_table_name = snapshot_data->table_entry_pb.name(); |
1420 | 0 | if (table_lock->name() != external_table_name) { |
1421 | 0 | VLOG_WITH_FUNC(2) << "Table names do not match: " |
1422 | 0 | << table_lock->name() << " vs " << external_table_name |
1423 | 0 | << " for " << table->ToString(); |
1424 | 0 | return false; |
1425 | 0 | } |
1426 | | // Check index vs table. |
1427 | 0 | if (snapshot_data->is_index() ? table->indexed_table_id().empty() |
1428 | 0 | : !table->indexed_table_id().empty()) { |
1429 | 0 | VLOG_WITH_FUNC(2) << "External snapshot table is " << (snapshot_data->is_index() ? "" : "not ") |
1430 | 0 | << "index but internal table is the opposite: " << table->ToString(); |
1431 | 0 | return false; |
1432 | 0 | } |
1433 | | // Check if table schemas match (if present in snapshot). |
1434 | 0 | if (!snapshot_data->pg_schema_name.empty()) { |
1435 | 0 | if (table->GetTableType() != PGSQL_TABLE_TYPE) { |
1436 | 0 | LOG_WITH_FUNC(DFATAL) << "ExternalTableSnapshotData.pg_schema_name set when table type is not" |
1437 | 0 | << " PGSQL: schema name: " << snapshot_data->pg_schema_name |
1438 | 0 | << ", table type: " << TableType_Name(table->GetTableType()); |
1439 | | // If not a debug build, ignore pg_schema_name. |
1440 | 0 | } else { |
1441 | 0 | const string internal_schema_name = VERIFY_RESULT(GetPgSchemaName(table)); |
1442 | 0 | const string& external_schema_name = snapshot_data->pg_schema_name; |
1443 | 0 | if (internal_schema_name != external_schema_name) { |
1444 | 0 | LOG_WITH_FUNC(INFO) << "Schema names do not match: " |
1445 | 0 | << internal_schema_name << " vs " << external_schema_name |
1446 | 0 | << " for " << table->ToString(); |
1447 | 0 | return false; |
1448 | 0 | } |
1449 | 0 | } |
1450 | 0 | } |
1451 | | |
1452 | 0 | return true; |
1453 | 0 | } |
1454 | | |
1455 | | Status CatalogManager::ImportTableEntry(const NamespaceMap& namespace_map, |
1456 | | const ExternalTableSnapshotDataMap& table_map, |
1457 | 0 | ExternalTableSnapshotData* table_data) { |
1458 | 0 | const SysTablesEntryPB& meta = DCHECK_NOTNULL(table_data)->table_entry_pb; |
1459 | 0 | bool is_parent_colocated_table = false; |
1460 | |
|
1461 | 0 | table_data->old_namespace_id = meta.namespace_id(); |
1462 | 0 | LOG_IF(DFATAL, table_data->old_namespace_id.empty()) << "No namespace id"; |
1463 | |
|
1464 | 0 | LOG_IF(DFATAL, namespace_map.find(table_data->old_namespace_id) == namespace_map.end()) |
1465 | 0 | << "Namespace not found: " << table_data->old_namespace_id; |
1466 | 0 | const NamespaceId new_namespace_id = |
1467 | 0 | namespace_map.find(table_data->old_namespace_id)->second.first; |
1468 | 0 | LOG_IF(DFATAL, new_namespace_id.empty()) << "No namespace id"; |
1469 | |
|
1470 | 0 | Schema schema; |
1471 | 0 | RETURN_NOT_OK(SchemaFromPB(meta.schema(), &schema)); |
1472 | 0 | const vector<ColumnId>& column_ids = schema.column_ids(); |
1473 | 0 | scoped_refptr<TableInfo> table; |
1474 | | |
1475 | | // First, check if namespace id and table id match. If, in addition, other properties match, we |
1476 | | // found the destination table. |
1477 | 0 | if (new_namespace_id == table_data->old_namespace_id) { |
1478 | 0 | TRACE("Looking up table"); |
1479 | 0 | { |
1480 | 0 | SharedLock lock(mutex_); |
1481 | 0 | table = FindPtrOrNull(*table_ids_map_, table_data->old_table_id); |
1482 | 0 | } |
1483 | |
|
1484 | 0 | if (table != nullptr) { |
1485 | 0 | VLOG_WITH_PREFIX(3) << "Begin first search"; |
1486 | | // At this point, namespace id and table id match. Check other properties, like whether the |
1487 | | // table is active and whether table name matches. |
1488 | 0 | SharedLock lock(mutex_); |
1489 | 0 | if (VERIFY_RESULT(CheckTableForImport(table, table_data))) { |
1490 | 0 | LOG_WITH_FUNC(INFO) << "Found existing table: '" << table->ToString() << "'"; |
1491 | 0 | } else { |
1492 | | // A property did not match, so this search by ids failed. |
1493 | 0 | auto table_lock = table->LockForRead(); |
1494 | 0 | LOG_WITH_FUNC(WARNING) << "Existing table " << table->ToString() << " not suitable: " |
1495 | 0 | << table_lock->pb.ShortDebugString() |
1496 | 0 | << ", name: " << table->name() << " vs " << meta.name(); |
1497 | 0 | table.reset(); |
1498 | 0 | } |
1499 | 0 | } |
1500 | 0 | } |
1501 | | |
1502 | | // Second, if we still didn't find a match... |
1503 | 0 | if (table == nullptr) { |
1504 | 0 | VLOG_WITH_PREFIX(3) << "Begin second search"; |
1505 | 0 | switch (meta.table_type()) { |
1506 | 0 | case TableType::YQL_TABLE_TYPE: FALLTHROUGH_INTENDED; |
1507 | 0 | case TableType::REDIS_TABLE_TYPE: { |
1508 | | // For YCQL and YEDIS, simply create the missing table. |
1509 | 0 | RETURN_NOT_OK(RecreateTable(new_namespace_id, table_map, table_data)); |
1510 | 0 | break; |
1511 | 0 | } |
1512 | 0 | case TableType::PGSQL_TABLE_TYPE: { |
1513 | | // For YSQL, the table must be created via external call. Therefore, continue the search for |
1514 | | // the table, this time checking for name matches rather than id matches. |
1515 | |
|
1516 | 0 | if (meta.colocated() && IsColocatedParentTableId(table_data->old_table_id)) { |
1517 | | // For the parent colocated table we need to generate the new_table_id ourselves |
1518 | | // since the names will not match. |
1519 | | // For normal colocated tables, we are still able to follow the normal table flow, so no |
1520 | | // need to generate the new_table_id ourselves. |
1521 | 0 | table_data->new_table_id = new_namespace_id + kColocatedParentTableIdSuffix; |
1522 | 0 | is_parent_colocated_table = true; |
1523 | 0 | } else { |
1524 | 0 | if (!table_data->new_table_id.empty()) { |
1525 | 0 | const string msg = Format( |
1526 | 0 | "$0 expected empty new table id but $1 found", __func__, table_data->new_table_id); |
1527 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1528 | 0 | return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1529 | 0 | } |
1530 | 0 | SharedLock lock(mutex_); |
1531 | |
|
1532 | 0 | for (const auto& entry : *table_ids_map_) { |
1533 | 0 | table = entry.second; |
1534 | |
|
1535 | 0 | if (new_namespace_id != table->namespace_id()) { |
1536 | 0 | VLOG_WITH_FUNC(3) << "Namespace ids do not match: " |
1537 | 0 | << table->namespace_id() << " vs " << new_namespace_id |
1538 | 0 | << " for " << table->ToString(); |
1539 | 0 | continue; |
1540 | 0 | } |
1541 | 0 | if (!VERIFY_RESULT(CheckTableForImport(table, table_data))) { |
1542 | | // Some other check failed. |
1543 | 0 | continue; |
1544 | 0 | } |
1545 | | // Also check if table is user-created. |
1546 | 0 | if (!IsUserCreatedTableUnlocked(*table)) { |
1547 | 0 | VLOG_WITH_FUNC(2) << "Table not user created: " << table->ToString(); |
1548 | 0 | continue; |
1549 | 0 | } |
1550 | | |
1551 | | // Found the new YSQL table by name. |
1552 | 0 | if (table_data->new_table_id.empty()) { |
1553 | 0 | LOG_WITH_FUNC(INFO) << "Found existing table " << entry.first << " for " |
1554 | 0 | << new_namespace_id << "/" << meta.name() << " (old table " |
1555 | 0 | << table_data->old_table_id << ") with schema " |
1556 | 0 | << table_data->pg_schema_name; |
1557 | 0 | table_data->new_table_id = entry.first; |
1558 | 0 | } else if (table_data->new_table_id != entry.first) { |
1559 | 0 | const string msg = Format( |
1560 | 0 | "Found 2 YSQL tables with the same name: $0 - $1, $2", |
1561 | 0 | meta.name(), table_data->new_table_id, entry.first); |
1562 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1563 | 0 | return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1564 | 0 | } |
1565 | 0 | } |
1566 | |
|
1567 | 0 | if (table_data->new_table_id.empty()) { |
1568 | 0 | const string msg = Format("YSQL table not found: $0", meta.name()); |
1569 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1570 | 0 | return STATUS(InvalidArgument, msg, MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
1571 | 0 | } |
1572 | 0 | } |
1573 | 0 | break; |
1574 | 0 | } |
1575 | 0 | case TableType::TRANSACTION_STATUS_TABLE_TYPE: { |
1576 | 0 | return STATUS( |
1577 | 0 | InvalidArgument, |
1578 | 0 | Format("Unexpected table type: $0", TableType_Name(meta.table_type())), |
1579 | 0 | MasterError(MasterErrorPB::INVALID_TABLE_TYPE)); |
1580 | 0 | } |
1581 | 0 | } |
1582 | 0 | } else { |
1583 | 0 | table_data->new_table_id = table_data->old_table_id; |
1584 | 0 | LOG_WITH_FUNC(INFO) << "Use existing table " << table_data->new_table_id; |
1585 | 0 | } |
1586 | | |
1587 | | // The destination table should be found or created by now. |
1588 | 0 | TRACE("Looking up new table"); |
1589 | 0 | { |
1590 | 0 | SharedLock lock(mutex_); |
1591 | 0 | table = FindPtrOrNull(*table_ids_map_, table_data->new_table_id); |
1592 | 0 | } |
1593 | 0 | if (table == nullptr) { |
1594 | 0 | const string msg = Format("Created table not found: $0", table_data->new_table_id); |
1595 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1596 | 0 | return STATUS(InternalError, msg, MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
1597 | 0 | } |
1598 | | |
1599 | | // Don't do schema validation/column updates on the parent colocated table. |
1600 | | // However, still do the validation for regular colocated tables. |
1601 | 0 | if (!is_parent_colocated_table) { |
1602 | 0 | Schema persisted_schema; |
1603 | 0 | size_t new_num_tablets = 0; |
1604 | 0 | { |
1605 | 0 | TRACE("Locking table"); |
1606 | 0 | auto table_lock = table->LockForRead(); |
1607 | 0 | RETURN_NOT_OK(table->GetSchema(&persisted_schema)); |
1608 | 0 | new_num_tablets = table->NumPartitions(); |
1609 | 0 | } |
1610 | | |
1611 | | // Ignore 'nullable' attribute - due to difference in implementation |
1612 | | // of PgCreateTable::AddColumn() and PgAlterTable::AddColumn(). |
1613 | 0 | struct CompareColumnsExceptNullable { |
1614 | 0 | bool operator ()(const ColumnSchema& a, const ColumnSchema& b) { |
1615 | 0 | return ColumnSchema::CompHashKey(a, b) && ColumnSchema::CompSortingType(a, b) && |
1616 | 0 | ColumnSchema::CompTypeInfo(a, b) && ColumnSchema::CompName(a, b); |
1617 | 0 | } |
1618 | 0 | } comparator; |
1619 | | // Schema::Equals() compares only column names & types. It does not compare the column ids. |
1620 | 0 | if (!persisted_schema.Equals(schema, comparator) |
1621 | 0 | || persisted_schema.column_ids().size() != column_ids.size()) { |
1622 | 0 | const string msg = Format( |
1623 | 0 | "Invalid created $0 table '$1' in namespace id $2: schema={$3}, expected={$4}", |
1624 | 0 | TableType_Name(meta.table_type()), meta.name(), new_namespace_id, |
1625 | 0 | persisted_schema, schema); |
1626 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1627 | 0 | return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1628 | 0 | } |
1629 | | |
1630 | 0 | if (table_data->num_tablets > 0) { |
1631 | 0 | if (meta.table_type() == TableType::PGSQL_TABLE_TYPE) { |
1632 | 0 | bool partitions_match = true; |
1633 | 0 | if (new_num_tablets != table_data->num_tablets) { |
1634 | 0 | partitions_match = false; |
1635 | 0 | } else { |
1636 | | // Check if partition boundaries match. Only check the starts; assume the ends are fine. |
1637 | 0 | size_t i = 0; |
1638 | 0 | vector<PartitionKey> partition_starts(table_data->num_tablets); |
1639 | 0 | for (const auto& partition_pb : table_data->partitions) { |
1640 | 0 | partition_starts[i] = partition_pb.partition_key_start(); |
1641 | 0 | LOG_IF(DFATAL, (i == 0) ? partition_starts[i] != "" |
1642 | 0 | : partition_starts[i] <= partition_starts[i-1]) |
1643 | 0 | << "Wrong partition key start: " << b2a_hex(partition_starts[i]); |
1644 | 0 | i++; |
1645 | 0 | } |
1646 | 0 | if (!table->HasPartitions(partition_starts)) { |
1647 | 0 | LOG_WITH_FUNC(INFO) << "Partition boundaries mismatch for table " << table->id(); |
1648 | 0 | partitions_match = false; |
1649 | 0 | } |
1650 | 0 | } |
1651 | |
|
1652 | 0 | if (!partitions_match) { |
1653 | 0 | RETURN_NOT_OK(RepartitionTable(table, table_data)); |
1654 | 0 | } |
1655 | 0 | } else { // not PGSQL_TABLE_TYPE |
1656 | 0 | if (new_num_tablets != table_data->num_tablets) { |
1657 | 0 | const string msg = Format( |
1658 | 0 | "Wrong number of tablets in created $0 table '$1' in namespace id $2:" |
1659 | 0 | " $3 (expected $4)", |
1660 | 0 | TableType_Name(meta.table_type()), meta.name(), new_namespace_id, |
1661 | 0 | new_num_tablets, table_data->num_tablets); |
1662 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1663 | 0 | return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1664 | 0 | } |
1665 | 0 | } |
1666 | 0 | } |
1667 | | |
1668 | | // Update the table column ids if it's not equal to the stored ids. |
1669 | 0 | if (persisted_schema.column_ids() != column_ids) { |
1670 | 0 | if (meta.table_type() != TableType::PGSQL_TABLE_TYPE) { |
1671 | 0 | LOG_WITH_FUNC(WARNING) << "Unexpected wrong column ids in " |
1672 | 0 | << TableType_Name(meta.table_type()) << " table '" << meta.name() |
1673 | 0 | << "' in namespace id " << new_namespace_id; |
1674 | 0 | } |
1675 | |
|
1676 | 0 | LOG_WITH_FUNC(INFO) << "Restoring column ids in " << TableType_Name(meta.table_type()) |
1677 | 0 | << " table '" << meta.name() << "' in namespace id " |
1678 | 0 | << new_namespace_id; |
1679 | 0 | auto l = table->LockForWrite(); |
1680 | 0 | size_t col_idx = 0; |
1681 | 0 | for (auto& column : *l.mutable_data()->pb.mutable_schema()->mutable_columns()) { |
1682 | | // Expecting here correct schema (columns - order, names, types), but with only wrong |
1683 | | // column ids. Checking correct column order and column names below. |
1684 | 0 | if (column.name() != schema.column(col_idx).name()) { |
1685 | 0 | const string msg = Format( |
1686 | 0 | "Unexpected column name for index=$0: name=$1, expected name=$2", |
1687 | 0 | col_idx, schema.column(col_idx).name(), column.name()); |
1688 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1689 | 0 | return STATUS(InternalError, msg, MasterError(MasterErrorPB::SNAPSHOT_FAILED)); |
1690 | 0 | } |
1691 | | // Copy the column id from imported (original) schema. |
1692 | 0 | column.set_id(column_ids[col_idx++]); |
1693 | 0 | } |
1694 | |
|
1695 | 0 | l.mutable_data()->pb.set_next_column_id(schema.max_col_id() + 1); |
1696 | 0 | l.mutable_data()->pb.set_version(l->pb.version() + 1); |
1697 | | // Update sys-catalog with the new table schema. |
1698 | 0 | RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), table)); |
1699 | 0 | l.Commit(); |
1700 | | // Update the new table schema in tablets. |
1701 | 0 | RETURN_NOT_OK(SendAlterTableRequest(table)); |
1702 | 0 | } |
1703 | 0 | } |
1704 | | |
1705 | | // Set the type of the table in the response pb (default is TABLE so only set if colocated). |
1706 | 0 | if (meta.colocated()) { |
1707 | 0 | if (is_parent_colocated_table) { |
1708 | 0 | table_data->table_meta->set_table_type( |
1709 | 0 | ImportSnapshotMetaResponsePB_TableType_PARENT_COLOCATED_TABLE); |
1710 | 0 | } else { |
1711 | 0 | table_data->table_meta->set_table_type( |
1712 | 0 | ImportSnapshotMetaResponsePB_TableType_COLOCATED_TABLE); |
1713 | 0 | } |
1714 | 0 | } |
1715 | |
|
1716 | 0 | TabletInfos new_tablets; |
1717 | 0 | { |
1718 | 0 | TRACE("Locking table"); |
1719 | 0 | auto table_lock = table->LockForRead(); |
1720 | 0 | new_tablets = table->GetTablets(); |
1721 | 0 | } |
1722 | |
|
1723 | 0 | for (const scoped_refptr<TabletInfo>& tablet : new_tablets) { |
1724 | 0 | auto tablet_lock = tablet->LockForRead(); |
1725 | 0 | const PartitionPB& partition_pb = tablet->metadata().state().pb.partition(); |
1726 | 0 | const ExternalTableSnapshotData::PartitionKeys key( |
1727 | 0 | partition_pb.partition_key_start(), partition_pb.partition_key_end()); |
1728 | 0 | table_data->new_tablets_map[key] = tablet->id(); |
1729 | 0 | } |
1730 | |
|
1731 | 0 | IdPairPB* const namespace_ids = table_data->table_meta->mutable_namespace_ids(); |
1732 | 0 | namespace_ids->set_new_id(new_namespace_id); |
1733 | 0 | namespace_ids->set_old_id(table_data->old_namespace_id); |
1734 | |
|
1735 | 0 | IdPairPB* const table_ids = table_data->table_meta->mutable_table_ids(); |
1736 | 0 | table_ids->set_new_id(table_data->new_table_id); |
1737 | 0 | table_ids->set_old_id(table_data->old_table_id); |
1738 | |
|
1739 | 0 | return Status::OK(); |
1740 | 0 | } |
1741 | | |
1742 | | Status CatalogManager::PreprocessTabletEntry(const SysRowEntry& entry, |
1743 | 0 | ExternalTableSnapshotDataMap* table_map) { |
1744 | 0 | LOG_IF(DFATAL, entry.type() != SysRowEntryType::TABLET) |
1745 | 0 | << "Unexpected entry type: " << entry.type(); |
1746 | |
|
1747 | 0 | SysTabletsEntryPB meta = VERIFY_RESULT(ParseFromSlice<SysTabletsEntryPB>(entry.data())); |
1748 | |
|
1749 | 0 | ExternalTableSnapshotData& table_data = (*table_map)[meta.table_id()]; |
1750 | 0 | ++table_data.num_tablets; |
1751 | 0 | if (meta.has_partition()) { |
1752 | 0 | table_data.partitions.push_back(meta.partition()); |
1753 | 0 | } |
1754 | 0 | return Status::OK(); |
1755 | 0 | } |
1756 | | |
1757 | | Status CatalogManager::ImportTabletEntry(const SysRowEntry& entry, |
1758 | 0 | ExternalTableSnapshotDataMap* table_map) { |
1759 | 0 | LOG_IF(DFATAL, entry.type() != SysRowEntryType::TABLET) |
1760 | 0 | << "Unexpected entry type: " << entry.type(); |
1761 | |
|
1762 | 0 | SysTabletsEntryPB meta = VERIFY_RESULT(ParseFromSlice<SysTabletsEntryPB>(entry.data())); |
1763 | |
|
1764 | 0 | LOG_IF(DFATAL, table_map->find(meta.table_id()) == table_map->end()) |
1765 | 0 | << "Table not found: " << meta.table_id(); |
1766 | 0 | ExternalTableSnapshotData& table_data = (*table_map)[meta.table_id()]; |
1767 | |
|
1768 | 0 | if (meta.colocated() && table_data.tablet_id_map->size() >= 1) { |
1769 | 0 | LOG_WITH_FUNC(INFO) << "Already processed this colocated tablet: " << entry.id(); |
1770 | 0 | return Status::OK(); |
1771 | 0 | } |
1772 | | |
1773 | | // Update tablets IDs map. |
1774 | 0 | if (table_data.new_table_id == table_data.old_table_id) { |
1775 | 0 | TRACE("Looking up tablet"); |
1776 | 0 | SharedLock lock(mutex_); |
1777 | 0 | scoped_refptr<TabletInfo> tablet = FindPtrOrNull(*tablet_map_, entry.id()); |
1778 | |
|
1779 | 0 | if (tablet != nullptr) { |
1780 | 0 | IdPairPB* const pair = table_data.tablet_id_map->Add(); |
1781 | 0 | pair->set_old_id(entry.id()); |
1782 | 0 | pair->set_new_id(entry.id()); |
1783 | 0 | return Status::OK(); |
1784 | 0 | } |
1785 | 0 | } |
1786 | | |
1787 | 0 | const PartitionPB& partition_pb = meta.partition(); |
1788 | 0 | const ExternalTableSnapshotData::PartitionKeys key( |
1789 | 0 | partition_pb.partition_key_start(), partition_pb.partition_key_end()); |
1790 | 0 | const ExternalTableSnapshotData::PartitionToIdMap::const_iterator it = |
1791 | 0 | table_data.new_tablets_map.find(key); |
1792 | |
|
1793 | 0 | if (it == table_data.new_tablets_map.end()) { |
1794 | 0 | const string msg = Format( |
1795 | 0 | "For new table $0 (old table $1, expecting $2 tablets) not found new tablet with " |
1796 | 0 | "expected [$3]", table_data.new_table_id, table_data.old_table_id, |
1797 | 0 | table_data.num_tablets, partition_pb); |
1798 | 0 | LOG_WITH_FUNC(WARNING) << msg; |
1799 | 0 | return STATUS(NotFound, msg, MasterError(MasterErrorPB::INTERNAL_ERROR)); |
1800 | 0 | } |
1801 | | |
1802 | 0 | IdPairPB* const pair = table_data.tablet_id_map->Add(); |
1803 | 0 | pair->set_old_id(entry.id()); |
1804 | 0 | pair->set_new_id(it->second); |
1805 | 0 | return Status::OK(); |
1806 | 0 | } |
1807 | | |
1808 | 124 | const Schema& CatalogManager::schema() { |
1809 | 124 | return sys_catalog()->schema(); |
1810 | 124 | } |
1811 | | |
1812 | 0 | TabletInfos CatalogManager::GetTabletInfos(const std::vector<TabletId>& ids) { |
1813 | 0 | TabletInfos result; |
1814 | 0 | result.reserve(ids.size()); |
1815 | 0 | SharedLock lock(mutex_); |
1816 | 0 | for (const auto& id : ids) { |
1817 | 0 | auto it = tablet_map_->find(id); |
1818 | 0 | result.push_back(it != tablet_map_->end() ? it->second : nullptr); |
1819 | 0 | } |
1820 | 0 | return result; |
1821 | 0 | } |
1822 | | |
1823 | | AsyncTabletSnapshotOpPtr CatalogManager::CreateAsyncTabletSnapshotOp( |
1824 | | const TabletInfoPtr& tablet, const std::string& snapshot_id, |
1825 | | tserver::TabletSnapshotOpRequestPB::Operation operation, |
1826 | 0 | TabletSnapshotOperationCallback callback) { |
1827 | 0 | auto result = std::make_shared<AsyncTabletSnapshotOp>( |
1828 | 0 | master_, AsyncTaskPool(), tablet, snapshot_id, operation); |
1829 | 0 | result->SetCallback(std::move(callback)); |
1830 | 0 | tablet->table()->AddTask(result); |
1831 | 0 | return result; |
1832 | 0 | } |
1833 | | |
1834 | 0 | void CatalogManager::ScheduleTabletSnapshotOp(const AsyncTabletSnapshotOpPtr& task) { |
1835 | 0 | WARN_NOT_OK(ScheduleTask(task), "Failed to send create snapshot request"); |
1836 | 0 | } |
1837 | | |
1838 | | Status CatalogManager::RestoreSysCatalog( |
1839 | 0 | SnapshotScheduleRestoration* restoration, tablet::Tablet* tablet, Status* complete_status) { |
1840 | 0 | VLOG_WITH_PREFIX_AND_FUNC(1) << restoration->restoration_id; |
1841 | | // Restore master snapshot and load it to RocksDB. |
1842 | 0 | auto dir = VERIFY_RESULT(tablet->snapshots().RestoreToTemporary( |
1843 | 0 | restoration->snapshot_id, restoration->restore_at)); |
1844 | 0 | rocksdb::Options rocksdb_options; |
1845 | 0 | std::string log_prefix = LogPrefix(); |
1846 | | // Remove ": " to patch suffix. |
1847 | 0 | log_prefix.erase(log_prefix.size() - 2); |
1848 | 0 | tablet->InitRocksDBOptions(&rocksdb_options, log_prefix + " [TMP]: "); |
1849 | 0 | auto db = VERIFY_RESULT(rocksdb::DB::Open(rocksdb_options, dir)); |
1850 | |
|
1851 | 0 | auto doc_db = docdb::DocDB::FromRegularUnbounded(db.get()); |
1852 | | |
1853 | | // Load objects to restore and determine obsolete objects. |
1854 | 0 | RestoreSysCatalogState state(restoration); |
1855 | 0 | RETURN_NOT_OK(state.LoadRestoringObjects(schema(), doc_db)); |
1856 | | // Load existing objects from RocksDB because on followers they are NOT present in loaded sys |
1857 | | // catalog state. |
1858 | 0 | RETURN_NOT_OK(state.LoadExistingObjects(schema(), tablet->doc_db())); |
1859 | 0 | RETURN_NOT_OK(state.Process()); |
1860 | |
|
1861 | 0 | docdb::DocWriteBatch write_batch( |
1862 | 0 | tablet->doc_db(), docdb::InitMarkerBehavior::kOptional); |
1863 | |
|
1864 | 0 | if (FLAGS_enable_ysql) { |
1865 | | // Restore the pg_catalog tables. |
1866 | 0 | const auto* meta = tablet->metadata(); |
1867 | 0 | const auto& pg_yb_catalog_version_schema = |
1868 | 0 | *VERIFY_RESULT(meta->GetTableInfo(kPgYbCatalogVersionTableId))->schema; |
1869 | 0 | RETURN_NOT_OK(state.ProcessPgCatalogRestores( |
1870 | 0 | pg_yb_catalog_version_schema, doc_db, tablet->doc_db(), &write_batch)); |
1871 | 0 | } |
1872 | | |
1873 | | // Restore the other tables. |
1874 | 0 | RETURN_NOT_OK(state.PrepareWriteBatch(schema(), &write_batch)); |
1875 | | |
1876 | | // Apply write batch to RocksDB. |
1877 | 0 | state.WriteToRocksDB(&write_batch, restoration->write_time, restoration->op_id, tablet); |
1878 | | |
1879 | | // TODO(pitr) Handle master leader failover. |
1880 | 0 | RETURN_NOT_OK(ElectedAsLeaderCb()); |
1881 | |
|
1882 | 0 | return Status::OK(); |
1883 | 0 | } |
1884 | | |
1885 | 0 | Status CatalogManager::VerifyRestoredObjects(const SnapshotScheduleRestoration& restoration) { |
1886 | 0 | auto entries = VERIFY_RESULT(CollectEntriesForSnapshot( |
1887 | 0 | restoration.schedules[0].second.tables().tables())); |
1888 | 0 | auto objects_to_restore = restoration.non_system_objects_to_restore; |
1889 | 0 | VLOG_WITH_PREFIX(1) << "Objects to restore: " << AsString(objects_to_restore); |
1890 | 0 | for (const auto& entry : entries.entries()) { |
1891 | 0 | VLOG_WITH_PREFIX(1) |
1892 | 0 | << "Alive " << SysRowEntryType_Name(entry.type()) << ": " << entry.id(); |
1893 | 0 | auto it = objects_to_restore.find(entry.id()); |
1894 | 0 | if (it == objects_to_restore.end()) { |
1895 | 0 | return STATUS_FORMAT(IllegalState, "Object $0/$1 present, but should not be restored", |
1896 | 0 | SysRowEntryType_Name(entry.type()), entry.id()); |
1897 | 0 | } |
1898 | 0 | if (it->second != entry.type()) { |
1899 | 0 | return STATUS_FORMAT( |
1900 | 0 | IllegalState, "Restored object $0 has wrong type $1, while $2 expected", |
1901 | 0 | entry.id(), SysRowEntryType_Name(entry.type()), SysRowEntryType_Name(it->second)); |
1902 | 0 | } |
1903 | 0 | objects_to_restore.erase(it); |
1904 | 0 | } |
1905 | 0 | for (const auto& id_and_type : objects_to_restore) { |
1906 | 0 | return STATUS_FORMAT( |
1907 | 0 | IllegalState, "Expected to restore $0/$1, but it is not present after restoration", |
1908 | 0 | SysRowEntryType_Name(id_and_type.second), id_and_type.first); |
1909 | 0 | } |
1910 | 0 | return Status::OK(); |
1911 | 0 | } |
1912 | | |
1913 | 11.1k | void CatalogManager::CleanupHiddenObjects(const ScheduleMinRestoreTime& schedule_min_restore_time) { |
1914 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) << AsString(schedule_min_restore_time); |
1915 | | |
1916 | 11.1k | std::vector<TabletInfoPtr> hidden_tablets; |
1917 | 11.1k | std::vector<TableInfoPtr> tables; |
1918 | 11.1k | { |
1919 | 11.1k | SharedLock lock(mutex_); |
1920 | 11.1k | hidden_tablets = hidden_tablets_; |
1921 | 11.1k | tables.reserve(table_ids_map_->size()); |
1922 | 1.21M | for (const auto& p : *table_ids_map_) { |
1923 | 1.21M | if (!p.second->is_system()) { |
1924 | 27.7k | tables.push_back(p.second); |
1925 | 27.7k | } |
1926 | 1.21M | } |
1927 | 11.1k | } |
1928 | 11.1k | CleanupHiddenTablets(hidden_tablets, schedule_min_restore_time); |
1929 | 11.1k | CleanupHiddenTables(std::move(tables)); |
1930 | 11.1k | } |
1931 | | |
1932 | | void CatalogManager::CleanupHiddenTablets( |
1933 | | const std::vector<TabletInfoPtr>& hidden_tablets, |
1934 | 11.1k | const ScheduleMinRestoreTime& schedule_min_restore_time) { |
1935 | 11.1k | if (hidden_tablets.empty()) { |
1936 | 11.1k | return; |
1937 | 11.1k | } |
1938 | 0 | std::vector<TabletInfoPtr> tablets_to_delete; |
1939 | 0 | std::vector<TabletInfoPtr> tablets_to_remove_from_hidden; |
1940 | |
|
1941 | 0 | for (const auto& tablet : hidden_tablets) { |
1942 | 0 | auto lock = tablet->LockForRead(); |
1943 | 0 | if (!lock->ListedAsHidden()) { |
1944 | 0 | tablets_to_remove_from_hidden.push_back(tablet); |
1945 | 0 | continue; |
1946 | 0 | } |
1947 | 0 | auto hide_hybrid_time = HybridTime::FromPB(lock->pb.hide_hybrid_time()); |
1948 | 0 | bool cleanup = true; |
1949 | 0 | for (const auto& schedule_id_str : lock->pb.retained_by_snapshot_schedules()) { |
1950 | 0 | auto schedule_id = TryFullyDecodeSnapshotScheduleId(schedule_id_str); |
1951 | 0 | auto it = schedule_min_restore_time.find(schedule_id); |
1952 | | // If schedule is not present in schedule_min_restore_time then it means that schedule |
1953 | | // was deleted, so it should not retain the tablet. |
1954 | 0 | if (it != schedule_min_restore_time.end() && it->second <= hide_hybrid_time) { |
1955 | 0 | VLOG_WITH_PREFIX(1) |
1956 | 0 | << "Retaining tablet: " << tablet->tablet_id() << ", hide hybrid time: " |
1957 | 0 | << hide_hybrid_time << ", because of schedule: " << schedule_id |
1958 | 0 | << ", min restore time: " << it->second; |
1959 | 0 | cleanup = false; |
1960 | 0 | break; |
1961 | 0 | } |
1962 | 0 | } |
1963 | 0 | if (cleanup) { |
1964 | 0 | SharedLock read_lock(mutex_); |
1965 | 0 | if (IsTableCdcProducer(*tablet->table())) { |
1966 | | // We also need to check if this tablet is being kept for xcluster replication. |
1967 | 0 | auto l = tablet->table()->LockForRead(); |
1968 | 0 | cleanup = hide_hybrid_time.AddSeconds(l->pb.wal_retention_secs()) < master_->clock()->Now(); |
1969 | 0 | } |
1970 | 0 | } |
1971 | 0 | if (cleanup) { |
1972 | 0 | tablets_to_delete.push_back(tablet); |
1973 | 0 | } |
1974 | 0 | } |
1975 | 0 | if (!tablets_to_delete.empty()) { |
1976 | 0 | LOG_WITH_PREFIX(INFO) << "Cleanup hidden tablets: " << AsString(tablets_to_delete); |
1977 | 0 | WARN_NOT_OK(DeleteTabletListAndSendRequests(tablets_to_delete, "Cleanup hidden tablets", {}), |
1978 | 0 | "Failed to cleanup hidden tablets"); |
1979 | 0 | } |
1980 | |
|
1981 | 0 | if (!tablets_to_remove_from_hidden.empty()) { |
1982 | 0 | auto it = tablets_to_remove_from_hidden.begin(); |
1983 | 0 | LockGuard lock(mutex_); |
1984 | | // Order of tablets in tablets_to_remove_from_hidden matches order in hidden_tablets_, |
1985 | | // so we could avoid searching in tablets_to_remove_from_hidden. |
1986 | 0 | auto filter = [&it, end = tablets_to_remove_from_hidden.end()](const TabletInfoPtr& tablet) { |
1987 | 0 | if (it != end && tablet.get() == it->get()) { |
1988 | 0 | ++it; |
1989 | 0 | return true; |
1990 | 0 | } |
1991 | 0 | return false; |
1992 | 0 | }; |
1993 | 0 | hidden_tablets_.erase(std::remove_if(hidden_tablets_.begin(), hidden_tablets_.end(), filter), |
1994 | 0 | hidden_tablets_.end()); |
1995 | 0 | } |
1996 | 0 | } |
1997 | | |
1998 | 11.1k | void CatalogManager::CleanupHiddenTables(std::vector<TableInfoPtr> tables) { |
1999 | 11.1k | std::vector<TableInfo::WriteLock> locks; |
2000 | 27.7k | EraseIf([this, &locks](const TableInfoPtr& table) { |
2001 | 27.7k | { |
2002 | 27.7k | auto lock = table->LockForRead(); |
2003 | 27.7k | if (!lock->is_hidden() || lock->started_deleting() || !table->AreAllTabletsDeleted()) { |
2004 | 27.7k | return true; |
2005 | 27.7k | } |
2006 | 0 | } |
2007 | 0 | auto lock = table->LockForWrite(); |
2008 | 0 | if (lock->started_deleting()) { |
2009 | 0 | return true; |
2010 | 0 | } |
2011 | 0 | LOG_WITH_PREFIX(INFO) << "Should delete table: " << AsString(table); |
2012 | 0 | lock.mutable_data()->set_state( |
2013 | 0 | SysTablesEntryPB::DELETED, Format("Cleanup hidden table at $0", LocalTimeAsString())); |
2014 | 0 | locks.push_back(std::move(lock)); |
2015 | 0 | return false; |
2016 | 0 | }, &tables); |
2017 | 11.1k | if (tables.empty()) { |
2018 | 11.1k | return; |
2019 | 11.1k | } |
2020 | | |
2021 | 0 | Status s = sys_catalog_->Upsert(leader_ready_term(), tables); |
2022 | 0 | if (!s.ok()) { |
2023 | 0 | LOG_WITH_PREFIX(WARNING) << "Failed to mark tables as deleted: " << s; |
2024 | 0 | return; |
2025 | 0 | } |
2026 | 0 | for (auto& lock : locks) { |
2027 | 0 | lock.Commit(); |
2028 | 0 | } |
2029 | 0 | } |
2030 | | |
2031 | 5.35k | rpc::Scheduler& CatalogManager::Scheduler() { |
2032 | 5.35k | return master_->messenger()->scheduler(); |
2033 | 5.35k | } |
2034 | | |
2035 | 24.1k | int64_t CatalogManager::LeaderTerm() { |
2036 | 24.1k | auto peer = tablet_peer(); |
2037 | 24.1k | if (!peer) { |
2038 | 92 | return false; |
2039 | 92 | } |
2040 | 24.0k | auto consensus = peer->shared_consensus(); |
2041 | 24.0k | if (!consensus) { |
2042 | 2 | return false; |
2043 | 2 | } |
2044 | 24.0k | return consensus->GetLeaderState(/* allow_stale= */ true).term; |
2045 | 24.0k | } |
2046 | | |
2047 | 0 | void CatalogManager::HandleCreateTabletSnapshotResponse(TabletInfo *tablet, bool error) { |
2048 | 0 | LOG(INFO) << "Handling Create Tablet Snapshot Response for tablet " |
2049 | 0 | << DCHECK_NOTNULL(tablet)->ToString() << (error ? " ERROR" : " OK"); |
2050 | | |
2051 | | // Get the snapshot data descriptor from the catalog manager. |
2052 | 0 | scoped_refptr<SnapshotInfo> snapshot; |
2053 | 0 | { |
2054 | 0 | LockGuard lock(mutex_); |
2055 | 0 | TRACE("Acquired catalog manager lock"); |
2056 | |
|
2057 | 0 | if (current_snapshot_id_.empty()) { |
2058 | 0 | LOG(WARNING) << "No active snapshot: " << current_snapshot_id_; |
2059 | 0 | return; |
2060 | 0 | } |
2061 | | |
2062 | 0 | snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, current_snapshot_id_); |
2063 | |
|
2064 | 0 | if (!snapshot) { |
2065 | 0 | LOG(WARNING) << "Snapshot not found: " << current_snapshot_id_; |
2066 | 0 | return; |
2067 | 0 | } |
2068 | 0 | } |
2069 | | |
2070 | 0 | if (!snapshot->IsCreateInProgress()) { |
2071 | 0 | LOG(WARNING) << "Snapshot is not in creating state: " << snapshot->id(); |
2072 | 0 | return; |
2073 | 0 | } |
2074 | | |
2075 | 0 | auto tablet_l = tablet->LockForRead(); |
2076 | 0 | auto l = snapshot->LockForWrite(); |
2077 | 0 | RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots = |
2078 | 0 | l.mutable_data()->pb.mutable_tablet_snapshots(); |
2079 | 0 | int num_tablets_complete = 0; |
2080 | |
|
2081 | 0 | for (int i = 0; i < tablet_snapshots->size(); ++i) { |
2082 | 0 | SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i); |
2083 | |
|
2084 | 0 | if (tablet_info->id() == tablet->id()) { |
2085 | 0 | tablet_info->set_state(error ? SysSnapshotEntryPB::FAILED : SysSnapshotEntryPB::COMPLETE); |
2086 | 0 | } |
2087 | |
|
2088 | 0 | if (tablet_info->state() == SysSnapshotEntryPB::COMPLETE) { |
2089 | 0 | ++num_tablets_complete; |
2090 | 0 | } |
2091 | 0 | } |
2092 | | |
2093 | | // Finish the snapshot. |
2094 | 0 | bool finished = true; |
2095 | 0 | if (error) { |
2096 | 0 | l.mutable_data()->pb.set_state(SysSnapshotEntryPB::FAILED); |
2097 | 0 | LOG(WARNING) << "Failed snapshot " << snapshot->id() << " on tablet " << tablet->id(); |
2098 | 0 | } else if (num_tablets_complete == tablet_snapshots->size()) { |
2099 | 0 | l.mutable_data()->pb.set_state(SysSnapshotEntryPB::COMPLETE); |
2100 | 0 | LOG(INFO) << "Completed snapshot " << snapshot->id(); |
2101 | 0 | } else { |
2102 | 0 | finished = false; |
2103 | 0 | } |
2104 | |
|
2105 | 0 | if (finished) { |
2106 | 0 | LockGuard lock(mutex_); |
2107 | 0 | TRACE("Acquired catalog manager lock"); |
2108 | 0 | current_snapshot_id_ = ""; |
2109 | 0 | } |
2110 | |
|
2111 | 0 | VLOG(1) << "Snapshot: " << snapshot->id() |
2112 | 0 | << " PB: " << l.mutable_data()->pb.DebugString() |
2113 | 0 | << " Complete " << num_tablets_complete << " tablets from " << tablet_snapshots->size(); |
2114 | |
|
2115 | 0 | const Status s = sys_catalog_->Upsert(leader_ready_term(), snapshot); |
2116 | |
|
2117 | 0 | l.CommitOrWarn(s, "updating snapshot in sys-catalog"); |
2118 | 0 | } |
2119 | | |
2120 | 0 | void CatalogManager::HandleRestoreTabletSnapshotResponse(TabletInfo *tablet, bool error) { |
2121 | 0 | LOG(INFO) << "Handling Restore Tablet Snapshot Response for tablet " |
2122 | 0 | << DCHECK_NOTNULL(tablet)->ToString() << (error ? " ERROR" : " OK"); |
2123 | | |
2124 | | // Get the snapshot data descriptor from the catalog manager. |
2125 | 0 | scoped_refptr<SnapshotInfo> snapshot; |
2126 | 0 | { |
2127 | 0 | LockGuard lock(mutex_); |
2128 | 0 | TRACE("Acquired catalog manager lock"); |
2129 | |
|
2130 | 0 | if (current_snapshot_id_.empty()) { |
2131 | 0 | LOG(WARNING) << "No restoring snapshot: " << current_snapshot_id_; |
2132 | 0 | return; |
2133 | 0 | } |
2134 | | |
2135 | 0 | snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, current_snapshot_id_); |
2136 | |
|
2137 | 0 | if (!snapshot) { |
2138 | 0 | LOG(WARNING) << "Restoring snapshot not found: " << current_snapshot_id_; |
2139 | 0 | return; |
2140 | 0 | } |
2141 | 0 | } |
2142 | | |
2143 | 0 | if (!snapshot->IsRestoreInProgress()) { |
2144 | 0 | LOG(WARNING) << "Snapshot is not in restoring state: " << snapshot->id(); |
2145 | 0 | return; |
2146 | 0 | } |
2147 | | |
2148 | 0 | auto tablet_l = tablet->LockForRead(); |
2149 | 0 | auto l = snapshot->LockForWrite(); |
2150 | 0 | RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots = |
2151 | 0 | l.mutable_data()->pb.mutable_tablet_snapshots(); |
2152 | 0 | int num_tablets_complete = 0; |
2153 | |
|
2154 | 0 | for (int i = 0; i < tablet_snapshots->size(); ++i) { |
2155 | 0 | SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i); |
2156 | |
|
2157 | 0 | if (tablet_info->id() == tablet->id()) { |
2158 | 0 | tablet_info->set_state(error ? SysSnapshotEntryPB::FAILED : SysSnapshotEntryPB::COMPLETE); |
2159 | 0 | } |
2160 | |
|
2161 | 0 | if (tablet_info->state() == SysSnapshotEntryPB::COMPLETE) { |
2162 | 0 | ++num_tablets_complete; |
2163 | 0 | } |
2164 | 0 | } |
2165 | | |
2166 | | // Finish the snapshot. |
2167 | 0 | if (error || num_tablets_complete == tablet_snapshots->size()) { |
2168 | 0 | if (error) { |
2169 | 0 | l.mutable_data()->pb.set_state(SysSnapshotEntryPB::FAILED); |
2170 | 0 | LOG(WARNING) << "Failed restoring snapshot " << snapshot->id() |
2171 | 0 | << " on tablet " << tablet->id(); |
2172 | 0 | } else { |
2173 | 0 | LOG_IF(DFATAL, num_tablets_complete != tablet_snapshots->size()) |
2174 | 0 | << "Wrong number of tablets"; |
2175 | 0 | l.mutable_data()->pb.set_state(SysSnapshotEntryPB::COMPLETE); |
2176 | 0 | LOG(INFO) << "Restored snapshot " << snapshot->id(); |
2177 | 0 | } |
2178 | |
|
2179 | 0 | LockGuard lock(mutex_); |
2180 | 0 | TRACE("Acquired catalog manager lock"); |
2181 | 0 | current_snapshot_id_ = ""; |
2182 | 0 | } |
2183 | |
|
2184 | 0 | VLOG(1) << "Snapshot: " << snapshot->id() |
2185 | 0 | << " PB: " << l.mutable_data()->pb.DebugString() |
2186 | 0 | << " Complete " << num_tablets_complete << " tablets from " << tablet_snapshots->size(); |
2187 | |
|
2188 | 0 | const Status s = sys_catalog_->Upsert(leader_ready_term(), snapshot); |
2189 | |
|
2190 | 0 | l.CommitOrWarn(s, "updating snapshot in sys-catalog"); |
2191 | 0 | } |
2192 | | |
2193 | | void CatalogManager::HandleDeleteTabletSnapshotResponse( |
2194 | 0 | const SnapshotId& snapshot_id, TabletInfo *tablet, bool error) { |
2195 | 0 | LOG(INFO) << "Handling Delete Tablet Snapshot Response for tablet " |
2196 | 0 | << DCHECK_NOTNULL(tablet)->ToString() << (error ? " ERROR" : " OK"); |
2197 | | |
2198 | | // Get the snapshot data descriptor from the catalog manager. |
2199 | 0 | scoped_refptr<SnapshotInfo> snapshot; |
2200 | 0 | { |
2201 | 0 | LockGuard lock(mutex_); |
2202 | 0 | TRACE("Acquired catalog manager lock"); |
2203 | |
|
2204 | 0 | snapshot = FindPtrOrNull(non_txn_snapshot_ids_map_, snapshot_id); |
2205 | |
|
2206 | 0 | if (!snapshot) { |
2207 | 0 | LOG(WARNING) << __func__ << " Snapshot not found: " << snapshot_id; |
2208 | 0 | return; |
2209 | 0 | } |
2210 | 0 | } |
2211 | | |
2212 | 0 | if (!snapshot->IsDeleteInProgress()) { |
2213 | 0 | LOG(WARNING) << "Snapshot is not in deleting state: " << snapshot->id(); |
2214 | 0 | return; |
2215 | 0 | } |
2216 | | |
2217 | 0 | auto tablet_l = tablet->LockForRead(); |
2218 | 0 | auto l = snapshot->LockForWrite(); |
2219 | 0 | RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots = |
2220 | 0 | l.mutable_data()->pb.mutable_tablet_snapshots(); |
2221 | 0 | int num_tablets_complete = 0; |
2222 | |
|
2223 | 0 | for (int i = 0; i < tablet_snapshots->size(); ++i) { |
2224 | 0 | SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i); |
2225 | |
|
2226 | 0 | if (tablet_info->id() == tablet->id()) { |
2227 | 0 | tablet_info->set_state(error ? SysSnapshotEntryPB::FAILED : SysSnapshotEntryPB::DELETED); |
2228 | 0 | } |
2229 | |
|
2230 | 0 | if (tablet_info->state() != SysSnapshotEntryPB::DELETING) { |
2231 | 0 | ++num_tablets_complete; |
2232 | 0 | } |
2233 | 0 | } |
2234 | |
|
2235 | 0 | if (num_tablets_complete == tablet_snapshots->size()) { |
2236 | | // Delete the snapshot. |
2237 | 0 | l.mutable_data()->pb.set_state(SysSnapshotEntryPB::DELETED); |
2238 | 0 | LOG(INFO) << "Deleted snapshot " << snapshot->id(); |
2239 | |
|
2240 | 0 | const Status s = sys_catalog_->Delete(leader_ready_term(), snapshot); |
2241 | |
|
2242 | 0 | LockGuard lock(mutex_); |
2243 | 0 | TRACE("Acquired catalog manager lock"); |
2244 | |
|
2245 | 0 | if (current_snapshot_id_ == snapshot_id) { |
2246 | 0 | current_snapshot_id_ = ""; |
2247 | 0 | } |
2248 | | |
2249 | | // Remove it from the maps. |
2250 | 0 | TRACE("Removing from maps"); |
2251 | 0 | if (non_txn_snapshot_ids_map_.erase(snapshot_id) < 1) { |
2252 | 0 | LOG(WARNING) << "Could not remove snapshot " << snapshot_id << " from map"; |
2253 | 0 | } |
2254 | |
|
2255 | 0 | l.CommitOrWarn(s, "deleting snapshot from sys-catalog"); |
2256 | 0 | } else if (error) { |
2257 | 0 | l.mutable_data()->pb.set_state(SysSnapshotEntryPB::FAILED); |
2258 | 0 | LOG(WARNING) << "Failed snapshot " << snapshot->id() << " deletion on tablet " << tablet->id(); |
2259 | |
|
2260 | 0 | const Status s = sys_catalog_->Upsert(leader_ready_term(), snapshot); |
2261 | 0 | l.CommitOrWarn(s, "updating snapshot in sys-catalog"); |
2262 | 0 | } |
2263 | |
|
2264 | 0 | VLOG(1) << "Deleting snapshot: " << snapshot->id() |
2265 | 0 | << " PB: " << l.mutable_data()->pb.DebugString() |
2266 | 0 | << " Complete " << num_tablets_complete << " tablets from " << tablet_snapshots->size(); |
2267 | 0 | } |
2268 | | |
2269 | | Status CatalogManager::CreateSnapshotSchedule(const CreateSnapshotScheduleRequestPB* req, |
2270 | | CreateSnapshotScheduleResponsePB* resp, |
2271 | 0 | rpc::RpcContext* rpc) { |
2272 | 0 | auto id = VERIFY_RESULT(snapshot_coordinator_.CreateSchedule( |
2273 | 0 | *req, leader_ready_term(), rpc->GetClientDeadline())); |
2274 | 0 | resp->set_snapshot_schedule_id(id.data(), id.size()); |
2275 | 0 | return Status::OK(); |
2276 | 0 | } |
2277 | | |
2278 | | Status CatalogManager::ListSnapshotSchedules(const ListSnapshotSchedulesRequestPB* req, |
2279 | | ListSnapshotSchedulesResponsePB* resp, |
2280 | 0 | rpc::RpcContext* rpc) { |
2281 | 0 | auto snapshot_schedule_id = TryFullyDecodeSnapshotScheduleId(req->snapshot_schedule_id()); |
2282 | |
|
2283 | 0 | return snapshot_coordinator_.ListSnapshotSchedules(snapshot_schedule_id, resp); |
2284 | 0 | } |
2285 | | |
2286 | | Status CatalogManager::DeleteSnapshotSchedule(const DeleteSnapshotScheduleRequestPB* req, |
2287 | | DeleteSnapshotScheduleResponsePB* resp, |
2288 | 0 | rpc::RpcContext* rpc) { |
2289 | 0 | auto snapshot_schedule_id = TryFullyDecodeSnapshotScheduleId(req->snapshot_schedule_id()); |
2290 | |
|
2291 | 0 | return snapshot_coordinator_.DeleteSnapshotSchedule( |
2292 | 0 | snapshot_schedule_id, leader_ready_term(), rpc->GetClientDeadline()); |
2293 | 0 | } |
2294 | | |
2295 | 0 | void CatalogManager::DumpState(std::ostream* out, bool on_disk_dump) const { |
2296 | 0 | super::DumpState(out, on_disk_dump); |
2297 | | |
2298 | | // TODO: dump snapshots |
2299 | 0 | } |
2300 | | |
2301 | | template <typename Registry, typename Mutex> |
2302 | | bool ShouldResendRegistry( |
2303 | 3 | const std::string& ts_uuid, bool has_registration, Registry* registry, Mutex* mutex) { |
2304 | 3 | bool should_resend_registry; |
2305 | 3 | { |
2306 | 3 | std::lock_guard<Mutex> lock(*mutex); |
2307 | 3 | auto it = registry->find(ts_uuid); |
2308 | 3 | should_resend_registry = (it == registry->end() || it->second || has_registration); |
2309 | 3 | if (it == registry->end()) { |
2310 | 3 | registry->emplace(ts_uuid, false); |
2311 | 0 | } else { |
2312 | 0 | it->second = false; |
2313 | 0 | } |
2314 | 3 | } |
2315 | 3 | return should_resend_registry; |
2316 | 3 | } |
2317 | | |
2318 | | Status CatalogManager::FillHeartbeatResponse(const TSHeartbeatRequestPB* req, |
2319 | 384k | TSHeartbeatResponsePB* resp) { |
2320 | 384k | SysClusterConfigEntryPB cluster_config; |
2321 | 384k | RETURN_NOT_OK(GetClusterConfig(&cluster_config)); |
2322 | 384k | RETURN_NOT_OK(FillHeartbeatResponseEncryption(cluster_config, req, resp)); |
2323 | 384k | RETURN_NOT_OK(snapshot_coordinator_.FillHeartbeatResponse(resp)); |
2324 | 384k | return FillHeartbeatResponseCDC(cluster_config, req, resp); |
2325 | 384k | } |
2326 | | |
2327 | | Status CatalogManager::FillHeartbeatResponseCDC(const SysClusterConfigEntryPB& cluster_config, |
2328 | | const TSHeartbeatRequestPB* req, |
2329 | 384k | TSHeartbeatResponsePB* resp) { |
2330 | 384k | resp->set_cluster_config_version(cluster_config.version()); |
2331 | 384k | if (!cluster_config.has_consumer_registry() || |
2332 | 384k | req->cluster_config_version() >= cluster_config.version()) { |
2333 | 384k | return Status::OK(); |
2334 | 384k | } |
2335 | 12 | *resp->mutable_consumer_registry() = cluster_config.consumer_registry(); |
2336 | 12 | return Status::OK(); |
2337 | 12 | } |
2338 | | |
2339 | | Status CatalogManager::FillHeartbeatResponseEncryption( |
2340 | | const SysClusterConfigEntryPB& cluster_config, |
2341 | | const TSHeartbeatRequestPB* req, |
2342 | 384k | TSHeartbeatResponsePB* resp) { |
2343 | 384k | const auto& ts_uuid = req->common().ts_instance().permanent_uuid(); |
2344 | 384k | if (!cluster_config.has_encryption_info() || |
2345 | 3 | !ShouldResendRegistry(ts_uuid, req->has_registration(), &should_send_universe_key_registry_, |
2346 | 383k | &should_send_universe_key_registry_mutex_)) { |
2347 | 383k | return Status::OK(); |
2348 | 383k | } |
2349 | | |
2350 | 142 | const auto& encryption_info = cluster_config.encryption_info(); |
2351 | 142 | RETURN_NOT_OK(encryption_manager_->FillHeartbeatResponseEncryption(encryption_info, resp)); |
2352 | | |
2353 | 142 | return Status::OK(); |
2354 | 142 | } |
2355 | | |
2356 | | void CatalogManager::SetTabletSnapshotsState(SysSnapshotEntryPB::State state, |
2357 | 0 | SysSnapshotEntryPB* snapshot_pb) { |
2358 | 0 | RepeatedPtrField<SysSnapshotEntryPB_TabletSnapshotPB>* tablet_snapshots = |
2359 | 0 | snapshot_pb->mutable_tablet_snapshots(); |
2360 | |
|
2361 | 0 | for (int i = 0; i < tablet_snapshots->size(); ++i) { |
2362 | 0 | SysSnapshotEntryPB_TabletSnapshotPB* tablet_info = tablet_snapshots->Mutable(i); |
2363 | 0 | tablet_info->set_state(state); |
2364 | 0 | } |
2365 | 0 | } |
2366 | | |
2367 | 157 | Status CatalogManager::CreateCdcStateTableIfNeeded(rpc::RpcContext *rpc) { |
2368 | | // If CDC state table exists do nothing, otherwise create it. |
2369 | 157 | if (VERIFY_RESULT(TableExists(kSystemNamespaceName, kCdcStateTableName))) { |
2370 | 69 | return Status::OK(); |
2371 | 69 | } |
2372 | | // Set up a CreateTable request internally. |
2373 | 88 | CreateTableRequestPB req; |
2374 | 88 | CreateTableResponsePB resp; |
2375 | 88 | req.set_name(kCdcStateTableName); |
2376 | 88 | req.mutable_namespace_()->set_name(kSystemNamespaceName); |
2377 | 88 | req.set_table_type(TableType::YQL_TABLE_TYPE); |
2378 | | |
2379 | 88 | client::YBSchemaBuilder schema_builder; |
2380 | 88 | schema_builder.AddColumn(master::kCdcTabletId)->HashPrimaryKey()->Type(DataType::STRING); |
2381 | 88 | schema_builder.AddColumn(master::kCdcStreamId)->PrimaryKey()->Type(DataType::STRING); |
2382 | 88 | schema_builder.AddColumn(master::kCdcCheckpoint)->Type(DataType::STRING); |
2383 | 88 | schema_builder.AddColumn(master::kCdcData)->Type(QLType::CreateTypeMap( |
2384 | 88 | DataType::STRING, DataType::STRING)); |
2385 | 88 | schema_builder.AddColumn(master::kCdcLastReplicationTime)->Type(DataType::TIMESTAMP); |
2386 | | |
2387 | 88 | client::YBSchema yb_schema; |
2388 | 88 | CHECK_OK(schema_builder.Build(&yb_schema)); |
2389 | | |
2390 | 88 | auto schema = yb::client::internal::GetSchema(yb_schema); |
2391 | 88 | SchemaToPB(schema, req.mutable_schema()); |
2392 | | // Explicitly set the number tablets if the corresponding flag is set, otherwise CreateTable |
2393 | | // will use the same defaults as for regular tables. |
2394 | 88 | if (FLAGS_cdc_state_table_num_tablets > 0) { |
2395 | 4 | req.mutable_schema()->mutable_table_properties()->set_num_tablets( |
2396 | 4 | FLAGS_cdc_state_table_num_tablets); |
2397 | 4 | } |
2398 | | |
2399 | 88 | Status s = CreateTable(&req, &resp, rpc); |
2400 | | // We do not lock here so it is technically possible that the table was already created. |
2401 | | // If so, there is nothing to do so we just ignore the "AlreadyPresent" error. |
2402 | 88 | if (!s.ok() && !s.IsAlreadyPresent()) { |
2403 | 0 | return s; |
2404 | 0 | } |
2405 | 88 | return Status::OK(); |
2406 | 88 | } |
2407 | | |
2408 | 0 | Status CatalogManager::IsCdcStateTableCreated(IsCreateTableDoneResponsePB* resp) { |
2409 | 0 | IsCreateTableDoneRequestPB req; |
2410 | |
|
2411 | 0 | req.mutable_table()->set_table_name(kCdcStateTableName); |
2412 | 0 | req.mutable_table()->mutable_namespace_()->set_name(kSystemNamespaceName); |
2413 | |
|
2414 | 0 | return IsCreateTableDone(&req, resp); |
2415 | 0 | } |
2416 | | |
2417 | | // Helper class to print a vector of CDCStreamInfo pointers. |
2418 | | namespace { |
2419 | | |
2420 | | template<class CDCStreamInfoPointer> |
2421 | 4 | std::string JoinStreamsCSVLine(std::vector<CDCStreamInfoPointer> cdc_streams) { |
2422 | 4 | std::vector<CDCStreamId> cdc_stream_ids; |
2423 | 4 | for (const auto& cdc_stream : cdc_streams) { |
2424 | 4 | cdc_stream_ids.push_back(cdc_stream->id()); |
2425 | 4 | } |
2426 | 4 | return JoinCSVLine(cdc_stream_ids); |
2427 | 4 | } catalog_manager_ent.cc:_ZN2yb6master10enterprise12_GLOBAL__N_118JoinStreamsCSVLineI13scoped_refptrINS0_13CDCStreamInfoEEEENSt3__112basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEENS7_6vectorIT_NSB_ISF_EEEE Line | Count | Source | 2421 | 2 | std::string JoinStreamsCSVLine(std::vector<CDCStreamInfoPointer> cdc_streams) { | 2422 | 2 | std::vector<CDCStreamId> cdc_stream_ids; | 2423 | 2 | for (const auto& cdc_stream : cdc_streams) { | 2424 | 2 | cdc_stream_ids.push_back(cdc_stream->id()); | 2425 | 2 | } | 2426 | 2 | return JoinCSVLine(cdc_stream_ids); | 2427 | 2 | } |
catalog_manager_ent.cc:_ZN2yb6master10enterprise12_GLOBAL__N_118JoinStreamsCSVLineIPNS0_13CDCStreamInfoEEENSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEENS6_6vectorIT_NSA_ISE_EEEE Line | Count | Source | 2421 | 2 | std::string JoinStreamsCSVLine(std::vector<CDCStreamInfoPointer> cdc_streams) { | 2422 | 2 | std::vector<CDCStreamId> cdc_stream_ids; | 2423 | 2 | for (const auto& cdc_stream : cdc_streams) { | 2424 | 2 | cdc_stream_ids.push_back(cdc_stream->id()); | 2425 | 2 | } | 2426 | 2 | return JoinCSVLine(cdc_stream_ids); | 2427 | 2 | } |
|
2428 | | |
2429 | | } // namespace |
2430 | | |
2431 | 2.46k | Status CatalogManager::DeleteCDCStreamsForTable(const TableId& table_id) { |
2432 | 2.46k | return DeleteCDCStreamsForTables({table_id}); |
2433 | 2.46k | } |
2434 | | |
2435 | 2.50k | Status CatalogManager::DeleteCDCStreamsForTables(const vector<TableId>& table_ids) { |
2436 | 2.50k | std::ostringstream tid_stream; |
2437 | 4.76k | for (const auto& tid : table_ids) { |
2438 | 4.76k | tid_stream << " " << tid; |
2439 | 4.76k | } |
2440 | 2.50k | LOG(INFO) << "Deleting CDC streams for tables:" << tid_stream.str(); |
2441 | | |
2442 | 2.50k | std::vector<scoped_refptr<CDCStreamInfo>> streams; |
2443 | 4.76k | for (const auto& tid : table_ids) { |
2444 | 4.76k | auto newstreams = FindCDCStreamsForTable(tid); |
2445 | 4.76k | streams.insert(streams.end(), newstreams.begin(), newstreams.end()); |
2446 | 4.76k | } |
2447 | | |
2448 | 2.50k | if (streams.empty()) { |
2449 | 2.50k | return Status::OK(); |
2450 | 2.50k | } |
2451 | | |
2452 | | // Do not delete them here, just mark them as DELETING and the catalog manager background thread |
2453 | | // will handle the deletion. |
2454 | 0 | return MarkCDCStreamsAsDeleting(streams); |
2455 | 0 | } |
2456 | | |
2457 | | std::vector<scoped_refptr<CDCStreamInfo>> CatalogManager::FindCDCStreamsForTable( |
2458 | 4.80k | const TableId& table_id) const { |
2459 | 4.80k | std::vector<scoped_refptr<CDCStreamInfo>> streams; |
2460 | 4.80k | SharedLock lock(mutex_); |
2461 | | |
2462 | 0 | for (const auto& entry : cdc_stream_map_) { |
2463 | 0 | auto ltm = entry.second->LockForRead(); |
2464 | | // for xCluster the first entry will be the table_id |
2465 | 0 | if (ltm->table_id().Get(0) == table_id && !ltm->started_deleting()) { |
2466 | 0 | streams.push_back(entry.second); |
2467 | 0 | } |
2468 | 0 | } |
2469 | 4.80k | return streams; |
2470 | 4.80k | } |
2471 | | |
2472 | 0 | void CatalogManager::GetAllCDCStreams(std::vector<scoped_refptr<CDCStreamInfo>>* streams) { |
2473 | 0 | streams->clear(); |
2474 | 0 | SharedLock lock(mutex_); |
2475 | 0 | streams->reserve(cdc_stream_map_.size()); |
2476 | 0 | for (const CDCStreamInfoMap::value_type& e : cdc_stream_map_) { |
2477 | 0 | if (!e.second->LockForRead()->is_deleting()) { |
2478 | 0 | streams->push_back(e.second); |
2479 | 0 | } |
2480 | 0 | } |
2481 | 0 | } |
2482 | | |
2483 | | Status CatalogManager::CreateCDCStream(const CreateCDCStreamRequestPB* req, |
2484 | | CreateCDCStreamResponsePB* resp, |
2485 | 2.69k | rpc::RpcContext* rpc) { |
2486 | 2.69k | LOG(INFO) << "CreateCDCStream from " << RequestorString(rpc) << ": " << req->ShortDebugString(); |
2487 | | |
2488 | 2.69k | std::string id_type_option_value(cdc::kTableId); |
2489 | | |
2490 | 10.9k | for (auto option : req->options()) { |
2491 | 10.9k | if (option.key() == cdc::kIdType) { |
2492 | 153 | id_type_option_value = option.value(); |
2493 | 153 | } |
2494 | 10.9k | } |
2495 | | |
2496 | | |
2497 | 2.69k | if (id_type_option_value != cdc::kNamespaceId) { |
2498 | 2.54k | scoped_refptr<TableInfo> table = VERIFY_RESULT(FindTableById(req->table_id())); |
2499 | 2.54k | { |
2500 | 2.54k | auto l = table->LockForRead(); |
2501 | 2.54k | if (l->started_deleting()) { |
2502 | 0 | return STATUS( |
2503 | 0 | NotFound, "Table does not exist", req->table_id(), |
2504 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
2505 | 0 | } |
2506 | 2.54k | } |
2507 | | |
2508 | 2.54k | AlterTableRequestPB alter_table_req; |
2509 | 2.54k | alter_table_req.mutable_table()->set_table_id(req->table_id()); |
2510 | 2.54k | alter_table_req.set_wal_retention_secs(FLAGS_cdc_wal_retention_time_secs); |
2511 | 2.54k | AlterTableResponsePB alter_table_resp; |
2512 | 2.54k | Status s = this->AlterTable(&alter_table_req, &alter_table_resp, rpc); |
2513 | 2.54k | if (!s.ok()) { |
2514 | 0 | return STATUS(InternalError, |
2515 | 0 | "Unable to change the WAL retention time for table", req->table_id(), |
2516 | 0 | MasterError(MasterErrorPB::INTERNAL_ERROR)); |
2517 | 0 | } |
2518 | 2.69k | } |
2519 | | |
2520 | 2.69k | scoped_refptr<CDCStreamInfo> stream; |
2521 | 2.69k | if (!req->has_db_stream_id()) { |
2522 | 157 | { |
2523 | 157 | TRACE("Acquired catalog manager lock"); |
2524 | 157 | LockGuard lock(mutex_); |
2525 | | // Construct the CDC stream if the producer wasn't bootstrapped. |
2526 | 157 | CDCStreamId stream_id; |
2527 | 157 | stream_id = GenerateIdUnlocked(SysRowEntryType::CDC_STREAM); |
2528 | | |
2529 | 157 | stream = make_scoped_refptr<CDCStreamInfo>(stream_id); |
2530 | 157 | stream->mutable_metadata()->StartMutation(); |
2531 | 157 | SysCDCStreamEntryPB* metadata = &stream->mutable_metadata()->mutable_dirty()->pb; |
2532 | 157 | bool create_namespace = id_type_option_value == cdc::kNamespaceId; |
2533 | 157 | if (create_namespace) { |
2534 | 153 | metadata->set_namespace_id(req->table_id()); |
2535 | 4 | } else { |
2536 | 4 | metadata->add_table_id(req->table_id()); |
2537 | 4 | } |
2538 | 157 | metadata->mutable_options()->CopyFrom(req->options()); |
2539 | 157 | metadata->set_state( |
2540 | 153 | req->has_initial_state() ? req->initial_state() : SysCDCStreamEntryPB::ACTIVE); |
2541 | | |
2542 | | // Add the stream to the in-memory map. |
2543 | 157 | cdc_stream_map_[stream->id()] = stream; |
2544 | 157 | if (!create_namespace) { |
2545 | 4 | cdc_stream_tables_count_map_[req->table_id()]++; |
2546 | 4 | } |
2547 | 157 | resp->set_stream_id(stream->id()); |
2548 | 157 | } |
2549 | 157 | TRACE("Inserted new CDC stream into CatalogManager maps"); |
2550 | | |
2551 | | // Update the on-disk system catalog. |
2552 | 157 | RETURN_NOT_OK(CheckLeaderStatusAndSetupError( |
2553 | 157 | sys_catalog_->Upsert(leader_ready_term(), stream), |
2554 | 157 | "inserting CDC stream into sys-catalog", resp)); |
2555 | 157 | TRACE("Wrote CDC stream to sys-catalog"); |
2556 | | |
2557 | | // Commit the in-memory state. |
2558 | 157 | stream->mutable_metadata()->CommitMutation(); |
2559 | 157 | LOG(INFO) << "Created CDC stream " << stream->ToString(); |
2560 | | |
2561 | 157 | RETURN_NOT_OK(CreateCdcStateTableIfNeeded(rpc)); |
2562 | 157 | if (!PREDICT_FALSE(FLAGS_TEST_disable_cdc_state_insert_on_setup) && |
2563 | 153 | (!req->has_initial_state() || |
2564 | 153 | (req->initial_state() == master::SysCDCStreamEntryPB::ACTIVE)) && |
2565 | 153 | (id_type_option_value != cdc::kNamespaceId)) { |
2566 | | // Create the cdc state entries for the tablets in this table from scratch since we have no |
2567 | | // data to bootstrap. If we data to bootstrap, let the BootstrapProducer logic take care of |
2568 | | // populating entries in cdc_state. |
2569 | 0 | auto ybclient = master_->cdc_state_client_initializer().client(); |
2570 | 0 | if (!ybclient) { |
2571 | 0 | return STATUS(IllegalState, "Client not initialized or shutting down"); |
2572 | 0 | } |
2573 | 0 | client::TableHandle cdc_table; |
2574 | 0 | const client::YBTableName cdc_state_table_name( |
2575 | 0 | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
2576 | 0 | RETURN_NOT_OK(ybclient->WaitForCreateTableToFinish(cdc_state_table_name)); |
2577 | 0 | RETURN_NOT_OK(cdc_table.Open(cdc_state_table_name, ybclient)); |
2578 | 0 | std::shared_ptr<client::YBSession> session = ybclient->NewSession(); |
2579 | 0 | scoped_refptr<TableInfo> table = VERIFY_RESULT(FindTableById(req->table_id())); |
2580 | 0 | auto tablets = table->GetTablets(); |
2581 | 0 | for (const auto& tablet : tablets) { |
2582 | 0 | const auto op = cdc_table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
2583 | 0 | auto* const req = op->mutable_request(); |
2584 | 0 | QLAddStringHashValue(req, tablet->id()); |
2585 | 0 | QLAddStringRangeValue(req, stream->id()); |
2586 | 0 | cdc_table.AddStringColumnValue(req, master::kCdcCheckpoint, OpId().ToString()); |
2587 | 0 | cdc_table.AddTimestampColumnValue( |
2588 | 0 | req, master::kCdcLastReplicationTime, GetCurrentTimeMicros()); |
2589 | 0 | session->Apply(op); |
2590 | 0 | } |
2591 | 0 | RETURN_NOT_OK(session->Flush()); |
2592 | 0 | } |
2593 | 2.53k | } else { |
2594 | | // Update and add table_id. |
2595 | 2.53k | { |
2596 | 2.53k | SharedLock lock(mutex_); |
2597 | 2.53k | stream = FindPtrOrNull(cdc_stream_map_, req->db_stream_id()); |
2598 | 2.53k | } |
2599 | | |
2600 | 2.53k | if (stream == nullptr) { |
2601 | 0 | return STATUS( |
2602 | 0 | NotFound, "Could not find CDC stream", req->ShortDebugString(), |
2603 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
2604 | 0 | } |
2605 | | |
2606 | 2.53k | auto stream_lock = stream->LockForWrite(); |
2607 | 2.53k | if (stream_lock->is_deleting()) { |
2608 | 0 | return STATUS( |
2609 | 0 | NotFound, "CDC stream has been deleted", req->ShortDebugString(), |
2610 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
2611 | 0 | } |
2612 | 2.53k | stream_lock.mutable_data()->pb.add_table_id(req->table_id()); |
2613 | | |
2614 | | // Also need to persist changes in sys catalog. |
2615 | 2.53k | RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), stream)); |
2616 | 2.53k | stream_lock.Commit(); |
2617 | 2.53k | } |
2618 | 2.69k | return Status::OK(); |
2619 | 2.69k | } |
2620 | | |
2621 | | Status CatalogManager::DeleteCDCStream(const DeleteCDCStreamRequestPB* req, |
2622 | | DeleteCDCStreamResponsePB* resp, |
2623 | 2 | rpc::RpcContext* rpc) { |
2624 | 2 | LOG(INFO) << "Servicing DeleteCDCStream request from " << RequestorString(rpc) |
2625 | 2 | << ": " << req->ShortDebugString(); |
2626 | | |
2627 | 2 | if (req->stream_id_size() < 1) { |
2628 | 0 | return STATUS(InvalidArgument, "No CDC Stream ID given", req->ShortDebugString(), |
2629 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
2630 | 0 | } |
2631 | | |
2632 | 2 | std::vector<scoped_refptr<CDCStreamInfo>> streams; |
2633 | 2 | { |
2634 | 2 | SharedLock lock(mutex_); |
2635 | 2 | for (const auto& stream_id : req->stream_id()) { |
2636 | 2 | auto stream = FindPtrOrNull(cdc_stream_map_, stream_id); |
2637 | | |
2638 | 2 | if (stream == nullptr || stream->LockForRead()->is_deleting()) { |
2639 | 0 | resp->add_not_found_stream_ids(stream_id); |
2640 | 0 | LOG(WARNING) << "CDC stream does not exist: " << stream_id; |
2641 | 2 | } else { |
2642 | 2 | auto ltm = stream->LockForRead(); |
2643 | 2 | bool active = (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE); |
2644 | 2 | bool is_WAL = false; |
2645 | 5 | for (const auto& option : ltm->pb.options()) { |
2646 | 5 | if (option.key() == "record_format" && option.value() == "WAL") { |
2647 | 0 | is_WAL = true; |
2648 | 0 | } |
2649 | 5 | } |
2650 | 2 | if (!req->force_delete() && is_WAL && active) { |
2651 | 0 | return STATUS(NotSupported, |
2652 | 0 | "Cannot delete an xCluster Stream in replication. " |
2653 | 0 | "Use 'force_delete' to override", |
2654 | 0 | req->ShortDebugString(), |
2655 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
2656 | 0 | } |
2657 | 2 | streams.push_back(stream); |
2658 | 2 | } |
2659 | 2 | } |
2660 | 2 | } |
2661 | | |
2662 | 2 | if (!resp->not_found_stream_ids().empty() && !req->ignore_errors()) { |
2663 | 0 | string missing_streams = JoinElementsIterator(resp->not_found_stream_ids().begin(), |
2664 | 0 | resp->not_found_stream_ids().end(), |
2665 | 0 | ","); |
2666 | 0 | return STATUS( |
2667 | 0 | NotFound, |
2668 | 0 | Substitute("Did not find all requested CDC streams. Missing streams: [$0]. Request: $1", |
2669 | 0 | missing_streams, req->ShortDebugString()), |
2670 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
2671 | 0 | } |
2672 | | |
2673 | | // Do not delete them here, just mark them as DELETING and the catalog manager background thread |
2674 | | // will handle the deletion. |
2675 | 2 | Status s = MarkCDCStreamsAsDeleting(streams); |
2676 | 2 | if (!s.ok()) { |
2677 | 0 | if (s.IsIllegalState()) { |
2678 | 0 | PANIC_RPC(rpc, s.message().ToString()); |
2679 | 0 | } |
2680 | 0 | return CheckIfNoLongerLeaderAndSetupError(s, resp); |
2681 | 0 | } |
2682 | | |
2683 | 2 | LOG(INFO) << "Successfully deleted CDC streams " << JoinStreamsCSVLine(streams) |
2684 | 2 | << " per request from " << RequestorString(rpc); |
2685 | | |
2686 | 2 | return Status::OK(); |
2687 | 2 | } |
2688 | | |
2689 | | Status CatalogManager::MarkCDCStreamsAsDeleting( |
2690 | 2 | const std::vector<scoped_refptr<CDCStreamInfo>>& streams) { |
2691 | 2 | if (streams.empty()) { |
2692 | 0 | return Status::OK(); |
2693 | 0 | } |
2694 | 2 | std::vector<CDCStreamInfo::WriteLock> locks; |
2695 | 2 | std::vector<CDCStreamInfo*> streams_to_mark; |
2696 | 2 | locks.reserve(streams.size()); |
2697 | 2 | for (auto& stream : streams) { |
2698 | 2 | auto l = stream->LockForWrite(); |
2699 | 2 | l.mutable_data()->pb.set_state(SysCDCStreamEntryPB::DELETING); |
2700 | 2 | locks.push_back(std::move(l)); |
2701 | 2 | streams_to_mark.push_back(stream.get()); |
2702 | 2 | } |
2703 | | // The mutation will be aborted when 'l' exits the scope on early return. |
2704 | 2 | RETURN_NOT_OK(CheckStatus( |
2705 | 2 | sys_catalog_->Upsert(leader_ready_term(), streams_to_mark), |
2706 | 2 | "updating CDC streams in sys-catalog")); |
2707 | 2 | LOG(INFO) << "Successfully marked streams " << JoinStreamsCSVLine(streams_to_mark) |
2708 | 2 | << " as DELETING in sys catalog"; |
2709 | 2 | for (auto& lock : locks) { |
2710 | 2 | lock.Commit(); |
2711 | 2 | } |
2712 | 2 | return Status::OK(); |
2713 | 2 | } |
2714 | | |
2715 | | Status CatalogManager::FindCDCStreamsMarkedAsDeleting( |
2716 | 90.0k | std::vector<scoped_refptr<CDCStreamInfo>>* streams) { |
2717 | 90.0k | TRACE("Acquired catalog manager lock"); |
2718 | 90.0k | SharedLock lock(mutex_); |
2719 | 558 | for (const CDCStreamInfoMap::value_type& entry : cdc_stream_map_) { |
2720 | 558 | auto ltm = entry.second->LockForRead(); |
2721 | 558 | if (ltm->is_deleting()) { |
2722 | 0 | LOG(INFO) << "Stream " << entry.second->id() << " was marked as DELETING"; |
2723 | 0 | streams->push_back(entry.second); |
2724 | 0 | } |
2725 | 558 | } |
2726 | 90.0k | return Status::OK(); |
2727 | 90.0k | } |
2728 | | |
2729 | | Status CatalogManager::CleanUpDeletedCDCStreams( |
2730 | 0 | const std::vector<scoped_refptr<CDCStreamInfo>>& streams) { |
2731 | 0 | auto ybclient = master_->cdc_state_client_initializer().client(); |
2732 | 0 | if (!ybclient) { |
2733 | 0 | return STATUS(IllegalState, "Client not initialized or shutting down"); |
2734 | 0 | } |
2735 | | |
2736 | | // First. For each deleted stream, delete the cdc state rows. |
2737 | | // Delete all the entries in cdc_state table that contain all the deleted cdc streams. |
2738 | 0 | client::TableHandle cdc_table; |
2739 | 0 | const client::YBTableName cdc_state_table_name( |
2740 | 0 | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
2741 | 0 | Status s = cdc_table.Open(cdc_state_table_name, ybclient); |
2742 | 0 | if (!s.ok()) { |
2743 | 0 | LOG(WARNING) << "Unable to open table " << master::kCdcStateTableName |
2744 | 0 | << " to delete stream ids entries: " << s; |
2745 | 0 | return s.CloneAndPrepend("Unable to open cdc_state table"); |
2746 | 0 | } |
2747 | | |
2748 | 0 | std::shared_ptr<client::YBSession> session = ybclient->NewSession(); |
2749 | 0 | std::vector<std::pair<CDCStreamId, std::shared_ptr<client::YBqlWriteOp>>> stream_ops; |
2750 | 0 | std::set<CDCStreamId> failed_streams; |
2751 | 0 | for (const auto& stream : streams) { |
2752 | 0 | LOG(INFO) << "Deleting rows for stream " << stream->id(); |
2753 | 0 | for (const auto& table_id : stream->table_id()) { |
2754 | 0 | TabletInfos tablets; |
2755 | 0 | scoped_refptr<TableInfo> table; |
2756 | 0 | { |
2757 | 0 | TRACE("Acquired catalog manager lock"); |
2758 | 0 | SharedLock lock(mutex_); |
2759 | 0 | table = FindPtrOrNull(*table_ids_map_, table_id); |
2760 | 0 | } |
2761 | | // GetTablets locks lock_ in shared mode. |
2762 | 0 | if (table) { |
2763 | 0 | tablets = table->GetTablets(); |
2764 | 0 | } |
2765 | |
|
2766 | 0 | for (const auto& tablet : tablets) { |
2767 | 0 | const auto delete_op = cdc_table.NewDeleteOp(); |
2768 | 0 | auto* delete_req = delete_op->mutable_request(); |
2769 | |
|
2770 | 0 | QLAddStringHashValue(delete_req, tablet->tablet_id()); |
2771 | 0 | QLAddStringRangeValue(delete_req, stream->id()); |
2772 | 0 | session->Apply(delete_op); |
2773 | 0 | stream_ops.push_back(std::make_pair(stream->id(), delete_op)); |
2774 | 0 | LOG(INFO) << "Deleting stream " << stream->id() << " for tablet " << tablet->tablet_id() |
2775 | 0 | << " with request " << delete_req->ShortDebugString(); |
2776 | 0 | } |
2777 | 0 | } |
2778 | 0 | } |
2779 | | // Flush all the delete operations. |
2780 | 0 | s = session->Flush(); |
2781 | 0 | if (!s.ok()) { |
2782 | 0 | LOG(ERROR) << "Unable to flush operations to delete cdc streams: " << s; |
2783 | 0 | return s.CloneAndPrepend("Error deleting cdc stream rows from cdc_state table"); |
2784 | 0 | } |
2785 | | |
2786 | 0 | for (const auto& e : stream_ops) { |
2787 | 0 | if (!e.second->succeeded()) { |
2788 | 0 | LOG(WARNING) << "Error deleting cdc_state row with tablet id " |
2789 | 0 | << e.second->request().hashed_column_values(0).value().string_value() |
2790 | 0 | << " and stream id " |
2791 | 0 | << e.second->request().range_column_values(0).value().string_value() |
2792 | 0 | << ": " << e.second->response().status(); |
2793 | 0 | failed_streams.insert(e.first); |
2794 | 0 | } |
2795 | 0 | } |
2796 | | |
2797 | | // TODO: Read cdc_state table and verify that there are not rows with the specified cdc stream |
2798 | | // and keep those in the map in the DELETED state to retry later. |
2799 | |
|
2800 | 0 | std::vector<CDCStreamInfo::WriteLock> locks; |
2801 | 0 | locks.reserve(streams.size() - failed_streams.size()); |
2802 | 0 | std::vector<CDCStreamInfo*> streams_to_delete; |
2803 | 0 | streams_to_delete.reserve(streams.size() - failed_streams.size()); |
2804 | | |
2805 | | // Delete from sys catalog only those streams that were successfully delete from cdc_state. |
2806 | 0 | for (auto& stream : streams) { |
2807 | 0 | if (failed_streams.find(stream->id()) == failed_streams.end()) { |
2808 | 0 | locks.push_back(stream->LockForWrite()); |
2809 | 0 | streams_to_delete.push_back(stream.get()); |
2810 | 0 | } |
2811 | 0 | } |
2812 | | |
2813 | | // The mutation will be aborted when 'l' exits the scope on early return. |
2814 | 0 | RETURN_NOT_OK(CheckStatus( |
2815 | 0 | sys_catalog_->Delete(leader_ready_term(), streams_to_delete), |
2816 | 0 | "deleting CDC streams from sys-catalog")); |
2817 | 0 | LOG(INFO) << "Successfully deleted streams " << JoinStreamsCSVLine(streams_to_delete) |
2818 | 0 | << " from sys catalog"; |
2819 | | |
2820 | | // Remove it from the map. |
2821 | 0 | TRACE("Removing from CDC stream maps"); |
2822 | 0 | { |
2823 | 0 | LockGuard lock(mutex_); |
2824 | 0 | for (const auto& stream : streams_to_delete) { |
2825 | 0 | if (cdc_stream_map_.erase(stream->id()) < 1) { |
2826 | 0 | return STATUS(IllegalState, "Could not remove CDC stream from map", stream->id()); |
2827 | 0 | } |
2828 | 0 | for (auto& id : stream->table_id()) { |
2829 | 0 | cdc_stream_tables_count_map_[id]--; |
2830 | 0 | } |
2831 | 0 | } |
2832 | 0 | } |
2833 | 0 | LOG(INFO) << "Successfully deleted streams " << JoinStreamsCSVLine(streams_to_delete) |
2834 | 0 | << " from stream map"; |
2835 | |
|
2836 | 0 | for (auto& lock : locks) { |
2837 | 0 | lock.Commit(); |
2838 | 0 | } |
2839 | 0 | return Status::OK(); |
2840 | 0 | } |
2841 | | |
2842 | | Status CatalogManager::GetCDCStream(const GetCDCStreamRequestPB* req, |
2843 | | GetCDCStreamResponsePB* resp, |
2844 | 6 | rpc::RpcContext* rpc) { |
2845 | 6 | LOG(INFO) << "GetCDCStream from " << RequestorString(rpc) |
2846 | 6 | << ": " << req->DebugString(); |
2847 | | |
2848 | 6 | if (!req->has_stream_id()) { |
2849 | 0 | return STATUS(InvalidArgument, "CDC Stream ID must be provided", req->ShortDebugString(), |
2850 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
2851 | 0 | } |
2852 | | |
2853 | 6 | scoped_refptr<CDCStreamInfo> stream; |
2854 | 6 | { |
2855 | 6 | SharedLock lock(mutex_); |
2856 | 6 | stream = FindPtrOrNull(cdc_stream_map_, req->stream_id()); |
2857 | 6 | } |
2858 | | |
2859 | 6 | if (stream == nullptr || stream->LockForRead()->is_deleting()) { |
2860 | 2 | return STATUS(NotFound, "Could not find CDC stream", req->ShortDebugString(), |
2861 | 2 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
2862 | 2 | } |
2863 | | |
2864 | 4 | auto stream_lock = stream->LockForRead(); |
2865 | | |
2866 | 4 | CDCStreamInfoPB* stream_info = resp->mutable_stream(); |
2867 | | |
2868 | 4 | stream_info->set_stream_id(stream->id()); |
2869 | 4 | std::string id_type_option_value(cdc::kTableId); |
2870 | | |
2871 | 0 | for (auto option : stream_lock->options()) { |
2872 | 0 | if (option.has_key() && option.key() == cdc::kIdType) |
2873 | 0 | id_type_option_value = option.value(); |
2874 | 0 | } |
2875 | 4 | if (id_type_option_value == cdc::kNamespaceId) { |
2876 | 0 | stream_info->set_namespace_id(stream_lock->namespace_id()); |
2877 | 0 | } |
2878 | | |
2879 | 4 | for (auto& table_id : stream_lock->table_id()) { |
2880 | 4 | stream_info->add_table_id(table_id); |
2881 | 4 | } |
2882 | | |
2883 | 4 | stream_info->mutable_options()->CopyFrom(stream_lock->options()); |
2884 | | |
2885 | 4 | return Status::OK(); |
2886 | 4 | } |
2887 | | |
2888 | | Status CatalogManager::GetCDCDBStreamInfo(const GetCDCDBStreamInfoRequestPB* req, |
2889 | 11 | GetCDCDBStreamInfoResponsePB* resp) { |
2890 | | |
2891 | 11 | if (!req->has_db_stream_id()) { |
2892 | 0 | return STATUS(InvalidArgument, "CDC DB Stream ID must be provided", req->ShortDebugString(), |
2893 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
2894 | 0 | } |
2895 | | |
2896 | 11 | scoped_refptr<CDCStreamInfo> stream; |
2897 | 11 | { |
2898 | 11 | SharedLock lock(mutex_); |
2899 | 11 | stream = FindPtrOrNull(cdc_stream_map_, req->db_stream_id()); |
2900 | 11 | } |
2901 | | |
2902 | 11 | if (stream == nullptr || stream->LockForRead()->is_deleting()) { |
2903 | 0 | return STATUS(NotFound, "Could not find CDC stream", req->ShortDebugString(), |
2904 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
2905 | 0 | } |
2906 | | |
2907 | 11 | auto stream_lock = stream->LockForRead(); |
2908 | | |
2909 | 11 | if (!stream->namespace_id().empty()) { |
2910 | 11 | resp->set_namespace_id(stream->namespace_id()); |
2911 | 11 | } |
2912 | | |
2913 | 11 | for (const auto& table_id : stream_lock->table_id()) { |
2914 | 11 | const auto table_info = resp->add_table_info(); |
2915 | 11 | table_info->set_stream_id(req->db_stream_id()); |
2916 | 11 | table_info->set_table_id(table_id); |
2917 | 11 | } |
2918 | | |
2919 | 11 | return Status::OK(); |
2920 | 11 | } |
2921 | | |
2922 | | Status CatalogManager::ListCDCStreams(const ListCDCStreamsRequestPB* req, |
2923 | 1 | ListCDCStreamsResponsePB* resp) { |
2924 | | |
2925 | 1 | scoped_refptr<TableInfo> table; |
2926 | 1 | bool filter_table = req->has_table_id(); |
2927 | 1 | if (filter_table) { |
2928 | 0 | table = VERIFY_RESULT(FindTableById(req->table_id())); |
2929 | 0 | } |
2930 | | |
2931 | 1 | SharedLock lock(mutex_); |
2932 | | |
2933 | 1 | for (const CDCStreamInfoMap::value_type& entry : cdc_stream_map_) { |
2934 | 1 | bool skip_stream = false; |
2935 | 1 | bool id_type_option_present = false; |
2936 | | |
2937 | | // if the request is to list the DB streams of a specific namespace then the other namespaces |
2938 | | // should not be considered |
2939 | 1 | if (req->has_namespace_id() && (req->namespace_id() != entry.second->namespace_id())) { |
2940 | 0 | continue; |
2941 | 0 | } |
2942 | | |
2943 | 1 | if (filter_table && table->id() != entry.second->table_id().Get(0)) { |
2944 | 0 | continue; // Skip deleting/deleted streams and streams from other tables. |
2945 | 0 | } |
2946 | | |
2947 | 1 | auto ltm = entry.second->LockForRead(); |
2948 | | |
2949 | 1 | if (ltm->is_deleting()) { |
2950 | 0 | continue; |
2951 | 0 | } |
2952 | | |
2953 | 1 | for (const auto& option : ltm->options()) { |
2954 | 0 | if (option.key() == cdc::kIdType) { |
2955 | 0 | id_type_option_present = true; |
2956 | 0 | if (req->has_id_type()) { |
2957 | 0 | if (req->id_type() == IdTypePB::NAMESPACE_ID && |
2958 | 0 | option.value() != cdc::kNamespaceId) { |
2959 | 0 | skip_stream = true; |
2960 | 0 | break; |
2961 | 0 | } |
2962 | 0 | if (req->id_type() == IdTypePB::TABLE_ID && |
2963 | 0 | option.value() == cdc::kNamespaceId) { |
2964 | 0 | skip_stream = true; |
2965 | 0 | break; |
2966 | 0 | } |
2967 | 0 | } |
2968 | 0 | } |
2969 | 0 | } |
2970 | | |
2971 | 1 | if ((!id_type_option_present && req->id_type() == IdTypePB::NAMESPACE_ID) || |
2972 | 1 | skip_stream) |
2973 | 0 | continue; |
2974 | | |
2975 | 1 | CDCStreamInfoPB* stream = resp->add_streams(); |
2976 | 1 | stream->set_stream_id(entry.second->id()); |
2977 | 1 | for (const auto& table_id : ltm->table_id()) { |
2978 | 1 | stream->add_table_id(table_id); |
2979 | 1 | } |
2980 | 1 | stream->mutable_options()->CopyFrom(ltm->options()); |
2981 | | // Also add an option for the current state. |
2982 | 1 | if (ltm->pb.has_state()) { |
2983 | 1 | auto state_option = stream->add_options(); |
2984 | 1 | state_option->set_key("state"); |
2985 | 1 | state_option->set_value(master::SysCDCStreamEntryPB::State_Name(ltm->pb.state())); |
2986 | 1 | } |
2987 | 1 | } |
2988 | 1 | return Status::OK(); |
2989 | 1 | } |
2990 | | |
2991 | 157 | bool CatalogManager::CDCStreamExistsUnlocked(const CDCStreamId& stream_id) { |
2992 | 157 | scoped_refptr<CDCStreamInfo> stream = FindPtrOrNull(cdc_stream_map_, stream_id); |
2993 | 157 | if (stream == nullptr || stream->LockForRead()->is_deleting()) { |
2994 | 157 | return false; |
2995 | 157 | } |
2996 | 0 | return true; |
2997 | 0 | } |
2998 | | |
2999 | | Status CatalogManager::UpdateCDCStream(const UpdateCDCStreamRequestPB *req, |
3000 | | UpdateCDCStreamResponsePB* resp, |
3001 | 0 | rpc::RpcContext* rpc) { |
3002 | 0 | LOG(INFO) << "UpdateCDCStream from " << RequestorString(rpc) |
3003 | 0 | << ": " << req->DebugString(); |
3004 | | |
3005 | | // Check fields. |
3006 | 0 | if (!req->has_stream_id()) { |
3007 | 0 | return STATUS(InvalidArgument, "Stream ID must be provided", |
3008 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3009 | 0 | } |
3010 | 0 | if (!req->has_entry()) { |
3011 | 0 | return STATUS(InvalidArgument, "CDC Stream Entry must be provided", |
3012 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3013 | 0 | } |
3014 | | |
3015 | 0 | scoped_refptr<CDCStreamInfo> stream; |
3016 | 0 | { |
3017 | 0 | SharedLock lock(mutex_); |
3018 | 0 | stream = FindPtrOrNull(cdc_stream_map_, req->stream_id()); |
3019 | 0 | } |
3020 | 0 | if (stream == nullptr) { |
3021 | 0 | return STATUS(NotFound, "Could not find CDC stream", req->ShortDebugString(), |
3022 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
3023 | 0 | } |
3024 | | |
3025 | 0 | auto stream_lock = stream->LockForWrite(); |
3026 | 0 | if (stream_lock->is_deleting()) { |
3027 | 0 | return STATUS(NotFound, "CDC stream has been deleted", req->ShortDebugString(), |
3028 | 0 | MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
3029 | 0 | } |
3030 | | |
3031 | 0 | stream_lock.mutable_data()->pb.CopyFrom(req->entry()); |
3032 | | // Also need to persist changes in sys catalog. |
3033 | 0 | RETURN_NOT_OK(sys_catalog_->Upsert(leader_ready_term(), stream)); |
3034 | |
|
3035 | 0 | stream_lock.Commit(); |
3036 | |
|
3037 | 0 | return Status::OK(); |
3038 | 0 | } |
3039 | | |
3040 | | /* |
3041 | | * UniverseReplication is setup in 4 stages within the Catalog Manager |
3042 | | * 1. SetupUniverseReplication: Validates user input & requests Producer schema. |
3043 | | * 2. GetTableSchemaCallback: Validates Schema compatibility & requests Producer CDC init. |
3044 | | * 3. AddCDCStreamToUniverseAndInitConsumer: Setup RPC connections for CDC Streaming |
3045 | | * 4. InitCDCConsumer: Initializes the Consumer settings to begin tailing data |
3046 | | */ |
3047 | | Status CatalogManager::SetupUniverseReplication(const SetupUniverseReplicationRequestPB* req, |
3048 | | SetupUniverseReplicationResponsePB* resp, |
3049 | | rpc::RpcContext* rpc) { |
3050 | | LOG(INFO) << "SetupUniverseReplication from " << RequestorString(rpc) |
3051 | | << ": " << req->DebugString(); |
3052 | | |
3053 | | // Sanity checking section. |
3054 | | if (!req->has_producer_id()) { |
3055 | | return STATUS(InvalidArgument, "Producer universe ID must be provided", |
3056 | | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3057 | | } |
3058 | | |
3059 | | if (req->producer_master_addresses_size() <= 0) { |
3060 | | return STATUS(InvalidArgument, "Producer master address must be provided", |
3061 | | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3062 | | } |
3063 | | |
3064 | | if (req->producer_bootstrap_ids().size() > 0 && |
3065 | | req->producer_bootstrap_ids().size() != req->producer_table_ids().size()) { |
3066 | | return STATUS(InvalidArgument, "Number of bootstrap ids must be equal to number of tables", |
3067 | | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3068 | | } |
3069 | | |
3070 | | { |
3071 | | auto l = cluster_config_->LockForRead(); |
3072 | | if (l->pb.cluster_uuid() == req->producer_id()) { |
3073 | | return STATUS(InvalidArgument, "The request UUID and cluster UUID are identical.", |
3074 | | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3075 | | } |
3076 | | } |
3077 | | |
3078 | | { |
3079 | | auto request_master_addresses = req->producer_master_addresses(); |
3080 | | std::vector<ServerEntryPB> cluster_master_addresses; |
3081 | | RETURN_NOT_OK(master_->ListMasters(&cluster_master_addresses)); |
3082 | | for (const auto &req_elem : request_master_addresses) { |
3083 | | for (const auto &cluster_elem : cluster_master_addresses) { |
3084 | | if (cluster_elem.has_registration()) { |
3085 | | auto p_rpc_addresses = cluster_elem.registration().private_rpc_addresses(); |
3086 | | for (const auto &p_rpc_elem : p_rpc_addresses) { |
3087 | | if (req_elem.host() == p_rpc_elem.host() && req_elem.port() == p_rpc_elem.port()) { |
3088 | | return STATUS(InvalidArgument, |
3089 | | "Duplicate between request master addresses and private RPC addresses", |
3090 | | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3091 | | } |
3092 | | } |
3093 | | |
3094 | | auto broadcast_addresses = cluster_elem.registration().broadcast_addresses(); |
3095 | | for (const auto &bc_elem : broadcast_addresses) { |
3096 | | if (req_elem.host() == bc_elem.host() && req_elem.port() == bc_elem.port()) { |
3097 | | return STATUS(InvalidArgument, |
3098 | | "Duplicate between request master addresses and broadcast addresses", |
3099 | | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
3100 | | } |
3101 | | } |
3102 | | } |
3103 | | } |
3104 | | } |
3105 | | } |
3106 | | |
3107 | | std::unordered_map<TableId, std::string> table_id_to_bootstrap_id; |
3108 | | |
3109 | | if (req->producer_bootstrap_ids_size() > 0) { |
3110 | | for (int i = 0; i < req->producer_table_ids().size(); i++) { |
3111 | | table_id_to_bootstrap_id[req->producer_table_ids(i)] = req->producer_bootstrap_ids(i); |
3112 | | } |
3113 | | } |
3114 | | |
3115 | | // We assume that the list of table ids is unique. |
3116 | | if (req->producer_bootstrap_ids().size() > 0 && |
3117 | | implicit_cast<size_t>(req->producer_table_ids().size()) != table_id_to_bootstrap_id.size()) { |
3118 | | return STATUS(InvalidArgument, "When providing bootstrap ids, " |
3119 | | "the list of tables must be unique", req->ShortDebugString(), |
3120 | | MasterError(MasterErrorPB::INVALID_REQUEST)); |
3121 | | } |
3122 | | |
3123 | | scoped_refptr<UniverseReplicationInfo> ri; |
3124 | | { |
3125 | | TRACE("Acquired catalog manager lock"); |
3126 | | SharedLock lock(mutex_); |
3127 | | |
3128 | | if (FindPtrOrNull(universe_replication_map_, req->producer_id()) != nullptr) { |
3129 | | return STATUS(InvalidArgument, "Producer already present", req->producer_id(), |
3130 | | MasterError(MasterErrorPB::INVALID_REQUEST)); |
3131 | | } |
3132 | | } |
3133 | | |
3134 | | // Create an entry in the system catalog DocDB for this new universe replication. |
3135 | | ri = new UniverseReplicationInfo(req->producer_id()); |
3136 | | ri->mutable_metadata()->StartMutation(); |
3137 | | SysUniverseReplicationEntryPB *metadata = &ri->mutable_metadata()->mutable_dirty()->pb; |
3138 | | metadata->set_producer_id(req->producer_id()); |
3139 | | metadata->mutable_producer_master_addresses()->CopyFrom(req->producer_master_addresses()); |
3140 | | metadata->mutable_tables()->CopyFrom(req->producer_table_ids()); |
3141 | | metadata->set_state(SysUniverseReplicationEntryPB::INITIALIZING); |
3142 | | |
3143 | | RETURN_NOT_OK(CheckLeaderStatusAndSetupError( |
3144 | | sys_catalog_->Upsert(leader_ready_term(), ri), |
3145 | | "inserting universe replication info into sys-catalog", resp)); |
3146 | | TRACE("Wrote universe replication info to sys-catalog"); |
3147 | | |
3148 | | // Commit the in-memory state now that it's added to the persistent catalog. |
3149 | | ri->mutable_metadata()->CommitMutation(); |
3150 | | LOG(INFO) << "Setup universe replication from producer " << ri->ToString(); |
3151 | | |
3152 | | { |
3153 | | LockGuard lock(mutex_); |
3154 | | universe_replication_map_[ri->id()] = ri; |
3155 | | } |
3156 | | |
3157 | | // Initialize the CDC Stream by querying the Producer server for RPC sanity checks. |
3158 | | auto result = ri->GetOrCreateCDCRpcTasks(req->producer_master_addresses()); |
3159 | | LOG(INFO) << "GetOrCreateCDCRpcTasks: " << result.ok(); |
3160 | | if (!result.ok()) { |
3161 | | MarkUniverseReplicationFailed(ri, ResultToStatus(result)); |
3162 | | return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, result.status()); |
3163 | | } |
3164 | | std::shared_ptr<CDCRpcTasks> cdc_rpc = *result; |
3165 | | |
3166 | | // For each table, run an async RPC task to verify a sufficient Producer:Consumer schema match. |
3167 | | for (int i = 0; i < req->producer_table_ids_size(); i++) { |
3168 | | |
3169 | | // SETUP CONTINUES after this async call. |
3170 | | Status s; |
3171 | | if (IsColocatedParentTableId(req->producer_table_ids(i))) { |
3172 | | auto tables_info = std::make_shared<std::vector<client::YBTableInfo>>(); |
3173 | | s = cdc_rpc->client()->GetColocatedTabletSchemaById( |
3174 | | req->producer_table_ids(i), tables_info, |
3175 | | Bind(&enterprise::CatalogManager::GetColocatedTabletSchemaCallback, Unretained(this), |
3176 | | ri->id(), tables_info, table_id_to_bootstrap_id)); |
3177 | | } else if (IsTablegroupParentTableId(req->producer_table_ids(i))) { |
3178 | | auto tablegroup_info = std::make_shared<std::vector<client::YBTableInfo>>(); |
3179 | | s = cdc_rpc->client()->GetTablegroupSchemaById( |
3180 | | req->producer_table_ids(i), tablegroup_info, |
3181 | | Bind(&enterprise::CatalogManager::GetTablegroupSchemaCallback, Unretained(this), |
3182 | | ri->id(), tablegroup_info, req->producer_table_ids(i), table_id_to_bootstrap_id)); |
3183 | | } else { |
3184 | | auto table_info = std::make_shared<client::YBTableInfo>(); |
3185 | | s = cdc_rpc->client()->GetTableSchemaById( |
3186 | | req->producer_table_ids(i), table_info, |
3187 | | Bind(&enterprise::CatalogManager::GetTableSchemaCallback, Unretained(this), |
3188 | | ri->id(), table_info, table_id_to_bootstrap_id)); |
3189 | | } |
3190 | | |
3191 | | if (!s.ok()) { |
3192 | | MarkUniverseReplicationFailed(ri, s); |
3193 | | return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s); |
3194 | | } |
3195 | | } |
3196 | | |
3197 | | LOG(INFO) << "Started schema validation for universe replication " << ri->ToString(); |
3198 | | return Status::OK(); |
3199 | | } |
3200 | | |
3201 | | void CatalogManager::MarkUniverseReplicationFailed( |
3202 | 0 | scoped_refptr<UniverseReplicationInfo> universe, const Status& failure_status) { |
3203 | 0 | auto l = universe->LockForWrite(); |
3204 | 0 | if (l->pb.state() == SysUniverseReplicationEntryPB::DELETED) { |
3205 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DELETED_ERROR); |
3206 | 0 | } else { |
3207 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED); |
3208 | 0 | } |
3209 | |
|
3210 | 0 | universe->SetSetupUniverseReplicationErrorStatus(failure_status); |
3211 | | |
3212 | | // Update sys_catalog. |
3213 | 0 | const Status s = sys_catalog_->Upsert(leader_ready_term(), universe); |
3214 | |
|
3215 | 0 | l.CommitOrWarn(s, "updating universe replication info in sys-catalog"); |
3216 | 0 | } |
3217 | | |
3218 | | Status CatalogManager::ValidateTableSchema( |
3219 | | const std::shared_ptr<client::YBTableInfo>& info, |
3220 | | const std::unordered_map<TableId, std::string>& table_bootstrap_ids, |
3221 | 0 | GetTableSchemaResponsePB* resp) { |
3222 | | // Get corresponding table schema on local universe. |
3223 | 0 | GetTableSchemaRequestPB req; |
3224 | |
|
3225 | 0 | auto* table = req.mutable_table(); |
3226 | 0 | table->set_table_name(info->table_name.table_name()); |
3227 | 0 | table->mutable_namespace_()->set_name(info->table_name.namespace_name()); |
3228 | 0 | table->mutable_namespace_()->set_database_type( |
3229 | 0 | GetDatabaseTypeForTable(client::ClientToPBTableType(info->table_type))); |
3230 | | |
3231 | | // Since YSQL tables are not present in table map, we first need to list tables to get the table |
3232 | | // ID and then get table schema. |
3233 | | // Remove this once table maps are fixed for YSQL. |
3234 | 0 | ListTablesRequestPB list_req; |
3235 | 0 | ListTablesResponsePB list_resp; |
3236 | |
|
3237 | 0 | list_req.set_name_filter(info->table_name.table_name()); |
3238 | 0 | Status status = ListTables(&list_req, &list_resp); |
3239 | 0 | if (!status.ok() || list_resp.has_error()) { |
3240 | 0 | return STATUS(NotFound, Substitute("Error while listing table: $0", status.ToString())); |
3241 | 0 | } |
3242 | | |
3243 | | // TODO: This does not work for situation where tables in different YSQL schemas have the same |
3244 | | // name. This will be fixed as part of #1476. |
3245 | 0 | for (const auto& t : list_resp.tables()) { |
3246 | 0 | if (t.name() == info->table_name.table_name() && |
3247 | 0 | t.namespace_().name() == info->table_name.namespace_name()) { |
3248 | 0 | table->set_table_id(t.id()); |
3249 | 0 | break; |
3250 | 0 | } |
3251 | 0 | } |
3252 | |
|
3253 | 0 | if (!table->has_table_id()) { |
3254 | 0 | return STATUS(NotFound, |
3255 | 0 | Substitute("Could not find matching table for $0", info->table_name.ToString())); |
3256 | 0 | } |
3257 | | |
3258 | | // We have a table match. Now get the table schema and validate. |
3259 | 0 | status = GetTableSchema(&req, resp); |
3260 | 0 | if (!status.ok() || resp->has_error()) { |
3261 | 0 | return STATUS(NotFound, Substitute("Error while getting table schema: $0", status.ToString())); |
3262 | 0 | } |
3263 | | |
3264 | 0 | auto result = info->schema.EquivalentForDataCopy(resp->schema()); |
3265 | 0 | if (!result.ok() || !*result) { |
3266 | 0 | return STATUS(IllegalState, |
3267 | 0 | Substitute("Source and target schemas don't match: " |
3268 | 0 | "Source: $0, Target: $1, Source schema: $2, Target schema: $3", |
3269 | 0 | info->table_id, resp->identifier().table_id(), |
3270 | 0 | info->schema.ToString(), resp->schema().DebugString())); |
3271 | 0 | } |
3272 | | |
3273 | | // Still need to make map of table id to resp table id (to add to validated map) |
3274 | | // For colocated tables, only add the parent table since we only added the parent table to the |
3275 | | // original pb (we use the number of tables in the pb to determine when validation is done). |
3276 | 0 | if (info->colocated) { |
3277 | | // For now we require that colocated tables have the same table oid. |
3278 | 0 | auto source_oid = CHECK_RESULT(GetPgsqlTableOid(info->table_id)); |
3279 | 0 | auto target_oid = CHECK_RESULT(GetPgsqlTableOid(resp->identifier().table_id())); |
3280 | 0 | if (source_oid != target_oid) { |
3281 | 0 | return STATUS(IllegalState, |
3282 | 0 | Substitute("Source and target table oids don't match for colocated table: " |
3283 | 0 | "Source: $0, Target: $1, Source table oid: $2, Target table oid: $3", |
3284 | 0 | info->table_id, resp->identifier().table_id(), source_oid, target_oid)); |
3285 | 0 | } |
3286 | 0 | } |
3287 | | |
3288 | 0 | return Status::OK(); |
3289 | 0 | } |
3290 | | |
3291 | | Status CatalogManager::AddValidatedTableAndCreateCdcStreams( |
3292 | | scoped_refptr<UniverseReplicationInfo> universe, |
3293 | | const std::unordered_map<TableId, std::string>& table_bootstrap_ids, |
3294 | | const TableId& producer_table, |
3295 | 0 | const TableId& consumer_table) { |
3296 | 0 | auto l = universe->LockForWrite(); |
3297 | 0 | auto master_addresses = l->pb.producer_master_addresses(); |
3298 | |
|
3299 | 0 | auto res = universe->GetOrCreateCDCRpcTasks(master_addresses); |
3300 | 0 | if (!res.ok()) { |
3301 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED); |
3302 | 0 | const Status s = sys_catalog_->Upsert(leader_ready_term(), universe); |
3303 | 0 | if (!s.ok()) { |
3304 | 0 | return CheckStatus(s, "updating universe replication info in sys-catalog"); |
3305 | 0 | } |
3306 | 0 | l.Commit(); |
3307 | 0 | return STATUS(InternalError, |
3308 | 0 | Substitute("Error while setting up client for producer $0", universe->id())); |
3309 | 0 | } |
3310 | 0 | std::shared_ptr<CDCRpcTasks> cdc_rpc = *res; |
3311 | 0 | vector<TableId> validated_tables; |
3312 | |
|
3313 | 0 | if (l->is_deleted_or_failed()) { |
3314 | | // Nothing to do since universe is being deleted. |
3315 | 0 | return STATUS(Aborted, "Universe is being deleted"); |
3316 | 0 | } |
3317 | | |
3318 | 0 | auto map = l.mutable_data()->pb.mutable_validated_tables(); |
3319 | 0 | (*map)[producer_table] = consumer_table; |
3320 | | |
3321 | | // Now, all tables are validated. |
3322 | 0 | if (l.mutable_data()->pb.validated_tables_size() == l.mutable_data()->pb.tables_size()) { |
3323 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::VALIDATED); |
3324 | 0 | auto tbl_iter = l->pb.tables(); |
3325 | 0 | validated_tables.insert(validated_tables.begin(), tbl_iter.begin(), tbl_iter.end()); |
3326 | 0 | } |
3327 | | |
3328 | | // TODO: end of config validation should be where SetupUniverseReplication exits back to user |
3329 | 0 | LOG(INFO) << "UpdateItem in AddValidatedTable"; |
3330 | | |
3331 | | // Update sys_catalog. |
3332 | 0 | Status status = sys_catalog_->Upsert(leader_ready_term(), universe); |
3333 | 0 | if (!status.ok()) { |
3334 | 0 | LOG(ERROR) << "Error during UpdateItem: " << status; |
3335 | 0 | return CheckStatus(status, "updating universe replication info in sys-catalog"); |
3336 | 0 | } |
3337 | 0 | l.Commit(); |
3338 | | |
3339 | | // Create CDC stream for each validated table, after persisting the replication state change. |
3340 | 0 | if (!validated_tables.empty()) { |
3341 | 0 | std::unordered_map<std::string, std::string> options; |
3342 | 0 | options.reserve(4); |
3343 | 0 | options.emplace(cdc::kRecordType, CDCRecordType_Name(cdc::CDCRecordType::CHANGE)); |
3344 | 0 | options.emplace(cdc::kRecordFormat, CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL)); |
3345 | 0 | options.emplace(cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER)); |
3346 | 0 | options.emplace(cdc::kCheckpointType, CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT)); |
3347 | |
|
3348 | 0 | for (const auto& table : validated_tables) { |
3349 | 0 | string producer_bootstrap_id; |
3350 | 0 | auto it = table_bootstrap_ids.find(table); |
3351 | 0 | if (it != table_bootstrap_ids.end()) { |
3352 | 0 | producer_bootstrap_id = it->second; |
3353 | 0 | } |
3354 | 0 | if (!producer_bootstrap_id.empty()) { |
3355 | 0 | auto table_id = std::make_shared<TableId>(); |
3356 | 0 | auto stream_options = std::make_shared<std::unordered_map<std::string, std::string>>(); |
3357 | 0 | cdc_rpc->client()->GetCDCStream(producer_bootstrap_id, table_id, stream_options, |
3358 | 0 | std::bind(&enterprise::CatalogManager::GetCDCStreamCallback, this, |
3359 | 0 | producer_bootstrap_id, table_id, stream_options, universe->id(), table, cdc_rpc, |
3360 | 0 | std::placeholders::_1)); |
3361 | 0 | } else { |
3362 | 0 | cdc_rpc->client()->CreateCDCStream( |
3363 | 0 | table, options, |
3364 | 0 | std::bind(&enterprise::CatalogManager::AddCDCStreamToUniverseAndInitConsumer, this, |
3365 | 0 | universe->id(), table, std::placeholders::_1, nullptr /* on_success_cb */)); |
3366 | 0 | } |
3367 | 0 | } |
3368 | 0 | } |
3369 | 0 | return Status::OK(); |
3370 | 0 | } |
3371 | | |
3372 | | void CatalogManager::GetTableSchemaCallback( |
3373 | | const std::string& universe_id, const std::shared_ptr<client::YBTableInfo>& info, |
3374 | 0 | const std::unordered_map<TableId, std::string>& table_bootstrap_ids, const Status& s) { |
3375 | | // First get the universe. |
3376 | 0 | scoped_refptr<UniverseReplicationInfo> universe; |
3377 | 0 | { |
3378 | 0 | SharedLock lock(mutex_); |
3379 | 0 | TRACE("Acquired catalog manager lock"); |
3380 | |
|
3381 | 0 | universe = FindPtrOrNull(universe_replication_map_, universe_id); |
3382 | 0 | if (universe == nullptr) { |
3383 | 0 | LOG(ERROR) << "Universe not found: " << universe_id; |
3384 | 0 | return; |
3385 | 0 | } |
3386 | 0 | } |
3387 | | |
3388 | 0 | if (!s.ok()) { |
3389 | 0 | MarkUniverseReplicationFailed(universe, s); |
3390 | 0 | LOG(ERROR) << "Error getting schema for table " << info->table_id << ": " << s; |
3391 | 0 | return; |
3392 | 0 | } |
3393 | | |
3394 | | // Validate the table schema. |
3395 | 0 | GetTableSchemaResponsePB resp; |
3396 | 0 | Status status = ValidateTableSchema(info, table_bootstrap_ids, &resp); |
3397 | 0 | if (!status.ok()) { |
3398 | 0 | MarkUniverseReplicationFailed(universe, status); |
3399 | 0 | LOG(ERROR) << "Found error while validating table schema for table " << info->table_id |
3400 | 0 | << ": " << status; |
3401 | 0 | return; |
3402 | 0 | } |
3403 | | |
3404 | 0 | status = AddValidatedTableAndCreateCdcStreams(universe, |
3405 | 0 | table_bootstrap_ids, |
3406 | 0 | info->table_id, |
3407 | 0 | resp.identifier().table_id()); |
3408 | 0 | if (!status.ok()) { |
3409 | 0 | LOG(ERROR) << "Found error while adding validated table to system catalog: " << info->table_id |
3410 | 0 | << ": " << status; |
3411 | 0 | return; |
3412 | 0 | } |
3413 | 0 | } |
3414 | | |
3415 | | void CatalogManager::GetTablegroupSchemaCallback( |
3416 | | const std::string& universe_id, |
3417 | | const std::shared_ptr<std::vector<client::YBTableInfo>>& infos, |
3418 | | const TablegroupId& producer_tablegroup_id, |
3419 | | const std::unordered_map<TableId, std::string>& table_bootstrap_ids, |
3420 | 0 | const Status& s) { |
3421 | | // First get the universe. |
3422 | 0 | scoped_refptr<UniverseReplicationInfo> universe; |
3423 | 0 | { |
3424 | 0 | SharedLock lock(mutex_); |
3425 | 0 | TRACE("Acquired catalog manager lock"); |
3426 | |
|
3427 | 0 | universe = FindPtrOrNull(universe_replication_map_, universe_id); |
3428 | 0 | if (universe == nullptr) { |
3429 | 0 | LOG(ERROR) << "Universe not found: " << universe_id; |
3430 | 0 | return; |
3431 | 0 | } |
3432 | 0 | } |
3433 | | |
3434 | 0 | if (!s.ok()) { |
3435 | 0 | MarkUniverseReplicationFailed(universe, s); |
3436 | 0 | std::ostringstream oss; |
3437 | 0 | for (size_t i = 0; i < infos->size(); ++i) { |
3438 | 0 | oss << ((i == 0) ? "" : ", ") << (*infos)[i].table_id; |
3439 | 0 | } |
3440 | 0 | LOG(ERROR) << "Error getting schema for tables: [ " << oss.str() << " ]: " << s; |
3441 | 0 | return; |
3442 | 0 | } |
3443 | | |
3444 | 0 | if (infos->empty()) { |
3445 | 0 | LOG(WARNING) << "Received empty list of tables to validate: " << s; |
3446 | 0 | return; |
3447 | 0 | } |
3448 | | |
3449 | | // validated_consumer_tables contains the table IDs corresponding to that |
3450 | | // from the producer tables. |
3451 | 0 | std::unordered_set<TableId> validated_consumer_tables; |
3452 | 0 | for (const auto& info : *infos) { |
3453 | | // Validate each of the member table in the tablegroup. |
3454 | 0 | GetTableSchemaResponsePB resp; |
3455 | 0 | Status table_status = ValidateTableSchema(std::make_shared<client::YBTableInfo>(info), |
3456 | 0 | table_bootstrap_ids, |
3457 | 0 | &resp); |
3458 | |
|
3459 | 0 | if (!table_status.ok()) { |
3460 | 0 | MarkUniverseReplicationFailed(universe, table_status); |
3461 | 0 | LOG(ERROR) << "Found error while validating table schema for table " << info.table_id |
3462 | 0 | << ": " << table_status; |
3463 | 0 | return; |
3464 | 0 | } |
3465 | | |
3466 | 0 | validated_consumer_tables.insert(resp.identifier().table_id()); |
3467 | 0 | } |
3468 | | |
3469 | | // Get the consumer tablegroup ID. Since this call is expensive (one needs to reverse lookup |
3470 | | // the tablegroup ID from table ID), we only do this call once and do validation afterward. |
3471 | 0 | TablegroupId consumer_tablegroup_id; |
3472 | 0 | { |
3473 | 0 | const auto& result = FindTablegroupByTableId(*validated_consumer_tables.begin()); |
3474 | 0 | if (!result.has_value()) { |
3475 | 0 | std::string message = |
3476 | 0 | Format("No consumer tablegroup found for producer tablegroup: $0", |
3477 | 0 | producer_tablegroup_id); |
3478 | 0 | MarkUniverseReplicationFailed(universe, STATUS(IllegalState, message)); |
3479 | 0 | LOG(ERROR) << message; |
3480 | 0 | return; |
3481 | 0 | } |
3482 | 0 | consumer_tablegroup_id = result.value(); |
3483 | 0 | } |
3484 | | |
3485 | | // tables_in_consumer_tablegroup are the tables listed within the consumer_tablegroup_id. |
3486 | | // We need validated_consumer_tables and tables_in_consumer_tablegroup to be identical. |
3487 | 0 | std::unordered_set<TableId> tables_in_consumer_tablegroup; |
3488 | 0 | { |
3489 | 0 | GetTablegroupSchemaRequestPB req; |
3490 | 0 | GetTablegroupSchemaResponsePB resp; |
3491 | 0 | req.mutable_parent_tablegroup()->set_id(consumer_tablegroup_id); |
3492 | 0 | Status status = GetTablegroupSchema(&req, &resp); |
3493 | 0 | if (!status.ok() || resp.has_error()) { |
3494 | 0 | std::string message = Format("Error when getting consumer tablegroup schema: $0", |
3495 | 0 | consumer_tablegroup_id); |
3496 | 0 | MarkUniverseReplicationFailed(universe, STATUS(IllegalState, message)); |
3497 | 0 | LOG(ERROR) << message; |
3498 | 0 | return; |
3499 | 0 | } |
3500 | | |
3501 | 0 | for (const auto& info : resp.get_table_schema_response_pbs()) { |
3502 | 0 | tables_in_consumer_tablegroup.insert(info.identifier().table_id()); |
3503 | 0 | } |
3504 | 0 | } |
3505 | |
|
3506 | 0 | if (validated_consumer_tables != tables_in_consumer_tablegroup) { |
3507 | 0 | std::ostringstream validated_tables_oss; |
3508 | 0 | for (auto it = validated_consumer_tables.begin(); |
3509 | 0 | it != validated_consumer_tables.end(); it++) { |
3510 | 0 | validated_tables_oss << (it == validated_consumer_tables.begin() ? "" : ",") << *it; |
3511 | 0 | } |
3512 | 0 | std::ostringstream consumer_tables_oss; |
3513 | 0 | for (auto it = tables_in_consumer_tablegroup.begin(); |
3514 | 0 | it != tables_in_consumer_tablegroup.end(); it++) { |
3515 | 0 | consumer_tables_oss << (it == tables_in_consumer_tablegroup.begin() ? "" : ",") << *it; |
3516 | 0 | } |
3517 | |
|
3518 | 0 | std::string message = |
3519 | 0 | Format("Mismatch between tables associated with producer tablegroup $0 and " |
3520 | 0 | "tables in consumer tablegroup $1: ($2) vs ($3).", |
3521 | 0 | producer_tablegroup_id, consumer_tablegroup_id, |
3522 | 0 | validated_tables_oss.str(), consumer_tables_oss.str()); |
3523 | 0 | MarkUniverseReplicationFailed(universe, STATUS(IllegalState, message)); |
3524 | 0 | LOG(ERROR) << message; |
3525 | 0 | return; |
3526 | 0 | } |
3527 | | |
3528 | 0 | Status status = AddValidatedTableAndCreateCdcStreams(universe, |
3529 | 0 | table_bootstrap_ids, |
3530 | 0 | producer_tablegroup_id, |
3531 | 0 | consumer_tablegroup_id); |
3532 | 0 | if (!status.ok()) { |
3533 | 0 | LOG(ERROR) << "Found error while adding validated table to system catalog: " |
3534 | 0 | << producer_tablegroup_id << ": " << status; |
3535 | 0 | return; |
3536 | 0 | } |
3537 | 0 | } |
3538 | | |
3539 | | void CatalogManager::GetColocatedTabletSchemaCallback( |
3540 | | const std::string& universe_id, const std::shared_ptr<std::vector<client::YBTableInfo>>& infos, |
3541 | 0 | const std::unordered_map<TableId, std::string>& table_bootstrap_ids, const Status& s) { |
3542 | | // First get the universe. |
3543 | 0 | scoped_refptr<UniverseReplicationInfo> universe; |
3544 | 0 | { |
3545 | 0 | SharedLock lock(mutex_); |
3546 | 0 | TRACE("Acquired catalog manager lock"); |
3547 | |
|
3548 | 0 | universe = FindPtrOrNull(universe_replication_map_, universe_id); |
3549 | 0 | if (universe == nullptr) { |
3550 | 0 | LOG(ERROR) << "Universe not found: " << universe_id; |
3551 | 0 | return; |
3552 | 0 | } |
3553 | 0 | } |
3554 | | |
3555 | 0 | if (!s.ok()) { |
3556 | 0 | MarkUniverseReplicationFailed(universe, s); |
3557 | 0 | std::ostringstream oss; |
3558 | 0 | for (size_t i = 0; i < infos->size(); ++i) { |
3559 | 0 | oss << ((i == 0) ? "" : ", ") << (*infos)[i].table_id; |
3560 | 0 | } |
3561 | 0 | LOG(ERROR) << "Error getting schema for tables: [ " << oss.str() << " ]: " << s; |
3562 | 0 | return; |
3563 | 0 | } |
3564 | | |
3565 | 0 | if (infos->empty()) { |
3566 | 0 | LOG(WARNING) << "Received empty list of tables to validate: " << s; |
3567 | 0 | return; |
3568 | 0 | } |
3569 | | |
3570 | | // Validate table schemas. |
3571 | 0 | std::unordered_set<TableId> producer_parent_table_ids; |
3572 | 0 | std::unordered_set<TableId> consumer_parent_table_ids; |
3573 | 0 | for (const auto& info : *infos) { |
3574 | | // Verify that we have a colocated table. |
3575 | 0 | if (!info.colocated) { |
3576 | 0 | MarkUniverseReplicationFailed(universe, |
3577 | 0 | STATUS(InvalidArgument, Substitute("Received non-colocated table: $0", info.table_id))); |
3578 | 0 | LOG(ERROR) << "Received non-colocated table: " << info.table_id; |
3579 | 0 | return; |
3580 | 0 | } |
3581 | | // Validate each table, and get the parent colocated table id for the consumer. |
3582 | 0 | GetTableSchemaResponsePB resp; |
3583 | 0 | Status table_status = ValidateTableSchema(std::make_shared<client::YBTableInfo>(info), |
3584 | 0 | table_bootstrap_ids, |
3585 | 0 | &resp); |
3586 | 0 | if (!table_status.ok()) { |
3587 | 0 | MarkUniverseReplicationFailed(universe, table_status); |
3588 | 0 | LOG(ERROR) << "Found error while validating table schema for table " << info.table_id |
3589 | 0 | << ": " << table_status; |
3590 | 0 | return; |
3591 | 0 | } |
3592 | | // Store the parent table ids. |
3593 | 0 | producer_parent_table_ids.insert( |
3594 | 0 | info.table_name.namespace_id() + kColocatedParentTableIdSuffix); |
3595 | 0 | consumer_parent_table_ids.insert( |
3596 | 0 | resp.identifier().namespace_().id() + kColocatedParentTableIdSuffix); |
3597 | 0 | } |
3598 | | |
3599 | | // Verify that we only found one producer and one consumer colocated parent table id. |
3600 | 0 | if (producer_parent_table_ids.size() != 1) { |
3601 | 0 | std::ostringstream oss; |
3602 | 0 | for (auto it = producer_parent_table_ids.begin(); it != producer_parent_table_ids.end(); ++it) { |
3603 | 0 | oss << ((it == producer_parent_table_ids.begin()) ? "" : ", ") << *it; |
3604 | 0 | } |
3605 | 0 | MarkUniverseReplicationFailed(universe, STATUS(InvalidArgument, |
3606 | 0 | Substitute("Found incorrect number of producer colocated parent table ids. " |
3607 | 0 | "Expected 1, but found: [ $0 ]", oss.str()))); |
3608 | 0 | LOG(ERROR) << "Found incorrect number of producer colocated parent table ids. " |
3609 | 0 | << "Expected 1, but found: [ " << oss.str() << " ]"; |
3610 | 0 | return; |
3611 | 0 | } |
3612 | 0 | if (consumer_parent_table_ids.size() != 1) { |
3613 | 0 | std::ostringstream oss; |
3614 | 0 | for (auto it = consumer_parent_table_ids.begin(); it != consumer_parent_table_ids.end(); ++it) { |
3615 | 0 | oss << ((it == consumer_parent_table_ids.begin()) ? "" : ", ") << *it; |
3616 | 0 | } |
3617 | 0 | MarkUniverseReplicationFailed(universe, STATUS(InvalidArgument, |
3618 | 0 | Substitute("Found incorrect number of consumer colocated parent table ids. " |
3619 | 0 | "Expected 1, but found: [ $0 ]", oss.str()))); |
3620 | 0 | LOG(ERROR) << "Found incorrect number of consumer colocated parent table ids. " |
3621 | 0 | << "Expected 1, but found: [ " << oss.str() << " ]"; |
3622 | 0 | return; |
3623 | 0 | } |
3624 | | |
3625 | 0 | Status status = AddValidatedTableAndCreateCdcStreams(universe, |
3626 | 0 | table_bootstrap_ids, |
3627 | 0 | *producer_parent_table_ids.begin(), |
3628 | 0 | *consumer_parent_table_ids.begin()); |
3629 | 0 | if (!status.ok()) { |
3630 | 0 | LOG(ERROR) << "Found error while adding validated table to system catalog: " |
3631 | 0 | << *producer_parent_table_ids.begin() << ": " << status; |
3632 | 0 | return; |
3633 | 0 | } |
3634 | 0 | } |
3635 | | |
3636 | | void CatalogManager::GetCDCStreamCallback( |
3637 | | const CDCStreamId& bootstrap_id, |
3638 | | std::shared_ptr<TableId> table_id, |
3639 | | std::shared_ptr<std::unordered_map<std::string, std::string>> options, |
3640 | | const std::string& universe_id, |
3641 | | const TableId& table, |
3642 | | std::shared_ptr<CDCRpcTasks> cdc_rpc, |
3643 | 0 | const Status& s) { |
3644 | 0 | if (!s.ok()) { |
3645 | 0 | LOG(ERROR) << "Unable to find bootstrap id " << bootstrap_id; |
3646 | 0 | AddCDCStreamToUniverseAndInitConsumer(universe_id, table, s); |
3647 | 0 | } else { |
3648 | 0 | if (*table_id != table) { |
3649 | 0 | const Status invalid_bootstrap_id_status = STATUS_FORMAT( |
3650 | 0 | InvalidArgument, "Invalid bootstrap id for table $0. Bootstrap id $1 belongs to table $2", |
3651 | 0 | table, bootstrap_id, *table_id); |
3652 | 0 | LOG(ERROR) << invalid_bootstrap_id_status; |
3653 | 0 | AddCDCStreamToUniverseAndInitConsumer(universe_id, table, invalid_bootstrap_id_status); |
3654 | 0 | return; |
3655 | 0 | } |
3656 | | // todo check options |
3657 | 0 | AddCDCStreamToUniverseAndInitConsumer(universe_id, table, bootstrap_id, [&] () { |
3658 | | // Extra callback on universe setup success - update the producer to let it know that |
3659 | | // the bootstrapping is complete. |
3660 | 0 | SysCDCStreamEntryPB new_entry; |
3661 | 0 | new_entry.add_table_id(*table_id); |
3662 | 0 | new_entry.mutable_options()->Reserve(narrow_cast<int>(options->size())); |
3663 | 0 | for (const auto& option : *options) { |
3664 | 0 | auto new_option = new_entry.add_options(); |
3665 | 0 | new_option->set_key(option.first); |
3666 | 0 | new_option->set_value(option.second); |
3667 | 0 | } |
3668 | 0 | new_entry.set_state(master::SysCDCStreamEntryPB::ACTIVE); |
3669 | |
|
3670 | 0 | WARN_NOT_OK(cdc_rpc->client()->UpdateCDCStream(bootstrap_id, new_entry), |
3671 | 0 | "Unable to update CDC stream options"); |
3672 | 0 | }); |
3673 | 0 | } |
3674 | 0 | } |
3675 | | |
3676 | | void CatalogManager::AddCDCStreamToUniverseAndInitConsumer( |
3677 | | const std::string& universe_id, const TableId& table_id, const Result<CDCStreamId>& stream_id, |
3678 | 0 | std::function<void()> on_success_cb) { |
3679 | 0 | scoped_refptr<UniverseReplicationInfo> universe; |
3680 | 0 | { |
3681 | 0 | SharedLock lock(mutex_); |
3682 | 0 | TRACE("Acquired catalog manager lock"); |
3683 | |
|
3684 | 0 | universe = FindPtrOrNull(universe_replication_map_, universe_id); |
3685 | 0 | if (universe == nullptr) { |
3686 | 0 | LOG(ERROR) << "Universe not found: " << universe_id; |
3687 | 0 | return; |
3688 | 0 | } |
3689 | 0 | } |
3690 | | |
3691 | 0 | if (!stream_id.ok()) { |
3692 | 0 | LOG(ERROR) << "Error setting up CDC stream for table " << table_id; |
3693 | 0 | MarkUniverseReplicationFailed(universe, ResultToStatus(stream_id)); |
3694 | 0 | return; |
3695 | 0 | } |
3696 | | |
3697 | 0 | bool merge_alter = false; |
3698 | 0 | bool validated_all_tables = false; |
3699 | 0 | { |
3700 | 0 | auto l = universe->LockForWrite(); |
3701 | 0 | if (l->is_deleted_or_failed()) { |
3702 | | // Nothing to do if universe is being deleted. |
3703 | 0 | return; |
3704 | 0 | } |
3705 | | |
3706 | 0 | auto map = l.mutable_data()->pb.mutable_table_streams(); |
3707 | 0 | (*map)[table_id] = *stream_id; |
3708 | | |
3709 | | // This functions as a barrier: waiting for the last RPC call from GetTableSchemaCallback. |
3710 | 0 | if (l.mutable_data()->pb.table_streams_size() == l->pb.tables_size()) { |
3711 | | // All tables successfully validated! Register CDC consumers & start replication. |
3712 | 0 | validated_all_tables = true; |
3713 | 0 | LOG(INFO) << "Registering CDC consumers for universe " << universe->id(); |
3714 | |
|
3715 | 0 | auto& validated_tables = l->pb.validated_tables(); |
3716 | |
|
3717 | 0 | std::vector<CDCConsumerStreamInfo> consumer_info; |
3718 | 0 | consumer_info.reserve(l->pb.tables_size()); |
3719 | 0 | for (const auto& table : validated_tables) { |
3720 | 0 | CDCConsumerStreamInfo info; |
3721 | 0 | info.producer_table_id = table.first; |
3722 | 0 | info.consumer_table_id = table.second; |
3723 | 0 | info.stream_id = (*map)[info.producer_table_id]; |
3724 | 0 | consumer_info.push_back(info); |
3725 | 0 | } |
3726 | |
|
3727 | 0 | std::vector<HostPort> hp; |
3728 | 0 | HostPortsFromPBs(l->pb.producer_master_addresses(), &hp); |
3729 | |
|
3730 | 0 | auto cdc_rpc_tasks_result = |
3731 | 0 | universe->GetOrCreateCDCRpcTasks(l->pb.producer_master_addresses()); |
3732 | 0 | if (!cdc_rpc_tasks_result.ok()) { |
3733 | 0 | LOG(WARNING) << "CDC streams won't be created: " << cdc_rpc_tasks_result; |
3734 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED); |
3735 | 0 | } else { |
3736 | 0 | auto cdc_rpc_tasks = *cdc_rpc_tasks_result; |
3737 | 0 | Status s = InitCDCConsumer( |
3738 | 0 | consumer_info, HostPort::ToCommaSeparatedString(hp), l->pb.producer_id(), |
3739 | 0 | cdc_rpc_tasks); |
3740 | 0 | if (!s.ok()) { |
3741 | 0 | LOG(ERROR) << "Error registering subscriber: " << s; |
3742 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED); |
3743 | 0 | } else { |
3744 | 0 | GStringPiece original_producer_id(universe->id()); |
3745 | 0 | if (original_producer_id.ends_with(".ALTER")) { |
3746 | | // Don't enable ALTER universes, merge them into the main universe instead. |
3747 | 0 | merge_alter = true; |
3748 | 0 | } else { |
3749 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::ACTIVE); |
3750 | 0 | } |
3751 | 0 | } |
3752 | 0 | } |
3753 | 0 | } |
3754 | | |
3755 | | // Update sys_catalog with new producer table id info. |
3756 | 0 | Status status = sys_catalog_->Upsert(leader_ready_term(), universe); |
3757 | | |
3758 | | // Before committing, run any callbacks on success. |
3759 | 0 | if (status.ok() && on_success_cb && |
3760 | 0 | (l.mutable_data()->pb.state() == SysUniverseReplicationEntryPB::ACTIVE || merge_alter)) { |
3761 | 0 | on_success_cb(); |
3762 | 0 | } |
3763 | |
|
3764 | 0 | l.CommitOrWarn(status, "updating universe replication info in sys-catalog"); |
3765 | 0 | } |
3766 | |
|
3767 | 0 | if (validated_all_tables) { |
3768 | | // Also update the set of consumer tables. |
3769 | 0 | LockGuard lock(mutex_); |
3770 | 0 | auto l = universe->LockForRead(); |
3771 | 0 | for (const auto& table : l->pb.validated_tables()) { |
3772 | 0 | xcluster_consumer_tables_to_stream_map_[table.second].emplace(universe->id(), *stream_id); |
3773 | 0 | } |
3774 | 0 | } |
3775 | | |
3776 | | // If this is an 'alter', merge back into primary command now that setup is a success. |
3777 | 0 | if (merge_alter) { |
3778 | 0 | MergeUniverseReplication(universe); |
3779 | 0 | } |
3780 | 0 | } |
3781 | | |
3782 | | /* |
3783 | | * UpdateXClusterConsumerOnTabletSplit updates the consumer -> producer tablet mapping after a local |
3784 | | * tablet split. |
3785 | | */ |
3786 | | Status CatalogManager::UpdateXClusterConsumerOnTabletSplit( |
3787 | | const TableId& consumer_table_id, |
3788 | 42 | const SplitTabletIds& split_tablet_ids) { |
3789 | | // Check if this table is consuming a stream. |
3790 | 42 | XClusterConsumerTableStreamInfoMap stream_infos = |
3791 | 42 | GetXClusterStreamInfoForConsumerTable(consumer_table_id); |
3792 | | |
3793 | 42 | if (stream_infos.empty()) { |
3794 | 42 | return Status::OK(); |
3795 | 42 | } |
3796 | | |
3797 | 0 | auto l = cluster_config_->LockForWrite(); |
3798 | 0 | for (const auto& stream_info : stream_infos) { |
3799 | 0 | std::string universe_id = stream_info.first; |
3800 | 0 | CDCStreamId stream_id = stream_info.second; |
3801 | | // Fetch the stream entry so we can update the mappings. |
3802 | 0 | auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
3803 | 0 | auto producer_entry = FindOrNull(*producer_map, universe_id); |
3804 | 0 | if (!producer_entry) { |
3805 | 0 | return STATUS_FORMAT(NotFound, |
3806 | 0 | "Unable to find the producer entry for universe $0", |
3807 | 0 | universe_id); |
3808 | 0 | } |
3809 | 0 | auto stream_entry = FindOrNull(*producer_entry->mutable_stream_map(), stream_id); |
3810 | 0 | if (!stream_entry) { |
3811 | 0 | return STATUS_FORMAT(NotFound, |
3812 | 0 | "Unable to find the stream entry for universe $0, stream $1", |
3813 | 0 | universe_id, |
3814 | 0 | stream_id); |
3815 | 0 | } |
3816 | | |
3817 | 0 | RETURN_NOT_OK(UpdateTableMappingOnTabletSplit(stream_entry, split_tablet_ids)); |
3818 | | |
3819 | | // We also need to mark this stream as no longer being able to perform 1-1 mappings. |
3820 | 0 | stream_entry->set_same_num_producer_consumer_tablets(false); |
3821 | 0 | } |
3822 | | |
3823 | | // Also bump the cluster_config_ version so that changes are propagated to tservers. |
3824 | 0 | l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1); |
3825 | |
|
3826 | 0 | RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config_), |
3827 | 0 | "Updating cluster config in sys-catalog")); |
3828 | 0 | l.Commit(); |
3829 | |
|
3830 | 0 | return Status::OK(); |
3831 | 0 | } |
3832 | | |
3833 | | Status CatalogManager::UpdateXClusterProducerOnTabletSplit( |
3834 | | const TableId& producer_table_id, |
3835 | 42 | const SplitTabletIds& split_tablet_ids) { |
3836 | | // First check if this table has any streams associated with it. |
3837 | 42 | auto streams = FindCDCStreamsForTable(producer_table_id); |
3838 | | |
3839 | 42 | if (!streams.empty()) { |
3840 | | // For each stream, need to add in the children entries to the cdc_state table. |
3841 | 0 | client::TableHandle cdc_table; |
3842 | 0 | const client::YBTableName cdc_state_table_name( |
3843 | 0 | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
3844 | 0 | auto ybclient = master_->cdc_state_client_initializer().client(); |
3845 | 0 | if (!ybclient) { |
3846 | 0 | return STATUS(IllegalState, "Client not initialized or shutting down"); |
3847 | 0 | } |
3848 | 0 | RETURN_NOT_OK(cdc_table.Open(cdc_state_table_name, ybclient)); |
3849 | 0 | std::shared_ptr<client::YBSession> session = ybclient->NewSession(); |
3850 | |
|
3851 | 0 | for (const auto& stream : streams) { |
3852 | 0 | for (const auto& child_tablet_id : |
3853 | 0 | {split_tablet_ids.children.first, split_tablet_ids.children.second}) { |
3854 | 0 | const auto insert_op = cdc_table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
3855 | 0 | auto* insert_req = insert_op->mutable_request(); |
3856 | 0 | QLAddStringHashValue(insert_req, child_tablet_id); |
3857 | 0 | QLAddStringRangeValue(insert_req, stream->id()); |
3858 | | // TODO(JHE) set the checkpoint to a different value? OpId of the SPLIT_OP itself perhaps? |
3859 | 0 | cdc_table.AddStringColumnValue(insert_req, master::kCdcCheckpoint, OpId().ToString()); |
3860 | | // TODO(JHE) what to set the time to? |
3861 | | // cdc_table.AddTimestampColumnValue( |
3862 | | // insert_req, master::kCdcLastReplicationTime, GetCurrentTimeMicros()); |
3863 | 0 | session->Apply(insert_op); |
3864 | 0 | } |
3865 | 0 | } |
3866 | 0 | RETURN_NOT_OK(session->Flush()); |
3867 | 0 | } |
3868 | | |
3869 | 42 | return Status::OK(); |
3870 | 42 | } |
3871 | | |
3872 | | Status CatalogManager::InitCDCConsumer( |
3873 | | const std::vector<CDCConsumerStreamInfo>& consumer_info, |
3874 | | const std::string& master_addrs, |
3875 | | const std::string& producer_universe_uuid, |
3876 | 0 | std::shared_ptr<CDCRpcTasks> cdc_rpc_tasks) { |
3877 | |
|
3878 | 0 | std::unordered_set<HostPort, HostPortHash> tserver_addrs; |
3879 | | // Get the tablets in the consumer table. |
3880 | 0 | cdc::ProducerEntryPB producer_entry; |
3881 | 0 | for (const auto& stream_info : consumer_info) { |
3882 | 0 | GetTableLocationsRequestPB consumer_table_req; |
3883 | 0 | consumer_table_req.set_max_returned_locations(std::numeric_limits<int32_t>::max()); |
3884 | 0 | GetTableLocationsResponsePB consumer_table_resp; |
3885 | 0 | TableIdentifierPB table_identifer; |
3886 | 0 | table_identifer.set_table_id(stream_info.consumer_table_id); |
3887 | 0 | *(consumer_table_req.mutable_table()) = table_identifer; |
3888 | 0 | RETURN_NOT_OK(GetTableLocations(&consumer_table_req, &consumer_table_resp)); |
3889 | |
|
3890 | 0 | cdc::StreamEntryPB stream_entry; |
3891 | | // Get producer tablets and map them to the consumer tablets |
3892 | 0 | RETURN_NOT_OK(CreateTabletMapping( |
3893 | 0 | stream_info.producer_table_id, stream_info.consumer_table_id, producer_universe_uuid, |
3894 | 0 | master_addrs, consumer_table_resp, &tserver_addrs, &stream_entry, cdc_rpc_tasks)); |
3895 | 0 | (*producer_entry.mutable_stream_map())[stream_info.stream_id] = std::move(stream_entry); |
3896 | 0 | } |
3897 | | |
3898 | | // Log the Network topology of the Producer Cluster |
3899 | 0 | auto master_addrs_list = StringSplit(master_addrs, ','); |
3900 | 0 | producer_entry.mutable_master_addrs()->Reserve(narrow_cast<int>(master_addrs_list.size())); |
3901 | 0 | for (const auto& addr : master_addrs_list) { |
3902 | 0 | auto hp = VERIFY_RESULT(HostPort::FromString(addr, 0)); |
3903 | 0 | HostPortToPB(hp, producer_entry.add_master_addrs()); |
3904 | 0 | } |
3905 | |
|
3906 | 0 | producer_entry.mutable_tserver_addrs()->Reserve(narrow_cast<int>(tserver_addrs.size())); |
3907 | 0 | for (const auto& addr : tserver_addrs) { |
3908 | 0 | HostPortToPB(addr, producer_entry.add_tserver_addrs()); |
3909 | 0 | } |
3910 | |
|
3911 | 0 | auto l = cluster_config_->LockForWrite(); |
3912 | 0 | auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
3913 | 0 | auto it = producer_map->find(producer_universe_uuid); |
3914 | 0 | if (it != producer_map->end()) { |
3915 | 0 | return STATUS(InvalidArgument, "Already created a consumer for this universe"); |
3916 | 0 | } |
3917 | | |
3918 | | // TServers will use the ClusterConfig to create CDC Consumers for applicable local tablets. |
3919 | 0 | (*producer_map)[producer_universe_uuid] = std::move(producer_entry); |
3920 | 0 | l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1); |
3921 | 0 | RETURN_NOT_OK(CheckStatus( |
3922 | 0 | sys_catalog_->Upsert(leader_ready_term(), cluster_config_), |
3923 | 0 | "updating cluster config in sys-catalog")); |
3924 | 0 | l.Commit(); |
3925 | |
|
3926 | 0 | return Status::OK(); |
3927 | 0 | } |
3928 | | |
3929 | 0 | void CatalogManager::MergeUniverseReplication(scoped_refptr<UniverseReplicationInfo> universe) { |
3930 | | // Merge back into primary command now that setup is a success. |
3931 | 0 | GStringPiece original_producer_id(universe->id()); |
3932 | 0 | if (!original_producer_id.ends_with(".ALTER")) { |
3933 | 0 | return; |
3934 | 0 | } |
3935 | 0 | original_producer_id.remove_suffix(sizeof(".ALTER")-1 /* exclude \0 ending */); |
3936 | 0 | LOG(INFO) << "Merging CDC universe: " << universe->id() |
3937 | 0 | << " into " << original_producer_id.ToString(); |
3938 | |
|
3939 | 0 | scoped_refptr<UniverseReplicationInfo> original_universe; |
3940 | 0 | { |
3941 | 0 | SharedLock lock(mutex_); |
3942 | 0 | TRACE("Acquired catalog manager lock"); |
3943 | |
|
3944 | 0 | original_universe = FindPtrOrNull(universe_replication_map_, original_producer_id.ToString()); |
3945 | 0 | if (original_universe == nullptr) { |
3946 | 0 | LOG(ERROR) << "Universe not found: " << original_producer_id.ToString(); |
3947 | 0 | return; |
3948 | 0 | } |
3949 | 0 | } |
3950 | | // Merge Cluster Config for TServers. |
3951 | 0 | { |
3952 | 0 | auto cl = cluster_config_->LockForWrite(); |
3953 | 0 | auto pm = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
3954 | 0 | auto original_producer_entry = pm->find(original_universe->id()); |
3955 | 0 | auto alter_producer_entry = pm->find(universe->id()); |
3956 | 0 | if (original_producer_entry != pm->end() && alter_producer_entry != pm->end()) { |
3957 | | // Merge the Tables from the Alter into the original. |
3958 | 0 | auto as = alter_producer_entry->second.stream_map(); |
3959 | 0 | original_producer_entry->second.mutable_stream_map()->insert(as.begin(), as.end()); |
3960 | | // Delete the Alter |
3961 | 0 | pm->erase(alter_producer_entry); |
3962 | 0 | } else { |
3963 | 0 | LOG(WARNING) << "Could not find both universes in Cluster Config: " << universe->id(); |
3964 | 0 | } |
3965 | 0 | cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1); |
3966 | 0 | const Status s = sys_catalog_->Upsert(leader_ready_term(), cluster_config_); |
3967 | 0 | cl.CommitOrWarn(s, "updating cluster config in sys-catalog"); |
3968 | 0 | } |
3969 | | // Merge Master Config on Consumer. (no need for Producer changes, since it uses stream_id) |
3970 | 0 | { |
3971 | 0 | auto original_lock = original_universe->LockForWrite(); |
3972 | 0 | auto alter_lock = universe->LockForWrite(); |
3973 | | // Merge Table->StreamID mapping. |
3974 | 0 | auto at = alter_lock.mutable_data()->pb.mutable_tables(); |
3975 | 0 | original_lock.mutable_data()->pb.mutable_tables()->MergeFrom(*at); |
3976 | 0 | at->Clear(); |
3977 | 0 | auto as = alter_lock.mutable_data()->pb.mutable_table_streams(); |
3978 | 0 | original_lock.mutable_data()->pb.mutable_table_streams()->insert(as->begin(), as->end()); |
3979 | 0 | as->clear(); |
3980 | 0 | auto av = alter_lock.mutable_data()->pb.mutable_validated_tables(); |
3981 | 0 | original_lock.mutable_data()->pb.mutable_validated_tables()->insert(av->begin(), av->end()); |
3982 | 0 | av->clear(); |
3983 | 0 | alter_lock.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DELETED); |
3984 | |
|
3985 | 0 | vector<UniverseReplicationInfo*> universes{original_universe.get(), universe.get()}; |
3986 | 0 | const Status s = sys_catalog_->Upsert(leader_ready_term(), universes); |
3987 | 0 | alter_lock.CommitOrWarn(s, "updating universe replication entries in sys-catalog"); |
3988 | 0 | if (s.ok()) { |
3989 | 0 | original_lock.Commit(); |
3990 | 0 | } |
3991 | 0 | } |
3992 | | // TODO: universe_replication_map_.erase(universe->id()) at a later time. |
3993 | | // TwoDCTest.AlterUniverseReplicationTables crashes due to undiagnosed race right now. |
3994 | 0 | LOG(INFO) << "Done with Merging " << universe->id() << " into " << original_universe->id(); |
3995 | 0 | } |
3996 | | |
3997 | | Status ReturnErrorOrAddWarning(const Status& s, |
3998 | | const DeleteUniverseReplicationRequestPB* req, |
3999 | 0 | DeleteUniverseReplicationResponsePB* resp) { |
4000 | 0 | if (!s.ok()) { |
4001 | 0 | if (req->ignore_errors()) { |
4002 | | // Continue executing, save the status as a warning. |
4003 | 0 | AppStatusPB* warning = resp->add_warnings(); |
4004 | 0 | StatusToPB(s, warning); |
4005 | 0 | return Status::OK(); |
4006 | 0 | } |
4007 | 0 | return s.CloneAndAppend("\nUse 'ignore-errors' to ignore this error."); |
4008 | 0 | } |
4009 | 0 | return s; |
4010 | 0 | } |
4011 | | |
4012 | | Status CatalogManager::DeleteUniverseReplication(const DeleteUniverseReplicationRequestPB* req, |
4013 | | DeleteUniverseReplicationResponsePB* resp, |
4014 | 0 | rpc::RpcContext* rpc) { |
4015 | 0 | LOG(INFO) << "Servicing DeleteUniverseReplication request from " << RequestorString(rpc) |
4016 | 0 | << ": " << req->ShortDebugString(); |
4017 | |
|
4018 | 0 | if (!req->has_producer_id()) { |
4019 | 0 | return STATUS(InvalidArgument, "Producer universe ID required", req->ShortDebugString(), |
4020 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
4021 | 0 | } |
4022 | | |
4023 | 0 | scoped_refptr<UniverseReplicationInfo> ri; |
4024 | 0 | { |
4025 | 0 | SharedLock lock(mutex_); |
4026 | 0 | TRACE("Acquired catalog manager lock"); |
4027 | |
|
4028 | 0 | ri = FindPtrOrNull(universe_replication_map_, req->producer_id()); |
4029 | 0 | if (ri == nullptr) { |
4030 | 0 | return STATUS(NotFound, "Universe replication info does not exist", |
4031 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
4032 | 0 | } |
4033 | 0 | } |
4034 | | |
4035 | 0 | auto l = ri->LockForWrite(); |
4036 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DELETED); |
4037 | | |
4038 | | // Delete subscribers on the Consumer Registry (removes from TServers). |
4039 | 0 | LOG(INFO) << "Deleting subscribers for producer " << req->producer_id(); |
4040 | 0 | { |
4041 | 0 | auto cl = cluster_config_->LockForWrite(); |
4042 | 0 | auto producer_map = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
4043 | 0 | auto it = producer_map->find(req->producer_id()); |
4044 | 0 | if (it != producer_map->end()) { |
4045 | 0 | producer_map->erase(it); |
4046 | 0 | cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1); |
4047 | 0 | RETURN_NOT_OK(CheckStatus( |
4048 | 0 | sys_catalog_->Upsert(leader_ready_term(), cluster_config_), |
4049 | 0 | "updating cluster config in sys-catalog")); |
4050 | 0 | cl.Commit(); |
4051 | 0 | } |
4052 | 0 | } |
4053 | | |
4054 | | // Delete CDC stream config on the Producer. |
4055 | 0 | if (!l->pb.table_streams().empty()) { |
4056 | 0 | auto result = ri->GetOrCreateCDCRpcTasks(l->pb.producer_master_addresses()); |
4057 | 0 | if (!result.ok()) { |
4058 | 0 | LOG(WARNING) << "Unable to create cdc rpc task. CDC streams won't be deleted: " << result; |
4059 | 0 | } else { |
4060 | 0 | auto cdc_rpc = *result; |
4061 | 0 | vector<CDCStreamId> streams; |
4062 | 0 | std::unordered_map<CDCStreamId, TableId> stream_to_producer_table_id; |
4063 | 0 | for (const auto& table : l->pb.table_streams()) { |
4064 | 0 | streams.push_back(table.second); |
4065 | 0 | stream_to_producer_table_id.emplace(table.second, table.first); |
4066 | 0 | } |
4067 | |
|
4068 | 0 | DeleteCDCStreamResponsePB delete_cdc_stream_resp; |
4069 | | // Set force_delete=true since we are deleting active xCluster streams. |
4070 | 0 | auto s = cdc_rpc->client()->DeleteCDCStream(streams, |
4071 | 0 | true, /* force_delete */ |
4072 | 0 | req->ignore_errors(), |
4073 | 0 | &delete_cdc_stream_resp); |
4074 | |
|
4075 | 0 | if (delete_cdc_stream_resp.not_found_stream_ids().size() > 0) { |
4076 | 0 | std::ostringstream missing_streams; |
4077 | 0 | for (auto it = delete_cdc_stream_resp.not_found_stream_ids().begin(); |
4078 | 0 | it != delete_cdc_stream_resp.not_found_stream_ids().end(); |
4079 | 0 | ++it) { |
4080 | 0 | if (it != delete_cdc_stream_resp.not_found_stream_ids().begin()) { |
4081 | 0 | missing_streams << ","; |
4082 | 0 | } |
4083 | 0 | missing_streams << *it << " (table_id: " << stream_to_producer_table_id[*it] << ")"; |
4084 | 0 | } |
4085 | 0 | if (s.ok()) { |
4086 | | // Returned but did not find some streams, so still need to warn the user about those. |
4087 | 0 | s = STATUS(NotFound, |
4088 | 0 | "Could not find the following streams: [" + missing_streams.str() + "]."); |
4089 | 0 | } else { |
4090 | 0 | s = s.CloneAndPrepend( |
4091 | 0 | "Could not find the following streams: [" + missing_streams.str() + "]."); |
4092 | 0 | } |
4093 | 0 | } |
4094 | |
|
4095 | 0 | RETURN_NOT_OK(ReturnErrorOrAddWarning(s, req, resp)); |
4096 | 0 | } |
4097 | 0 | } |
4098 | | |
4099 | | // Delete universe in the Universe Config. |
4100 | 0 | RETURN_NOT_OK(ReturnErrorOrAddWarning(DeleteUniverseReplicationUnlocked(ri), req, resp)); |
4101 | 0 | l.Commit(); |
4102 | |
|
4103 | 0 | LOG(INFO) << "Processed delete universe replication " << ri->ToString() |
4104 | 0 | << " per request from " << RequestorString(rpc); |
4105 | |
|
4106 | 0 | return Status::OK(); |
4107 | 0 | } |
4108 | | |
4109 | | Status CatalogManager::DeleteUniverseReplicationUnlocked( |
4110 | 0 | scoped_refptr<UniverseReplicationInfo> universe) { |
4111 | | // Assumes that caller has locked universe. |
4112 | 0 | RETURN_NOT_OK_PREPEND( |
4113 | 0 | sys_catalog_->Delete(leader_ready_term(), universe), |
4114 | 0 | Substitute("An error occurred while updating sys-catalog, universe_id: $0", universe->id())); |
4115 | | |
4116 | | // Remove it from the map. |
4117 | 0 | LockGuard lock(mutex_); |
4118 | 0 | if (universe_replication_map_.erase(universe->id()) < 1) { |
4119 | 0 | LOG(WARNING) << "Failed to remove replication info from map: universe_id: " << universe->id(); |
4120 | 0 | } |
4121 | | // Also update the mapping of consumer tables. |
4122 | 0 | for (const auto& table : universe->metadata().state().pb.validated_tables()) { |
4123 | 0 | if (xcluster_consumer_tables_to_stream_map_[table.second].erase(universe->id()) < 1) { |
4124 | 0 | LOG(WARNING) << "Failed to remove consumer table from mapping. " |
4125 | 0 | << "table_id: " << table.second << ": universe_id: " << universe->id(); |
4126 | 0 | } |
4127 | 0 | if (xcluster_consumer_tables_to_stream_map_[table.second].empty()) { |
4128 | 0 | xcluster_consumer_tables_to_stream_map_.erase(table.second); |
4129 | 0 | } |
4130 | 0 | } |
4131 | 0 | return Status::OK(); |
4132 | 0 | } |
4133 | | |
4134 | | Status CatalogManager::SetUniverseReplicationEnabled( |
4135 | | const SetUniverseReplicationEnabledRequestPB* req, |
4136 | | SetUniverseReplicationEnabledResponsePB* resp, |
4137 | 0 | rpc::RpcContext* rpc) { |
4138 | 0 | LOG(INFO) << "Servicing SetUniverseReplicationEnabled request from " << RequestorString(rpc) |
4139 | 0 | << ": " << req->ShortDebugString(); |
4140 | | |
4141 | | // Sanity Checking Cluster State and Input. |
4142 | 0 | if (!req->has_producer_id()) { |
4143 | 0 | return STATUS(InvalidArgument, "Producer universe ID must be provided", |
4144 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4145 | 0 | } |
4146 | 0 | if (!req->has_is_enabled()) { |
4147 | 0 | return STATUS(InvalidArgument, "Must explicitly set whether to enable", |
4148 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4149 | 0 | } |
4150 | | |
4151 | 0 | scoped_refptr<UniverseReplicationInfo> universe; |
4152 | 0 | { |
4153 | 0 | SharedLock lock(mutex_); |
4154 | |
|
4155 | 0 | universe = FindPtrOrNull(universe_replication_map_, req->producer_id()); |
4156 | 0 | if (universe == nullptr) { |
4157 | 0 | return STATUS(NotFound, "Could not find CDC producer universe", |
4158 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
4159 | 0 | } |
4160 | 0 | } |
4161 | | |
4162 | | // Update the Master's Universe Config with the new state. |
4163 | 0 | { |
4164 | 0 | auto l = universe->LockForWrite(); |
4165 | 0 | if (l->pb.state() != SysUniverseReplicationEntryPB::DISABLED && |
4166 | 0 | l->pb.state() != SysUniverseReplicationEntryPB::ACTIVE) { |
4167 | 0 | return STATUS( |
4168 | 0 | InvalidArgument, |
4169 | 0 | Format("Universe Replication in invalid state: $0. Retry or Delete.", |
4170 | 0 | SysUniverseReplicationEntryPB::State_Name(l->pb.state())), |
4171 | 0 | req->ShortDebugString(), |
4172 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
4173 | 0 | } |
4174 | 0 | if (req->is_enabled()) { |
4175 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::ACTIVE); |
4176 | 0 | } else { // DISABLE. |
4177 | 0 | l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DISABLED); |
4178 | 0 | } |
4179 | 0 | RETURN_NOT_OK(CheckStatus( |
4180 | 0 | sys_catalog_->Upsert(leader_ready_term(), universe), |
4181 | 0 | "updating universe replication info in sys-catalog")); |
4182 | 0 | l.Commit(); |
4183 | 0 | } |
4184 | | |
4185 | | // Modify the Consumer Registry, which will fan out this info to all TServers on heartbeat. |
4186 | 0 | { |
4187 | 0 | auto l = cluster_config_->LockForWrite(); |
4188 | 0 | auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
4189 | 0 | auto it = producer_map->find(req->producer_id()); |
4190 | 0 | if (it == producer_map->end()) { |
4191 | 0 | LOG(WARNING) << "Valid Producer Universe not in Consumer Registry: " << req->producer_id(); |
4192 | 0 | return STATUS(NotFound, "Could not find CDC producer universe", |
4193 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
4194 | 0 | } |
4195 | 0 | (*it).second.set_disable_stream(!req->is_enabled()); |
4196 | 0 | l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1); |
4197 | 0 | RETURN_NOT_OK(CheckStatus( |
4198 | 0 | sys_catalog_->Upsert(leader_ready_term(), cluster_config_), |
4199 | 0 | "updating cluster config in sys-catalog")); |
4200 | 0 | l.Commit(); |
4201 | 0 | } |
4202 | |
|
4203 | 0 | return Status::OK(); |
4204 | 0 | } |
4205 | | |
4206 | | Status CatalogManager::AlterUniverseReplication(const AlterUniverseReplicationRequestPB* req, |
4207 | | AlterUniverseReplicationResponsePB* resp, |
4208 | 0 | rpc::RpcContext* rpc) { |
4209 | 0 | LOG(INFO) << "Servicing AlterUniverseReplication request from " << RequestorString(rpc) |
4210 | 0 | << ": " << req->ShortDebugString(); |
4211 | | |
4212 | | // Sanity Checking Cluster State and Input. |
4213 | 0 | if (!req->has_producer_id()) { |
4214 | 0 | return STATUS(InvalidArgument, "Producer universe ID must be provided", |
4215 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4216 | 0 | } |
4217 | | |
4218 | | // Verify that there is an existing Universe config |
4219 | 0 | scoped_refptr<UniverseReplicationInfo> original_ri; |
4220 | 0 | { |
4221 | 0 | SharedLock lock(mutex_); |
4222 | |
|
4223 | 0 | original_ri = FindPtrOrNull(universe_replication_map_, req->producer_id()); |
4224 | 0 | if (original_ri == nullptr) { |
4225 | 0 | return STATUS(NotFound, "Could not find CDC producer universe", |
4226 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
4227 | 0 | } |
4228 | 0 | } |
4229 | | |
4230 | | // Currently, config options are mutually exclusive to simplify transactionality. |
4231 | 0 | int config_count = (req->producer_master_addresses_size() > 0 ? 1 : 0) + |
4232 | 0 | (req->producer_table_ids_to_remove_size() > 0 ? 1 : 0) + |
4233 | 0 | (req->producer_table_ids_to_add_size() > 0 ? 1 : 0) + |
4234 | 0 | (req->has_new_producer_universe_id() ? 1 : 0); |
4235 | 0 | if (config_count != 1) { |
4236 | 0 | return STATUS(InvalidArgument, "Only 1 Alter operation per request currently supported", |
4237 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4238 | 0 | } |
4239 | | |
4240 | | // Config logic... |
4241 | 0 | if (req->producer_master_addresses_size() > 0) { |
4242 | | // 'set_master_addresses' |
4243 | | // TODO: Verify the input. Setup an RPC Task, ListTables, ensure same. |
4244 | | |
4245 | | // 1a. Persistent Config: Update the Universe Config for Master. |
4246 | 0 | { |
4247 | 0 | auto l = original_ri->LockForWrite(); |
4248 | 0 | l.mutable_data()->pb.mutable_producer_master_addresses()->CopyFrom( |
4249 | 0 | req->producer_master_addresses()); |
4250 | 0 | RETURN_NOT_OK(CheckStatus( |
4251 | 0 | sys_catalog_->Upsert(leader_ready_term(), original_ri), |
4252 | 0 | "updating universe replication info in sys-catalog")); |
4253 | 0 | l.Commit(); |
4254 | 0 | } |
4255 | | // 1b. Persistent Config: Update the Consumer Registry (updates TServers) |
4256 | 0 | { |
4257 | 0 | auto l = cluster_config_->LockForWrite(); |
4258 | 0 | auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
4259 | 0 | auto it = producer_map->find(req->producer_id()); |
4260 | 0 | if (it == producer_map->end()) { |
4261 | 0 | LOG(WARNING) << "Valid Producer Universe not in Consumer Registry: " << req->producer_id(); |
4262 | 0 | return STATUS(NotFound, "Could not find CDC producer universe", |
4263 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
4264 | 0 | } |
4265 | 0 | (*it).second.mutable_master_addrs()->CopyFrom(req->producer_master_addresses()); |
4266 | 0 | l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1); |
4267 | 0 | RETURN_NOT_OK(CheckStatus( |
4268 | 0 | sys_catalog_->Upsert(leader_ready_term(), cluster_config_), |
4269 | 0 | "updating cluster config in sys-catalog")); |
4270 | 0 | l.Commit(); |
4271 | 0 | } |
4272 | | // 2. Memory Update: Change cdc_rpc_tasks (Master cache) |
4273 | 0 | { |
4274 | 0 | auto result = original_ri->GetOrCreateCDCRpcTasks(req->producer_master_addresses()); |
4275 | 0 | if (!result.ok()) { |
4276 | 0 | return SetupError(resp->mutable_error(), MasterErrorPB::INTERNAL_ERROR, result.status()); |
4277 | 0 | } |
4278 | 0 | } |
4279 | 0 | } else if (req->producer_table_ids_to_remove_size() > 0) { |
4280 | | // 'remove_table' |
4281 | 0 | auto it = req->producer_table_ids_to_remove(); |
4282 | 0 | std::set<string> table_ids_to_remove(it.begin(), it.end()); |
4283 | | // Filter out any tables that aren't in the existing replication config. |
4284 | 0 | { |
4285 | 0 | auto l = original_ri->LockForRead(); |
4286 | 0 | auto tbl_iter = l->pb.tables(); |
4287 | 0 | std::set<string> existing_tables(tbl_iter.begin(), tbl_iter.end()), filtered_list; |
4288 | 0 | set_intersection(table_ids_to_remove.begin(), table_ids_to_remove.end(), |
4289 | 0 | existing_tables.begin(), existing_tables.end(), |
4290 | 0 | std::inserter(filtered_list, filtered_list.begin())); |
4291 | 0 | filtered_list.swap(table_ids_to_remove); |
4292 | 0 | } |
4293 | |
|
4294 | 0 | vector<CDCStreamId> streams_to_remove; |
4295 | | // 1. Update the Consumer Registry (removes from TServers). |
4296 | 0 | { |
4297 | 0 | auto cl = cluster_config_->LockForWrite(); |
4298 | 0 | auto pm = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
4299 | 0 | auto producer_entry = pm->find(req->producer_id()); |
4300 | 0 | if (producer_entry != pm->end()) { |
4301 | | // Remove the Tables Specified (not part of the key). |
4302 | 0 | auto stream_map = producer_entry->second.mutable_stream_map(); |
4303 | 0 | for (auto& p : *stream_map) { |
4304 | 0 | if (table_ids_to_remove.count(p.second.producer_table_id()) > 0) { |
4305 | 0 | streams_to_remove.push_back(p.first); |
4306 | 0 | } |
4307 | 0 | } |
4308 | 0 | if (streams_to_remove.size() == stream_map->size()) { |
4309 | | // If this ends with an empty Map, disallow and force user to delete. |
4310 | 0 | LOG(WARNING) << "CDC 'remove_table' tried to remove all tables." << req->producer_id(); |
4311 | 0 | return STATUS( |
4312 | 0 | InvalidArgument, |
4313 | 0 | "Cannot remove all tables with alter. Use delete_universe_replication instead.", |
4314 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4315 | 0 | } else if (streams_to_remove.empty()) { |
4316 | | // If this doesn't delete anything, notify the user. |
4317 | 0 | return STATUS(InvalidArgument, "Removal matched no entries.", |
4318 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4319 | 0 | } |
4320 | 0 | for (auto& key : streams_to_remove) { |
4321 | 0 | stream_map->erase(stream_map->find(key)); |
4322 | 0 | } |
4323 | 0 | } |
4324 | 0 | cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1); |
4325 | 0 | RETURN_NOT_OK(CheckStatus( |
4326 | 0 | sys_catalog_->Upsert(leader_ready_term(), cluster_config_), |
4327 | 0 | "updating cluster config in sys-catalog")); |
4328 | 0 | cl.Commit(); |
4329 | 0 | } |
4330 | | // 2. Remove from Master Configs on Producer and Consumer. |
4331 | 0 | { |
4332 | 0 | auto l = original_ri->LockForWrite(); |
4333 | 0 | if (!l->pb.table_streams().empty()) { |
4334 | | // Delete Relevant Table->StreamID mappings on Consumer. |
4335 | 0 | auto table_streams = l.mutable_data()->pb.mutable_table_streams(); |
4336 | 0 | auto validated_tables = l.mutable_data()->pb.mutable_validated_tables(); |
4337 | 0 | for (auto& key : table_ids_to_remove) { |
4338 | 0 | table_streams->erase(table_streams->find(key)); |
4339 | 0 | validated_tables->erase(validated_tables->find(key)); |
4340 | 0 | } |
4341 | 0 | for (int i = 0; i < l.mutable_data()->pb.tables_size(); i++) { |
4342 | 0 | if (table_ids_to_remove.count(l.mutable_data()->pb.tables(i)) > 0) { |
4343 | 0 | l.mutable_data()->pb.mutable_tables()->DeleteSubrange(i, 1); |
4344 | 0 | --i; |
4345 | 0 | } |
4346 | 0 | } |
4347 | | // Delete CDC stream config on the Producer. |
4348 | 0 | auto result = original_ri->GetOrCreateCDCRpcTasks(l->pb.producer_master_addresses()); |
4349 | 0 | if (!result.ok()) { |
4350 | 0 | LOG(WARNING) << "Unable to create cdc rpc task. CDC streams won't be deleted: " << result; |
4351 | 0 | } else { |
4352 | 0 | auto s = (*result)->client()->DeleteCDCStream(streams_to_remove, |
4353 | 0 | true /* force_delete */); |
4354 | 0 | if (!s.ok()) { |
4355 | 0 | std::stringstream os; |
4356 | 0 | std::copy(streams_to_remove.begin(), streams_to_remove.end(), |
4357 | 0 | std::ostream_iterator<CDCStreamId>(os, ", ")); |
4358 | 0 | LOG(WARNING) << "Unable to delete CDC streams: " << os.str() << s; |
4359 | 0 | } |
4360 | 0 | } |
4361 | 0 | } |
4362 | 0 | RETURN_NOT_OK(CheckStatus( |
4363 | 0 | sys_catalog_->Upsert(leader_ready_term(), original_ri), |
4364 | 0 | "updating universe replication info in sys-catalog")); |
4365 | 0 | l.Commit(); |
4366 | 0 | } |
4367 | 0 | } else if (req->producer_table_ids_to_add_size() > 0) { |
4368 | | // 'add_table' |
4369 | 0 | string alter_producer_id = req->producer_id() + ".ALTER"; |
4370 | | |
4371 | | // If user passed in bootstrap ids, check that there is a bootstrap id for every table. |
4372 | 0 | if (req->producer_bootstrap_ids_to_add().size() > 0 && |
4373 | 0 | req->producer_table_ids_to_add().size() != req->producer_bootstrap_ids_to_add().size()) { |
4374 | 0 | return STATUS(InvalidArgument, "Number of bootstrap ids must be equal to number of tables", |
4375 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4376 | 0 | } |
4377 | | |
4378 | | // Verify no 'alter' command running. |
4379 | 0 | scoped_refptr<UniverseReplicationInfo> alter_ri; |
4380 | 0 | { |
4381 | 0 | SharedLock lock(mutex_); |
4382 | 0 | alter_ri = FindPtrOrNull(universe_replication_map_, alter_producer_id); |
4383 | 0 | } |
4384 | 0 | { |
4385 | 0 | if (alter_ri != nullptr) { |
4386 | 0 | LOG(INFO) << "Found " << alter_producer_id << "... Removing"; |
4387 | 0 | if (alter_ri->LockForRead()->is_deleted_or_failed()) { |
4388 | | // Delete previous Alter if it's completed but failed. |
4389 | 0 | master::DeleteUniverseReplicationRequestPB delete_req; |
4390 | 0 | delete_req.set_producer_id(alter_ri->id()); |
4391 | 0 | master::DeleteUniverseReplicationResponsePB delete_resp; |
4392 | 0 | Status s = DeleteUniverseReplication(&delete_req, &delete_resp, rpc); |
4393 | 0 | if (!s.ok()) { |
4394 | 0 | if (delete_resp.has_error()) { |
4395 | 0 | resp->mutable_error()->Swap(delete_resp.mutable_error()); |
4396 | 0 | return s; |
4397 | 0 | } |
4398 | 0 | return SetupError(resp->mutable_error(), s); |
4399 | 0 | } |
4400 | 0 | } else { |
4401 | 0 | return STATUS(InvalidArgument, "Alter for CDC producer currently running", |
4402 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4403 | 0 | } |
4404 | 0 | } |
4405 | 0 | } |
4406 | | |
4407 | | // Map each table id to its corresponding bootstrap id. |
4408 | 0 | std::unordered_map<TableId, std::string> table_id_to_bootstrap_id; |
4409 | 0 | if (req->producer_bootstrap_ids_to_add().size() > 0) { |
4410 | 0 | for (int i = 0; i < req->producer_table_ids_to_add().size(); i++) { |
4411 | 0 | table_id_to_bootstrap_id[req->producer_table_ids_to_add(i)] |
4412 | 0 | = req->producer_bootstrap_ids_to_add(i); |
4413 | 0 | } |
4414 | | |
4415 | | // Ensure that table ids are unique. We need to do this here even though |
4416 | | // the same check is performed by SetupUniverseReplication because |
4417 | | // duplicate table ids can cause a bootstrap id entry in table_id_to_bootstrap_id |
4418 | | // to be overwritten. |
4419 | 0 | if (table_id_to_bootstrap_id.size() != |
4420 | 0 | implicit_cast<size_t>(req->producer_table_ids_to_add().size())) { |
4421 | 0 | return STATUS(InvalidArgument, "When providing bootstrap ids, " |
4422 | 0 | "the list of tables must be unique", req->ShortDebugString(), |
4423 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
4424 | 0 | } |
4425 | 0 | } |
4426 | | |
4427 | | // Only add new tables. Ignore tables that are currently being replicated. |
4428 | 0 | auto tid_iter = req->producer_table_ids_to_add(); |
4429 | 0 | std::unordered_set<string> new_tables(tid_iter.begin(), tid_iter.end()); |
4430 | 0 | { |
4431 | 0 | auto l = original_ri->LockForRead(); |
4432 | 0 | for(auto t : l->pb.tables()) { |
4433 | 0 | auto pos = new_tables.find(t); |
4434 | 0 | if (pos != new_tables.end()) { |
4435 | 0 | new_tables.erase(pos); |
4436 | 0 | } |
4437 | 0 | } |
4438 | 0 | } |
4439 | 0 | if (new_tables.empty()) { |
4440 | 0 | return STATUS(InvalidArgument, "CDC producer already contains all requested tables", |
4441 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4442 | 0 | } |
4443 | | |
4444 | | // 1. create an ALTER table request that mirrors the original 'setup_replication'. |
4445 | 0 | master::SetupUniverseReplicationRequestPB setup_req; |
4446 | 0 | master::SetupUniverseReplicationResponsePB setup_resp; |
4447 | 0 | setup_req.set_producer_id(alter_producer_id); |
4448 | 0 | setup_req.mutable_producer_master_addresses()->CopyFrom( |
4449 | 0 | original_ri->LockForRead()->pb.producer_master_addresses()); |
4450 | 0 | for (auto t : new_tables) { |
4451 | 0 | setup_req.add_producer_table_ids(t); |
4452 | | |
4453 | | // Add bootstrap id to request if it exists. |
4454 | 0 | auto bootstrap_id_lookup_result = table_id_to_bootstrap_id.find(t); |
4455 | 0 | if (bootstrap_id_lookup_result != table_id_to_bootstrap_id.end()) { |
4456 | 0 | setup_req.add_producer_bootstrap_ids(bootstrap_id_lookup_result->second); |
4457 | 0 | } |
4458 | 0 | } |
4459 | | |
4460 | | // 2. run the 'setup_replication' pipeline on the ALTER Table |
4461 | 0 | Status s = SetupUniverseReplication(&setup_req, &setup_resp, rpc); |
4462 | 0 | if (!s.ok()) { |
4463 | 0 | if (setup_resp.has_error()) { |
4464 | 0 | resp->mutable_error()->Swap(setup_resp.mutable_error()); |
4465 | 0 | return s; |
4466 | 0 | } |
4467 | 0 | return SetupError(resp->mutable_error(), s); |
4468 | 0 | } |
4469 | | // NOTE: ALTER merges back into original after completion. |
4470 | 0 | } else if (req->has_new_producer_universe_id()) { |
4471 | 0 | Status s = RenameUniverseReplication(original_ri, req, resp, rpc); |
4472 | 0 | if (!s.ok()) { |
4473 | 0 | return SetupError(resp->mutable_error(), s); |
4474 | 0 | } |
4475 | 0 | } |
4476 | | |
4477 | 0 | return Status::OK(); |
4478 | 0 | } |
4479 | | |
4480 | | Status CatalogManager::RenameUniverseReplication( |
4481 | | scoped_refptr<UniverseReplicationInfo> universe, |
4482 | | const AlterUniverseReplicationRequestPB* req, |
4483 | | AlterUniverseReplicationResponsePB* resp, |
4484 | 0 | rpc::RpcContext* rpc) { |
4485 | 0 | const string old_universe_replication_id = universe->id(); |
4486 | 0 | const string new_producer_universe_id = req->new_producer_universe_id(); |
4487 | 0 | if (old_universe_replication_id == new_producer_universe_id) { |
4488 | 0 | return STATUS(InvalidArgument, "Old and new replication ids must be different", |
4489 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4490 | 0 | } |
4491 | | |
4492 | 0 | { |
4493 | 0 | LockGuard lock(mutex_); |
4494 | 0 | auto l = universe->LockForWrite(); |
4495 | 0 | scoped_refptr<UniverseReplicationInfo> new_ri; |
4496 | | |
4497 | | // Assert that new_replication_name isn't already in use. |
4498 | 0 | if (FindPtrOrNull(universe_replication_map_, new_producer_universe_id) != nullptr) { |
4499 | 0 | return STATUS(InvalidArgument, "New replication id is already in use", |
4500 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4501 | 0 | } |
4502 | | |
4503 | | // Since the producer_id is used as the key, we need to create a new UniverseReplicationInfo. |
4504 | 0 | new_ri = new UniverseReplicationInfo(new_producer_universe_id); |
4505 | 0 | new_ri->mutable_metadata()->StartMutation(); |
4506 | 0 | SysUniverseReplicationEntryPB *metadata = &new_ri->mutable_metadata()->mutable_dirty()->pb; |
4507 | 0 | metadata->CopyFrom(l->pb); |
4508 | 0 | metadata->set_producer_id(new_producer_universe_id); |
4509 | | |
4510 | | // Also need to update internal maps. |
4511 | 0 | auto cl = cluster_config_->LockForWrite(); |
4512 | 0 | auto producer_map = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
4513 | 0 | (*producer_map)[new_producer_universe_id] = |
4514 | 0 | std::move((*producer_map)[old_universe_replication_id]); |
4515 | 0 | producer_map->erase(old_universe_replication_id); |
4516 | |
|
4517 | 0 | { |
4518 | | // Need both these updates to be atomic. |
4519 | 0 | auto w = sys_catalog_->NewWriter(leader_ready_term()); |
4520 | 0 | RETURN_NOT_OK(w->Mutate(QLWriteRequestPB::QL_STMT_DELETE, universe.get())); |
4521 | 0 | RETURN_NOT_OK(w->Mutate(QLWriteRequestPB::QL_STMT_UPDATE, |
4522 | 0 | new_ri.get(), |
4523 | 0 | cluster_config_.get())); |
4524 | 0 | RETURN_NOT_OK(CheckStatus( |
4525 | 0 | sys_catalog_->SyncWrite(w.get()), |
4526 | 0 | "Updating universe replication info and cluster config in sys-catalog")); |
4527 | 0 | } |
4528 | 0 | new_ri->mutable_metadata()->CommitMutation(); |
4529 | 0 | cl.Commit(); |
4530 | | |
4531 | | // Update universe_replication_map after persistent data is saved. |
4532 | 0 | universe_replication_map_[new_producer_universe_id] = new_ri; |
4533 | 0 | universe_replication_map_.erase(old_universe_replication_id); |
4534 | 0 | } |
4535 | |
|
4536 | 0 | return Status::OK(); |
4537 | 0 | } |
4538 | | |
4539 | | Status CatalogManager::GetUniverseReplication(const GetUniverseReplicationRequestPB* req, |
4540 | | GetUniverseReplicationResponsePB* resp, |
4541 | 0 | rpc::RpcContext* rpc) { |
4542 | 0 | LOG(INFO) << "GetUniverseReplication from " << RequestorString(rpc) |
4543 | 0 | << ": " << req->DebugString(); |
4544 | |
|
4545 | 0 | if (!req->has_producer_id()) { |
4546 | 0 | return STATUS(InvalidArgument, "Producer universe ID must be provided", |
4547 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4548 | 0 | } |
4549 | | |
4550 | 0 | scoped_refptr<UniverseReplicationInfo> universe; |
4551 | 0 | { |
4552 | 0 | SharedLock lock(mutex_); |
4553 | |
|
4554 | 0 | universe = FindPtrOrNull(universe_replication_map_, req->producer_id()); |
4555 | 0 | if (universe == nullptr) { |
4556 | 0 | return STATUS(NotFound, "Could not find CDC producer universe", |
4557 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); |
4558 | 0 | } |
4559 | 0 | } |
4560 | | |
4561 | 0 | resp->mutable_entry()->CopyFrom(universe->LockForRead()->pb); |
4562 | 0 | return Status::OK(); |
4563 | 0 | } |
4564 | | |
4565 | | /* |
4566 | | * Checks if the universe replication setup has completed. |
4567 | | * Returns Status::OK() if this call succeeds, and uses resp->done() to determine if the setup has |
4568 | | * completed (either failed or succeeded). If the setup has failed, then resp->replication_error() |
4569 | | * is also set. |
4570 | | */ |
4571 | | Status CatalogManager::IsSetupUniverseReplicationDone( |
4572 | | const IsSetupUniverseReplicationDoneRequestPB* req, |
4573 | | IsSetupUniverseReplicationDoneResponsePB* resp, |
4574 | 0 | rpc::RpcContext* rpc) { |
4575 | 0 | LOG(INFO) << "IsSetupUniverseReplicationDone from " << RequestorString(rpc) |
4576 | 0 | << ": " << req->DebugString(); |
4577 | |
|
4578 | 0 | if (!req->has_producer_id()) { |
4579 | 0 | return STATUS(InvalidArgument, "Producer universe ID must be provided", |
4580 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4581 | 0 | } |
4582 | | |
4583 | 0 | GetUniverseReplicationRequestPB universe_req; |
4584 | 0 | GetUniverseReplicationResponsePB universe_resp; |
4585 | 0 | universe_req.set_producer_id(req->producer_id()); |
4586 | |
|
4587 | 0 | RETURN_NOT_OK(GetUniverseReplication(&universe_req, &universe_resp, /* RpcContext */ nullptr)); |
4588 | 0 | if (universe_resp.has_error()) { |
4589 | 0 | RETURN_NOT_OK(StatusFromPB(universe_resp.error().status())); |
4590 | 0 | } |
4591 | | |
4592 | | // Two cases for completion: |
4593 | | // - For a regular SetupUniverseReplication, we want to wait for the universe to become ACTIVE. |
4594 | | // - For an AlterUniverseReplication, we need to wait until the .ALTER universe gets merged with |
4595 | | // the main universe - at which point the .ALTER universe is deleted. |
4596 | 0 | bool isAlterRequest = GStringPiece(req->producer_id()).ends_with(".ALTER"); |
4597 | 0 | if ((!isAlterRequest && universe_resp.entry().state() == SysUniverseReplicationEntryPB::ACTIVE) || |
4598 | 0 | (isAlterRequest && universe_resp.entry().state() == SysUniverseReplicationEntryPB::DELETED)) { |
4599 | 0 | resp->set_done(true); |
4600 | 0 | StatusToPB(Status::OK(), resp->mutable_replication_error()); |
4601 | 0 | return Status::OK(); |
4602 | 0 | } |
4603 | | |
4604 | | // Otherwise we have either failed (see MarkUniverseReplicationFailed), or are still working. |
4605 | 0 | if (universe_resp.entry().state() == SysUniverseReplicationEntryPB::DELETED_ERROR || |
4606 | 0 | universe_resp.entry().state() == SysUniverseReplicationEntryPB::FAILED) { |
4607 | 0 | resp->set_done(true); |
4608 | | |
4609 | | // Get the more detailed error. |
4610 | 0 | scoped_refptr<UniverseReplicationInfo> universe; |
4611 | 0 | { |
4612 | 0 | SharedLock lock(mutex_); |
4613 | 0 | universe = FindPtrOrNull(universe_replication_map_, req->producer_id()); |
4614 | 0 | if (universe == nullptr) { |
4615 | 0 | StatusToPB( |
4616 | 0 | STATUS(InternalError, "Could not find CDC producer universe after having failed."), |
4617 | 0 | resp->mutable_replication_error()); |
4618 | 0 | return Status::OK(); |
4619 | 0 | } |
4620 | 0 | } |
4621 | 0 | if (!universe->GetSetupUniverseReplicationErrorStatus().ok()) { |
4622 | 0 | StatusToPB(universe->GetSetupUniverseReplicationErrorStatus(), |
4623 | 0 | resp->mutable_replication_error()); |
4624 | 0 | } else { |
4625 | 0 | LOG(WARNING) << "Did not find setup universe replication error status."; |
4626 | 0 | StatusToPB(STATUS(InternalError, "unknown error"), resp->mutable_replication_error()); |
4627 | 0 | } |
4628 | 0 | return Status::OK(); |
4629 | 0 | } |
4630 | | |
4631 | | // Not done yet. |
4632 | 0 | resp->set_done(false); |
4633 | 0 | return Status::OK(); |
4634 | 0 | } |
4635 | | |
4636 | | Status CatalogManager::UpdateConsumerOnProducerSplit( |
4637 | | const UpdateConsumerOnProducerSplitRequestPB* req, |
4638 | | UpdateConsumerOnProducerSplitResponsePB* resp, |
4639 | 0 | rpc::RpcContext* rpc) { |
4640 | 0 | LOG(INFO) << "UpdateConsumerOnProducerSplit from " << RequestorString(rpc) |
4641 | 0 | << ": " << req->DebugString(); |
4642 | |
|
4643 | 0 | if (!req->has_producer_id()) { |
4644 | 0 | return STATUS(InvalidArgument, "Producer universe ID must be provided", |
4645 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4646 | 0 | } |
4647 | 0 | if (!req->has_stream_id()) { |
4648 | 0 | return STATUS(InvalidArgument, "Stream ID must be provided", |
4649 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4650 | 0 | } |
4651 | 0 | if (!req->has_producer_split_tablet_info()) { |
4652 | 0 | return STATUS(InvalidArgument, "Producer split tablet info must be provided", |
4653 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
4654 | 0 | } |
4655 | | |
4656 | 0 | auto l = cluster_config_->LockForWrite(); |
4657 | 0 | auto producer_map = l.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); |
4658 | 0 | auto producer_entry = FindOrNull(*producer_map, req->producer_id()); |
4659 | 0 | if (!producer_entry) { |
4660 | 0 | return STATUS_FORMAT( |
4661 | 0 | NotFound, "Unable to find the producer entry for universe $0", req->producer_id()); |
4662 | 0 | } |
4663 | 0 | auto stream_entry = FindOrNull(*producer_entry->mutable_stream_map(), req->stream_id()); |
4664 | 0 | if (!stream_entry) { |
4665 | 0 | return STATUS_FORMAT( |
4666 | 0 | NotFound, "Unable to find the stream entry for universe $0, stream $1", |
4667 | 0 | req->producer_id(), req->stream_id()); |
4668 | 0 | } |
4669 | | |
4670 | | // Find the parent tablet in the tablet mapping. |
4671 | 0 | bool found = false; |
4672 | 0 | auto mutable_map = stream_entry->mutable_consumer_producer_tablet_map(); |
4673 | | // Also keep track if we see the split children tablets. |
4674 | 0 | vector<string> split_child_tablet_ids{req->producer_split_tablet_info().new_tablet1_id(), |
4675 | 0 | req->producer_split_tablet_info().new_tablet2_id()}; |
4676 | 0 | for (auto& consumer_tablet_to_producer_tablets : *mutable_map) { |
4677 | 0 | auto& producer_tablets_list = consumer_tablet_to_producer_tablets.second; |
4678 | 0 | auto producer_tablets = producer_tablets_list.mutable_tablets(); |
4679 | 0 | for (auto tablet = producer_tablets->begin(); tablet < producer_tablets->end(); ++tablet) { |
4680 | 0 | if (*tablet == req->producer_split_tablet_info().tablet_id()) { |
4681 | | // Remove the parent tablet id. |
4682 | 0 | producer_tablets->erase(tablet); |
4683 | | // For now we add the children tablets to the same consumer tablet. |
4684 | | // See github issue #10186 for further improvements. |
4685 | 0 | producer_tablets_list.add_tablets(req->producer_split_tablet_info().new_tablet1_id()); |
4686 | 0 | producer_tablets_list.add_tablets(req->producer_split_tablet_info().new_tablet2_id()); |
4687 | | // There should only be one copy of each producer tablet per stream, so can exit early. |
4688 | 0 | found = true; |
4689 | 0 | break; |
4690 | 0 | } |
4691 | | // Check if this is one of the child split tablets. |
4692 | 0 | auto it = std::find(split_child_tablet_ids.begin(), split_child_tablet_ids.end(), *tablet); |
4693 | 0 | if (it != split_child_tablet_ids.end()) { |
4694 | 0 | split_child_tablet_ids.erase(it); |
4695 | 0 | } |
4696 | 0 | } |
4697 | 0 | if (found) { |
4698 | 0 | break; |
4699 | 0 | } |
4700 | 0 | } |
4701 | |
|
4702 | 0 | if (!found) { |
4703 | | // Did not find the source tablet, but did find the children - means that we have already |
4704 | | // processed this SPLIT_OP, so for idempotency, we can return OK. |
4705 | 0 | if (split_child_tablet_ids.empty()) { |
4706 | 0 | LOG(INFO) << "Already processed this tablet split: " << req->DebugString(); |
4707 | 0 | return Status::OK(); |
4708 | 0 | } |
4709 | | |
4710 | | // When there are sequential SPLIT_OPs, we may try to reprocess an older SPLIT_OP. However, if |
4711 | | // one or both of those children have also already been split and processed, then we'll end up |
4712 | | // here (!found && !split_child_tablet_ids.empty()). |
4713 | | // This is alright, we can log a warning, and then continue (to not block later records). |
4714 | 0 | LOG(WARNING) |
4715 | 0 | << "Unable to find matching source tablet " << req->producer_split_tablet_info().tablet_id() |
4716 | 0 | << " for universe " << req->producer_id() << " stream " << req->stream_id(); |
4717 | |
|
4718 | 0 | return Status::OK(); |
4719 | 0 | } |
4720 | | |
4721 | | // Also make sure that we switch off of 1-1 mapping optimizations. |
4722 | 0 | stream_entry->set_same_num_producer_consumer_tablets(false); |
4723 | | |
4724 | | // Also bump the cluster_config_ version so that changes are propagated to tservers (and new |
4725 | | // pollers are created for the new tablets). |
4726 | 0 | l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1); |
4727 | |
|
4728 | 0 | RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config_), |
4729 | 0 | "Updating cluster config in sys-catalog")); |
4730 | 0 | l.Commit(); |
4731 | |
|
4732 | 0 | return Status::OK(); |
4733 | 0 | } |
4734 | | |
4735 | 11.3M | bool CatalogManager::IsTableCdcProducer(const TableInfo& table_info) const { |
4736 | 11.3M | auto it = cdc_stream_tables_count_map_.find(table_info.id()); |
4737 | 11.3M | if (it == cdc_stream_tables_count_map_.end()) { |
4738 | 11.3M | return false; |
4739 | 7 | } else { |
4740 | 7 | auto tid = table_info.id(); |
4741 | 7 | std::vector<scoped_refptr<CDCStreamInfo>> streams; |
4742 | 5 | for (const auto& entry : cdc_stream_map_) { |
4743 | 5 | auto s = entry.second->LockForRead(); |
4744 | | // for xCluster the first entry will be the table_id |
4745 | 5 | if (s->table_id().Get(0) == tid) { |
4746 | 5 | if (!(s->is_deleting() || s->is_deleted())) { |
4747 | 5 | return it->second > 0; |
4748 | 5 | } |
4749 | 5 | } |
4750 | 5 | } |
4751 | 7 | } |
4752 | 2 | return false; |
4753 | 11.3M | } |
4754 | | |
4755 | 11.3M | bool CatalogManager::IsTableCdcConsumer(const TableInfo& table_info) const { |
4756 | 11.3M | auto it = xcluster_consumer_tables_to_stream_map_.find(table_info.id()); |
4757 | 11.3M | if (it == xcluster_consumer_tables_to_stream_map_.end()) { |
4758 | 11.3M | return false; |
4759 | 11.3M | } |
4760 | 0 | return !it->second.empty(); |
4761 | 0 | } |
4762 | | |
4763 | | CatalogManager::XClusterConsumerTableStreamInfoMap |
4764 | 42 | CatalogManager::GetXClusterStreamInfoForConsumerTable(const TableId& table_id) const { |
4765 | 42 | SharedLock lock(mutex_); |
4766 | 42 | auto it = xcluster_consumer_tables_to_stream_map_.find(table_id); |
4767 | 42 | if (it == xcluster_consumer_tables_to_stream_map_.end()) { |
4768 | 42 | return {}; |
4769 | 42 | } |
4770 | | |
4771 | 0 | return it->second; |
4772 | 0 | } |
4773 | | |
4774 | | bool CatalogManager::IsCdcEnabled( |
4775 | 11.3M | const TableInfo& table_info) const { |
4776 | 11.3M | SharedLock lock(mutex_); |
4777 | 11.3M | return IsTableCdcProducer(table_info) || IsTableCdcConsumer(table_info); |
4778 | 11.3M | } |
4779 | | |
4780 | 5.35k | void CatalogManager::Started() { |
4781 | 5.35k | snapshot_coordinator_.Start(); |
4782 | 5.35k | } |
4783 | | |
4784 | | Result<SnapshotSchedulesToObjectIdsMap> CatalogManager::MakeSnapshotSchedulesToObjectIdsMap( |
4785 | 7.99k | SysRowEntryType type) { |
4786 | 7.99k | return snapshot_coordinator_.MakeSnapshotSchedulesToObjectIdsMap(type); |
4787 | 7.99k | } |
4788 | | |
4789 | 0 | Result<bool> CatalogManager::IsTablePartOfSomeSnapshotSchedule(const TableInfo& table_info) { |
4790 | 0 | return snapshot_coordinator_.IsTableCoveredBySomeSnapshotSchedule(table_info); |
4791 | 0 | } |
4792 | | |
4793 | 2.00k | void CatalogManager::SysCatalogLoaded(int64_t term) { |
4794 | 2.00k | return snapshot_coordinator_.SysCatalogLoaded(term); |
4795 | 2.00k | } |
4796 | | |
4797 | 0 | size_t CatalogManager::GetNumLiveTServersForActiveCluster() { |
4798 | 0 | BlacklistSet blacklist = BlacklistSetFromPB(); |
4799 | 0 | TSDescriptorVector ts_descs; |
4800 | 0 | master_->ts_manager()->GetAllLiveDescriptorsInCluster(&ts_descs, placement_uuid(), blacklist); |
4801 | 0 | return ts_descs.size(); |
4802 | 0 | } |
4803 | | |
4804 | | } // namespace enterprise |
4805 | | } // namespace master |
4806 | | } // namespace yb |