YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/master/restore_sys_catalog_state.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/restore_sys_catalog_state.h"
15
16
#include "yb/common/entity_ids.h"
17
#include "yb/common/hybrid_time.h"
18
#include "yb/common/index.h"
19
20
#include "yb/common/pgsql_protocol.pb.h"
21
#include "yb/common/ql_expr.h"
22
#include "yb/docdb/consensus_frontier.h"
23
#include "yb/docdb/cql_operation.h"
24
#include "yb/docdb/docdb_rocksdb_util.h"
25
#include "yb/docdb/doc_rowwise_iterator.h"
26
#include "yb/docdb/doc_write_batch.h"
27
#include "yb/docdb/docdb.h"
28
#include "yb/docdb/pgsql_operation.h"
29
#include "yb/docdb/rocksdb_writer.h"
30
31
#include "yb/master/catalog_loaders.h"
32
#include "yb/master/master_backup.pb.h"
33
#include "yb/master/master_snapshot_coordinator.h"
34
#include "yb/master/master_util.h"
35
#include "yb/master/sys_catalog.h"
36
#include "yb/master/sys_catalog_writer.h"
37
38
#include "yb/rocksdb/write_batch.h"
39
#include "yb/tablet/tablet.h"
40
#include "yb/util/format.h"
41
#include "yb/util/logging.h"
42
#include "yb/util/pb_util.h"
43
#include "yb/util/status_format.h"
44
45
using namespace std::placeholders;
46
47
namespace yb {
48
namespace master {
49
50
namespace {
51
52
CHECKED_STATUS ApplyWriteRequest(
53
    const Schema& schema, const QLWriteRequestPB& write_request,
54
0
    docdb::DocWriteBatch* write_batch) {
55
0
  std::shared_ptr<const Schema> schema_ptr(&schema, [](const Schema* schema){});
56
0
  docdb::DocOperationApplyData apply_data{.doc_write_batch = write_batch};
57
0
  IndexMap index_map;
58
0
  docdb::QLWriteOperation operation(
59
0
      write_request, schema_ptr, index_map, nullptr, TransactionOperationContext());
60
0
  QLResponsePB response;
61
0
  RETURN_NOT_OK(operation.Init(&response));
62
0
  return operation.Apply(apply_data);
63
0
}
64
65
0
bool TableDeleted(const SysTablesEntryPB& table) {
66
0
  return table.state() == SysTablesEntryPB::DELETED ||
67
0
         table.state() == SysTablesEntryPB::DELETING ||
68
0
         table.hide_state() == SysTablesEntryPB::HIDING ||
69
0
         table.hide_state() == SysTablesEntryPB::HIDDEN;
70
0
}
71
72
Result<bool> MatchNamespace(
73
    const SnapshotScheduleFilterPB& filter, const NamespaceId& id,
74
0
    const SysNamespaceEntryPB& ns) {
75
0
  VLOG(1) << __func__ << "(" << id << ", " << ns.ShortDebugString() << ")";
76
0
  for (const auto& table_identifier : filter.tables().tables()) {
77
0
    if (table_identifier.has_namespace_() &&
78
0
        VERIFY_RESULT(master::NamespaceMatchesIdentifier(
79
0
            id, ns.database_type(), ns.name(), table_identifier.namespace_()))) {
80
0
      return true;
81
0
    }
82
0
  }
83
0
  return false;
84
0
}
85
86
Result<bool> MatchTable(
87
    const SnapshotScheduleFilterPB& filter, const TableId& id,
88
18
    const SysTablesEntryPB& table) {
89
0
  VLOG(1) << __func__ << "(" << id << ", " << table.ShortDebugString() << ")";
90
18
  for (const auto& table_identifier : filter.tables().tables()) {
91
18
    if (VERIFY_RESULT(master::TableMatchesIdentifier(id, table, table_identifier))) {
92
4
      return true;
93
4
    }
94
18
  }
95
14
  return false;
96
18
}
97
98
template <class PB>
99
struct GetEntryType;
100
101
template<> struct GetEntryType<SysNamespaceEntryPB>
102
    : public std::integral_constant<SysRowEntryType, SysRowEntryType::NAMESPACE> {};
103
104
template<> struct GetEntryType<SysTablesEntryPB>
105
    : public std::integral_constant<SysRowEntryType, SysRowEntryType::TABLE> {};
106
107
template<> struct GetEntryType<SysTabletsEntryPB>
108
    : public std::integral_constant<SysRowEntryType, SysRowEntryType::TABLET> {};
109
110
} // namespace
111
112
RestoreSysCatalogState::RestoreSysCatalogState(SnapshotScheduleRestoration* restoration)
113
1
    : restoration_(*restoration) {}
114
115
Result<bool> RestoreSysCatalogState::PatchRestoringEntry(
116
0
    const std::string& id, SysNamespaceEntryPB* pb) {
117
0
  return true;
118
0
}
119
120
Result<bool> RestoreSysCatalogState::PatchRestoringEntry(
121
0
    const std::string& id, SysTablesEntryPB* pb) {
122
0
  if (pb->schema().table_properties().is_ysql_catalog_table()) {
123
0
    LOG(INFO) << "PITR: Adding " << pb->name() << " for restoring. ID: " << id;
124
0
    restoration_.system_tables_to_restore.emplace(id, pb->name());
125
126
0
    return false;
127
0
  }
128
129
0
  auto it = existing_objects_.tables.find(id);
130
0
  if (it == existing_objects_.tables.end()) {
131
0
    return STATUS_FORMAT(NotFound, "Not found restoring table: $0", id);
132
0
  }
133
134
0
  if (pb->version() != it->second.version()) {
135
    // Force schema update after restoration, if schema has changes.
136
0
    pb->set_version(it->second.version() + 1);
137
0
  }
138
139
0
  return true;
140
0
}
141
142
Result<bool> RestoreSysCatalogState::PatchRestoringEntry(
143
0
    const std::string& id, SysTabletsEntryPB* pb) {
144
0
  return true;
145
0
}
146
147
template <class PB>
148
Status RestoreSysCatalogState::AddRestoringEntry(
149
0
    const std::string& id, PB* pb, faststring* buffer) {
150
0
  auto type = GetEntryType<PB>::value;
151
0
  VLOG_WITH_FUNC(1) << SysRowEntryType_Name(type) << ": " << id << ", " << pb->ShortDebugString();
152
153
0
  if (!VERIFY_RESULT(PatchRestoringEntry(id, pb))) {
154
0
    return Status::OK();
155
0
  }
156
0
  auto& entry = *entries_.mutable_entries()->Add();
157
0
  entry.set_type(type);
158
0
  entry.set_id(id);
159
0
  pb_util::SerializeToString(*pb, buffer);
160
0
  entry.set_data(buffer->data(), buffer->size());
161
0
  restoration_.non_system_objects_to_restore.emplace(id, type);
162
163
0
  return Status::OK();
164
0
}
Unexecuted instantiation: _ZN2yb6master22RestoreSysCatalogState17AddRestoringEntryINS0_19SysNamespaceEntryPBEEENS_6StatusERKNSt3__112basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEPT_PNS_10faststringE
Unexecuted instantiation: _ZN2yb6master22RestoreSysCatalogState17AddRestoringEntryINS0_16SysTablesEntryPBEEENS_6StatusERKNSt3__112basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEPT_PNS_10faststringE
Unexecuted instantiation: _ZN2yb6master22RestoreSysCatalogState17AddRestoringEntryINS0_17SysTabletsEntryPBEEENS_6StatusERKNSt3__112basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEPT_PNS_10faststringE
165
166
0
Status RestoreSysCatalogState::Process() {
167
0
  VLOG_WITH_FUNC(1) << "Restoring: " << restoring_objects_.SizesToString() << ", existing: "
168
0
                    << existing_objects_.SizesToString();
169
170
0
  VLOG_WITH_FUNC(2) << "Check restoring objects";
171
0
  VLOG_WITH_FUNC(4) << "Restoring namespaces: " << AsString(restoring_objects_.namespaces);
172
0
  faststring buffer;
173
0
  RETURN_NOT_OK_PREPEND(DetermineEntries(
174
0
      &restoring_objects_, nullptr,
175
0
      [this, &buffer](const auto& id, auto* pb) {
176
0
        return AddRestoringEntry(id, pb, &buffer);
177
0
  }), "Determine restoring entries failed");
178
179
0
  VLOG_WITH_FUNC(2) << "Check existing objects";
180
0
  RETURN_NOT_OK_PREPEND(DetermineEntries(
181
0
      &existing_objects_, &retained_existing_tables_,
182
0
      [this](const auto& id, auto* pb) {
183
0
        return CheckExistingEntry(id, *pb);
184
0
  }), "Determine obsolete entries failed");
185
186
  // Sort generated vectors, so binary search could be used to check whether object is obsolete.
187
0
  auto compare_by_first = [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; };
Unexecuted instantiation: restore_sys_catalog_state.cc:_ZZN2yb6master22RestoreSysCatalogState7ProcessEvENK3$_2clINSt3__14pairINS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEENS0_17SysTabletsEntryPBEEESD_EEDaRKT_RKT0_
Unexecuted instantiation: restore_sys_catalog_state.cc:_ZZN2yb6master22RestoreSysCatalogState7ProcessEvENK3$_2clINSt3__14pairINS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEENS0_16SysTablesEntryPBEEESD_EEDaRKT_RKT0_
188
0
  std::sort(restoration_.non_system_obsolete_tablets.begin(),
189
0
            restoration_.non_system_obsolete_tablets.end(),
190
0
            compare_by_first);
191
0
  std::sort(restoration_.non_system_obsolete_tables.begin(),
192
0
            restoration_.non_system_obsolete_tables.end(),
193
0
            compare_by_first);
194
195
0
  return Status::OK();
196
0
}
197
198
template <class ProcessEntry>
199
Status RestoreSysCatalogState::DetermineEntries(
200
    Objects* objects, RetainedExistingTables* retained_existing_tables,
201
0
    const ProcessEntry& process_entry) {
202
0
  std::unordered_set<NamespaceId> namespaces;
203
0
  std::unordered_set<TableId> tables;
204
205
0
  const auto& filter = restoration_.schedules[0].second;
206
207
0
  for (auto& id_and_metadata : objects->namespaces) {
208
0
    if (!VERIFY_RESULT(MatchNamespace(filter, id_and_metadata.first, id_and_metadata.second))) {
209
0
      continue;
210
0
    }
211
0
    if (!namespaces.insert(id_and_metadata.first).second) {
212
0
      continue;
213
0
    }
214
0
    RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second));
215
0
  }
216
217
0
  for (auto& id_and_metadata : objects->tables) {
218
0
    VLOG_WITH_FUNC(3) << "Checking: " << id_and_metadata.first << ", "
219
0
                      << id_and_metadata.second.ShortDebugString();
220
221
0
    if (TableDeleted(id_and_metadata.second)) {
222
0
      continue;
223
0
    }
224
0
    auto& root_table_id_and_metadata = VERIFY_RESULT(objects->FindRootTable(id_and_metadata)).get();
225
0
    auto match = VERIFY_RESULT(MatchTable(
226
0
        filter, root_table_id_and_metadata.first, root_table_id_and_metadata.second));
227
0
    if (!match) {
228
0
      continue;
229
0
    }
230
0
    if (retained_existing_tables) {
231
0
      auto& retaining_schedules = retained_existing_tables->emplace(
232
0
          id_and_metadata.first, std::vector<SnapshotScheduleId>()).first->second;
233
0
      retaining_schedules.push_back(restoration_.schedules[0].first);
234
0
      for (size_t i = 1; i != restoration_.schedules.size(); ++i) {
235
0
        if (VERIFY_RESULT(MatchTable(
236
0
            restoration_.schedules[i].second, root_table_id_and_metadata.first,
237
0
            root_table_id_and_metadata.second))) {
238
0
          retaining_schedules.push_back(restoration_.schedules[i].first);
239
0
        }
240
0
      }
241
0
    }
242
    // Process pg_catalog tables that need to be restored.
243
0
    if (namespaces.insert(id_and_metadata.second.namespace_id()).second) {
244
0
      auto namespace_it = objects->namespaces.find(id_and_metadata.second.namespace_id());
245
0
      if (namespace_it == objects->namespaces.end()) {
246
0
        return STATUS_FORMAT(
247
0
            NotFound, "Namespace $0 not found for table $1", id_and_metadata.second.namespace_id(),
248
0
            id_and_metadata.first, id_and_metadata.second.name());
249
0
      }
250
0
      RETURN_NOT_OK(process_entry(namespace_it->first, &namespace_it->second));
251
0
    }
252
0
    RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second));
