YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/master/restore_sys_catalog_state.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/restore_sys_catalog_state.h"
15
16
#include "yb/common/entity_ids.h"
17
#include "yb/common/hybrid_time.h"
18
#include "yb/common/index.h"
19
20
#include "yb/common/pgsql_protocol.pb.h"
21
#include "yb/common/ql_expr.h"
22
#include "yb/docdb/consensus_frontier.h"
23
#include "yb/docdb/cql_operation.h"
24
#include "yb/docdb/docdb_rocksdb_util.h"
25
#include "yb/docdb/doc_rowwise_iterator.h"
26
#include "yb/docdb/doc_write_batch.h"
27
#include "yb/docdb/docdb.h"
28
#include "yb/docdb/pgsql_operation.h"
29
#include "yb/docdb/rocksdb_writer.h"
30
31
#include "yb/master/catalog_loaders.h"
32
#include "yb/master/master_backup.pb.h"
33
#include "yb/master/master_snapshot_coordinator.h"
34
#include "yb/master/master_util.h"
35
#include "yb/master/sys_catalog.h"
36
#include "yb/master/sys_catalog_writer.h"
37
38
#include "yb/rocksdb/write_batch.h"
39
#include "yb/tablet/tablet.h"
40
#include "yb/util/format.h"
41
#include "yb/util/logging.h"
42
#include "yb/util/pb_util.h"
43
#include "yb/util/status_format.h"
44
45
using namespace std::placeholders;
46
47
namespace yb {
48
namespace master {
49
50
namespace {
51
52
CHECKED_STATUS ApplyWriteRequest(
53
    const Schema& schema, const QLWriteRequestPB& write_request,
54
51
    docdb::DocWriteBatch* write_batch) {
55
51
  std::shared_ptr<const Schema> schema_ptr(&schema, [](const Schema* schema){});
56
51
  docdb::DocOperationApplyData apply_data{.doc_write_batch = write_batch};
57
51
  IndexMap index_map;
58
51
  docdb::QLWriteOperation operation(
59
51
      write_request, schema_ptr, index_map, nullptr, TransactionOperationContext());
60
51
  QLResponsePB response;
61
51
  RETURN_NOT_OK(operation.Init(&response));
62
51
  return operation.Apply(apply_data);
63
51
}
64
65
8.35k
bool TableDeleted(const SysTablesEntryPB& table) {
66
8.35k
  return table.state() == SysTablesEntryPB::DELETED ||
67
8.35k
         table.state() == SysTablesEntryPB::DELETING ||
68
8.35k
         table.hide_state() == SysTablesEntryPB::HIDING ||
69
8.35k
         table.hide_state() == SysTablesEntryPB::HIDDEN;
70
8.35k
}
71
72
Result<bool> MatchNamespace(
73
    const SnapshotScheduleFilterPB& filter, const NamespaceId& id,
74
132
    const SysNamespaceEntryPB& ns) {
75
132
  VLOG
(1) << __func__ << "(" << id << ", " << ns.ShortDebugString() << ")"0
;
76
132
  for (const auto& table_identifier : filter.tables().tables()) {
77
132
    if (table_identifier.has_namespace_() &&
78
132
        VERIFY_RESULT(master::NamespaceMatchesIdentifier(
79
132
            id, ns.database_type(), ns.name(), table_identifier.namespace_()))) {
80
18
      return true;
81
18
    }
82
132
  }
83
114
  return false;
84
132
}
85
86
Result<bool> MatchTable(
87
    const SnapshotScheduleFilterPB& filter, const TableId& id,
88
8.37k
    const SysTablesEntryPB& table) {
89
8.37k
  VLOG
(1) << __func__ << "(" << id << ", " << table.ShortDebugString() << ")"0
;
90
8.37k
  for (const auto& table_identifier : filter.tables().tables()) {
91
8.37k
    if (VERIFY_RESULT(master::TableMatchesIdentifier(id, table, table_identifier))) {
92
1.32k
      return true;
93
1.32k
    }
94
8.37k
  }
95
7.05k
  return false;
96
8.37k
}
97
98
template <class PB>
99
struct GetEntryType;
100
101
template<> struct GetEntryType<SysNamespaceEntryPB>
102
    : public std::integral_constant<SysRowEntryType, SysRowEntryType::NAMESPACE> {};
103
104
template<> struct GetEntryType<SysTablesEntryPB>
105
    : public std::integral_constant<SysRowEntryType, SysRowEntryType::TABLE> {};
106
107
template<> struct GetEntryType<SysTabletsEntryPB>
108
    : public std::integral_constant<SysRowEntryType, SysRowEntryType::TABLET> {};
109
110
} // namespace
111
112
RestoreSysCatalogState::RestoreSysCatalogState(SnapshotScheduleRestoration* restoration)
113
10
    : restoration_(*restoration) {}
114
115
Result<bool> RestoreSysCatalogState::PatchRestoringEntry(
116
9
    const std::string& id, SysNamespaceEntryPB* pb) {
117
9
  return true;
118
9
}
119
120
Result<bool> RestoreSysCatalogState::PatchRestoringEntry(
121
660
    const std::string& id, SysTablesEntryPB* pb) {
122
660
  if (pb->schema().table_properties().is_ysql_catalog_table()) {
123
648
    LOG(INFO) << "PITR: Adding " << pb->name() << " for restoring. ID: " << id;
124
648
    restoration_.system_tables_to_restore.emplace(id, pb->name());
125
126
648
    return false;
127
648
  }
128
129
12
  auto it = existing_objects_.tables.find(id);
130
12
  if (it == existing_objects_.tables.end()) {
131
0
    return STATUS_FORMAT(NotFound, "Not found restoring table: $0", id);
132
0
  }
133
134
12
  if (pb->version() != it->second.version()) {
135
    // Force schema update after restoration, if schema has changes.
136
3
    pb->set_version(it->second.version() + 1);
137
3
  }
138
139
12
  return true;
140
12
}
141
142
Result<bool> RestoreSysCatalogState::PatchRestoringEntry(
143
30
    const std::string& id, SysTabletsEntryPB* pb) {
144
30
  return true;
145
30
}
146
147
template <class PB>
148
Status RestoreSysCatalogState::AddRestoringEntry(
149
699
    const std::string& id, PB* pb, faststring* buffer) {
150
699
  auto type = GetEntryType<PB>::value;
151
699
  
VLOG_WITH_FUNC0
(1) << SysRowEntryType_Name(type) << ": " << id << ", " << pb->ShortDebugString()0
;
152
153
699
  if (!VERIFY_RESULT(PatchRestoringEntry(id, pb))) {
154
648
    return Status::OK();
155
648
  }
156
51
  auto& entry = *entries_.mutable_entries()->Add();
157
51
  entry.set_type(type);
158
51
  entry.set_id(id);
159
51
  pb_util::SerializeToString(*pb, buffer);
160
51
  entry.set_data(buffer->data(), buffer->size());
161
51
  restoration_.non_system_objects_to_restore.emplace(id, type);
162
163
51
  return Status::OK();
164
699
}
yb::Status yb::master::RestoreSysCatalogState::AddRestoringEntry<yb::master::SysNamespaceEntryPB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::SysNamespaceEntryPB*, yb::faststring*)
Line
Count
Source
149
9
    const std::string& id, PB* pb, faststring* buffer) {
150
9
  auto type = GetEntryType<PB>::value;
151
9
  
VLOG_WITH_FUNC0
(1) << SysRowEntryType_Name(type) << ": " << id << ", " << pb->ShortDebugString()0
;
152
153
9
  if (!VERIFY_RESULT(PatchRestoringEntry(id, pb))) {
154
0
    return Status::OK();
155
0
  }
156
9
  auto& entry = *entries_.mutable_entries()->Add();
157
9
  entry.set_type(type);
158
9
  entry.set_id(id);
159
9
  pb_util::SerializeToString(*pb, buffer);
160
9
  entry.set_data(buffer->data(), buffer->size());
161
9
  restoration_.non_system_objects_to_restore.emplace(id, type);
162
163
9
  return Status::OK();
164
9
}
yb::Status yb::master::RestoreSysCatalogState::AddRestoringEntry<yb::master::SysTablesEntryPB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::SysTablesEntryPB*, yb::faststring*)
Line
Count
Source
149
660
    const std::string& id, PB* pb, faststring* buffer) {
150
660
  auto type = GetEntryType<PB>::value;
151
660
  
VLOG_WITH_FUNC0
(1) << SysRowEntryType_Name(type) << ": " << id << ", " << pb->ShortDebugString()0
;
152
153
660
  if (!VERIFY_RESULT(PatchRestoringEntry(id, pb))) {
154
648
    return Status::OK();
155
648
  }
156
12
  auto& entry = *entries_.mutable_entries()->Add();
157
12
  entry.set_type(type);
158
12
  entry.set_id(id);
159
12
  pb_util::SerializeToString(*pb, buffer);
160
12
  entry.set_data(buffer->data(), buffer->size());
161
12
  restoration_.non_system_objects_to_restore.emplace(id, type);
162
163
12
  return Status::OK();
164
660
}
yb::Status yb::master::RestoreSysCatalogState::AddRestoringEntry<yb::master::SysTabletsEntryPB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::SysTabletsEntryPB*, yb::faststring*)
Line
Count
Source
149
30
    const std::string& id, PB* pb, faststring* buffer) {
150
30
  auto type = GetEntryType<PB>::value;
151
30
  
VLOG_WITH_FUNC0
(1) << SysRowEntryType_Name(type) << ": " << id << ", " << pb->ShortDebugString()0
;
152
153
30
  if (!VERIFY_RESULT(PatchRestoringEntry(id, pb))) {
154
0
    return Status::OK();
155
0
  }
156
30
  auto& entry = *entries_.mutable_entries()->Add();
157
30
  entry.set_type(type);
158
30
  entry.set_id(id);
159
30
  pb_util::SerializeToString(*pb, buffer);
160
30
  entry.set_data(buffer->data(), buffer->size());
161
30
  restoration_.non_system_objects_to_restore.emplace(id, type);
162
163
30
  return Status::OK();
164
30
}
165
166
9
Status RestoreSysCatalogState::Process() {
167
9
  
VLOG_WITH_FUNC0
(1) << "Restoring: " << restoring_objects_.SizesToString() << ", existing: "
168
0
                    << existing_objects_.SizesToString();
169
170
9
  
VLOG_WITH_FUNC0
(2) << "Check restoring objects"0
;
171
9
  
VLOG_WITH_FUNC0
(4) << "Restoring namespaces: " << AsString(restoring_objects_.namespaces)0
;
172
9
  faststring buffer;
173
9
  RETURN_NOT_OK_PREPEND(DetermineEntries(
174
9
      &restoring_objects_, nullptr,
175
9
      [this, &buffer](const auto& id, auto* pb) {
176
9
        return AddRestoringEntry(id, pb, &buffer);
177
9
  }), "Determine restoring entries failed");
178
179
9
  
VLOG_WITH_FUNC0
(2) << "Check existing objects"0
;
180
9
  RETURN_NOT_OK_PREPEND(DetermineEntries(
181
9
      &existing_objects_, &retained_existing_tables_,
182
9
      [this](const auto& id, auto* pb) {
183
9
        return CheckExistingEntry(id, *pb);
184
9
  }), "Determine obsolete entries failed");
185
186
  // Sort generated vectors, so binary search could be used to check whether object is obsolete.
187
9
  auto compare_by_first = [](const auto& lhs, const auto& rhs) 
{ return lhs.first < rhs.first; }0
;
Unexecuted instantiation: restore_sys_catalog_state.cc:auto yb::master::RestoreSysCatalogState::Process()::$_2::operator()<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTabletsEntryPB>, std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTabletsEntryPB> >(std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTabletsEntryPB> const&, std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTabletsEntryPB> const&) const
Unexecuted instantiation: restore_sys_catalog_state.cc:auto yb::master::RestoreSysCatalogState::Process()::$_2::operator()<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTablesEntryPB>, std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTablesEntryPB> >(std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTablesEntryPB> const&, std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTablesEntryPB> const&) const
188
9
  std::sort(restoration_.non_system_obsolete_tablets.begin(),
189
9
            restoration_.non_system_obsolete_tablets.end(),
190
9
            compare_by_first);
191
9
  std::sort(restoration_.non_system_obsolete_tables.begin(),
192
9
            restoration_.non_system_obsolete_tables.end(),
193
9
            compare_by_first);
194
195
9
  return Status::OK();
196
9
}
197
198
template <class ProcessEntry>
199
Status RestoreSysCatalogState::DetermineEntries(
200
    Objects* objects, RetainedExistingTables* retained_existing_tables,
201
18
    const ProcessEntry& process_entry) {
202
18
  std::unordered_set<NamespaceId> namespaces;
203
18
  std::unordered_set<TableId> tables;
204
205
18
  const auto& filter = restoration_.schedules[0].second;
206
207
132
  for (auto& id_and_metadata : objects->namespaces) {
208
132
    if (!VERIFY_RESULT(MatchNamespace(filter, id_and_metadata.first, id_and_metadata.second))) {
209
114
      continue;
210
114
    }
211
18
    if (!namespaces.insert(id_and_metadata.first).second) {
212
0
      continue;
213
0
    }
214
18
    RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second));
