/Users/deen/code/yugabyte-db/ent/src/yb/master/restore_sys_catalog_state.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/master/restore_sys_catalog_state.h" |
15 | | |
16 | | #include "yb/common/entity_ids.h" |
17 | | #include "yb/common/hybrid_time.h" |
18 | | #include "yb/common/index.h" |
19 | | |
20 | | #include "yb/common/pgsql_protocol.pb.h" |
21 | | #include "yb/common/ql_expr.h" |
22 | | #include "yb/docdb/consensus_frontier.h" |
23 | | #include "yb/docdb/cql_operation.h" |
24 | | #include "yb/docdb/docdb_rocksdb_util.h" |
25 | | #include "yb/docdb/doc_rowwise_iterator.h" |
26 | | #include "yb/docdb/doc_write_batch.h" |
27 | | #include "yb/docdb/docdb.h" |
28 | | #include "yb/docdb/pgsql_operation.h" |
29 | | #include "yb/docdb/rocksdb_writer.h" |
30 | | |
31 | | #include "yb/master/catalog_loaders.h" |
32 | | #include "yb/master/master_backup.pb.h" |
33 | | #include "yb/master/master_snapshot_coordinator.h" |
34 | | #include "yb/master/master_util.h" |
35 | | #include "yb/master/sys_catalog.h" |
36 | | #include "yb/master/sys_catalog_writer.h" |
37 | | |
38 | | #include "yb/rocksdb/write_batch.h" |
39 | | #include "yb/tablet/tablet.h" |
40 | | #include "yb/util/format.h" |
41 | | #include "yb/util/logging.h" |
42 | | #include "yb/util/pb_util.h" |
43 | | #include "yb/util/status_format.h" |
44 | | |
45 | | using namespace std::placeholders; |
46 | | |
47 | | namespace yb { |
48 | | namespace master { |
49 | | |
50 | | namespace { |
51 | | |
52 | | CHECKED_STATUS ApplyWriteRequest( |
53 | | const Schema& schema, const QLWriteRequestPB& write_request, |
54 | 0 | docdb::DocWriteBatch* write_batch) { |
55 | 0 | std::shared_ptr<const Schema> schema_ptr(&schema, [](const Schema* schema){}); |
56 | 0 | docdb::DocOperationApplyData apply_data{.doc_write_batch = write_batch}; |
57 | 0 | IndexMap index_map; |
58 | 0 | docdb::QLWriteOperation operation( |
59 | 0 | write_request, schema_ptr, index_map, nullptr, TransactionOperationContext()); |
60 | 0 | QLResponsePB response; |
61 | 0 | RETURN_NOT_OK(operation.Init(&response)); |
62 | 0 | return operation.Apply(apply_data); |
63 | 0 | } |
64 | | |
65 | 0 | bool TableDeleted(const SysTablesEntryPB& table) { |
66 | 0 | return table.state() == SysTablesEntryPB::DELETED || |
67 | 0 | table.state() == SysTablesEntryPB::DELETING || |
68 | 0 | table.hide_state() == SysTablesEntryPB::HIDING || |
69 | 0 | table.hide_state() == SysTablesEntryPB::HIDDEN; |
70 | 0 | } |
71 | | |
72 | | Result<bool> MatchNamespace( |
73 | | const SnapshotScheduleFilterPB& filter, const NamespaceId& id, |
74 | 0 | const SysNamespaceEntryPB& ns) { |
75 | 0 | VLOG(1) << __func__ << "(" << id << ", " << ns.ShortDebugString() << ")"; |
76 | 0 | for (const auto& table_identifier : filter.tables().tables()) { |
77 | 0 | if (table_identifier.has_namespace_() && |
78 | 0 | VERIFY_RESULT(master::NamespaceMatchesIdentifier( |
79 | 0 | id, ns.database_type(), ns.name(), table_identifier.namespace_()))) { |
80 | 0 | return true; |
81 | 0 | } |
82 | 0 | } |
83 | 0 | return false; |
84 | 0 | } |
85 | | |
86 | | Result<bool> MatchTable( |
87 | | const SnapshotScheduleFilterPB& filter, const TableId& id, |
88 | 18 | const SysTablesEntryPB& table) { |
89 | 0 | VLOG(1) << __func__ << "(" << id << ", " << table.ShortDebugString() << ")"; |
90 | 18 | for (const auto& table_identifier : filter.tables().tables()) { |
91 | 18 | if (VERIFY_RESULT(master::TableMatchesIdentifier(id, table, table_identifier))) { |
92 | 4 | return true; |
93 | 4 | } |
94 | 18 | } |
95 | 14 | return false; |
96 | 18 | } |
97 | | |
98 | | template <class PB> |
99 | | struct GetEntryType; |
100 | | |
101 | | template<> struct GetEntryType<SysNamespaceEntryPB> |
102 | | : public std::integral_constant<SysRowEntryType, SysRowEntryType::NAMESPACE> {}; |
103 | | |
104 | | template<> struct GetEntryType<SysTablesEntryPB> |
105 | | : public std::integral_constant<SysRowEntryType, SysRowEntryType::TABLE> {}; |
106 | | |
107 | | template<> struct GetEntryType<SysTabletsEntryPB> |
108 | | : public std::integral_constant<SysRowEntryType, SysRowEntryType::TABLET> {}; |
109 | | |
110 | | } // namespace |
111 | | |
112 | | RestoreSysCatalogState::RestoreSysCatalogState(SnapshotScheduleRestoration* restoration) |
113 | 1 | : restoration_(*restoration) {} |
114 | | |
115 | | Result<bool> RestoreSysCatalogState::PatchRestoringEntry( |
116 | 0 | const std::string& id, SysNamespaceEntryPB* pb) { |
117 | 0 | return true; |
118 | 0 | } |
119 | | |
120 | | Result<bool> RestoreSysCatalogState::PatchRestoringEntry( |
121 | 0 | const std::string& id, SysTablesEntryPB* pb) { |
122 | 0 | if (pb->schema().table_properties().is_ysql_catalog_table()) { |
123 | 0 | LOG(INFO) << "PITR: Adding " << pb->name() << " for restoring. ID: " << id; |
124 | 0 | restoration_.system_tables_to_restore.emplace(id, pb->name()); |
125 | |
|
126 | 0 | return false; |
127 | 0 | } |
128 | | |
129 | 0 | auto it = existing_objects_.tables.find(id); |
130 | 0 | if (it == existing_objects_.tables.end()) { |
131 | 0 | return STATUS_FORMAT(NotFound, "Not found restoring table: $0", id); |
132 | 0 | } |
133 | | |
134 | 0 | if (pb->version() != it->second.version()) { |
135 | | // Force schema update after restoration, if schema has changes. |
136 | 0 | pb->set_version(it->second.version() + 1); |
137 | 0 | } |
138 | |
|
139 | 0 | return true; |
140 | 0 | } |
141 | | |
142 | | Result<bool> RestoreSysCatalogState::PatchRestoringEntry( |
143 | 0 | const std::string& id, SysTabletsEntryPB* pb) { |
144 | 0 | return true; |
145 | 0 | } |
146 | | |
147 | | template <class PB> |
148 | | Status RestoreSysCatalogState::AddRestoringEntry( |
149 | 0 | const std::string& id, PB* pb, faststring* buffer) { |
150 | 0 | auto type = GetEntryType<PB>::value; |
151 | 0 | VLOG_WITH_FUNC(1) << SysRowEntryType_Name(type) << ": " << id << ", " << pb->ShortDebugString(); |
152 | |
|
153 | 0 | if (!VERIFY_RESULT(PatchRestoringEntry(id, pb))) { |
154 | 0 | return Status::OK(); |
155 | 0 | } |
156 | 0 | auto& entry = *entries_.mutable_entries()->Add(); |
157 | 0 | entry.set_type(type); |
158 | 0 | entry.set_id(id); |
159 | 0 | pb_util::SerializeToString(*pb, buffer); |
160 | 0 | entry.set_data(buffer->data(), buffer->size()); |
161 | 0 | restoration_.non_system_objects_to_restore.emplace(id, type); |
162 | |
|
163 | 0 | return Status::OK(); |
164 | 0 | } Unexecuted instantiation: _ZN2yb6master22RestoreSysCatalogState17AddRestoringEntryINS0_19SysNamespaceEntryPBEEENS_6StatusERKNSt3__112basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEPT_PNS_10faststringE Unexecuted instantiation: _ZN2yb6master22RestoreSysCatalogState17AddRestoringEntryINS0_16SysTablesEntryPBEEENS_6StatusERKNSt3__112basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEPT_PNS_10faststringE Unexecuted instantiation: _ZN2yb6master22RestoreSysCatalogState17AddRestoringEntryINS0_17SysTabletsEntryPBEEENS_6StatusERKNSt3__112basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEPT_PNS_10faststringE |
165 | | |
166 | 0 | Status RestoreSysCatalogState::Process() { |
167 | 0 | VLOG_WITH_FUNC(1) << "Restoring: " << restoring_objects_.SizesToString() << ", existing: " |
168 | 0 | << existing_objects_.SizesToString(); |
169 | |
|
170 | 0 | VLOG_WITH_FUNC(2) << "Check restoring objects"; |
171 | 0 | VLOG_WITH_FUNC(4) << "Restoring namespaces: " << AsString(restoring_objects_.namespaces); |
172 | 0 | faststring buffer; |
173 | 0 | RETURN_NOT_OK_PREPEND(DetermineEntries( |
174 | 0 | &restoring_objects_, nullptr, |
175 | 0 | [this, &buffer](const auto& id, auto* pb) { |
176 | 0 | return AddRestoringEntry(id, pb, &buffer); |
177 | 0 | }), "Determine restoring entries failed"); |
178 | |
|
179 | 0 | VLOG_WITH_FUNC(2) << "Check existing objects"; |
180 | 0 | RETURN_NOT_OK_PREPEND(DetermineEntries( |
181 | 0 | &existing_objects_, &retained_existing_tables_, |
182 | 0 | [this](const auto& id, auto* pb) { |
183 | 0 | return CheckExistingEntry(id, *pb); |
184 | 0 | }), "Determine obsolete entries failed"); |
185 | | |
186 | | // Sort generated vectors, so binary search could be used to check whether object is obsolete. |
187 | 0 | auto compare_by_first = [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }; Unexecuted instantiation: restore_sys_catalog_state.cc:_ZZN2yb6master22RestoreSysCatalogState7ProcessEvENK3$_2clINSt3__14pairINS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEENS0_17SysTabletsEntryPBEEESD_EEDaRKT_RKT0_ Unexecuted instantiation: restore_sys_catalog_state.cc:_ZZN2yb6master22RestoreSysCatalogState7ProcessEvENK3$_2clINSt3__14pairINS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEENS0_16SysTablesEntryPBEEESD_EEDaRKT_RKT0_ |
188 | 0 | std::sort(restoration_.non_system_obsolete_tablets.begin(), |
189 | 0 | restoration_.non_system_obsolete_tablets.end(), |
190 | 0 | compare_by_first); |
191 | 0 | std::sort(restoration_.non_system_obsolete_tables.begin(), |
192 | 0 | restoration_.non_system_obsolete_tables.end(), |
193 | 0 | compare_by_first); |
194 | |
|
195 | 0 | return Status::OK(); |
196 | 0 | } |
197 | | |
198 | | template <class ProcessEntry> |
199 | | Status RestoreSysCatalogState::DetermineEntries( |
200 | | Objects* objects, RetainedExistingTables* retained_existing_tables, |
201 | 0 | const ProcessEntry& process_entry) { |
202 | 0 | std::unordered_set<NamespaceId> namespaces; |
203 | 0 | std::unordered_set<TableId> tables; |
204 | |
|
205 | 0 | const auto& filter = restoration_.schedules[0].second; |
206 | |
|
207 | 0 | for (auto& id_and_metadata : objects->namespaces) { |
208 | 0 | if (!VERIFY_RESULT(MatchNamespace(filter, id_and_metadata.first, id_and_metadata.second))) { |
209 | 0 | continue; |
210 | 0 | } |
211 | 0 | if (!namespaces.insert(id_and_metadata.first).second) { |
212 | 0 | continue; |
213 | 0 | } |
214 | 0 | RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second)); |
215 | 0 | } |
216 | |
|
217 | 0 | for (auto& id_and_metadata : objects->tables) { |
218 | 0 | VLOG_WITH_FUNC(3) << "Checking: " << id_and_metadata.first << ", " |
219 | 0 | << id_and_metadata.second.ShortDebugString(); |
220 | |
|
221 | 0 | if (TableDeleted(id_and_metadata.second)) { |
222 | 0 | continue; |
223 | 0 | } |
224 | 0 | auto& root_table_id_and_metadata = VERIFY_RESULT(objects->FindRootTable(id_and_metadata)).get(); |
225 | 0 | auto match = VERIFY_RESULT(MatchTable( |
226 | 0 | filter, root_table_id_and_metadata.first, root_table_id_and_metadata.second)); |
227 | 0 | if (!match) { |
228 | 0 | continue; |
229 | 0 | } |
230 | 0 | if (retained_existing_tables) { |
231 | 0 | auto& retaining_schedules = retained_existing_tables->emplace( |
232 | 0 | id_and_metadata.first, std::vector<SnapshotScheduleId>()).first->second; |
233 | 0 | retaining_schedules.push_back(restoration_.schedules[0].first); |
234 | 0 | for (size_t i = 1; i != restoration_.schedules.size(); ++i) { |
235 | 0 | if (VERIFY_RESULT(MatchTable( |
236 | 0 | restoration_.schedules[i].second, root_table_id_and_metadata.first, |
237 | 0 | root_table_id_and_metadata.second))) { |
238 | 0 | retaining_schedules.push_back(restoration_.schedules[i].first); |
239 | 0 | } |
240 | 0 | } |
241 | 0 | } |
242 | | // Process pg_catalog tables that need to be restored. |
243 | 0 | if (namespaces.insert(id_and_metadata.second.namespace_id()).second) { |
244 | 0 | auto namespace_it = objects->namespaces.find(id_and_metadata.second.namespace_id()); |
245 | 0 | if (namespace_it == objects->namespaces.end()) { |
246 | 0 | return STATUS_FORMAT( |
247 | 0 | NotFound, "Namespace $0 not found for table $1", id_and_metadata.second.namespace_id(), |
248 | 0 | id_and_metadata.first, id_and_metadata.second.name()); |
249 | 0 | } |
250 | 0 | RETURN_NOT_OK(process_entry(namespace_it->first, &namespace_it->second)); |
251 | 0 | } |
252 | 0 | RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second)); |
253 | 0 | tables.insert(id_and_metadata.first); |
254 | 0 | VLOG(2) << "Table to restore: " << id_and_metadata.first << ", " |
255 | 0 | << id_and_metadata.second.ShortDebugString(); |
256 | 0 | } |
257 | 0 | for (auto& id_and_metadata : objects->tablets) { |
258 | 0 | auto it = tables.find(id_and_metadata.second.table_id()); |
259 | 0 | if (it == tables.end()) { |
260 | 0 | continue; |
261 | 0 | } |
262 | 0 | RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second)); |
263 | 0 | VLOG(2) << "Tablet to restore: " << id_and_metadata.first << ", " |
264 | 0 | << id_and_metadata.second.ShortDebugString(); |
265 | 0 | } |
266 | 0 | return Status::OK(); |
267 | 0 | } Unexecuted instantiation: restore_sys_catalog_state.cc:_ZN2yb6master22RestoreSysCatalogState16DetermineEntriesIZNS1_7ProcessEvE3$_0EENS_6StatusEPNS1_7ObjectsEPNSt3__113unordered_mapINS7_12basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEENS7_6vectorINS_17StronglyTypedUuidINS_22SnapshotScheduleId_TagEEENSC_ISI_EEEENS7_4hashISE_EENS7_8equal_toISE_EENSC_INS7_4pairIKSE_SK_EEEEEERKT_ Unexecuted instantiation: restore_sys_catalog_state.cc:_ZN2yb6master22RestoreSysCatalogState16DetermineEntriesIZNS1_7ProcessEvE3$_1EENS_6StatusEPNS1_7ObjectsEPNSt3__113unordered_mapINS7_12basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEENS7_6vectorINS_17StronglyTypedUuidINS_22SnapshotScheduleId_TagEEENSC_ISI_EEEENS7_4hashISE_EENS7_8equal_toISE_EENSC_INS7_4pairIKSE_SK_EEEEEERKT_ |
268 | | |
269 | | Result<const std::pair<const TableId, SysTablesEntryPB>&> |
270 | | RestoreSysCatalogState::Objects::FindRootTable( |
271 | 0 | const std::pair<const TableId, SysTablesEntryPB>& id_and_metadata) { |
272 | 0 | if (!id_and_metadata.second.has_index_info()) { |
273 | 0 | return id_and_metadata; |
274 | 0 | } |
275 | | |
276 | 0 | auto it = tables.find(id_and_metadata.second.index_info().indexed_table_id()); |
277 | 0 | if (it == tables.end()) { |
278 | 0 | return STATUS_FORMAT( |
279 | 0 | NotFound, "Indexed table $0 not found for index $1 ($2)", |
280 | 0 | id_and_metadata.second.index_info().indexed_table_id(), id_and_metadata.first, |
281 | 0 | id_and_metadata.second.name()); |
282 | 0 | } |
283 | 0 | const auto& ref = *it; |
284 | 0 | return ref; |
285 | 0 | } |
286 | | |
287 | | Result<const std::pair<const TableId, SysTablesEntryPB>&> |
288 | | RestoreSysCatalogState::Objects::FindRootTable( |
289 | 0 | const TableId& table_id) { |
290 | 0 | auto it = tables.find(table_id); |
291 | 0 | if (it == tables.end()) { |
292 | 0 | return STATUS_FORMAT(NotFound, "Table $0 not found for index", table_id); |
293 | 0 | } |
294 | 0 | return FindRootTable(*it); |
295 | 0 | } |
296 | | |
297 | 0 | std::string RestoreSysCatalogState::Objects::SizesToString() const { |
298 | 0 | return Format("{ tablets: $0 tables: $1 namespaces: $2 }", |
299 | 0 | tablets.size(), tables.size(), namespaces.size()); |
300 | 0 | } |
301 | | |
302 | | template <class PB> |
303 | | Status RestoreSysCatalogState::IterateSysCatalog( |
304 | | const Schema& schema, const docdb::DocDB& doc_db, HybridTime read_time, |
305 | 0 | std::unordered_map<std::string, PB>* map) { |
306 | 0 | auto iter = std::make_unique<docdb::DocRowwiseIterator>( |
307 | 0 | schema, schema, TransactionOperationContext(), doc_db, CoarseTimePoint::max(), |
308 | 0 | ReadHybridTime::SingleTime(read_time), nullptr); |
309 | 0 | return EnumerateSysCatalog(iter.get(), schema, GetEntryType<PB>::value, [map]( |
310 | 0 | const Slice& id, const Slice& data) -> Status { |
311 | 0 | auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data)); |
312 | 0 | if (!ShouldLoadObject(pb)) { |
313 | 0 | return Status::OK(); |
314 | 0 | } |
315 | 0 | if (!map->emplace(id.ToBuffer(), std::move(pb)).second) { |
316 | 0 | return STATUS_FORMAT(IllegalState, "Duplicate $0: $1", |
317 | 0 | SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer()); |
318 | 0 | } |
319 | 0 | return Status::OK(); |
320 | 0 | }); Unexecuted instantiation: _ZZN2yb6master22RestoreSysCatalogState17IterateSysCatalogINS0_19SysNamespaceEntryPBEEENS_6StatusERKNS_6SchemaERKNS_5docdb5DocDBENS_10HybridTimeEPNSt3__113unordered_mapINSD_12basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEET_NSD_4hashISK_EENSD_8equal_toISK_EENSI_INSD_4pairIKSK_SL_EEEEEEENKUlRKNS_5SliceESY_E_clESY_SY_ Unexecuted instantiation: _ZZN2yb6master22RestoreSysCatalogState17IterateSysCatalogINS0_16SysTablesEntryPBEEENS_6StatusERKNS_6SchemaERKNS_5docdb5DocDBENS_10HybridTimeEPNSt3__113unordered_mapINSD_12basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEET_NSD_4hashISK_EENSD_8equal_toISK_EENSI_INSD_4pairIKSK_SL_EEEEEEENKUlRKNS_5SliceESY_E_clESY_SY_ Unexecuted instantiation: _ZZN2yb6master22RestoreSysCatalogState17IterateSysCatalogINS0_17SysTabletsEntryPBEEENS_6StatusERKNS_6SchemaERKNS_5docdb5DocDBENS_10HybridTimeEPNSt3__113unordered_mapINSD_12basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEET_NSD_4hashISK_EENSD_8equal_toISK_EENSI_INSD_4pairIKSK_SL_EEEEEEENKUlRKNS_5SliceESY_E_clESY_SY_ |
321 | 0 | } Unexecuted instantiation: _ZN2yb6master22RestoreSysCatalogState17IterateSysCatalogINS0_19SysNamespaceEntryPBEEENS_6StatusERKNS_6SchemaERKNS_5docdb5DocDBENS_10HybridTimeEPNSt3__113unordered_mapINSD_12basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEET_NSD_4hashISK_EENSD_8equal_toISK_EENSI_INSD_4pairIKSK_SL_EEEEEE Unexecuted instantiation: _ZN2yb6master22RestoreSysCatalogState17IterateSysCatalogINS0_16SysTablesEntryPBEEENS_6StatusERKNS_6SchemaERKNS_5docdb5DocDBENS_10HybridTimeEPNSt3__113unordered_mapINSD_12basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEET_NSD_4hashISK_EENSD_8equal_toISK_EENSI_INSD_4pairIKSK_SL_EEEEEE Unexecuted instantiation: _ZN2yb6master22RestoreSysCatalogState17IterateSysCatalogINS0_17SysTabletsEntryPBEEENS_6StatusERKNS_6SchemaERKNS_5docdb5DocDBENS_10HybridTimeEPNSt3__113unordered_mapINSD_12basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEET_NSD_4hashISK_EENSD_8equal_toISK_EENSI_INSD_4pairIKSK_SL_EEEEEE |
322 | | |
323 | | Status RestoreSysCatalogState::LoadObjects( |
324 | 0 | const Schema& schema, const docdb::DocDB& doc_db, HybridTime read_time, Objects* objects) { |
325 | 0 | RETURN_NOT_OK(IterateSysCatalog(schema, doc_db, read_time, &objects->namespaces)); |
326 | 0 | RETURN_NOT_OK(IterateSysCatalog(schema, doc_db, read_time, &objects->tables)); |
327 | 0 | RETURN_NOT_OK(IterateSysCatalog(schema, doc_db, read_time, &objects->tablets)); |
328 | 0 | return Status::OK(); |
329 | 0 | } |
330 | | |
331 | | Status RestoreSysCatalogState::LoadRestoringObjects( |
332 | 0 | const Schema& schema, const docdb::DocDB& doc_db) { |
333 | 0 | return LoadObjects(schema, doc_db, restoration_.restore_at, &restoring_objects_); |
334 | 0 | } |
335 | | |
336 | | Status RestoreSysCatalogState::LoadExistingObjects( |
337 | 0 | const Schema& schema, const docdb::DocDB& doc_db) { |
338 | 0 | return LoadObjects(schema, doc_db, HybridTime::kMax, &existing_objects_); |
339 | 0 | } |
340 | | |
341 | | Status RestoreSysCatalogState::CheckExistingEntry( |
342 | 0 | const std::string& id, const SysTabletsEntryPB& pb) { |
343 | 0 | VLOG_WITH_FUNC(4) << "Tablet: " << id << ", " << pb.ShortDebugString(); |
344 | 0 | if (restoring_objects_.tablets.count(id)) { |
345 | 0 | return Status::OK(); |
346 | 0 | } |
347 | 0 | LOG(INFO) << "PITR: Will remove tablet: " << id; |
348 | 0 | restoration_.non_system_obsolete_tablets.emplace_back(id, pb); |
349 | 0 | return Status::OK(); |
350 | 0 | } |
351 | | |
352 | | Status RestoreSysCatalogState::CheckExistingEntry( |
353 | 0 | const std::string& id, const SysTablesEntryPB& pb) { |
354 | 0 | if (pb.schema().table_properties().is_ysql_catalog_table()) { |
355 | 0 | if (restoration_.system_tables_to_restore.count(id) == 0) { |
356 | 0 | return STATUS_FORMAT( |
357 | 0 | NotFound, |
358 | 0 | "PG Catalog table $0 not found in the present set of tables" |
359 | 0 | " but found in the objects to restore.", |
360 | 0 | pb.name()); |
361 | 0 | } |
362 | 0 | return Status::OK(); |
363 | 0 | } |
364 | | |
365 | 0 | VLOG_WITH_FUNC(4) << "Table: " << id << ", " << pb.ShortDebugString(); |
366 | 0 | if (restoring_objects_.tables.count(id)) { |
367 | 0 | return Status::OK(); |
368 | 0 | } |
369 | 0 | LOG(INFO) << "PITR: Will remove table: " << id; |
370 | 0 | restoration_.non_system_obsolete_tables.emplace_back(id, pb); |
371 | |
|
372 | 0 | return Status::OK(); |
373 | 0 | } |
374 | | |
375 | | // We don't delete newly created namespaces, because our filters namespace based. |
376 | | Status RestoreSysCatalogState::CheckExistingEntry( |
377 | 0 | const std::string& id, const SysNamespaceEntryPB& pb) { |
378 | 0 | return Status::OK(); |
379 | 0 | } |
380 | | |
381 | | Status RestoreSysCatalogState::PrepareWriteBatch( |
382 | 0 | const Schema& schema, docdb::DocWriteBatch* write_batch) { |
383 | 0 | for (const auto& entry : entries_.entries()) { |
384 | 0 | QLWriteRequestPB write_request; |
385 | 0 | RETURN_NOT_OK(FillSysCatalogWriteRequest( |
386 | 0 | entry.type(), entry.id(), entry.data(), QLWriteRequestPB::QL_STMT_INSERT, schema, |
387 | 0 | &write_request)); |
388 | 0 | RETURN_NOT_OK(ApplyWriteRequest(schema, write_request, write_batch)); |
389 | 0 | } |
390 | |
|
391 | 0 | for (const auto& tablet_id_and_pb : restoration_.non_system_obsolete_tablets) { |
392 | 0 | RETURN_NOT_OK(PrepareTabletCleanup( |
393 | 0 | tablet_id_and_pb.first, tablet_id_and_pb.second, schema, write_batch)); |
394 | 0 | } |
395 | 0 | for (const auto& table_id_and_pb : restoration_.non_system_obsolete_tables) { |
396 | 0 | RETURN_NOT_OK(PrepareTableCleanup( |
397 | 0 | table_id_and_pb.first, table_id_and_pb.second, schema, write_batch)); |
398 | 0 | } |
399 | |
|
400 | 0 | return Status::OK(); |
401 | 0 | } |
402 | | |
403 | | Status RestoreSysCatalogState::PrepareTabletCleanup( |
404 | | const TabletId& id, SysTabletsEntryPB pb, const Schema& schema, |
405 | 0 | docdb::DocWriteBatch* write_batch) { |
406 | 0 | VLOG_WITH_FUNC(4) << id; |
407 | |
|
408 | 0 | QLWriteRequestPB write_request; |
409 | |
|
410 | 0 | auto it = retained_existing_tables_.find(pb.table_id()); |
411 | 0 | if (it != retained_existing_tables_.end()) { |
412 | 0 | pb.set_hide_hybrid_time(restoration_.write_time.ToUint64()); |
413 | 0 | auto& out_schedules = *pb.mutable_retained_by_snapshot_schedules(); |
414 | 0 | for (const auto& schedule_id : it->second) { |
415 | 0 | out_schedules.Add()->assign(schedule_id.AsSlice().cdata(), schedule_id.size()); |
416 | 0 | } |
417 | 0 | } |
418 | 0 | RETURN_NOT_OK(FillSysCatalogWriteRequest( |
419 | 0 | SysRowEntryType::TABLET, id, pb.SerializeAsString(), |
420 | 0 | QLWriteRequestPB::QL_STMT_UPDATE, schema, &write_request)); |
421 | 0 | return ApplyWriteRequest(schema, write_request, write_batch); |
422 | 0 | } |
423 | | |
424 | | Status RestoreSysCatalogState::PrepareTableCleanup( |
425 | | const TableId& id, SysTablesEntryPB pb, const Schema& schema, |
426 | 0 | docdb::DocWriteBatch* write_batch) { |
427 | 0 | VLOG_WITH_FUNC(4) << id; |
428 | |
|
429 | 0 | QLWriteRequestPB write_request; |
430 | 0 | pb.set_hide_state(SysTablesEntryPB::HIDING); |
431 | 0 | pb.set_version(pb.version() + 1); |
432 | 0 | RETURN_NOT_OK(FillSysCatalogWriteRequest( |
433 | 0 | SysRowEntryType::TABLE, id, pb.SerializeAsString(), |
434 | 0 | QLWriteRequestPB::QL_STMT_UPDATE, schema, &write_request)); |
435 | 0 | return ApplyWriteRequest(schema, write_request, write_batch); |
436 | 0 | } |
437 | | |
438 | | Result<bool> RestoreSysCatalogState::TEST_MatchTable( |
439 | 18 | const TableId& id, const SysTablesEntryPB& table) { |
440 | 18 | return MatchTable(restoration_.schedules[0].second, id, table); |
441 | 18 | } |
442 | | |
443 | | void RestoreSysCatalogState::WriteToRocksDB( |
444 | | docdb::DocWriteBatch* write_batch, const HybridTime& write_time, const OpId& op_id, |
445 | 0 | tablet::Tablet* tablet) { |
446 | 0 | docdb::KeyValueWriteBatchPB kv_write_batch; |
447 | 0 | write_batch->MoveToWriteBatchPB(&kv_write_batch); |
448 | |
|
449 | 0 | docdb::NonTransactionalWriter writer(kv_write_batch, write_time); |
450 | 0 | rocksdb::WriteBatch rocksdb_write_batch; |
451 | 0 | rocksdb_write_batch.SetDirectWriter(&writer); |
452 | 0 | docdb::ConsensusFrontiers frontiers; |
453 | 0 | set_op_id(op_id, &frontiers); |
454 | 0 | set_hybrid_time(write_time, &frontiers); |
455 | |
|
456 | 0 | tablet->WriteToRocksDB( |
457 | 0 | &frontiers, &rocksdb_write_batch, docdb::StorageDbType::kRegular); |
458 | 0 | } |
459 | | |
460 | | class FetchState { |
461 | | public: |
462 | | explicit FetchState(const docdb::DocDB& doc_db, const ReadHybridTime& read_time) |
463 | | : iterator_(CreateIntentAwareIterator( |
464 | | doc_db, |
465 | | docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER, |
466 | | boost::none, |
467 | | rocksdb::kDefaultQueryId, |
468 | | TransactionOperationContext(), |
469 | | CoarseTimePoint::max(), |
470 | 0 | read_time)) { |
471 | 0 | } |
472 | | |
473 | 0 | CHECKED_STATUS SetPrefix(const Slice& prefix) { |
474 | 0 | if (prefix_.empty()) { |
475 | 0 | iterator_->Seek(prefix); |
476 | 0 | } else { |
477 | 0 | iterator_->SeekForward(prefix); |
478 | 0 | } |
479 | 0 | prefix_ = prefix; |
480 | 0 | finished_ = false; |
481 | 0 | last_deleted_key_bytes_.clear(); |
482 | 0 | last_deleted_key_write_time_ = DocHybridTime::kInvalid; |
483 | 0 | RETURN_NOT_OK(Update()); |
484 | 0 | return NextNonDeletedEntry(); |
485 | 0 | } |
486 | | |
487 | 0 | bool finished() const { |
488 | 0 | return finished_; |
489 | 0 | } |
490 | | |
491 | 0 | Slice key() const { |
492 | 0 | return key_.key; |
493 | 0 | } |
494 | | |
495 | 0 | Slice value() const { |
496 | 0 | return iterator_->value(); |
497 | 0 | } |
498 | | |
499 | 0 | docdb::FetchKeyResult FullKey() const { |
500 | 0 | return key_; |
501 | 0 | } |
502 | | |
503 | 0 | CHECKED_STATUS NextEntry() { |
504 | 0 | iterator_->SeekPastSubKey(key_.key); |
505 | 0 | return Update(); |
506 | 0 | } |
507 | | |
508 | 0 | CHECKED_STATUS Next() { |
509 | 0 | RETURN_NOT_OK(NextEntry()); |
510 | 0 | return NextNonDeletedEntry(); |
511 | 0 | } |
512 | | |
513 | | // Returns true if the entry corresponds to a deleted row |
514 | | // in rocksdb. |
515 | 0 | Result<bool> IsDeletedRowEntry() { |
516 | 0 | bool is_tombstoned = false; |
517 | 0 | is_tombstoned = VERIFY_RESULT(docdb::Value::IsTombstoned(value())); |
518 | | |
519 | | // Because Postgres doesn't have a concept of frozen types, kGroupEnd will only demarcate the |
520 | | // end of hashed and range components. It is reasonable to assume then that if the last byte |
521 | | // is kGroupEnd then it does not have any subkeys. |
522 | 0 | bool no_subkey = |
523 | 0 | key()[key().size() - 1] == docdb::ValueTypeAsChar::kGroupEnd; |
524 | |
|
525 | 0 | return no_subkey && is_tombstoned; |
526 | 0 | } |
527 | | |
528 | | // Returns true if it has been deleted since the time it was inserted. |
529 | 0 | bool IsDeletedSinceInsertion() { |
530 | 0 | if (last_deleted_key_bytes_.size() == 0) { |
531 | 0 | return false; |
532 | 0 | } |
533 | 0 | return key().starts_with(last_deleted_key_bytes_.AsSlice()) && |
534 | 0 | FullKey().write_time < last_deleted_key_write_time_; |
535 | 0 | } |
536 | | |
537 | | private: |
538 | 0 | CHECKED_STATUS Update() { |
539 | 0 | if (!iterator_->valid()) { |
540 | 0 | finished_ = true; |
541 | 0 | return Status::OK(); |
542 | 0 | } |
543 | 0 | key_ = VERIFY_RESULT(iterator_->FetchKey()); |
544 | 0 | if (VERIFY_RESULT(IsDeletedRowEntry())) { |
545 | 0 | last_deleted_key_write_time_ = key_.write_time; |
546 | 0 | last_deleted_key_bytes_ = key_.key; |
547 | 0 | } |
548 | 0 | if (!key_.key.starts_with(prefix_)) { |
549 | 0 | finished_ = true; |
550 | 0 | return Status::OK(); |
551 | 0 | } |
552 | | |
553 | 0 | return Status::OK(); |
554 | 0 | } |
555 | | |
556 | 0 | CHECKED_STATUS NextNonDeletedEntry() { |
557 | 0 | while (!finished()) { |
558 | 0 | if (VERIFY_RESULT(IsDeletedRowEntry()) || |
559 | 0 | IsDeletedSinceInsertion()) { |
560 | 0 | RETURN_NOT_OK(NextEntry()); |
561 | 0 | continue; |
562 | 0 | } |
563 | 0 | break; |
564 | 0 | } |
565 | 0 | return Status::OK(); |
566 | 0 | } |
567 | | |
568 | | std::unique_ptr<docdb::IntentAwareIterator> iterator_; |
569 | | Slice prefix_; |
570 | | docdb::FetchKeyResult key_; |
571 | | KeyBuffer last_deleted_key_bytes_; |
572 | | DocHybridTime last_deleted_key_write_time_; |
573 | | bool finished_ = false; |
574 | | }; |
575 | | |
576 | 0 | void AddKeyValue(const Slice& key, const Slice& value, docdb::DocWriteBatch* write_batch) { |
577 | 0 | auto& pair = write_batch->AddRaw(); |
578 | 0 | pair.first.assign(key.cdata(), key.size()); |
579 | 0 | pair.second.assign(value.cdata(), value.size()); |
580 | 0 | } |
581 | | |
582 | | struct PgCatalogTableData { |
583 | | std::array<uint8_t, kUuidSize + 1> prefix; |
584 | | const TableName* name; |
585 | | |
586 | 0 | CHECKED_STATUS SetTableId(const TableId& table_id) { |
587 | 0 | Uuid cotable_id; |
588 | 0 | RETURN_NOT_OK(cotable_id.FromHexString(table_id)); |
589 | 0 | prefix[0] = docdb::ValueTypeAsChar::kTableId; |
590 | 0 | cotable_id.EncodeToComparable(&prefix[1]); |
591 | 0 | return Status::OK(); |
592 | 0 | } |
593 | | }; |
594 | | |
595 | | Status RestoreSysCatalogState::ProcessPgCatalogRestores( |
596 | | const Schema& pg_yb_catalog_version_schema, |
597 | | const docdb::DocDB& restoring_db, |
598 | | const docdb::DocDB& existing_db, |
599 | 0 | docdb::DocWriteBatch* write_batch) { |
600 | 0 | if (restoration_.system_tables_to_restore.empty()) { |
601 | 0 | return Status::OK(); |
602 | 0 | } |
603 | | |
604 | 0 | FetchState restoring_state(restoring_db, ReadHybridTime::SingleTime(restoration_.restore_at)); |
605 | 0 | FetchState existing_state(existing_db, ReadHybridTime::Max()); |
606 | 0 | char tombstone_char = docdb::ValueTypeAsChar::kTombstone; |
607 | 0 | Slice tombstone(&tombstone_char, 1); |
608 | |
|
609 | 0 | std::vector<PgCatalogTableData> tables(restoration_.system_tables_to_restore.size() + 1); |
610 | 0 | size_t idx = 0; |
611 | 0 | RETURN_NOT_OK(tables[0].SetTableId(kPgYbCatalogVersionTableId)); |
612 | 0 | tables[0].name = nullptr; |
613 | 0 | ++idx; |
614 | 0 | for (auto& id_and_name : restoration_.system_tables_to_restore) { |
615 | 0 | auto& table = tables[idx]; |
616 | 0 | RETURN_NOT_OK(table.SetTableId(id_and_name.first)); |
617 | 0 | table.name = &id_and_name.second; |
618 | 0 | ++idx; |
619 | 0 | } |
620 | | |
621 | |
|
622 | 0 | std::sort(tables.begin(), tables.end(), [](const auto& lhs, const auto& rhs) { |
623 | 0 | return Slice(lhs.prefix).compare(Slice(rhs.prefix)) < 0; |
624 | 0 | }); |
625 | |
|
626 | 0 | for (auto& table : tables) { |
627 | 0 | size_t num_updates = 0; |
628 | 0 | size_t num_inserts = 0; |
629 | 0 | size_t num_deletes = 0; |
630 | 0 | Slice prefix(table.prefix); |
631 | |
|
632 | 0 | RETURN_NOT_OK(restoring_state.SetPrefix(prefix)); |
633 | 0 | RETURN_NOT_OK(existing_state.SetPrefix(prefix)); |
634 | |
|
635 | 0 | while (!restoring_state.finished() && !existing_state.finished()) { |
636 | 0 | auto compare_result = restoring_state.key().compare(existing_state.key()); |
637 | 0 | if (compare_result == 0) { |
638 | 0 | if (table.name != nullptr) { |
639 | 0 | if (restoring_state.value().compare(existing_state.value())) { |
640 | 0 | ++num_updates; |
641 | 0 | AddKeyValue(restoring_state.key(), restoring_state.value(), write_batch); |
642 | 0 | } |
643 | 0 | } else { |
644 | 0 | docdb::SubDocKey sub_doc_key; |
645 | 0 | RETURN_NOT_OK(sub_doc_key.FullyDecodeFrom( |
646 | 0 | restoring_state.key(), docdb::HybridTimeRequired::kFalse)); |
647 | 0 | SCHECK_EQ(sub_doc_key.subkeys().size(), 1U, Corruption, "Wrong number of subdoc keys"); |
648 | 0 | if (sub_doc_key.subkeys()[0].value_type() == docdb::ValueType::kColumnId) { |
649 | 0 | auto column_id = sub_doc_key.subkeys()[0].GetColumnId(); |
650 | 0 | const ColumnSchema& column = VERIFY_RESULT(pg_yb_catalog_version_schema.column_by_id( |
651 | 0 | column_id)); |
652 | 0 | if (column.name() == "current_version") { |
653 | 0 | docdb::Value value; |
654 | 0 | RETURN_NOT_OK(value.Decode(existing_state.value())); |
655 | 0 | docdb::DocPath path(sub_doc_key.doc_key().Encode(), sub_doc_key.subkeys()); |
656 | 0 | RETURN_NOT_OK(write_batch->SetPrimitive( |
657 | 0 | path, docdb::PrimitiveValue(value.primitive_value().GetInt64() + 1))); |
658 | 0 | } |
659 | 0 | } |
660 | 0 | } |
661 | 0 | RETURN_NOT_OK(restoring_state.Next()); |
662 | 0 | RETURN_NOT_OK(existing_state.Next()); |
663 | 0 | } else if (compare_result < 0) { |
664 | 0 | ++num_inserts; |
665 | 0 | AddKeyValue(restoring_state.key(), restoring_state.value(), write_batch); |
666 | 0 | RETURN_NOT_OK(restoring_state.Next()); |
667 | 0 | } else { |
668 | 0 | ++num_deletes; |
669 | 0 | AddKeyValue(existing_state.key(), tombstone, write_batch); |
670 | 0 | RETURN_NOT_OK(existing_state.Next()); |
671 | 0 | } |
672 | 0 | } |
673 | |
|
674 | 0 | while (!restoring_state.finished()) { |
675 | 0 | ++num_inserts; |
676 | 0 | AddKeyValue(restoring_state.key(), restoring_state.value(), write_batch); |
677 | 0 | RETURN_NOT_OK(restoring_state.Next()); |
678 | 0 | } |
679 | |
|
680 | 0 | while (!existing_state.finished()) { |
681 | 0 | ++num_deletes; |
682 | 0 | AddKeyValue(existing_state.key(), tombstone, write_batch); |
683 | 0 | RETURN_NOT_OK(existing_state.Next()); |
684 | 0 | } |
685 | |
|
686 | 0 | if (num_updates + num_inserts + num_deletes != 0 || VLOG_IS_ON(3)) { |
687 | 0 | LOG(INFO) << "PITR: Pg system table: " << AsString(table.name) << ", updates: " << num_updates |
688 | 0 | << ", inserts: " << num_inserts << ", deletes: " << num_deletes; |
689 | 0 | } |
690 | 0 | } |
691 | |
|
692 | 0 | return Status::OK(); |
693 | 0 | } |
694 | | |
695 | | } // namespace master |
696 | | } // namespace yb |