253
0
    tables.insert(id_and_metadata.first);
254
0
    VLOG(2) << "Table to restore: " << id_and_metadata.first << ", "
255
0
            << id_and_metadata.second.ShortDebugString();
256
0
  }
257
0
  for (auto& id_and_metadata : objects->tablets) {
258
0
    auto it = tables.find(id_and_metadata.second.table_id());
259
0
    if (it == tables.end()) {
260
0
      continue;
261
0
    }
262
0
    RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second));
263
0
    VLOG(2) << "Tablet to restore: " << id_and_metadata.first << ", "
264
0
            << id_and_metadata.second.ShortDebugString();
265
0
  }
266
0
  return Status::OK();
267
0
}
Unexecuted instantiation: restore_sys_catalog_state.cc:_ZN2yb6master22RestoreSysCatalogState16DetermineEntriesIZNS1_7ProcessEvE3$_0EENS_6StatusEPNS1_7ObjectsEPNSt3__113unordered_mapINS7_12basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEENS7_6vectorINS_17StronglyTypedUuidINS_22SnapshotScheduleId_TagEEENSC_ISI_EEEENS7_4hashISE_EENS7_8equal_toISE_EENSC_INS7_4pairIKSE_SK_EEEEEERKT_
Unexecuted instantiation: restore_sys_catalog_state.cc:_ZN2yb6master22RestoreSysCatalogState16DetermineEntriesIZNS1_7ProcessEvE3$_1EENS_6StatusEPNS1_7ObjectsEPNSt3__113unordered_mapINS7_12basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEENS7_6vectorINS_17StronglyTypedUuidINS_22SnapshotScheduleId_TagEEENSC_ISI_EEEENS7_4hashISE_EENS7_8equal_toISE_EENSC_INS7_4pairIKSE_SK_EEEEEERKT_
268
269
Result<const std::pair<const TableId, SysTablesEntryPB>&>
270
    RestoreSysCatalogState::Objects::FindRootTable(
271
0
    const std::pair<const TableId, SysTablesEntryPB>& id_and_metadata) {
272
0
  if (!id_and_metadata.second.has_index_info()) {
273
0
    return id_and_metadata;
274
0
  }
275
276
0
  auto it = tables.find(id_and_metadata.second.index_info().indexed_table_id());
277
0
  if (it == tables.end()) {
278
0
    return STATUS_FORMAT(
279
0
        NotFound, "Indexed table $0 not found for index $1 ($2)",
280
0
        id_and_metadata.second.index_info().indexed_table_id(), id_and_metadata.first,
281
0
        id_and_metadata.second.name());
282
0
  }
283
0
  const auto& ref = *it;
284
0
  return ref;
285
0
}
286
287
Result<const std::pair<const TableId, SysTablesEntryPB>&>
288
    RestoreSysCatalogState::Objects::FindRootTable(
289
0
    const TableId& table_id) {
290
0
  auto it = tables.find(table_id);
291
0
  if (it == tables.end()) {
292
0
    return STATUS_FORMAT(NotFound, "Table $0 not found for index", table_id);
293
0
  }
294
0
  return FindRootTable(*it);
295
0
}
296
297
0
std::string RestoreSysCatalogState::Objects::SizesToString() const {
298
0
  return Format("{ tablets: $0 tables: $1 namespaces: $2 }",
299
0
                tablets.size(), tables.size(), namespaces.size());
300
0
}
301
302
template <class PB>
303
Status RestoreSysCatalogState::IterateSysCatalog(
304
    const Schema& schema, const docdb::DocDB& doc_db, HybridTime read_time,
305
0
    std::unordered_map<std::string, PB>* map) {
306
0
  auto iter = std::make_unique<docdb::DocRowwiseIterator>(
307
0
      schema, schema, TransactionOperationContext(), doc_db, CoarseTimePoint::max(),
308
0
      ReadHybridTime::SingleTime(read_time), nullptr);
309
0
  return EnumerateSysCatalog(iter.get(), schema, GetEntryType<PB>::value, [map](
310
0
          const Slice& id, const Slice& data) -> Status {
311
0
    auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data));
312
0
    if (!ShouldLoadObject(pb)) {
313
0
      return Status::OK();
314
0
    }
315
0
    if (!map->emplace(id.ToBuffer(), std::move(pb)).second) {
316
0
      return STATUS_FORMAT(IllegalState, "Duplicate $0: $1",
317
0
                           SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer());
318
0
    }
319
0
    return Status::OK();
320
0
  });