215
18
  }
216
217
8.35k
  
for (auto& id_and_metadata : objects->tables)18
{
218
8.35k
    
VLOG_WITH_FUNC0
(3) << "Checking: " << id_and_metadata.first << ", "
219
0
                      << id_and_metadata.second.ShortDebugString();
220
221
8.35k
    if (TableDeleted(id_and_metadata.second)) {
222
3
      continue;
223
3
    }
224
8.35k
    auto& root_table_id_and_metadata = VERIFY_RESULT(objects->FindRootTable(id_and_metadata)).get();
225
8.35k
    auto match = VERIFY_RESULT(MatchTable(
226
8.35k
        filter, root_table_id_and_metadata.first, root_table_id_and_metadata.second));
227
8.35k
    if (!match) {
228
7.03k
      continue;
229
7.03k
    }
230
1.31k
    if (retained_existing_tables) {
231
657
      auto& retaining_schedules = retained_existing_tables->emplace(
232
657
          id_and_metadata.first, std::vector<SnapshotScheduleId>()).first->second;
233
657
      retaining_schedules.push_back(restoration_.schedules[0].first);
234
657
      for (size_t i = 1; i != restoration_.schedules.size(); 
++i0
) {
235
0
        if (VERIFY_RESULT(MatchTable(
236
0
            restoration_.schedules[i].second, root_table_id_and_metadata.first,
237
0
            root_table_id_and_metadata.second))) {
238
0
          retaining_schedules.push_back(restoration_.schedules[i].first);
239
0
        }
240
0
      }
241
657
    }
242
    // Process pg_catalog tables that need to be restored.
243
1.31k
    if (namespaces.insert(id_and_metadata.second.namespace_id()).second) {
244
0
      auto namespace_it = objects->namespaces.find(id_and_metadata.second.namespace_id());
245
0
      if (namespace_it == objects->namespaces.end()) {
246
0
        return STATUS_FORMAT(
247
0
            NotFound, "Namespace $0 not found for table $1", id_and_metadata.second.namespace_id(),
248
0
            id_and_metadata.first, id_and_metadata.second.name());
249
0
      }
250
0
      RETURN_NOT_OK(process_entry(namespace_it->first, &namespace_it->second));
251
0
    }
252
1.31k
    RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second));
