/Users/deen/code/yugabyte-db/ent/src/yb/master/restore_sys_catalog_state.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/master/restore_sys_catalog_state.h" |
15 | | |
16 | | #include "yb/common/entity_ids.h" |
17 | | #include "yb/common/hybrid_time.h" |
18 | | #include "yb/common/index.h" |
19 | | |
20 | | #include "yb/common/pgsql_protocol.pb.h" |
21 | | #include "yb/common/ql_expr.h" |
22 | | #include "yb/docdb/consensus_frontier.h" |
23 | | #include "yb/docdb/cql_operation.h" |
24 | | #include "yb/docdb/docdb_rocksdb_util.h" |
25 | | #include "yb/docdb/doc_rowwise_iterator.h" |
26 | | #include "yb/docdb/doc_write_batch.h" |
27 | | #include "yb/docdb/docdb.h" |
28 | | #include "yb/docdb/pgsql_operation.h" |
29 | | #include "yb/docdb/rocksdb_writer.h" |
30 | | |
31 | | #include "yb/master/catalog_loaders.h" |
32 | | #include "yb/master/master_backup.pb.h" |
33 | | #include "yb/master/master_snapshot_coordinator.h" |
34 | | #include "yb/master/master_util.h" |
35 | | #include "yb/master/sys_catalog.h" |
36 | | #include "yb/master/sys_catalog_writer.h" |
37 | | |
38 | | #include "yb/rocksdb/write_batch.h" |
39 | | #include "yb/tablet/tablet.h" |
40 | | #include "yb/util/format.h" |
41 | | #include "yb/util/logging.h" |
42 | | #include "yb/util/pb_util.h" |
43 | | #include "yb/util/status_format.h" |
44 | | |
45 | | using namespace std::placeholders; |
46 | | |
47 | | namespace yb { |
48 | | namespace master { |
49 | | |
50 | | namespace { |
51 | | |
52 | | CHECKED_STATUS ApplyWriteRequest( |
53 | | const Schema& schema, const QLWriteRequestPB& write_request, |
54 | 51 | docdb::DocWriteBatch* write_batch) { |
55 | 51 | std::shared_ptr<const Schema> schema_ptr(&schema, [](const Schema* schema){}); |
56 | 51 | docdb::DocOperationApplyData apply_data{.doc_write_batch = write_batch}; |
57 | 51 | IndexMap index_map; |
58 | 51 | docdb::QLWriteOperation operation( |
59 | 51 | write_request, schema_ptr, index_map, nullptr, TransactionOperationContext()); |
60 | 51 | QLResponsePB response; |
61 | 51 | RETURN_NOT_OK(operation.Init(&response)); |
62 | 51 | return operation.Apply(apply_data); |
63 | 51 | } |
64 | | |
65 | 8.35k | bool TableDeleted(const SysTablesEntryPB& table) { |
66 | 8.35k | return table.state() == SysTablesEntryPB::DELETED || |
67 | 8.35k | table.state() == SysTablesEntryPB::DELETING || |
68 | 8.35k | table.hide_state() == SysTablesEntryPB::HIDING || |
69 | 8.35k | table.hide_state() == SysTablesEntryPB::HIDDEN; |
70 | 8.35k | } |
71 | | |
72 | | Result<bool> MatchNamespace( |
73 | | const SnapshotScheduleFilterPB& filter, const NamespaceId& id, |
74 | 132 | const SysNamespaceEntryPB& ns) { |
75 | 132 | VLOG(1) << __func__ << "(" << id << ", " << ns.ShortDebugString() << ")"0 ; |
76 | 132 | for (const auto& table_identifier : filter.tables().tables()) { |
77 | 132 | if (table_identifier.has_namespace_() && |
78 | 132 | VERIFY_RESULT(master::NamespaceMatchesIdentifier( |
79 | 132 | id, ns.database_type(), ns.name(), table_identifier.namespace_()))) { |
80 | 18 | return true; |
81 | 18 | } |
82 | 132 | } |
83 | 114 | return false; |
84 | 132 | } |
85 | | |
86 | | Result<bool> MatchTable( |
87 | | const SnapshotScheduleFilterPB& filter, const TableId& id, |
88 | 8.37k | const SysTablesEntryPB& table) { |
89 | 8.37k | VLOG(1) << __func__ << "(" << id << ", " << table.ShortDebugString() << ")"0 ; |
90 | 8.37k | for (const auto& table_identifier : filter.tables().tables()) { |
91 | 8.37k | if (VERIFY_RESULT(master::TableMatchesIdentifier(id, table, table_identifier))) { |
92 | 1.32k | return true; |
93 | 1.32k | } |
94 | 8.37k | } |
95 | 7.05k | return false; |
96 | 8.37k | } |
97 | | |
98 | | template <class PB> |
99 | | struct GetEntryType; |
100 | | |
101 | | template<> struct GetEntryType<SysNamespaceEntryPB> |
102 | | : public std::integral_constant<SysRowEntryType, SysRowEntryType::NAMESPACE> {}; |
103 | | |
104 | | template<> struct GetEntryType<SysTablesEntryPB> |
105 | | : public std::integral_constant<SysRowEntryType, SysRowEntryType::TABLE> {}; |
106 | | |
107 | | template<> struct GetEntryType<SysTabletsEntryPB> |
108 | | : public std::integral_constant<SysRowEntryType, SysRowEntryType::TABLET> {}; |
109 | | |
110 | | } // namespace |
111 | | |
112 | | RestoreSysCatalogState::RestoreSysCatalogState(SnapshotScheduleRestoration* restoration) |
113 | 10 | : restoration_(*restoration) {} |
114 | | |
115 | | Result<bool> RestoreSysCatalogState::PatchRestoringEntry( |
116 | 9 | const std::string& id, SysNamespaceEntryPB* pb) { |
117 | 9 | return true; |
118 | 9 | } |
119 | | |
120 | | Result<bool> RestoreSysCatalogState::PatchRestoringEntry( |
121 | 660 | const std::string& id, SysTablesEntryPB* pb) { |
122 | 660 | if (pb->schema().table_properties().is_ysql_catalog_table()) { |
123 | 648 | LOG(INFO) << "PITR: Adding " << pb->name() << " for restoring. ID: " << id; |
124 | 648 | restoration_.system_tables_to_restore.emplace(id, pb->name()); |
125 | | |
126 | 648 | return false; |
127 | 648 | } |
128 | | |
129 | 12 | auto it = existing_objects_.tables.find(id); |
130 | 12 | if (it == existing_objects_.tables.end()) { |
131 | 0 | return STATUS_FORMAT(NotFound, "Not found restoring table: $0", id); |
132 | 0 | } |
133 | | |
134 | 12 | if (pb->version() != it->second.version()) { |
135 | | // Force schema update after restoration, if schema has changes. |
136 | 3 | pb->set_version(it->second.version() + 1); |
137 | 3 | } |
138 | | |
139 | 12 | return true; |
140 | 12 | } |
141 | | |
142 | | Result<bool> RestoreSysCatalogState::PatchRestoringEntry( |
143 | 30 | const std::string& id, SysTabletsEntryPB* pb) { |
144 | 30 | return true; |
145 | 30 | } |
146 | | |
147 | | template <class PB> |
148 | | Status RestoreSysCatalogState::AddRestoringEntry( |
149 | 699 | const std::string& id, PB* pb, faststring* buffer) { |
150 | 699 | auto type = GetEntryType<PB>::value; |
151 | 699 | VLOG_WITH_FUNC0 (1) << SysRowEntryType_Name(type) << ": " << id << ", " << pb->ShortDebugString()0 ; |
152 | | |
153 | 699 | if (!VERIFY_RESULT(PatchRestoringEntry(id, pb))) { |
154 | 648 | return Status::OK(); |
155 | 648 | } |
156 | 51 | auto& entry = *entries_.mutable_entries()->Add(); |
157 | 51 | entry.set_type(type); |
158 | 51 | entry.set_id(id); |
159 | 51 | pb_util::SerializeToString(*pb, buffer); |
160 | 51 | entry.set_data(buffer->data(), buffer->size()); |
161 | 51 | restoration_.non_system_objects_to_restore.emplace(id, type); |
162 | | |
163 | 51 | return Status::OK(); |
164 | 699 | } yb::Status yb::master::RestoreSysCatalogState::AddRestoringEntry<yb::master::SysNamespaceEntryPB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::SysNamespaceEntryPB*, yb::faststring*) Line | Count | Source | 149 | 9 | const std::string& id, PB* pb, faststring* buffer) { | 150 | 9 | auto type = GetEntryType<PB>::value; | 151 | 9 | VLOG_WITH_FUNC0 (1) << SysRowEntryType_Name(type) << ": " << id << ", " << pb->ShortDebugString()0 ; | 152 | | | 153 | 9 | if (!VERIFY_RESULT(PatchRestoringEntry(id, pb))) { | 154 | 0 | return Status::OK(); | 155 | 0 | } | 156 | 9 | auto& entry = *entries_.mutable_entries()->Add(); | 157 | 9 | entry.set_type(type); | 158 | 9 | entry.set_id(id); | 159 | 9 | pb_util::SerializeToString(*pb, buffer); | 160 | 9 | entry.set_data(buffer->data(), buffer->size()); | 161 | 9 | restoration_.non_system_objects_to_restore.emplace(id, type); | 162 | | | 163 | 9 | return Status::OK(); | 164 | 9 | } |
yb::Status yb::master::RestoreSysCatalogState::AddRestoringEntry<yb::master::SysTablesEntryPB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::SysTablesEntryPB*, yb::faststring*) Line | Count | Source | 149 | 660 | const std::string& id, PB* pb, faststring* buffer) { | 150 | 660 | auto type = GetEntryType<PB>::value; | 151 | 660 | VLOG_WITH_FUNC0 (1) << SysRowEntryType_Name(type) << ": " << id << ", " << pb->ShortDebugString()0 ; | 152 | | | 153 | 660 | if (!VERIFY_RESULT(PatchRestoringEntry(id, pb))) { | 154 | 648 | return Status::OK(); | 155 | 648 | } | 156 | 12 | auto& entry = *entries_.mutable_entries()->Add(); | 157 | 12 | entry.set_type(type); | 158 | 12 | entry.set_id(id); | 159 | 12 | pb_util::SerializeToString(*pb, buffer); | 160 | 12 | entry.set_data(buffer->data(), buffer->size()); | 161 | 12 | restoration_.non_system_objects_to_restore.emplace(id, type); | 162 | | | 163 | 12 | return Status::OK(); | 164 | 660 | } |
yb::Status yb::master::RestoreSysCatalogState::AddRestoringEntry<yb::master::SysTabletsEntryPB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::SysTabletsEntryPB*, yb::faststring*) Line | Count | Source | 149 | 30 | const std::string& id, PB* pb, faststring* buffer) { | 150 | 30 | auto type = GetEntryType<PB>::value; | 151 | 30 | VLOG_WITH_FUNC0 (1) << SysRowEntryType_Name(type) << ": " << id << ", " << pb->ShortDebugString()0 ; | 152 | | | 153 | 30 | if (!VERIFY_RESULT(PatchRestoringEntry(id, pb))) { | 154 | 0 | return Status::OK(); | 155 | 0 | } | 156 | 30 | auto& entry = *entries_.mutable_entries()->Add(); | 157 | 30 | entry.set_type(type); | 158 | 30 | entry.set_id(id); | 159 | 30 | pb_util::SerializeToString(*pb, buffer); | 160 | 30 | entry.set_data(buffer->data(), buffer->size()); | 161 | 30 | restoration_.non_system_objects_to_restore.emplace(id, type); | 162 | | | 163 | 30 | return Status::OK(); | 164 | 30 | } |
|
165 | | |
166 | 9 | Status RestoreSysCatalogState::Process() { |
167 | 9 | VLOG_WITH_FUNC0 (1) << "Restoring: " << restoring_objects_.SizesToString() << ", existing: " |
168 | 0 | << existing_objects_.SizesToString(); |
169 | | |
170 | 9 | VLOG_WITH_FUNC0 (2) << "Check restoring objects"0 ; |
171 | 9 | VLOG_WITH_FUNC0 (4) << "Restoring namespaces: " << AsString(restoring_objects_.namespaces)0 ; |
172 | 9 | faststring buffer; |
173 | 9 | RETURN_NOT_OK_PREPEND(DetermineEntries( |
174 | 9 | &restoring_objects_, nullptr, |
175 | 9 | [this, &buffer](const auto& id, auto* pb) { |
176 | 9 | return AddRestoringEntry(id, pb, &buffer); |
177 | 9 | }), "Determine restoring entries failed"); |
178 | | |
179 | 9 | VLOG_WITH_FUNC0 (2) << "Check existing objects"0 ; |
180 | 9 | RETURN_NOT_OK_PREPEND(DetermineEntries( |
181 | 9 | &existing_objects_, &retained_existing_tables_, |
182 | 9 | [this](const auto& id, auto* pb) { |
183 | 9 | return CheckExistingEntry(id, *pb); |
184 | 9 | }), "Determine obsolete entries failed"); |
185 | | |
186 | | // Sort generated vectors, so binary search could be used to check whether object is obsolete. |
187 | 9 | auto compare_by_first = [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }0 ; Unexecuted instantiation: restore_sys_catalog_state.cc:auto yb::master::RestoreSysCatalogState::Process()::$_2::operator()<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTabletsEntryPB>, std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTabletsEntryPB> >(std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTabletsEntryPB> const&, std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTabletsEntryPB> const&) const Unexecuted instantiation: restore_sys_catalog_state.cc:auto yb::master::RestoreSysCatalogState::Process()::$_2::operator()<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTablesEntryPB>, std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTablesEntryPB> >(std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTablesEntryPB> const&, std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTablesEntryPB> const&) const |
188 | 9 | std::sort(restoration_.non_system_obsolete_tablets.begin(), |
189 | 9 | restoration_.non_system_obsolete_tablets.end(), |
190 | 9 | compare_by_first); |
191 | 9 | std::sort(restoration_.non_system_obsolete_tables.begin(), |
192 | 9 | restoration_.non_system_obsolete_tables.end(), |
193 | 9 | compare_by_first); |
194 | | |
195 | 9 | return Status::OK(); |
196 | 9 | } |
197 | | |
198 | | template <class ProcessEntry> |
199 | | Status RestoreSysCatalogState::DetermineEntries( |
200 | | Objects* objects, RetainedExistingTables* retained_existing_tables, |
201 | 18 | const ProcessEntry& process_entry) { |
202 | 18 | std::unordered_set<NamespaceId> namespaces; |
203 | 18 | std::unordered_set<TableId> tables; |
204 | | |
205 | 18 | const auto& filter = restoration_.schedules[0].second; |
206 | | |
207 | 132 | for (auto& id_and_metadata : objects->namespaces) { |
208 | 132 | if (!VERIFY_RESULT(MatchNamespace(filter, id_and_metadata.first, id_and_metadata.second))) { |
209 | 114 | continue; |
210 | 114 | } |
211 | 18 | if (!namespaces.insert(id_and_metadata.first).second) { |
212 | 0 | continue; |
213 | 0 | } |
214 | 18 | RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second)); |
215 | 18 | } |
216 | | |
217 | 8.35k | for (auto& id_and_metadata : objects->tables)18 { |
218 | 8.35k | VLOG_WITH_FUNC0 (3) << "Checking: " << id_and_metadata.first << ", " |
219 | 0 | << id_and_metadata.second.ShortDebugString(); |
220 | | |
221 | 8.35k | if (TableDeleted(id_and_metadata.second)) { |
222 | 3 | continue; |
223 | 3 | } |
224 | 8.35k | auto& root_table_id_and_metadata = VERIFY_RESULT(objects->FindRootTable(id_and_metadata)).get(); |
225 | 8.35k | auto match = VERIFY_RESULT(MatchTable( |
226 | 8.35k | filter, root_table_id_and_metadata.first, root_table_id_and_metadata.second)); |
227 | 8.35k | if (!match) { |
228 | 7.03k | continue; |
229 | 7.03k | } |
230 | 1.31k | if (retained_existing_tables) { |
231 | 657 | auto& retaining_schedules = retained_existing_tables->emplace( |
232 | 657 | id_and_metadata.first, std::vector<SnapshotScheduleId>()).first->second; |
233 | 657 | retaining_schedules.push_back(restoration_.schedules[0].first); |
234 | 657 | for (size_t i = 1; i != restoration_.schedules.size(); ++i0 ) { |
235 | 0 | if (VERIFY_RESULT(MatchTable( |
236 | 0 | restoration_.schedules[i].second, root_table_id_and_metadata.first, |
237 | 0 | root_table_id_and_metadata.second))) { |
238 | 0 | retaining_schedules.push_back(restoration_.schedules[i].first); |
239 | 0 | } |
240 | 0 | } |
241 | 657 | } |
242 | | // Process pg_catalog tables that need to be restored. |
243 | 1.31k | if (namespaces.insert(id_and_metadata.second.namespace_id()).second) { |
244 | 0 | auto namespace_it = objects->namespaces.find(id_and_metadata.second.namespace_id()); |
245 | 0 | if (namespace_it == objects->namespaces.end()) { |
246 | 0 | return STATUS_FORMAT( |
247 | 0 | NotFound, "Namespace $0 not found for table $1", id_and_metadata.second.namespace_id(), |
248 | 0 | id_and_metadata.first, id_and_metadata.second.name()); |
249 | 0 | } |
250 | 0 | RETURN_NOT_OK(process_entry(namespace_it->first, &namespace_it->second)); |
251 | 0 | } |
252 | 1.31k | RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second)); |
253 | 1.31k | tables.insert(id_and_metadata.first); |
254 | 1.31k | VLOG(2) << "Table to restore: " << id_and_metadata.first << ", " |
255 | 0 | << id_and_metadata.second.ShortDebugString(); |
256 | 1.31k | } |
257 | 402 | for (auto& id_and_metadata : objects->tablets)18 { |
258 | 402 | auto it = tables.find(id_and_metadata.second.table_id()); |
259 | 402 | if (it == tables.end()) { |
260 | 351 | continue; |
261 | 351 | } |
262 | 51 | RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second)); |
263 | 51 | VLOG(2) << "Tablet to restore: " << id_and_metadata.first << ", " |
264 | 0 | << id_and_metadata.second.ShortDebugString(); |
265 | 51 | } |
266 | 18 | return Status::OK(); |
267 | 18 | } restore_sys_catalog_state.cc:yb::Status yb::master::RestoreSysCatalogState::DetermineEntries<yb::master::RestoreSysCatalogState::Process()::$_0>(yb::master::RestoreSysCatalogState::Objects*, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag>, std::__1::allocator<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag> > >, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, std::__1::vector<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag>, std::__1::allocator<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag> > > > > >*, yb::master::RestoreSysCatalogState::Process()::$_0 const&) Line | Count | Source | 201 | 9 | const ProcessEntry& process_entry) { | 202 | 9 | std::unordered_set<NamespaceId> namespaces; | 203 | 9 | std::unordered_set<TableId> tables; | 204 | | | 205 | 9 | const auto& filter = restoration_.schedules[0].second; | 206 | | | 207 | 66 | for (auto& id_and_metadata : objects->namespaces) { | 208 | 66 | if (!VERIFY_RESULT(MatchNamespace(filter, id_and_metadata.first, id_and_metadata.second))) { | 209 | 57 | continue; | 210 | 57 | } | 211 | 9 | if (!namespaces.insert(id_and_metadata.first).second) { | 212 | 0 | continue; | 213 | 0 | } | 214 | 9 | RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second)); | 215 | 9 | } | 216 | | | 217 | 4.17k | for (auto& id_and_metadata : objects->tables)9 { | 218 | 4.17k | VLOG_WITH_FUNC0 (3) << "Checking: " << id_and_metadata.first << ", " | 219 | 0 | << id_and_metadata.second.ShortDebugString(); | 220 | | | 221 | 4.17k | if (TableDeleted(id_and_metadata.second)) { | 222 | 0 | continue; | 223 | 0 | } | 224 | 4.17k | auto& root_table_id_and_metadata = VERIFY_RESULT(objects->FindRootTable(id_and_metadata)).get(); | 225 | 4.17k | auto match = VERIFY_RESULT(MatchTable( | 226 | 4.17k | filter, root_table_id_and_metadata.first, root_table_id_and_metadata.second)); | 227 | 4.17k | if (!match) { | 228 | 3.51k | continue; | 229 | 3.51k | } | 230 | 660 | if (retained_existing_tables) { | 231 | 0 | auto& retaining_schedules = retained_existing_tables->emplace( | 232 | 0 | id_and_metadata.first, std::vector<SnapshotScheduleId>()).first->second; | 233 | 0 | retaining_schedules.push_back(restoration_.schedules[0].first); | 234 | 0 | for (size_t i = 1; i != restoration_.schedules.size(); ++i) { | 235 | 0 | if (VERIFY_RESULT(MatchTable( | 236 | 0 | restoration_.schedules[i].second, root_table_id_and_metadata.first, | 237 | 0 | root_table_id_and_metadata.second))) { | 238 | 0 | retaining_schedules.push_back(restoration_.schedules[i].first); | 239 | 0 | } | 240 | 0 | } | 241 | 0 | } | 242 | | // Process pg_catalog tables that need to be restored. | 243 | 660 | if (namespaces.insert(id_and_metadata.second.namespace_id()).second) { | 244 | 0 | auto namespace_it = objects->namespaces.find(id_and_metadata.second.namespace_id()); | 245 | 0 | if (namespace_it == objects->namespaces.end()) { | 246 | 0 | return STATUS_FORMAT( | 247 | 0 | NotFound, "Namespace $0 not found for table $1", id_and_metadata.second.namespace_id(), | 248 | 0 | id_and_metadata.first, id_and_metadata.second.name()); | 249 | 0 | } | 250 | 0 | RETURN_NOT_OK(process_entry(namespace_it->first, &namespace_it->second)); | 251 | 0 | } | 252 | 660 | RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second)); | 253 | 660 | tables.insert(id_and_metadata.first); | 254 | 660 | VLOG(2) << "Table to restore: " << id_and_metadata.first << ", " | 255 | 0 | << id_and_metadata.second.ShortDebugString(); | 256 | 660 | } | 257 | 201 | for (auto& id_and_metadata : objects->tablets)9 { | 258 | 201 | auto it = tables.find(id_and_metadata.second.table_id()); | 259 | 201 | if (it == tables.end()) { | 260 | 171 | continue; | 261 | 171 | } | 262 | 30 | RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second)); | 263 | 30 | VLOG(2) << "Tablet to restore: " << id_and_metadata.first << ", " | 264 | 0 | << id_and_metadata.second.ShortDebugString(); | 265 | 30 | } | 266 | 9 | return Status::OK(); | 267 | 9 | } |
restore_sys_catalog_state.cc:yb::Status yb::master::RestoreSysCatalogState::DetermineEntries<yb::master::RestoreSysCatalogState::Process()::$_1>(yb::master::RestoreSysCatalogState::Objects*, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag>, std::__1::allocator<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag> > >, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, std::__1::vector<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag>, std::__1::allocator<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag> > > > > >*, yb::master::RestoreSysCatalogState::Process()::$_1 const&) Line | Count | Source | 201 | 9 | const ProcessEntry& process_entry) { | 202 | 9 | std::unordered_set<NamespaceId> namespaces; | 203 | 9 | std::unordered_set<TableId> tables; | 204 | | | 205 | 9 | const auto& filter = restoration_.schedules[0].second; | 206 | | | 207 | 66 | for (auto& id_and_metadata : objects->namespaces) { | 208 | 66 | if (!VERIFY_RESULT(MatchNamespace(filter, id_and_metadata.first, id_and_metadata.second))) { | 209 | 57 | continue; | 210 | 57 | } | 211 | 9 | if (!namespaces.insert(id_and_metadata.first).second) { | 212 | 0 | continue; | 213 | 0 | } | 214 | 9 | RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second)); | 215 | 9 | } | 216 | | | 217 | 4.17k | for (auto& id_and_metadata : objects->tables)9 { | 218 | 4.17k | VLOG_WITH_FUNC0 (3) << "Checking: " << id_and_metadata.first << ", " | 219 | 0 | << id_and_metadata.second.ShortDebugString(); | 220 | | | 221 | 4.17k | if (TableDeleted(id_and_metadata.second)) { | 222 | 3 | continue; | 223 | 3 | } | 224 | 4.17k | auto& root_table_id_and_metadata = VERIFY_RESULT(objects->FindRootTable(id_and_metadata)).get(); | 225 | 4.17k | auto match = VERIFY_RESULT(MatchTable( | 226 | 4.17k | filter, root_table_id_and_metadata.first, root_table_id_and_metadata.second)); | 227 | 4.17k | if (!match) { | 228 | 3.51k | continue; | 229 | 3.51k | } | 230 | 657 | if (retained_existing_tables) { | 231 | 657 | auto& retaining_schedules = retained_existing_tables->emplace( | 232 | 657 | id_and_metadata.first, std::vector<SnapshotScheduleId>()).first->second; | 233 | 657 | retaining_schedules.push_back(restoration_.schedules[0].first); | 234 | 657 | for (size_t i = 1; i != restoration_.schedules.size(); ++i0 ) { | 235 | 0 | if (VERIFY_RESULT(MatchTable( | 236 | 0 | restoration_.schedules[i].second, root_table_id_and_metadata.first, | 237 | 0 | root_table_id_and_metadata.second))) { | 238 | 0 | retaining_schedules.push_back(restoration_.schedules[i].first); | 239 | 0 | } | 240 | 0 | } | 241 | 657 | } | 242 | | // Process pg_catalog tables that need to be restored. | 243 | 657 | if (namespaces.insert(id_and_metadata.second.namespace_id()).second) { | 244 | 0 | auto namespace_it = objects->namespaces.find(id_and_metadata.second.namespace_id()); | 245 | 0 | if (namespace_it == objects->namespaces.end()) { | 246 | 0 | return STATUS_FORMAT( | 247 | 0 | NotFound, "Namespace $0 not found for table $1", id_and_metadata.second.namespace_id(), | 248 | 0 | id_and_metadata.first, id_and_metadata.second.name()); | 249 | 0 | } | 250 | 0 | RETURN_NOT_OK(process_entry(namespace_it->first, &namespace_it->second)); | 251 | 0 | } | 252 | 657 | RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second)); | 253 | 657 | tables.insert(id_and_metadata.first); | 254 | 657 | VLOG(2) << "Table to restore: " << id_and_metadata.first << ", " | 255 | 0 | << id_and_metadata.second.ShortDebugString(); | 256 | 657 | } | 257 | 201 | for (auto& id_and_metadata : objects->tablets)9 { | 258 | 201 | auto it = tables.find(id_and_metadata.second.table_id()); | 259 | 201 | if (it == tables.end()) { | 260 | 180 | continue; | 261 | 180 | } | 262 | 21 | RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second)); | 263 | 21 | VLOG(2) << "Tablet to restore: " << id_and_metadata.first << ", " | 264 | 0 | << id_and_metadata.second.ShortDebugString(); | 265 | 21 | } | 266 | 9 | return Status::OK(); | 267 | 9 | } |
|
268 | | |
269 | | Result<const std::pair<const TableId, SysTablesEntryPB>&> |
270 | | RestoreSysCatalogState::Objects::FindRootTable( |
271 | 8.35k | const std::pair<const TableId, SysTablesEntryPB>& id_and_metadata) { |
272 | 8.35k | if (!id_and_metadata.second.has_index_info()) { |
273 | 4.80k | return id_and_metadata; |
274 | 4.80k | } |
275 | | |
276 | 3.55k | auto it = tables.find(id_and_metadata.second.index_info().indexed_table_id()); |
277 | 3.55k | if (it == tables.end()) { |
278 | 0 | return STATUS_FORMAT( |
279 | 0 | NotFound, "Indexed table $0 not found for index $1 ($2)", |
280 | 0 | id_and_metadata.second.index_info().indexed_table_id(), id_and_metadata.first, |
281 | 0 | id_and_metadata.second.name()); |
282 | 0 | } |
283 | 3.55k | const auto& ref = *it; |
284 | 3.55k | return ref; |
285 | 3.55k | } |
286 | | |
287 | | Result<const std::pair<const TableId, SysTablesEntryPB>&> |
288 | | RestoreSysCatalogState::Objects::FindRootTable( |
289 | 0 | const TableId& table_id) { |
290 | 0 | auto it = tables.find(table_id); |
291 | 0 | if (it == tables.end()) { |
292 | 0 | return STATUS_FORMAT(NotFound, "Table $0 not found for index", table_id); |
293 | 0 | } |
294 | 0 | return FindRootTable(*it); |
295 | 0 | } |
296 | | |
297 | 0 | std::string RestoreSysCatalogState::Objects::SizesToString() const { |
298 | 0 | return Format("{ tablets: $0 tables: $1 namespaces: $2 }", |
299 | 0 | tablets.size(), tables.size(), namespaces.size()); |
300 | 0 | } |
301 | | |
302 | | template <class PB> |
303 | | Status RestoreSysCatalogState::IterateSysCatalog( |
304 | | const Schema& schema, const docdb::DocDB& doc_db, HybridTime read_time, |
305 | 54 | std::unordered_map<std::string, PB>* map) { |
306 | 54 | auto iter = std::make_unique<docdb::DocRowwiseIterator>( |
307 | 54 | schema, schema, TransactionOperationContext(), doc_db, CoarseTimePoint::max(), |
308 | 54 | ReadHybridTime::SingleTime(read_time), nullptr); |
309 | 54 | return EnumerateSysCatalog(iter.get(), schema, GetEntryType<PB>::value, [map]( |
310 | 8.89k | const Slice& id, const Slice& data) -> Status { |
311 | 8.89k | auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data)); |
312 | 8.89k | if (!ShouldLoadObject(pb)) { |
313 | 0 | return Status::OK(); |
314 | 0 | } |
315 | 8.89k | if (!map->emplace(id.ToBuffer(), std::move(pb)).second) { |
316 | 0 | return STATUS_FORMAT(IllegalState, "Duplicate $0: $1", |
317 | 0 | SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer()); |
318 | 0 | } |
319 | 8.89k | return Status::OK(); |
320 | 8.89k | }); yb::Status yb::master::RestoreSysCatalogState::IterateSysCatalog<yb::master::SysNamespaceEntryPB>(yb::Schema const&, yb::docdb::DocDB const&, yb::HybridTime, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysNamespaceEntryPB, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, yb::master::SysNamespaceEntryPB> > >*)::'lambda'(yb::Slice const&, yb::Slice const&)::operator()(yb::Slice const&, yb::Slice const&) const Line | Count | Source | 310 | 132 | const Slice& id, const Slice& data) -> Status { | 311 | 132 | auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data)); | 312 | 132 | if (!ShouldLoadObject(pb)) { | 313 | 0 | return Status::OK(); | 314 | 0 | } | 315 | 132 | if (!map->emplace(id.ToBuffer(), std::move(pb)).second) { | 316 | 0 | return STATUS_FORMAT(IllegalState, "Duplicate $0: $1", | 317 | 0 | SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer()); | 318 | 0 | } | 319 | 132 | return Status::OK(); | 320 | 132 | }); |
yb::Status yb::master::RestoreSysCatalogState::IterateSysCatalog<yb::master::SysTablesEntryPB>(yb::Schema const&, yb::docdb::DocDB const&, yb::HybridTime, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTablesEntryPB, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, yb::master::SysTablesEntryPB> > >*)::'lambda'(yb::Slice const&, yb::Slice const&)::operator()(yb::Slice const&, yb::Slice const&) const Line | Count | Source | 310 | 8.35k | const Slice& id, const Slice& data) -> Status { | 311 | 8.35k | auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data)); | 312 | 8.35k | if (!ShouldLoadObject(pb)) { | 313 | 0 | return Status::OK(); | 314 | 0 | } | 315 | 8.35k | if (!map->emplace(id.ToBuffer(), std::move(pb)).second) { | 316 | 0 | return STATUS_FORMAT(IllegalState, "Duplicate $0: $1", | 317 | 0 | SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer()); | 318 | 0 | } | 319 | 8.35k | return Status::OK(); | 320 | 8.35k | }); |
yb::Status yb::master::RestoreSysCatalogState::IterateSysCatalog<yb::master::SysTabletsEntryPB>(yb::Schema const&, yb::docdb::DocDB const&, yb::HybridTime, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTabletsEntryPB, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, yb::master::SysTabletsEntryPB> > >*)::'lambda'(yb::Slice const&, yb::Slice const&)::operator()(yb::Slice const&, yb::Slice const&) const Line | Count | Source | 310 | 402 | const Slice& id, const Slice& data) -> Status { | 311 | 402 | auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data)); | 312 | 402 | if (!ShouldLoadObject(pb)) { | 313 | 0 | return Status::OK(); | 314 | 0 | } | 315 | 402 | if (!map->emplace(id.ToBuffer(), std::move(pb)).second) { | 316 | 0 | return STATUS_FORMAT(IllegalState, "Duplicate $0: $1", | 317 | 0 | SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer()); | 318 | 0 | } | 319 | 402 | return Status::OK(); | 320 | 402 | }); |
|
321 | 54 | } yb::Status yb::master::RestoreSysCatalogState::IterateSysCatalog<yb::master::SysNamespaceEntryPB>(yb::Schema const&, yb::docdb::DocDB const&, yb::HybridTime, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysNamespaceEntryPB, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, yb::master::SysNamespaceEntryPB> > >*) Line | Count | Source | 305 | 18 | std::unordered_map<std::string, PB>* map) { | 306 | 18 | auto iter = std::make_unique<docdb::DocRowwiseIterator>( | 307 | 18 | schema, schema, TransactionOperationContext(), doc_db, CoarseTimePoint::max(), | 308 | 18 | ReadHybridTime::SingleTime(read_time), nullptr); | 309 | 18 | return EnumerateSysCatalog(iter.get(), schema, GetEntryType<PB>::value, [map]( | 310 | 18 | const Slice& id, const Slice& data) -> Status { | 311 | 18 | auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data)); | 312 | 18 | if (!ShouldLoadObject(pb)) { | 313 | 18 | return Status::OK(); | 314 | 18 | } | 315 | 18 | if (!map->emplace(id.ToBuffer(), std::move(pb)).second) { | 316 | 18 | return STATUS_FORMAT(IllegalState, "Duplicate $0: $1", | 317 | 18 | SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer()); | 318 | 18 | } | 319 | 18 | return Status::OK(); | 320 | 18 | }); | 321 | 18 | } |
yb::Status yb::master::RestoreSysCatalogState::IterateSysCatalog<yb::master::SysTablesEntryPB>(yb::Schema const&, yb::docdb::DocDB const&, yb::HybridTime, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTablesEntryPB, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, yb::master::SysTablesEntryPB> > >*) Line | Count | Source | 305 | 18 | std::unordered_map<std::string, PB>* map) { | 306 | 18 | auto iter = std::make_unique<docdb::DocRowwiseIterator>( | 307 | 18 | schema, schema, TransactionOperationContext(), doc_db, CoarseTimePoint::max(), | 308 | 18 | ReadHybridTime::SingleTime(read_time), nullptr); | 309 | 18 | return EnumerateSysCatalog(iter.get(), schema, GetEntryType<PB>::value, [map]( | 310 | 18 | const Slice& id, const Slice& data) -> Status { | 311 | 18 | auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data)); | 312 | 18 | if (!ShouldLoadObject(pb)) { | 313 | 18 | return Status::OK(); | 314 | 18 | } | 315 | 18 | if (!map->emplace(id.ToBuffer(), std::move(pb)).second) { | 316 | 18 | return STATUS_FORMAT(IllegalState, "Duplicate $0: $1", | 317 | 18 | SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer()); | 318 | 18 | } | 319 | 18 | return Status::OK(); | 320 | 18 | }); | 321 | 18 | } |
yb::Status yb::master::RestoreSysCatalogState::IterateSysCatalog<yb::master::SysTabletsEntryPB>(yb::Schema const&, yb::docdb::DocDB const&, yb::HybridTime, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTabletsEntryPB, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, yb::master::SysTabletsEntryPB> > >*) Line | Count | Source | 305 | 18 | std::unordered_map<std::string, PB>* map) { | 306 | 18 | auto iter = std::make_unique<docdb::DocRowwiseIterator>( | 307 | 18 | schema, schema, TransactionOperationContext(), doc_db, CoarseTimePoint::max(), | 308 | 18 | ReadHybridTime::SingleTime(read_time), nullptr); | 309 | 18 | return EnumerateSysCatalog(iter.get(), schema, GetEntryType<PB>::value, [map]( | 310 | 18 | const Slice& id, const Slice& data) -> Status { | 311 | 18 | auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data)); | 312 | 18 | if (!ShouldLoadObject(pb)) { | 313 | 18 | return Status::OK(); | 314 | 18 | } | 315 | 18 | if (!map->emplace(id.ToBuffer(), std::move(pb)).second) { | 316 | 18 | return STATUS_FORMAT(IllegalState, "Duplicate $0: $1", | 317 | 18 | SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer()); | 318 | 18 | } | 319 | 18 | return Status::OK(); | 320 | 18 | }); | 321 | 18 | } |
|
322 | | |
323 | | Status RestoreSysCatalogState::LoadObjects( |
324 | 18 | const Schema& schema, const docdb::DocDB& doc_db, HybridTime read_time, Objects* objects) { |
325 | 18 | RETURN_NOT_OK(IterateSysCatalog(schema, doc_db, read_time, &objects->namespaces)); |
326 | 18 | RETURN_NOT_OK(IterateSysCatalog(schema, doc_db, read_time, &objects->tables)); |
327 | 18 | RETURN_NOT_OK(IterateSysCatalog(schema, doc_db, read_time, &objects->tablets)); |
328 | 18 | return Status::OK(); |
329 | 18 | } |
330 | | |
331 | | Status RestoreSysCatalogState::LoadRestoringObjects( |
332 | 9 | const Schema& schema, const docdb::DocDB& doc_db) { |
333 | 9 | return LoadObjects(schema, doc_db, restoration_.restore_at, &restoring_objects_); |
334 | 9 | } |
335 | | |
336 | | Status RestoreSysCatalogState::LoadExistingObjects( |
337 | 9 | const Schema& schema, const docdb::DocDB& doc_db) { |
338 | 9 | return LoadObjects(schema, doc_db, HybridTime::kMax, &existing_objects_); |
339 | 9 | } |
340 | | |
341 | | Status RestoreSysCatalogState::CheckExistingEntry( |
342 | 21 | const std::string& id, const SysTabletsEntryPB& pb) { |
343 | 21 | VLOG_WITH_FUNC0 (4) << "Tablet: " << id << ", " << pb.ShortDebugString()0 ; |
344 | 21 | if (restoring_objects_.tablets.count(id)) { |
345 | 21 | return Status::OK(); |
346 | 21 | } |
347 | 0 | LOG(INFO) << "PITR: Will remove tablet: " << id; |
348 | 0 | restoration_.non_system_obsolete_tablets.emplace_back(id, pb); |
349 | 0 | return Status::OK(); |
350 | 21 | } |
351 | | |
352 | | Status RestoreSysCatalogState::CheckExistingEntry( |
353 | 657 | const std::string& id, const SysTablesEntryPB& pb) { |
354 | 657 | if (pb.schema().table_properties().is_ysql_catalog_table()) { |
355 | 648 | if (restoration_.system_tables_to_restore.count(id) == 0) { |
356 | 0 | return STATUS_FORMAT( |
357 | 0 | NotFound, |
358 | 0 | "PG Catalog table $0 not found in the present set of tables" |
359 | 0 | " but found in the objects to restore.", |
360 | 0 | pb.name()); |
361 | 0 | } |
362 | 648 | return Status::OK(); |
363 | 648 | } |
364 | | |
365 | 9 | VLOG_WITH_FUNC0 (4) << "Table: " << id << ", " << pb.ShortDebugString()0 ; |
366 | 9 | if (restoring_objects_.tables.count(id)) { |
367 | 9 | return Status::OK(); |
368 | 9 | } |
369 | 0 | LOG(INFO) << "PITR: Will remove table: " << id; |
370 | 0 | restoration_.non_system_obsolete_tables.emplace_back(id, pb); |
371 | |
|
372 | 0 | return Status::OK(); |
373 | 9 | } |
374 | | |
375 | | // We don't delete newly created namespaces, because our filters namespace based. |
376 | | Status RestoreSysCatalogState::CheckExistingEntry( |
377 | 9 | const std::string& id, const SysNamespaceEntryPB& pb) { |
378 | 9 | return Status::OK(); |
379 | 9 | } |
380 | | |
381 | | Status RestoreSysCatalogState::PrepareWriteBatch( |
382 | 9 | const Schema& schema, docdb::DocWriteBatch* write_batch) { |
383 | 51 | for (const auto& entry : entries_.entries()) { |
384 | 51 | QLWriteRequestPB write_request; |
385 | 51 | RETURN_NOT_OK(FillSysCatalogWriteRequest( |
386 | 51 | entry.type(), entry.id(), entry.data(), QLWriteRequestPB::QL_STMT_INSERT, schema, |
387 | 51 | &write_request)); |
388 | 51 | RETURN_NOT_OK(ApplyWriteRequest(schema, write_request, write_batch)); |
389 | 51 | } |
390 | | |
391 | 9 | for (const auto& tablet_id_and_pb : restoration_.non_system_obsolete_tablets) { |
392 | 0 | RETURN_NOT_OK(PrepareTabletCleanup( |
393 | 0 | tablet_id_and_pb.first, tablet_id_and_pb.second, schema, write_batch)); |
394 | 0 | } |
395 | 9 | for (const auto& table_id_and_pb : restoration_.non_system_obsolete_tables) { |
396 | 0 | RETURN_NOT_OK(PrepareTableCleanup( |
397 | 0 | table_id_and_pb.first, table_id_and_pb.second, schema, write_batch)); |
398 | 0 | } |
399 | | |
400 | 9 | return Status::OK(); |
401 | 9 | } |
402 | | |
403 | | Status RestoreSysCatalogState::PrepareTabletCleanup( |
404 | | const TabletId& id, SysTabletsEntryPB pb, const Schema& schema, |
405 | 0 | docdb::DocWriteBatch* write_batch) { |
406 | 0 | VLOG_WITH_FUNC(4) << id; |
407 | |
|
408 | 0 | QLWriteRequestPB write_request; |
409 | |
|
410 | 0 | auto it = retained_existing_tables_.find(pb.table_id()); |
411 | 0 | if (it != retained_existing_tables_.end()) { |
412 | 0 | pb.set_hide_hybrid_time(restoration_.write_time.ToUint64()); |
413 | 0 | auto& out_schedules = *pb.mutable_retained_by_snapshot_schedules(); |
414 | 0 | for (const auto& schedule_id : it->second) { |
415 | 0 | out_schedules.Add()->assign(schedule_id.AsSlice().cdata(), schedule_id.size()); |
416 | 0 | } |
417 | 0 | } |
418 | 0 | RETURN_NOT_OK(FillSysCatalogWriteRequest( |
419 | 0 | SysRowEntryType::TABLET, id, pb.SerializeAsString(), |
420 | 0 | QLWriteRequestPB::QL_STMT_UPDATE, schema, &write_request)); |
421 | 0 | return ApplyWriteRequest(schema, write_request, write_batch); |
422 | 0 | } |
423 | | |
424 | | Status RestoreSysCatalogState::PrepareTableCleanup( |
425 | | const TableId& id, SysTablesEntryPB pb, const Schema& schema, |
426 | 0 | docdb::DocWriteBatch* write_batch) { |
427 | 0 | VLOG_WITH_FUNC(4) << id; |
428 | |
|
429 | 0 | QLWriteRequestPB write_request; |
430 | 0 | pb.set_hide_state(SysTablesEntryPB::HIDING); |
431 | 0 | pb.set_version(pb.version() + 1); |
432 | 0 | RETURN_NOT_OK(FillSysCatalogWriteRequest( |
433 | 0 | SysRowEntryType::TABLE, id, pb.SerializeAsString(), |
434 | 0 | QLWriteRequestPB::QL_STMT_UPDATE, schema, &write_request)); |
435 | 0 | return ApplyWriteRequest(schema, write_request, write_batch); |
436 | 0 | } |
437 | | |
438 | | Result<bool> RestoreSysCatalogState::TEST_MatchTable( |
439 | 18 | const TableId& id, const SysTablesEntryPB& table) { |
440 | 18 | return MatchTable(restoration_.schedules[0].second, id, table); |
441 | 18 | } |
442 | | |
443 | | void RestoreSysCatalogState::WriteToRocksDB( |
444 | | docdb::DocWriteBatch* write_batch, const HybridTime& write_time, const OpId& op_id, |
445 | 9 | tablet::Tablet* tablet) { |
446 | 9 | docdb::KeyValueWriteBatchPB kv_write_batch; |
447 | 9 | write_batch->MoveToWriteBatchPB(&kv_write_batch); |
448 | | |
449 | 9 | docdb::NonTransactionalWriter writer(kv_write_batch, write_time); |
450 | 9 | rocksdb::WriteBatch rocksdb_write_batch; |
451 | 9 | rocksdb_write_batch.SetDirectWriter(&writer); |
452 | 9 | docdb::ConsensusFrontiers frontiers; |
453 | 9 | set_op_id(op_id, &frontiers); |
454 | 9 | set_hybrid_time(write_time, &frontiers); |
455 | | |
456 | 9 | tablet->WriteToRocksDB( |
457 | 9 | &frontiers, &rocksdb_write_batch, docdb::StorageDbType::kRegular); |
458 | 9 | } |
459 | | |
460 | | class FetchState { |
461 | | public: |
462 | | explicit FetchState(const docdb::DocDB& doc_db, const ReadHybridTime& read_time) |
463 | | : iterator_(CreateIntentAwareIterator( |
464 | | doc_db, |
465 | | docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER, |
466 | | boost::none, |
467 | | rocksdb::kDefaultQueryId, |
468 | | TransactionOperationContext(), |
469 | | CoarseTimePoint::max(), |
470 | 12 | read_time)) { |
471 | 12 | } |
472 | | |
473 | 1.30k | CHECKED_STATUS SetPrefix(const Slice& prefix) { |
474 | 1.30k | if (prefix_.empty()) { |
475 | 12 | iterator_->Seek(prefix); |
476 | 1.29k | } else { |
477 | 1.29k | iterator_->SeekForward(prefix); |
478 | 1.29k | } |
479 | 1.30k | prefix_ = prefix; |
480 | 1.30k | finished_ = false; |
481 | 1.30k | last_deleted_key_bytes_.clear(); |
482 | 1.30k | last_deleted_key_write_time_ = DocHybridTime::kInvalid; |
483 | 1.30k | RETURN_NOT_OK(Update()); |
484 | 1.30k | return NextNonDeletedEntry(); |
485 | 1.30k | } |
486 | | |
487 | 7.51M | bool finished() const { |
488 | 7.51M | return finished_; |
489 | 7.51M | } |
490 | | |
491 | 18.9M | Slice key() const { |
492 | 18.9M | return key_.key; |
493 | 18.9M | } |
494 | | |
495 | 11.2M | Slice value() const { |
496 | 11.2M | return iterator_->value(); |
497 | 11.2M | } |
498 | | |
499 | 408 | docdb::FetchKeyResult FullKey() const { |
500 | 408 | return key_; |
501 | 408 | } |
502 | | |
503 | 3.75M | CHECKED_STATUS NextEntry() { |
504 | 3.75M | iterator_->SeekPastSubKey(key_.key); |
505 | 3.75M | return Update(); |
506 | 3.75M | } |
507 | | |
508 | 3.75M | CHECKED_STATUS Next() { |
509 | 3.75M | RETURN_NOT_OK(NextEntry()); |
510 | 3.75M | return NextNonDeletedEntry(); |
511 | 3.75M | } |
512 | | |
513 | | // Returns true if the entry corresponds to a deleted row |
514 | | // in rocksdb. |
515 | 7.51M | Result<bool> IsDeletedRowEntry() { |
516 | 7.51M | bool is_tombstoned = false; |
517 | 7.51M | is_tombstoned = VERIFY_RESULT(docdb::Value::IsTombstoned(value())); |
518 | | |
519 | | // Because Postgres doesn't have a concept of frozen types, kGroupEnd will only demarcate the |
520 | | // end of hashed and range components. It is reasonable to assume then that if the last byte |
521 | | // is kGroupEnd then it does not have any subkeys. |
522 | 0 | bool no_subkey = |
523 | 7.51M | key()[key().size() - 1] == docdb::ValueTypeAsChar::kGroupEnd; |
524 | | |
525 | 7.51M | return no_subkey && is_tombstoned162 ; |
526 | 7.51M | } |
527 | | |
528 | | // Returns true if it has been deleted since the time it was inserted. |
529 | 3.75M | bool IsDeletedSinceInsertion() { |
530 | 3.75M | if (last_deleted_key_bytes_.size() == 0) { |
531 | 3.59M | return false; |
532 | 3.59M | } |
533 | 157k | return key().starts_with(last_deleted_key_bytes_.AsSlice()) && |
534 | 157k | FullKey().write_time < last_deleted_key_write_time_408 ; |
535 | 3.75M | } |
536 | | |
537 | | private: |
538 | 3.75M | CHECKED_STATUS Update() { |
539 | 3.75M | if (!iterator_->valid()) { |
540 | 24 | finished_ = true; |
541 | 24 | return Status::OK(); |
542 | 24 | } |
543 | 3.75M | key_ = VERIFY_RESULT(iterator_->FetchKey()); |
544 | 3.75M | if (VERIFY_RESULT(IsDeletedRowEntry())) { |
545 | 105 | last_deleted_key_write_time_ = key_.write_time; |
546 | 105 | last_deleted_key_bytes_ = key_.key; |
547 | 105 | } |
548 | 3.75M | if (!key_.key.starts_with(prefix_)) { |
549 | 1.28k | finished_ = true; |
550 | 1.28k | return Status::OK(); |
551 | 1.28k | } |
552 | | |
553 | 3.75M | return Status::OK(); |
554 | 3.75M | } |
555 | | |
556 | 3.75M | CHECKED_STATUS NextNonDeletedEntry() { |
557 | 3.75M | while (!finished()) { |
558 | 3.75M | if (VERIFY_RESULT(IsDeletedRowEntry()) || |
559 | 3.75M | IsDeletedSinceInsertion()3.75M ) { |
560 | 459 | RETURN_NOT_OK(NextEntry()); |
561 | 459 | continue; |
562 | 459 | } |
563 | 3.75M | break; |
564 | 3.75M | } |
565 | 3.75M | return Status::OK(); |
566 | 3.75M | } |
567 | | |
568 | | std::unique_ptr<docdb::IntentAwareIterator> iterator_; |
569 | | Slice prefix_; |
570 | | docdb::FetchKeyResult key_; |
571 | | KeyBuffer last_deleted_key_bytes_; |
572 | | DocHybridTime last_deleted_key_write_time_; |
573 | | bool finished_ = false; |
574 | | }; |
575 | | |
576 | 474 | void AddKeyValue(const Slice& key, const Slice& value, docdb::DocWriteBatch* write_batch) { |
577 | 474 | auto& pair = write_batch->AddRaw(); |
578 | 474 | pair.first.assign(key.cdata(), key.size()); |
579 | 474 | pair.second.assign(value.cdata(), value.size()); |
580 | 474 | } |
581 | | |
582 | | struct PgCatalogTableData { |
583 | | std::array<uint8_t, kUuidSize + 1> prefix; |
584 | | const TableName* name; |
585 | | |
586 | 654 | CHECKED_STATUS SetTableId(const TableId& table_id) { |
587 | 654 | Uuid cotable_id; |
588 | 654 | RETURN_NOT_OK(cotable_id.FromHexString(table_id)); |
589 | 654 | prefix[0] = docdb::ValueTypeAsChar::kTableId; |
590 | 654 | cotable_id.EncodeToComparable(&prefix[1]); |
591 | 654 | return Status::OK(); |
592 | 654 | } |
593 | | }; |
594 | | |
595 | | Status RestoreSysCatalogState::ProcessPgCatalogRestores( |
596 | | const Schema& pg_yb_catalog_version_schema, |
597 | | const docdb::DocDB& restoring_db, |
598 | | const docdb::DocDB& existing_db, |
599 | 6 | docdb::DocWriteBatch* write_batch) { |
600 | 6 | if (restoration_.system_tables_to_restore.empty()) { |
601 | 0 | return Status::OK(); |
602 | 0 | } |
603 | | |
604 | 6 | FetchState restoring_state(restoring_db, ReadHybridTime::SingleTime(restoration_.restore_at)); |
605 | 6 | FetchState existing_state(existing_db, ReadHybridTime::Max()); |
606 | 6 | char tombstone_char = docdb::ValueTypeAsChar::kTombstone; |
607 | 6 | Slice tombstone(&tombstone_char, 1); |
608 | | |
609 | 6 | std::vector<PgCatalogTableData> tables(restoration_.system_tables_to_restore.size() + 1); |
610 | 6 | size_t idx = 0; |
611 | 6 | RETURN_NOT_OK(tables[0].SetTableId(kPgYbCatalogVersionTableId)); |
612 | 6 | tables[0].name = nullptr; |
613 | 6 | ++idx; |
614 | 648 | for (auto& id_and_name : restoration_.system_tables_to_restore) { |
615 | 648 | auto& table = tables[idx]; |
616 | 648 | RETURN_NOT_OK(table.SetTableId(id_and_name.first)); |
617 | 648 | table.name = &id_and_name.second; |
618 | 648 | ++idx; |
619 | 648 | } |
620 | | |
621 | | |
622 | 5.51k | std::sort(tables.begin(), tables.end(), [](const auto& lhs, const auto& rhs) 6 { |
623 | 5.51k | return Slice(lhs.prefix).compare(Slice(rhs.prefix)) < 0; |
624 | 5.51k | }); |
625 | | |
626 | 654 | for (auto& table : tables) { |
627 | 654 | size_t num_updates = 0; |
628 | 654 | size_t num_inserts = 0; |
629 | 654 | size_t num_deletes = 0; |
630 | 654 | Slice prefix(table.prefix); |
631 | | |
632 | 654 | RETURN_NOT_OK(restoring_state.SetPrefix(prefix)); |
633 | 654 | RETURN_NOT_OK(existing_state.SetPrefix(prefix)); |
634 | | |
635 | 1.87M | while (654 !restoring_state.finished() && !existing_state.finished()1.87M ) { |
636 | 1.87M | auto compare_result = restoring_state.key().compare(existing_state.key()); |
637 | 1.87M | if (compare_result == 0) { |
638 | 1.87M | if (table.name != nullptr) { |
639 | 1.87M | if (restoring_state.value().compare(existing_state.value())) { |
640 | 72 | ++num_updates; |
641 | 72 | AddKeyValue(restoring_state.key(), restoring_state.value(), write_batch); |
642 | 72 | } |
643 | 1.87M | } else { |
644 | 18 | docdb::SubDocKey sub_doc_key; |
645 | 18 | RETURN_NOT_OK(sub_doc_key.FullyDecodeFrom( |
646 | 18 | restoring_state.key(), docdb::HybridTimeRequired::kFalse)); |
647 | 18 | SCHECK_EQ(sub_doc_key.subkeys().size(), 1U, Corruption, "Wrong number of subdoc keys"); |
648 | 18 | if (sub_doc_key.subkeys()[0].value_type() == docdb::ValueType::kColumnId) { |
649 | 12 | auto column_id = sub_doc_key.subkeys()[0].GetColumnId(); |
650 | 12 | const ColumnSchema& column = VERIFY_RESULT(pg_yb_catalog_version_schema.column_by_id( |
651 | 12 | column_id)); |
652 | 12 | if (column.name() == "current_version") { |
653 | 6 | docdb::Value value; |
654 | 6 | RETURN_NOT_OK(value.Decode(existing_state.value())); |
655 | 6 | docdb::DocPath path(sub_doc_key.doc_key().Encode(), sub_doc_key.subkeys()); |
656 | 6 | QLValuePB value_pb; |
657 | 6 | value_pb.set_int64_value(value.primitive_value().GetInt64() + 1); |
658 | 6 | RETURN_NOT_OK(write_batch->SetPrimitive( |
659 | 6 | path, docdb::ValueRef(value_pb, SortingType::kNotSpecified))); |
660 | 6 | } |
661 | 12 | } |
662 | 18 | } |
663 | 1.87M | RETURN_NOT_OK(restoring_state.Next()); |
664 | 1.87M | RETURN_NOT_OK(existing_state.Next()); |
665 | 1.87M | } else if (75 compare_result < 075 ) { |
666 | 75 | ++num_inserts; |
667 | 75 | AddKeyValue(restoring_state.key(), restoring_state.value(), write_batch); |
668 | 75 | RETURN_NOT_OK(restoring_state.Next()); |
669 | 75 | } else { |
670 | 0 | ++num_deletes; |
671 | 0 | AddKeyValue(existing_state.key(), tombstone, write_batch); |
672 | 0 | RETURN_NOT_OK(existing_state.Next()); |
673 | 0 | } |
674 | 1.87M | } |
675 | | |
676 | 981 | while (654 !restoring_state.finished()) { |
677 | 327 | ++num_inserts; |
678 | 327 | AddKeyValue(restoring_state.key(), restoring_state.value(), write_batch); |
679 | 327 | RETURN_NOT_OK(restoring_state.Next()); |
680 | 327 | } |
681 | | |
682 | 654 | while (!existing_state.finished()) { |
683 | 0 | ++num_deletes; |
684 | 0 | AddKeyValue(existing_state.key(), tombstone, write_batch); |
685 | 0 | RETURN_NOT_OK(existing_state.Next()); |
686 | 0 | } |
687 | | |
688 | 654 | if (num_updates + num_inserts + num_deletes != 0 || VLOG_IS_ON(3)) { |
689 | 51 | LOG(INFO) << "PITR: Pg system table: " << AsString(table.name) << ", updates: " << num_updates |
690 | 51 | << ", inserts: " << num_inserts << ", deletes: " << num_deletes; |
691 | 51 | } |
692 | 654 | } |
693 | | |
694 | 6 | return Status::OK(); |
695 | 6 | } |
696 | | |
697 | | } // namespace master |
698 | | } // namespace yb |