Unexecuted instantiation: _ZZN2yb6master22RestoreSysCatalogState17IterateSysCatalogINS0_19SysNamespaceEntryPBEEENS_6StatusERKNS_6SchemaERKNS_5docdb5DocDBENS_10HybridTimeEPNSt3__113unordered_mapINSD_12basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEET_NSD_4hashISK_EENSD_8equal_toISK_EENSI_INSD_4pairIKSK_SL_EEEEEEENKUlRKNS_5SliceESY_E_clESY_SY_
Unexecuted instantiation: _ZZN2yb6master22RestoreSysCatalogState17IterateSysCatalogINS0_16SysTablesEntryPBEEENS_6StatusERKNS_6SchemaERKNS_5docdb5DocDBENS_10HybridTimeEPNSt3__113unordered_mapINSD_12basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEET_NSD_4hashISK_EENSD_8equal_toISK_EENSI_INSD_4pairIKSK_SL_EEEEEEENKUlRKNS_5SliceESY_E_clESY_SY_
Unexecuted instantiation: _ZZN2yb6master22RestoreSysCatalogState17IterateSysCatalogINS0_17SysTabletsEntryPBEEENS_6StatusERKNS_6SchemaERKNS_5docdb5DocDBENS_10HybridTimeEPNSt3__113unordered_mapINSD_12basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEET_NSD_4hashISK_EENSD_8equal_toISK_EENSI_INSD_4pairIKSK_SL_EEEEEEENKUlRKNS_5SliceESY_E_clESY_SY_
321
0
}
Unexecuted instantiation: _ZN2yb6master22RestoreSysCatalogState17IterateSysCatalogINS0_19SysNamespaceEntryPBEEENS_6StatusERKNS_6SchemaERKNS_5docdb5DocDBENS_10HybridTimeEPNSt3__113unordered_mapINSD_12basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEET_NSD_4hashISK_EENSD_8equal_toISK_EENSI_INSD_4pairIKSK_SL_EEEEEE
Unexecuted instantiation: _ZN2yb6master22RestoreSysCatalogState17IterateSysCatalogINS0_16SysTablesEntryPBEEENS_6StatusERKNS_6SchemaERKNS_5docdb5DocDBENS_10HybridTimeEPNSt3__113unordered_mapINSD_12basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEET_NSD_4hashISK_EENSD_8equal_toISK_EENSI_INSD_4pairIKSK_SL_EEEEEE
Unexecuted instantiation: _ZN2yb6master22RestoreSysCatalogState17IterateSysCatalogINS0_17SysTabletsEntryPBEEENS_6StatusERKNS_6SchemaERKNS_5docdb5DocDBENS_10HybridTimeEPNSt3__113unordered_mapINSD_12basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEET_NSD_4hashISK_EENSD_8equal_toISK_EENSI_INSD_4pairIKSK_SL_EEEEEE
322
323
Status RestoreSysCatalogState::LoadObjects(
324
0
    const Schema& schema, const docdb::DocDB& doc_db, HybridTime read_time, Objects* objects) {
325
0
  RETURN_NOT_OK(IterateSysCatalog(schema, doc_db, read_time, &objects->namespaces));
326
0
  RETURN_NOT_OK(IterateSysCatalog(schema, doc_db, read_time, &objects->tables));
327
0
  RETURN_NOT_OK(IterateSysCatalog(schema, doc_db, read_time, &objects->tablets));
328
0
  return Status::OK();
329
0
}
330
331
Status RestoreSysCatalogState::LoadRestoringObjects(
332
0
    const Schema& schema, const docdb::DocDB& doc_db) {
333
0
  return LoadObjects(schema, doc_db, restoration_.restore_at, &restoring_objects_);
334
0
}
335
336
Status RestoreSysCatalogState::LoadExistingObjects(
337
0
    const Schema& schema, const docdb::DocDB& doc_db) {
338
0
  return LoadObjects(schema, doc_db, HybridTime::kMax, &existing_objects_);
339
0
}
340
341
Status RestoreSysCatalogState::CheckExistingEntry(
342
0
    const std::string& id, const SysTabletsEntryPB& pb) {
343
0
  VLOG_WITH_FUNC(4) << "Tablet: " << id << ", " << pb.ShortDebugString();
344
0
  if (restoring_objects_.tablets.count(id)) {
345
0
    return Status::OK();
346
0
  }
347
0
  LOG(INFO) << "PITR: Will remove tablet: " << id;
348
0
  restoration_.non_system_obsolete_tablets.emplace_back(id, pb);
349
0
  return Status::OK();
350
0
}
351
352
Status RestoreSysCatalogState::CheckExistingEntry(
353
0
    const std::string& id, const SysTablesEntryPB& pb) {
354
0
  if (pb.schema().table_properties().is_ysql_catalog_table()) {
355
0
    if (restoration_.system_tables_to_restore.count(id) == 0) {
356
0
      return STATUS_FORMAT(
357
0
          NotFound,
358
0
          "PG Catalog table $0 not found in the present set of tables"
359
0
          " but found in the objects to restore.",
360
0
          pb.name());
361
0
    }
362
0
    return Status::OK();
363
0
  }
364
365
0
  VLOG_WITH_FUNC(4) << "Table: " << id << ", " << pb.ShortDebugString();
366
0
  if (restoring_objects_.tables.count(id)) {
367
0
    return Status::OK();
368
0
  }
369
0
  LOG(INFO) << "PITR: Will remove table: " << id;
370
0
  restoration_.non_system_obsolete_tables.emplace_back(id, pb);
371
372
0
  return Status::OK();
373
0
}
374
375
// We don't delete newly created namespaces, because our filters namespace based.
376
Status RestoreSysCatalogState::CheckExistingEntry(
377
0
    const std::string& id, const SysNamespaceEntryPB& pb) {
378
0
  return Status::OK();
379
0
}
380
381
Status RestoreSysCatalogState::PrepareWriteBatch(
382
0
    const Schema& schema, docdb::DocWriteBatch* write_batch) {
383
0
  for (const auto& entry : entries_.entries()) {
384
0
    QLWriteRequestPB write_request;
385
0
    RETURN_NOT_OK(FillSysCatalogWriteRequest(
386
0
        entry.type(), entry.id(), entry.data(), QLWriteRequestPB::QL_STMT_INSERT, schema,
387
0
        &write_request));
388
0
    RETURN_NOT_OK(ApplyWriteRequest(schema, write_request, write_batch));
389
0
  }
390
391
0
  for (const auto& tablet_id_and_pb : restoration_.non_system_obsolete_tablets) {
392
0
    RETURN_NOT_OK(PrepareTabletCleanup(
393
0
        tablet_id_and_pb.first, tablet_id_and_pb.second, schema, write_batch));
394
0
  }
395
0
  for (const auto& table_id_and_pb : restoration_.non_system_obsolete_tables) {
396
0
    RETURN_NOT_OK(PrepareTableCleanup(
397
0
        table_id_and_pb.first, table_id_and_pb.second, schema, write_batch));
398
0
  }
399
400
0
  return Status::OK();
401
0
}
402
403
Status RestoreSysCatalogState::PrepareTabletCleanup(
404
    const TabletId& id, SysTabletsEntryPB pb, const Schema& schema,
405
0
    docdb::DocWriteBatch* write_batch) {
406
0
  VLOG_WITH_FUNC(4) << id;
407
408
0
  QLWriteRequestPB write_request;
409
410
0
  auto it = retained_existing_tables_.find(pb.table_id());
411
0
  if (it != retained_existing_tables_.end()) {
412
0
    pb.set_hide_hybrid_time(restoration_.write_time.ToUint64());
413
0
    auto& out_schedules = *pb.mutable_retained_by_snapshot_schedules();
414
0
    for (const auto& schedule_id : it->second) {
415
0
      out_schedules.Add()->assign(schedule_id.AsSlice().cdata(), schedule_id.size());
416
0
    }
417
0
  }
418
0
  RETURN_NOT_OK(FillSysCatalogWriteRequest(
419
0
      SysRowEntryType::TABLET, id, pb.SerializeAsString(),
420
0
      QLWriteRequestPB::QL_STMT_UPDATE, schema, &write_request));
421
0
  return ApplyWriteRequest(schema, write_request, write_batch);
422
0
}
423
424
Status RestoreSysCatalogState::PrepareTableCleanup(
425
    const TableId& id, SysTablesEntryPB pb, const Schema& schema,
426
0
    docdb::DocWriteBatch* write_batch) {
427
0
  VLOG_WITH_FUNC(4) << id;
428
429
0
  QLWriteRequestPB write_request;
430
0
  pb.set_hide_state(SysTablesEntryPB::HIDING);
431
0
  pb.set_version(pb.version() + 1);
432
0
  RETURN_NOT_OK(FillSysCatalogWriteRequest(
433
0
      SysRowEntryType::TABLE, id, pb.SerializeAsString(),
434
0
      QLWriteRequestPB::QL_STMT_UPDATE, schema, &write_request));
435
0
  return ApplyWriteRequest(schema, write_request, write_batch);
436
0
}
437
438
Result<bool> RestoreSysCatalogState::TEST_MatchTable(
439
18
    const TableId& id, const SysTablesEntryPB& table) {
440
18
  return MatchTable(restoration_.schedules[0].second, id, table);
441
18
}
442
443
void RestoreSysCatalogState::WriteToRocksDB(
444
    docdb::DocWriteBatch* write_batch, const HybridTime& write_time, const OpId& op_id,
445
0
    tablet::Tablet* tablet) {
446
0
  docdb::KeyValueWriteBatchPB kv_write_batch;
447
0
  write_batch->MoveToWriteBatchPB(&kv_write_batch);
448
449
0
  docdb::NonTransactionalWriter writer(kv_write_batch, write_time);
450
0
  rocksdb::WriteBatch rocksdb_write_batch;
451
0
  rocksdb_write_batch.SetDirectWriter(&writer);
452
0
  docdb::ConsensusFrontiers frontiers;
453
0
  set_op_id(op_id, &frontiers);
454
0
  set_hybrid_time(write_time, &frontiers);
455
456
0
  tablet->WriteToRocksDB(
457
0
      &frontiers, &rocksdb_write_batch, docdb::StorageDbType::kRegular);
458
0
}
459
460
class FetchState {
461
 public:
462
  explicit FetchState(const docdb::DocDB& doc_db, const ReadHybridTime& read_time)
463
      : iterator_(CreateIntentAwareIterator(
464
          doc_db,
465
          docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
466
          boost::none,
467
          rocksdb::kDefaultQueryId,
468
          TransactionOperationContext(),
469
          CoarseTimePoint::max(),
470
0
          read_time)) {
471
0
  }
472
473
0
  CHECKED_STATUS SetPrefix(const Slice& prefix) {
474
0
    if (prefix_.empty()) {
475
0
      iterator_->Seek(prefix);
476
0
    } else {
477
0
      iterator_->SeekForward(prefix);
478
0
    }
479
0
    prefix_ = prefix;
480
0
    finished_ = false;
481
0
    last_deleted_key_bytes_.clear();
482
0
    last_deleted_key_write_time_ = DocHybridTime::kInvalid;
483
0
    RETURN_NOT_OK(Update());
484
0
    return NextNonDeletedEntry();
485
0
  }
486
487
0
  bool finished() const {
488
0
    return finished_;
489
0
  }
490
491
0
  Slice key() const {
492
0
    return key_.key;
493
0
  }
494
495
0
  Slice value() const {
496
0
    return iterator_->value();
497
0
  }
498
499
0
  docdb::FetchKeyResult FullKey() const {
500
0
    return key_;
501
0
  }
502
503
0
  CHECKED_STATUS NextEntry() {
504
0
    iterator_->SeekPastSubKey(key_.key);
505
0
    return Update();
506
0
  }
507
508
0
  CHECKED_STATUS Next() {
509
0
    RETURN_NOT_OK(NextEntry());
510
0
    return NextNonDeletedEntry();
511
0
  }
512
513
  // Returns true if the entry corresponds to a deleted row
514
  // in rocksdb.
515
0
  Result<bool> IsDeletedRowEntry() {
516
0
    bool is_tombstoned = false;
517
0
    is_tombstoned = VERIFY_RESULT(docdb::Value::IsTombstoned(value()));
518
519
    // Because Postgres doesn't have a concept of frozen types, kGroupEnd will only demarcate the
520
    // end of hashed and range components. It is reasonable to assume then that if the last byte
521
    // is kGroupEnd then it does not have any subkeys.
522
0
    bool no_subkey =
523
0
        key()[key().size() - 1] == docdb::ValueTypeAsChar::kGroupEnd;
524
525
0
    return no_subkey && is_tombstoned;
526
0
  }
527
528
  // Returns true if it has been deleted since the time it was inserted.
529
0
  bool IsDeletedSinceInsertion() {
530
0
    if (last_deleted_key_bytes_.size() == 0) {
531
0
      return false;
532
0
    }
533
0
    return key().starts_with(last_deleted_key_bytes_.AsSlice()) &&
534
0
           FullKey().write_time < last_deleted_key_write_time_;
535
0
  }
536
537
 private:
538
0
  CHECKED_STATUS Update() {
539
0
    if (!iterator_->valid()) {
540
0
      finished_ = true;
541
0
      return Status::OK();
542
0
    }
543
0
    key_ = VERIFY_RESULT(iterator_->FetchKey());
544
0
    if (VERIFY_RESULT(IsDeletedRowEntry())) {
545
0
      last_deleted_key_write_time_ = key_.write_time;
546
0
      last_deleted_key_bytes_ = key_.key;
547
0
    }
548
0
    if (!key_.key.starts_with(prefix_)) {
549
0
      finished_ = true;
550
0
      return Status::OK();
551
0
    }
552
553
0
    return Status::OK();
554
0
  }
555
556
0
  CHECKED_STATUS NextNonDeletedEntry() {
557
0
    while (!finished()) {
558
0
      if (VERIFY_RESULT(IsDeletedRowEntry()) ||
559
0
          IsDeletedSinceInsertion()) {
560
0
        RETURN_NOT_OK(NextEntry());
561
0
        continue;
562
0
      }
563
0
      break;
564
0
    }
565
0
    return Status::OK();
566
0
  }
567
568
  std::unique_ptr<docdb::IntentAwareIterator> iterator_;
569
  Slice prefix_;
570
  docdb::FetchKeyResult key_;
571
  KeyBuffer last_deleted_key_bytes_;
572
  DocHybridTime last_deleted_key_write_time_;
573
  bool finished_ = false;
574
};
575
576
0
void AddKeyValue(const Slice& key, const Slice& value, docdb::DocWriteBatch* write_batch) {
577
0
  auto& pair = write_batch->AddRaw();
578
0
  pair.first.assign(key.cdata(), key.size());
579
0
  pair.second.assign(value.cdata(), value.size());
580
0
}
581
582
struct PgCatalogTableData {
583
  std::array<uint8_t, kUuidSize + 1> prefix;
584
  const TableName* name;
585
586
0
  CHECKED_STATUS SetTableId(const TableId& table_id) {
587
0
    Uuid cotable_id;
588
0
    RETURN_NOT_OK(cotable_id.FromHexString(table_id));
589
0
    prefix[0] = docdb::ValueTypeAsChar::kTableId;
590
0
    cotable_id.EncodeToComparable(&prefix[1]);
591
0
    return Status::OK();
592
0
  }
593
};
594
595
Status RestoreSysCatalogState::ProcessPgCatalogRestores(
596
    const Schema& pg_yb_catalog_version_schema,
597
    const docdb::DocDB& restoring_db,
598
    const docdb::DocDB& existing_db,
599
0
    docdb::DocWriteBatch* write_batch) {
600
0
  if (restoration_.system_tables_to_restore.empty()) {
601
0
    return Status::OK();
602
0
  }
603
604
0
  FetchState restoring_state(restoring_db, ReadHybridTime::SingleTime(restoration_.restore_at));
605
0
  FetchState existing_state(existing_db, ReadHybridTime::Max());
606
0
  char tombstone_char = docdb::ValueTypeAsChar::kTombstone;
607
0
  Slice tombstone(&tombstone_char, 1);
608
609
0
  std::vector<PgCatalogTableData> tables(restoration_.system_tables_to_restore.size() + 1);
610
0
  size_t idx = 0;
611
0
  RETURN_NOT_OK(tables[0].SetTableId(kPgYbCatalogVersionTableId));
612
0
  tables[0].name = nullptr;
613
0
  ++idx;
614
0
  for (auto& id_and_name : restoration_.system_tables_to_restore) {
615
0
    auto& table = tables[idx];
616
0
    RETURN_NOT_OK(table.SetTableId(id_and_name.first));
617
0
    table.name = &id_and_name.second;
618
0
    ++idx;
619
0
  }
620
621
622
0
  std::sort(tables.begin(), tables.end(), [](const auto& lhs, const auto& rhs) {
623
0
    return Slice(lhs.prefix).compare(Slice(rhs.prefix)) < 0;
624
0
  });
625
626
0
  for (auto& table : tables) {
627
0
    size_t num_updates = 0;
628
0
    size_t num_inserts = 0;
629
0
    size_t num_deletes = 0;
630
0
    Slice prefix(table.prefix);
631
632
0
    RETURN_NOT_OK(restoring_state.SetPrefix(prefix));
633
0
    RETURN_NOT_OK(existing_state.SetPrefix(prefix));
634
635
0
    while (!restoring_state.finished() && !existing_state.finished()) {
636
0
      auto compare_result = restoring_state.key().compare(existing_state.key());
637
0
      if (compare_result == 0) {
638
0
        if (table.name != nullptr) {
639
0
          if (restoring_state.value().compare(existing_state.value())) {
640
0
            ++num_updates;
641
0
            AddKeyValue(restoring_state.key(), restoring_state.value(), write_batch);
642
0
          }
643
0
        } else {
644
0
          docdb::SubDocKey sub_doc_key;
645
0
          RETURN_NOT_OK(sub_doc_key.FullyDecodeFrom(
646
0
              restoring_state.key(), docdb::HybridTimeRequired::kFalse));
647
0
          SCHECK_EQ(sub_doc_key.subkeys().size(), 1U, Corruption, "Wrong number of subdoc keys");
648
0
          if (sub_doc_key.subkeys()[0].value_type() == docdb::ValueType::kColumnId) {
649
0
            auto column_id = sub_doc_key.subkeys()[0].GetColumnId();
650
0
            const ColumnSchema& column = VERIFY_RESULT(pg_yb_catalog_version_schema.column_by_id(
651
0
                column_id));
652
0
            if (column.name() == "current_version") {
653
0
              docdb::Value value;
654
0
              RETURN_NOT_OK(value.Decode(existing_state.value()));
655
0
              docdb::DocPath path(sub_doc_key.doc_key().Encode(), sub_doc_key.subkeys());
656
0
              RETURN_NOT_OK(write_batch->SetPrimitive(
657
0
                  path, docdb::PrimitiveValue(value.primitive_value().GetInt64() + 1)));
658
0
            }
659
0
          }
660
0
        }
661
0
        RETURN_NOT_OK(restoring_state.Next());
662
0
        RETURN_NOT_OK(existing_state.Next());
663
0
      } else if (compare_result < 0) {
664
0
        ++num_inserts;
665
0
        AddKeyValue(restoring_state.key(), restoring_state.value(), write_batch);
666
0
        RETURN_NOT_OK(restoring_state.Next());
667
0
      } else {
668
0
        ++num_deletes;
669
0
        AddKeyValue(existing_state.key(), tombstone, write_batch);
670
0
        RETURN_NOT_OK(existing_state.Next());
671
0
      }
672
0
    }
673
674
0
    while (!restoring_state.finished()) {
675
0
      ++num_inserts;
676
0
      AddKeyValue(restoring_state.key(), restoring_state.value(), write_batch);
677
0
      RETURN_NOT_OK(restoring_state.Next());
678
0
    }
679
680
0
    while (!existing_state.finished()) {
681
0
      ++num_deletes;
682
0
      AddKeyValue(existing_state.key(), tombstone, write_batch);
683
0
      RETURN_NOT_OK(existing_state.Next());
684
0
    }
685
686
0
    if (num_updates + num_inserts + num_deletes != 0 || VLOG_IS_ON(3)) {
687
0
      LOG(INFO) << "PITR: Pg system table: " << AsString(table.name) << ", updates: " << num_updates
688
0
                << ", inserts: " << num_inserts << ", deletes: " << num_deletes;
689
0
    }
690
0
  }
691
692
0
  return Status::OK();
693
0
}
694
695
}  // namespace master
696
}  // namespace yb