253
1.31k
    tables.insert(id_and_metadata.first);
254
1.31k
    VLOG(2) << "Table to restore: " << id_and_metadata.first << ", "
255
0
            << id_and_metadata.second.ShortDebugString();
256
1.31k
  }
257
402
  
for (auto& id_and_metadata : objects->tablets)18
{
258
402
    auto it = tables.find(id_and_metadata.second.table_id());
259
402
    if (it == tables.end()) {
260
351
      continue;
261
351
    }
262
51
    RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second));
263
51
    VLOG(2) << "Tablet to restore: " << id_and_metadata.first << ", "
264
0
            << id_and_metadata.second.ShortDebugString();
265
51
  }
266
18
  return Status::OK();
267
18
}
restore_sys_catalog_state.cc:yb::Status yb::master::RestoreSysCatalogState::DetermineEntries<yb::master::RestoreSysCatalogState::Process()::$_0>(yb::master::RestoreSysCatalogState::Objects*, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag>, std::__1::allocator<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag> > >, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, std::__1::vector<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag>, std::__1::allocator<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag> > > > > >*, yb::master::RestoreSysCatalogState::Process()::$_0 const&)
Line
Count
Source
201
9
    const ProcessEntry& process_entry) {
202
9
  std::unordered_set<NamespaceId> namespaces;
203
9
  std::unordered_set<TableId> tables;
204
205
9
  const auto& filter = restoration_.schedules[0].second;
206
207
66
  for (auto& id_and_metadata : objects->namespaces) {
208
66
    if (!VERIFY_RESULT(MatchNamespace(filter, id_and_metadata.first, id_and_metadata.second))) {
209
57
      continue;
210
57
    }
211
9
    if (!namespaces.insert(id_and_metadata.first).second) {
212
0
      continue;
213
0
    }
214
9
    RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second));
215
9
  }
216
217
4.17k
  
for (auto& id_and_metadata : objects->tables)9
{
218
4.17k
    
VLOG_WITH_FUNC0
(3) << "Checking: " << id_and_metadata.first << ", "
219
0
                      << id_and_metadata.second.ShortDebugString();
220
221
4.17k
    if (TableDeleted(id_and_metadata.second)) {
222
0
      continue;
223
0
    }
224
4.17k
    auto& root_table_id_and_metadata = VERIFY_RESULT(objects->FindRootTable(id_and_metadata)).get();
225
4.17k
    auto match = VERIFY_RESULT(MatchTable(
226
4.17k
        filter, root_table_id_and_metadata.first, root_table_id_and_metadata.second));
227
4.17k
    if (!match) {
228
3.51k
      continue;
229
3.51k
    }
230
660
    if (retained_existing_tables) {
231
0
      auto& retaining_schedules = retained_existing_tables->emplace(
232
0
          id_and_metadata.first, std::vector<SnapshotScheduleId>()).first->second;
233
0
      retaining_schedules.push_back(restoration_.schedules[0].first);
234
0
      for (size_t i = 1; i != restoration_.schedules.size(); ++i) {
235
0
        if (VERIFY_RESULT(MatchTable(
236
0
            restoration_.schedules[i].second, root_table_id_and_metadata.first,
237
0
            root_table_id_and_metadata.second))) {
238
0
          retaining_schedules.push_back(restoration_.schedules[i].first);
239
0
        }
240
0
      }
241
0
    }
242
    // Process pg_catalog tables that need to be restored.
243
660
    if (namespaces.insert(id_and_metadata.second.namespace_id()).second) {
244
0
      auto namespace_it = objects->namespaces.find(id_and_metadata.second.namespace_id());
245
0
      if (namespace_it == objects->namespaces.end()) {
246
0
        return STATUS_FORMAT(
247
0
            NotFound, "Namespace $0 not found for table $1", id_and_metadata.second.namespace_id(),
248
0
            id_and_metadata.first, id_and_metadata.second.name());
249
0
      }
250
0
      RETURN_NOT_OK(process_entry(namespace_it->first, &namespace_it->second));
251
0
    }
252
660
    RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second));
253
660
    tables.insert(id_and_metadata.first);
254
660
    VLOG(2) << "Table to restore: " << id_and_metadata.first << ", "
255
0
            << id_and_metadata.second.ShortDebugString();
256
660
  }
257
201
  
for (auto& id_and_metadata : objects->tablets)9
{
258
201
    auto it = tables.find(id_and_metadata.second.table_id());
259
201
    if (it == tables.end()) {
260
171
      continue;
261
171
    }
262
30
    RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second));
263
30
    VLOG(2) << "Tablet to restore: " << id_and_metadata.first << ", "
264
0
            << id_and_metadata.second.ShortDebugString();
265
30
  }
266
9
  return Status::OK();
267
9
}
restore_sys_catalog_state.cc:yb::Status yb::master::RestoreSysCatalogState::DetermineEntries<yb::master::RestoreSysCatalogState::Process()::$_1>(yb::master::RestoreSysCatalogState::Objects*, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag>, std::__1::allocator<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag> > >, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, std::__1::vector<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag>, std::__1::allocator<yb::StronglyTypedUuid<yb::SnapshotScheduleId_Tag> > > > > >*, yb::master::RestoreSysCatalogState::Process()::$_1 const&)
Line
Count
Source
201
9
    const ProcessEntry& process_entry) {
202
9
  std::unordered_set<NamespaceId> namespaces;
203
9
  std::unordered_set<TableId> tables;
204
205
9
  const auto& filter = restoration_.schedules[0].second;
206
207
66
  for (auto& id_and_metadata : objects->namespaces) {
208
66
    if (!VERIFY_RESULT(MatchNamespace(filter, id_and_metadata.first, id_and_metadata.second))) {
209
57
      continue;
210
57
    }
211
9
    if (!namespaces.insert(id_and_metadata.first).second) {
212
0
      continue;
213
0
    }
214
9
    RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second));
215
9
  }
216
217
4.17k
  
for (auto& id_and_metadata : objects->tables)9
{
218
4.17k
    
VLOG_WITH_FUNC0
(3) << "Checking: " << id_and_metadata.first << ", "
219
0
                      << id_and_metadata.second.ShortDebugString();
220
221
4.17k
    if (TableDeleted(id_and_metadata.second)) {
222
3
      continue;
223
3
    }
224
4.17k
    auto& root_table_id_and_metadata = VERIFY_RESULT(objects->FindRootTable(id_and_metadata)).get();
225
4.17k
    auto match = VERIFY_RESULT(MatchTable(
226
4.17k
        filter, root_table_id_and_metadata.first, root_table_id_and_metadata.second));
227
4.17k
    if (!match) {
228
3.51k
      continue;
229
3.51k
    }
230
657
    if (retained_existing_tables) {
231
657
      auto& retaining_schedules = retained_existing_tables->emplace(
232
657
          id_and_metadata.first, std::vector<SnapshotScheduleId>()).first->second;
233
657
      retaining_schedules.push_back(restoration_.schedules[0].first);
234
657
      for (size_t i = 1; i != restoration_.schedules.size(); 
++i0
) {
235
0
        if (VERIFY_RESULT(MatchTable(
236
0
            restoration_.schedules[i].second, root_table_id_and_metadata.first,
237
0
            root_table_id_and_metadata.second))) {
238
0
          retaining_schedules.push_back(restoration_.schedules[i].first);
239
0
        }
240
0
      }
241
657
    }
242
    // Process pg_catalog tables that need to be restored.
243
657
    if (namespaces.insert(id_and_metadata.second.namespace_id()).second) {
244
0
      auto namespace_it = objects->namespaces.find(id_and_metadata.second.namespace_id());
245
0
      if (namespace_it == objects->namespaces.end()) {
246
0
        return STATUS_FORMAT(
247
0
            NotFound, "Namespace $0 not found for table $1", id_and_metadata.second.namespace_id(),
248
0
            id_and_metadata.first, id_and_metadata.second.name());
249
0
      }
250
0
      RETURN_NOT_OK(process_entry(namespace_it->first, &namespace_it->second));
251
0
    }
252
657
    RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second));
253
657
    tables.insert(id_and_metadata.first);
254
657
    VLOG(2) << "Table to restore: " << id_and_metadata.first << ", "
255
0
            << id_and_metadata.second.ShortDebugString();
256
657
  }
257
201
  
for (auto& id_and_metadata : objects->tablets)9
{
258
201
    auto it = tables.find(id_and_metadata.second.table_id());
259
201
    if (it == tables.end()) {
260
180
      continue;
261
180
    }
262
21
    RETURN_NOT_OK(process_entry(id_and_metadata.first, &id_and_metadata.second));
263
21
    VLOG(2) << "Tablet to restore: " << id_and_metadata.first << ", "
264
0
            << id_and_metadata.second.ShortDebugString();
265
21
  }
266
9
  return Status::OK();
267
9
}
268
269
Result<const std::pair<const TableId, SysTablesEntryPB>&>
270
    RestoreSysCatalogState::Objects::FindRootTable(
271
8.35k
    const std::pair<const TableId, SysTablesEntryPB>& id_and_metadata) {
272
8.35k
  if (!id_and_metadata.second.has_index_info()) {
273
4.80k
    return id_and_metadata;
274
4.80k
  }
275
276
3.55k
  auto it = tables.find(id_and_metadata.second.index_info().indexed_table_id());
277
3.55k
  if (it == tables.end()) {
278
0
    return STATUS_FORMAT(
279
0
        NotFound, "Indexed table $0 not found for index $1 ($2)",
280
0
        id_and_metadata.second.index_info().indexed_table_id(), id_and_metadata.first,
281
0
        id_and_metadata.second.name());
282
0
  }
283
3.55k
  const auto& ref = *it;
284
3.55k
  return ref;
285
3.55k
}
286
287
Result<const std::pair<const TableId, SysTablesEntryPB>&>
288
    RestoreSysCatalogState::Objects::FindRootTable(
289
0
    const TableId& table_id) {
290
0
  auto it = tables.find(table_id);
291
0
  if (it == tables.end()) {
292
0
    return STATUS_FORMAT(NotFound, "Table $0 not found for index", table_id);
293
0
  }
294
0
  return FindRootTable(*it);
295
0
}
296
297
0
std::string RestoreSysCatalogState::Objects::SizesToString() const {
298
0
  return Format("{ tablets: $0 tables: $1 namespaces: $2 }",
299
0
                tablets.size(), tables.size(), namespaces.size());
300
0
}
301
302
template <class PB>
303
Status RestoreSysCatalogState::IterateSysCatalog(
304
    const Schema& schema, const docdb::DocDB& doc_db, HybridTime read_time,
305
54
    std::unordered_map<std::string, PB>* map) {
306
54
  auto iter = std::make_unique<docdb::DocRowwiseIterator>(
307
54
      schema, schema, TransactionOperationContext(), doc_db, CoarseTimePoint::max(),
308
54
      ReadHybridTime::SingleTime(read_time), nullptr);
309
54
  return EnumerateSysCatalog(iter.get(), schema, GetEntryType<PB>::value, [map](
310
8.89k
          const Slice& id, const Slice& data) -> Status {
311
8.89k
    auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data));
312
8.89k
    if (!ShouldLoadObject(pb)) {
313
0
      return Status::OK();
314
0
    }
315
8.89k
    if (!map->emplace(id.ToBuffer(), std::move(pb)).second) {
316
0
      return STATUS_FORMAT(IllegalState, "Duplicate $0: $1",
317
0
                           SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer());
318
0
    }
319
8.89k
    return Status::OK();
320
8.89k
  });
yb::Status yb::master::RestoreSysCatalogState::IterateSysCatalog<yb::master::SysNamespaceEntryPB>(yb::Schema const&, yb::docdb::DocDB const&, yb::HybridTime, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysNamespaceEntryPB, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, yb::master::SysNamespaceEntryPB> > >*)::'lambda'(yb::Slice const&, yb::Slice const&)::operator()(yb::Slice const&, yb::Slice const&) const
Line
Count
Source
310
132
          const Slice& id, const Slice& data) -> Status {
311
132
    auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data));
312
132
    if (!ShouldLoadObject(pb)) {
313
0
      return Status::OK();
314
0
    }
315
132
    if (!map->emplace(id.ToBuffer(), std::move(pb)).second) {
316
0
      return STATUS_FORMAT(IllegalState, "Duplicate $0: $1",
317
0
                           SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer());
318
0
    }
319
132
    return Status::OK();
320
132
  });
yb::Status yb::master::RestoreSysCatalogState::IterateSysCatalog<yb::master::SysTablesEntryPB>(yb::Schema const&, yb::docdb::DocDB const&, yb::HybridTime, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTablesEntryPB, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, yb::master::SysTablesEntryPB> > >*)::'lambda'(yb::Slice const&, yb::Slice const&)::operator()(yb::Slice const&, yb::Slice const&) const
Line
Count
Source
310
8.35k
          const Slice& id, const Slice& data) -> Status {
311
8.35k
    auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data));
312
8.35k
    if (!ShouldLoadObject(pb)) {
313
0
      return Status::OK();
314
0
    }
315
8.35k
    if (!map->emplace(id.ToBuffer(), std::move(pb)).second) {
316
0
      return STATUS_FORMAT(IllegalState, "Duplicate $0: $1",
317
0
                           SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer());
318
0
    }
319
8.35k
    return Status::OK();
320
8.35k
  });
yb::Status yb::master::RestoreSysCatalogState::IterateSysCatalog<yb::master::SysTabletsEntryPB>(yb::Schema const&, yb::docdb::DocDB const&, yb::HybridTime, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTabletsEntryPB, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, yb::master::SysTabletsEntryPB> > >*)::'lambda'(yb::Slice const&, yb::Slice const&)::operator()(yb::Slice const&, yb::Slice const&) const
Line
Count
Source
310
402
          const Slice& id, const Slice& data) -> Status {
311
402
    auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data));
312
402
    if (!ShouldLoadObject(pb)) {
313
0
      return Status::OK();
314
0
    }
315
402
    if (!map->emplace(id.ToBuffer(), std::move(pb)).second) {
316
0
      return STATUS_FORMAT(IllegalState, "Duplicate $0: $1",
317
0
                           SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer());
318
0
    }
319
402
    return Status::OK();
320
402
  });
321
54
}
yb::Status yb::master::RestoreSysCatalogState::IterateSysCatalog<yb::master::SysNamespaceEntryPB>(yb::Schema const&, yb::docdb::DocDB const&, yb::HybridTime, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysNamespaceEntryPB, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, yb::master::SysNamespaceEntryPB> > >*)
Line
Count
Source
305
18
    std::unordered_map<std::string, PB>* map) {
306
18
  auto iter = std::make_unique<docdb::DocRowwiseIterator>(
307
18
      schema, schema, TransactionOperationContext(), doc_db, CoarseTimePoint::max(),
308
18
      ReadHybridTime::SingleTime(read_time), nullptr);
309
18
  return EnumerateSysCatalog(iter.get(), schema, GetEntryType<PB>::value, [map](
310
18
          const Slice& id, const Slice& data) -> Status {
311
18
    auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data));
312
18
    if (!ShouldLoadObject(pb)) {
313
18
      return Status::OK();
314
18
    }
315
18
    if (!map->emplace(id.ToBuffer(), std::move(pb)).second) {
316
18
      return STATUS_FORMAT(IllegalState, "Duplicate $0: $1",
317
18
                           SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer());
318
18
    }
319
18
    return Status::OK();
320
18
  });
321
18
}
yb::Status yb::master::RestoreSysCatalogState::IterateSysCatalog<yb::master::SysTablesEntryPB>(yb::Schema const&, yb::docdb::DocDB const&, yb::HybridTime, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTablesEntryPB, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, yb::master::SysTablesEntryPB> > >*)
Line
Count
Source
305
18
    std::unordered_map<std::string, PB>* map) {
306
18
  auto iter = std::make_unique<docdb::DocRowwiseIterator>(
307
18
      schema, schema, TransactionOperationContext(), doc_db, CoarseTimePoint::max(),
308
18
      ReadHybridTime::SingleTime(read_time), nullptr);
309
18
  return EnumerateSysCatalog(iter.get(), schema, GetEntryType<PB>::value, [map](
310
18
          const Slice& id, const Slice& data) -> Status {
311
18
    auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data));
312
18
    if (!ShouldLoadObject(pb)) {
313
18
      return Status::OK();
314
18
    }
315
18
    if (!map->emplace(id.ToBuffer(), std::move(pb)).second) {
316
18
      return STATUS_FORMAT(IllegalState, "Duplicate $0: $1",
317
18
                           SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer());
318
18
    }
319
18
    return Status::OK();
320
18
  });
321
18
}
yb::Status yb::master::RestoreSysCatalogState::IterateSysCatalog<yb::master::SysTabletsEntryPB>(yb::Schema const&, yb::docdb::DocDB const&, yb::HybridTime, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::master::SysTabletsEntryPB, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, yb::master::SysTabletsEntryPB> > >*)
Line
Count
Source
305
18
    std::unordered_map<std::string, PB>* map) {
306
18
  auto iter = std::make_unique<docdb::DocRowwiseIterator>(
307
18
      schema, schema, TransactionOperationContext(), doc_db, CoarseTimePoint::max(),
308
18
      ReadHybridTime::SingleTime(read_time), nullptr);
309
18
  return EnumerateSysCatalog(iter.get(), schema, GetEntryType<PB>::value, [map](
310
18
          const Slice& id, const Slice& data) -> Status {
311
18
    auto pb = VERIFY_RESULT(pb_util::ParseFromSlice<PB>(data));
312
18
    if (!ShouldLoadObject(pb)) {
313
18
      return Status::OK();
314
18
    }
315
18
    if (!map->emplace(id.ToBuffer(), std::move(pb)).second) {
316
18
      return STATUS_FORMAT(IllegalState, "Duplicate $0: $1",
317
18
                           SysRowEntryType_Name(GetEntryType<PB>::value), id.ToBuffer());
318
18
    }
319
18
    return Status::OK();
320
18
  });
321
18
}
322
323
Status RestoreSysCatalogState::LoadObjects(
324
18
    const Schema& schema, const docdb::DocDB& doc_db, HybridTime read_time, Objects* objects) {
325
18
  RETURN_NOT_OK(IterateSysCatalog(schema, doc_db, read_time, &objects->namespaces));
326
18
  RETURN_NOT_OK(IterateSysCatalog(schema, doc_db, read_time, &objects->tables));
327
18
  RETURN_NOT_OK(IterateSysCatalog(schema, doc_db, read_time, &objects->tablets));
328
18
  return Status::OK();
329
18
}
330
331
Status RestoreSysCatalogState::LoadRestoringObjects(
332
9
    const Schema& schema, const docdb::DocDB& doc_db) {
333
9
  return LoadObjects(schema, doc_db, restoration_.restore_at, &restoring_objects_);
334
9
}
335
336
Status RestoreSysCatalogState::LoadExistingObjects(
337
9
    const Schema& schema, const docdb::DocDB& doc_db) {
338
9
  return LoadObjects(schema, doc_db, HybridTime::kMax, &existing_objects_);
339
9
}
340
341
Status RestoreSysCatalogState::CheckExistingEntry(
342
21
    const std::string& id, const SysTabletsEntryPB& pb) {
343
21
  
VLOG_WITH_FUNC0
(4) << "Tablet: " << id << ", " << pb.ShortDebugString()0
;
344
21
  if (restoring_objects_.tablets.count(id)) {
345
21
    return Status::OK();
346
21
  }
347
0
  LOG(INFO) << "PITR: Will remove tablet: " << id;
348
0
  restoration_.non_system_obsolete_tablets.emplace_back(id, pb);
349
0
  return Status::OK();
350
21
}
351
352
Status RestoreSysCatalogState::CheckExistingEntry(
353
657
    const std::string& id, const SysTablesEntryPB& pb) {
354
657
  if (pb.schema().table_properties().is_ysql_catalog_table()) {
355
648
    if (restoration_.system_tables_to_restore.count(id) == 0) {
356
0
      return STATUS_FORMAT(
357
0
          NotFound,
358
0
          "PG Catalog table $0 not found in the present set of tables"
359
0
          " but found in the objects to restore.",
360
0
          pb.name());
361
0
    }
362
648
    return Status::OK();
363
648
  }
364
365
9
  
VLOG_WITH_FUNC0
(4) << "Table: " << id << ", " << pb.ShortDebugString()0
;
366
9
  if (restoring_objects_.tables.count(id)) {
367
9
    return Status::OK();
368
9
  }
369
0
  LOG(INFO) << "PITR: Will remove table: " << id;
370
0
  restoration_.non_system_obsolete_tables.emplace_back(id, pb);
371
372
0
  return Status::OK();
373
9
}
374
375
// We don't delete newly created namespaces, because our filters namespace based.
376
Status RestoreSysCatalogState::CheckExistingEntry(
377
9
    const std::string& id, const SysNamespaceEntryPB& pb) {
378
9
  return Status::OK();
379
9
}
380
381
Status RestoreSysCatalogState::PrepareWriteBatch(
382
9
    const Schema& schema, docdb::DocWriteBatch* write_batch) {
383
51
  for (const auto& entry : entries_.entries()) {
384
51
    QLWriteRequestPB write_request;
385
51
    RETURN_NOT_OK(FillSysCatalogWriteRequest(
386
51
        entry.type(), entry.id(), entry.data(), QLWriteRequestPB::QL_STMT_INSERT, schema,
387
51
        &write_request));
388
51
    RETURN_NOT_OK(ApplyWriteRequest(schema, write_request, write_batch));
389
51
  }
390
391
9
  for (const auto& tablet_id_and_pb : restoration_.non_system_obsolete_tablets) {
392
0
    RETURN_NOT_OK(PrepareTabletCleanup(
393
0
        tablet_id_and_pb.first, tablet_id_and_pb.second, schema, write_batch));
394
0
  }
395
9
  for (const auto& table_id_and_pb : restoration_.non_system_obsolete_tables) {
396
0
    RETURN_NOT_OK(PrepareTableCleanup(
397
0
        table_id_and_pb.first, table_id_and_pb.second, schema, write_batch));
398
0
  }
399
400
9
  return Status::OK();
401
9
}
402
403
Status RestoreSysCatalogState::PrepareTabletCleanup(
404
    const TabletId& id, SysTabletsEntryPB pb, const Schema& schema,
405
0
    docdb::DocWriteBatch* write_batch) {
406
0
  VLOG_WITH_FUNC(4) << id;
407
408
0
  QLWriteRequestPB write_request;
409
410
0
  auto it = retained_existing_tables_.find(pb.table_id());
411
0
  if (it != retained_existing_tables_.end()) {
412
0
    pb.set_hide_hybrid_time(restoration_.write_time.ToUint64());
413
0
    auto& out_schedules = *pb.mutable_retained_by_snapshot_schedules();
414
0
    for (const auto& schedule_id : it->second) {
415
0
      out_schedules.Add()->assign(schedule_id.AsSlice().cdata(), schedule_id.size());
416
0
    }
417
0
  }
418
0
  RETURN_NOT_OK(FillSysCatalogWriteRequest(
419
0
      SysRowEntryType::TABLET, id, pb.SerializeAsString(),
420
0
      QLWriteRequestPB::QL_STMT_UPDATE, schema, &write_request));
421
0
  return ApplyWriteRequest(schema, write_request, write_batch);
422
0
}
423
424
Status RestoreSysCatalogState::PrepareTableCleanup(
425
    const TableId& id, SysTablesEntryPB pb, const Schema& schema,
426
0
    docdb::DocWriteBatch* write_batch) {
427
0
  VLOG_WITH_FUNC(4) << id;
428
429
0
  QLWriteRequestPB write_request;
430
0
  pb.set_hide_state(SysTablesEntryPB::HIDING);
431
0
  pb.set_version(pb.version() + 1);
432
0
  RETURN_NOT_OK(FillSysCatalogWriteRequest(
433
0
      SysRowEntryType::TABLE, id, pb.SerializeAsString(),
434
0
      QLWriteRequestPB::QL_STMT_UPDATE, schema, &write_request));
435
0
  return ApplyWriteRequest(schema, write_request, write_batch);
436
0
}
437
438
Result<bool> RestoreSysCatalogState::TEST_MatchTable(
439
18
    const TableId& id, const SysTablesEntryPB& table) {
440
18
  return MatchTable(restoration_.schedules[0].second, id, table);
441
18
}
442
443
void RestoreSysCatalogState::WriteToRocksDB(
444
    docdb::DocWriteBatch* write_batch, const HybridTime& write_time, const OpId& op_id,
445
9
    tablet::Tablet* tablet) {
446
9
  docdb::KeyValueWriteBatchPB kv_write_batch;
447
9
  write_batch->MoveToWriteBatchPB(&kv_write_batch);
448
449
9
  docdb::NonTransactionalWriter writer(kv_write_batch, write_time);
450
9
  rocksdb::WriteBatch rocksdb_write_batch;
451
9
  rocksdb_write_batch.SetDirectWriter(&writer);
452
9
  docdb::ConsensusFrontiers frontiers;
453
9
  set_op_id(op_id, &frontiers);
454
9
  set_hybrid_time(write_time, &frontiers);
455
456
9
  tablet->WriteToRocksDB(
457
9
      &frontiers, &rocksdb_write_batch, docdb::StorageDbType::kRegular);
458
9
}
459
460
class FetchState {
461
 public:
462
  explicit FetchState(const docdb::DocDB& doc_db, const ReadHybridTime& read_time)
463
      : iterator_(CreateIntentAwareIterator(
464
          doc_db,
465
          docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
466
          boost::none,
467
          rocksdb::kDefaultQueryId,
468
          TransactionOperationContext(),
469
          CoarseTimePoint::max(),
470
12
          read_time)) {
471
12
  }
472
473
1.30k
  CHECKED_STATUS SetPrefix(const Slice& prefix) {
474
1.30k
    if (prefix_.empty()) {
475
12
      iterator_->Seek(prefix);
476
1.29k
    } else {
477
1.29k
      iterator_->SeekForward(prefix);
478
1.29k
    }
479
1.30k
    prefix_ = prefix;
480
1.30k
    finished_ = false;
481
1.30k
    last_deleted_key_bytes_.clear();
482
1.30k
    last_deleted_key_write_time_ = DocHybridTime::kInvalid;
483
1.30k
    RETURN_NOT_OK(Update());
484
1.30k
    return NextNonDeletedEntry();
485
1.30k
  }
486
487
7.51M
  bool finished() const {
488
7.51M
    return finished_;
489
7.51M
  }
490
491
18.9M
  Slice key() const {
492
18.9M
    return key_.key;
493
18.9M
  }
494
495
11.2M
  Slice value() const {
496
11.2M
    return iterator_->value();
497
11.2M
  }
498
499
408
  docdb::FetchKeyResult FullKey() const {
500
408
    return key_;
501
408
  }
502
503
3.75M
  CHECKED_STATUS NextEntry() {
504
3.75M
    iterator_->SeekPastSubKey(key_.key);
505
3.75M
    return Update();
506
3.75M
  }
507
508
3.75M
  CHECKED_STATUS Next() {
509
3.75M
    RETURN_NOT_OK(NextEntry());
510
3.75M
    return NextNonDeletedEntry();
511
3.75M
  }
512
513
  // Returns true if the entry corresponds to a deleted row
514
  // in rocksdb.
515
7.51M
  Result<bool> IsDeletedRowEntry() {
516
7.51M
    bool is_tombstoned = false;
517
7.51M
    is_tombstoned = VERIFY_RESULT(docdb::Value::IsTombstoned(value()));
518
519
    // Because Postgres doesn't have a concept of frozen types, kGroupEnd will only demarcate the
520
    // end of hashed and range components. It is reasonable to assume then that if the last byte
521
    // is kGroupEnd then it does not have any subkeys.
522
0
    bool no_subkey =
523
7.51M
        key()[key().size() - 1] == docdb::ValueTypeAsChar::kGroupEnd;
524
525
7.51M
    return no_subkey && 
is_tombstoned162
;
526
7.51M
  }
527
528
  // Returns true if it has been deleted since the time it was inserted.
529
3.75M
  bool IsDeletedSinceInsertion() {
530
3.75M
    if (last_deleted_key_bytes_.size() == 0) {
531
3.59M
      return false;
532
3.59M
    }
533
157k
    return key().starts_with(last_deleted_key_bytes_.AsSlice()) &&
534
157k
           
FullKey().write_time < last_deleted_key_write_time_408
;
535
3.75M
  }
536
537
 private:
538
3.75M
  CHECKED_STATUS Update() {
539
3.75M
    if (!iterator_->valid()) {
540
24
      finished_ = true;
541
24
      return Status::OK();
542
24
    }
543
3.75M
    key_ = VERIFY_RESULT(iterator_->FetchKey());
544
3.75M
    if (VERIFY_RESULT(IsDeletedRowEntry())) {
545
105
      last_deleted_key_write_time_ = key_.write_time;
546
105
      last_deleted_key_bytes_ = key_.key;
547
105
    }
548
3.75M
    if (!key_.key.starts_with(prefix_)) {
549
1.28k
      finished_ = true;
550
1.28k
      return Status::OK();
551
1.28k
    }
552
553
3.75M
    return Status::OK();
554
3.75M
  }
555
556
3.75M
  CHECKED_STATUS NextNonDeletedEntry() {
557
3.75M
    while (!finished()) {
558
3.75M
      if (VERIFY_RESULT(IsDeletedRowEntry()) ||
559
3.75M
          
IsDeletedSinceInsertion()3.75M
) {
560
459
        RETURN_NOT_OK(NextEntry());
561
459
        continue;
562
459
      }
563
3.75M
      break;
564
3.75M
    }
565
3.75M
    return Status::OK();
566
3.75M
  }
567
568
  std::unique_ptr<docdb::IntentAwareIterator> iterator_;
569
  Slice prefix_;
570
  docdb::FetchKeyResult key_;
571
  KeyBuffer last_deleted_key_bytes_;
572
  DocHybridTime last_deleted_key_write_time_;
573
  bool finished_ = false;
574
};
575
576
474
void AddKeyValue(const Slice& key, const Slice& value, docdb::DocWriteBatch* write_batch) {
577
474
  auto& pair = write_batch->AddRaw();
578
474
  pair.first.assign(key.cdata(), key.size());
579
474
  pair.second.assign(value.cdata(), value.size());
580
474
}
581
582
struct PgCatalogTableData {
583
  std::array<uint8_t, kUuidSize + 1> prefix;
584
  const TableName* name;
585
586
654
  CHECKED_STATUS SetTableId(const TableId& table_id) {
587
654
    Uuid cotable_id;
588
654
    RETURN_NOT_OK(cotable_id.FromHexString(table_id));
589
654
    prefix[0] = docdb::ValueTypeAsChar::kTableId;
590
654
    cotable_id.EncodeToComparable(&prefix[1]);
591
654
    return Status::OK();
592
654
  }
593
};
594
595
Status RestoreSysCatalogState::ProcessPgCatalogRestores(
596
    const Schema& pg_yb_catalog_version_schema,
597
    const docdb::DocDB& restoring_db,
598
    const docdb::DocDB& existing_db,
599
6
    docdb::DocWriteBatch* write_batch) {
600
6
  if (restoration_.system_tables_to_restore.empty()) {
601
0
    return Status::OK();
602
0
  }
603
604
6
  FetchState restoring_state(restoring_db, ReadHybridTime::SingleTime(restoration_.restore_at));
605
6
  FetchState existing_state(existing_db, ReadHybridTime::Max());
606
6
  char tombstone_char = docdb::ValueTypeAsChar::kTombstone;
607
6
  Slice tombstone(&tombstone_char, 1);
608
609
6
  std::vector<PgCatalogTableData> tables(restoration_.system_tables_to_restore.size() + 1);
610
6
  size_t idx = 0;
611
6
  RETURN_NOT_OK(tables[0].SetTableId(kPgYbCatalogVersionTableId));
612
6
  tables[0].name = nullptr;
613
6
  ++idx;
614
648
  for (auto& id_and_name : restoration_.system_tables_to_restore) {
615
648
    auto& table = tables[idx];
616
648
    RETURN_NOT_OK(table.SetTableId(id_and_name.first));
617
648
    table.name = &id_and_name.second;
618
648
    ++idx;
619
648
  }
620
621
622
5.51k
  
std::sort(tables.begin(), tables.end(), [](const auto& lhs, const auto& rhs) 6
{
623
5.51k
    return Slice(lhs.prefix).compare(Slice(rhs.prefix)) < 0;
624
5.51k
  });
625
626
654
  for (auto& table : tables) {
627
654
    size_t num_updates = 0;
628
654
    size_t num_inserts = 0;
629
654
    size_t num_deletes = 0;
630
654
    Slice prefix(table.prefix);
631
632
654
    RETURN_NOT_OK(restoring_state.SetPrefix(prefix));
633
654
    RETURN_NOT_OK(existing_state.SetPrefix(prefix));
634
635
1.87M
    
while (654
!restoring_state.finished() &&
!existing_state.finished()1.87M
) {
636
1.87M
      auto compare_result = restoring_state.key().compare(existing_state.key());
637
1.87M
      if (compare_result == 0) {
638
1.87M
        if (table.name != nullptr) {
639
1.87M
          if (restoring_state.value().compare(existing_state.value())) {
640
72
            ++num_updates;
641
72
            AddKeyValue(restoring_state.key(), restoring_state.value(), write_batch);
642
72
          }
643
1.87M
        } else {
644
18
          docdb::SubDocKey sub_doc_key;
645
18
          RETURN_NOT_OK(sub_doc_key.FullyDecodeFrom(
646
18
              restoring_state.key(), docdb::HybridTimeRequired::kFalse));
647
18
          SCHECK_EQ(sub_doc_key.subkeys().size(), 1U, Corruption, "Wrong number of subdoc keys");
648
18
          if (sub_doc_key.subkeys()[0].value_type() == docdb::ValueType::kColumnId) {
649
12
            auto column_id = sub_doc_key.subkeys()[0].GetColumnId();
650
12
            const ColumnSchema& column = VERIFY_RESULT(pg_yb_catalog_version_schema.column_by_id(
651
12
                column_id));
652
12
            if (column.name() == "current_version") {
653
6
              docdb::Value value;
654
6
              RETURN_NOT_OK(value.Decode(existing_state.value()));
655
6
              docdb::DocPath path(sub_doc_key.doc_key().Encode(), sub_doc_key.subkeys());
656
6
              QLValuePB value_pb;
657
6
              value_pb.set_int64_value(value.primitive_value().GetInt64() + 1);
658
6
              RETURN_NOT_OK(write_batch->SetPrimitive(
659
6
                  path, docdb::ValueRef(value_pb, SortingType::kNotSpecified)));
660
6
            }
661
12
          }
662
18
        }
663
1.87M
        RETURN_NOT_OK(restoring_state.Next());
664
1.87M
        RETURN_NOT_OK(existing_state.Next());
665
1.87M
      } else 
if (75
compare_result < 075
) {
666
75
        ++num_inserts;
667
75
        AddKeyValue(restoring_state.key(), restoring_state.value(), write_batch);
668
75
        RETURN_NOT_OK(restoring_state.Next());
669
75
      } else {
670
0
        ++num_deletes;
671
0
        AddKeyValue(existing_state.key(), tombstone, write_batch);
672
0
        RETURN_NOT_OK(existing_state.Next());
673
0
      }
674
1.87M
    }
675
676
981
    
while (654
!restoring_state.finished()) {
677
327
      ++num_inserts;
678
327
      AddKeyValue(restoring_state.key(), restoring_state.value(), write_batch);
679
327
      RETURN_NOT_OK(restoring_state.Next());
680
327
    }
681
682
654
    while (!existing_state.finished()) {
683
0
      ++num_deletes;
684
0
      AddKeyValue(existing_state.key(), tombstone, write_batch);
685
0
      RETURN_NOT_OK(existing_state.Next());
686
0
    }
687
688
654
    if (num_updates + num_inserts + num_deletes != 0 || VLOG_IS_ON(3)) {
689
51
      LOG(INFO) << "PITR: Pg system table: " << AsString(table.name) << ", updates: " << num_updates
690
51
                << ", inserts: " << num_inserts << ", deletes: " << num_deletes;
691
51
    }
692
654
  }
693
694
6
  return Status::OK();
695
6
}
696
697
}  // namespace master
698
}  // namespace yb