YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/master/master_snapshot_coordinator.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/master_snapshot_coordinator.h"
15
16
#include <unordered_map>
17
18
#include <boost/multi_index/composite_key.hpp>
19
#include <boost/multi_index/mem_fun.hpp>
20
#include <boost/asio/io_context.hpp>
21
22
#include "yb/common/snapshot.h"
23
24
#include "yb/docdb/doc_key.h"
25
#include "yb/docdb/value.h"
26
#include "yb/docdb/value_type.h"
27
28
#include "yb/master/async_snapshot_tasks.h"
29
#include "yb/master/catalog_entity_info.h"
30
#include "yb/master/master_error.h"
31
#include "yb/master/master_heartbeat.pb.h"
32
#include "yb/master/master_util.h"
33
#include "yb/master/restoration_state.h"
34
#include "yb/master/snapshot_coordinator_context.h"
35
#include "yb/master/snapshot_schedule_state.h"
36
#include "yb/master/snapshot_state.h"
37
#include "yb/master/sys_catalog_writer.h"
38
39
#include "yb/rpc/poller.h"
40
#include "yb/rpc/scheduler.h"
41
42
#include "yb/tablet/operations/snapshot_operation.h"
43
#include "yb/tablet/operations/write_operation.h"
44
#include "yb/tablet/tablet.h"
45
#include "yb/tablet/tablet_snapshots.h"
46
#include "yb/tablet/write_query.h"
47
48
#include "yb/util/async_util.h"
49
#include "yb/util/flag_tags.h"
50
#include "yb/util/pb_util.h"
51
#include "yb/util/status_format.h"
52
#include "yb/util/status_log.h"
53
#include "yb/util/stopwatch.h"
54
55
using namespace std::literals;
56
using namespace std::placeholders;
57
58
DECLARE_int32(sys_catalog_write_timeout_ms);
59
60
DEFINE_uint64(snapshot_coordinator_poll_interval_ms, 5000,
61
              "Poll interval for snapshot coordinator in milliseconds.");
62
63
DEFINE_test_flag(bool, skip_sending_restore_finished, false,
64
                 "Whether we should skip sending RESTORE_FINISHED to tablets.");
65
66
DEFINE_bool(schedule_snapshot_rpcs_out_of_band, false,
67
            "Should tablet snapshot RPCs be scheduled out of band from the periodic"
68
            " background thread.");
69
TAG_FLAG(schedule_snapshot_rpcs_out_of_band, runtime);
70
71
DECLARE_bool(allow_consecutive_restore);
72
73
namespace yb {
74
namespace master {
75
76
namespace {
77
78
YB_DEFINE_ENUM(Bound, (kFirst)(kLast));
79
YB_DEFINE_ENUM(RestorePhase, (kInitial)(kPostSysCatalogLoad));
80
81
void SubmitWrite(
82
    docdb::KeyValueWriteBatchPB&& write_batch, int64_t leader_term,
83
    SnapshotCoordinatorContext* context,
84
0
    const std::shared_ptr<Synchronizer>& synchronizer = nullptr) {
85
0
  auto query = std::make_unique<tablet::WriteQuery>(
86
0
      leader_term, CoarseMonoClock::now() + FLAGS_sys_catalog_write_timeout_ms * 1ms,
87
0
      /* context */ nullptr, /* tablet= */ nullptr);
88
0
  if (synchronizer) {
89
0
    query->set_callback(
90
0
        tablet::MakeWeakSynchronizerOperationCompletionCallback(synchronizer));
91
0
  }
92
0
  *query->operation().AllocateRequest()->mutable_write_batch() = std::move(write_batch);
93
0
  context->Submit(query.release()->PrepareSubmit(), leader_term);
94
0
}
95
96
CHECKED_STATUS SynchronizedWrite(
97
    docdb::KeyValueWriteBatchPB&& write_batch, int64_t leader_term, CoarseTimePoint deadline,
98
0
    SnapshotCoordinatorContext* context) {
99
0
  auto synchronizer = std::make_shared<Synchronizer>();
100
0
  SubmitWrite(std::move(write_batch), leader_term, context, synchronizer);
101
0
  return synchronizer->WaitUntil(ToSteady(deadline));
102
0
}
103
104
struct NoOp {
105
  template <class... Args>
106
  void operator()(Args&&... args) const {}
107
};
108
109
// Utility to create callback that is invoked when operation done.
110
// Finds appropriate entry in passed collection and invokes Done on it.
111
template <class Collection, class PostProcess = NoOp>
112
auto MakeDoneCallback(
113
    std::mutex* mutex, const Collection& collection, const typename Collection::key_type& key,
114
0
    const TabletId& tablet_id, const PostProcess& post_process = PostProcess()) {
115
0
  struct DoneFunctor {
116
0
    std::mutex& mutex;
117
0
    const Collection& collection;
118
0
    typename Collection::key_type key;
119
0
    TabletId tablet_id;
120
0
    PostProcess post_process;
121
122
0
    void operator()(Result<const tserver::TabletSnapshotOpResponsePB&> resp) const {
123
0
      std::unique_lock<std::mutex> lock(mutex);
124
0
      auto it = collection.find(key);
125
0
      if (it == collection.end()) {
126
0
        LOG(DFATAL) << "Received reply for unknown " << key;
127
0
        return;
128
0
      }
129
130
0
      (**it).Done(tablet_id, ResultToStatus(resp));
131
0
      post_process(it->get(), &lock);
132
0
    }
Unexecuted instantiation: master_snapshot_coordinator.cc:_ZZN2yb6master12_GLOBAL__N_116MakeDoneCallbackIN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_13SnapshotStateENS6_14default_deleteIS8_EEEENS4_10indexed_byINS4_13hashed_uniqueINS4_13const_mem_funIS8_RKNS_17StronglyTypedUuidINS_17TxnSnapshotId_TagEEEXadL_ZNKS8_2idEvEEEEN4mpl_2naESM_SM_EENS4_18ordered_non_uniqueINS4_3tagINS0_25MasterSnapshotCoordinator4Impl11ScheduleTagESM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_EENS4_13composite_keyIS8_NSE_IS8_RKNSF_INS_22SnapshotScheduleId_TagEEEXadL_ZNKS8_11schedule_idEvEEEENSE_IS8_NS_10HybridTimeEXadL_ZNKS8_20snapshot_hybrid_timeEvEEEENS3_6tuples9null_typeES13_S13_S13_S13_S13_S13_S13_EESM_EESM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_EENS6_9allocatorISB_EEEENS6_6__bindIMSR_FvPS8_xPNS6_11unique_lockINS6_5mutexEEEEJPSR_RKNS6_12placeholders4__phILi1EEERxRKNS1K_ILi2EEEEEEEEDaPS1D_RKT_RKNS1V_8key_typeERKNS6_12basic_stringIcNS6_11char_traitsIcEENS17_IcEEEERKT0_ENK11DoneFunctorclENS_6ResultIRKNS_7tserver26TabletSnapshotOpResponsePBEEE
Unexecuted instantiation: master_snapshot_coordinator.cc:_ZZN2yb6master12_GLOBAL__N_116MakeDoneCallbackIN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_16RestorationStateENS6_14default_deleteIS8_EEEENS4_10indexed_byINS4_13hashed_uniqueINS4_13const_mem_funIS8_RKNS_17StronglyTypedUuidINS_28TxnSnapshotRestorationId_TagEEEXadL_ZNKS8_14restoration_idEvEEEEN4mpl_2naESM_SM_EESM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_EENS6_9allocatorISB_EEEENS6_6__bindIMNS0_25MasterSnapshotCoordinator4ImplEFvPS8_xEJPSU_RKNS6_12placeholders4__phILi1EEERxEEEEEDaPNS6_5mutexERKT_RKNS19_8key_typeERKNS6_12basic_stringIcNS6_11char_traitsIcEENSP_IcEEEERKT0_ENK11DoneFunctorclENS_6ResultIRKNS_7tserver26TabletSnapshotOpResponsePBEEE
133
0
  };
134
135
0
  return DoneFunctor {
136
0
    .mutex = *mutex,
137
0
    .collection = collection,
138
0
    .key = key,
139
0
    .tablet_id = tablet_id,
140
0
    .post_process = post_process,
141
0
  };
142
0
}
Unexecuted instantiation: master_snapshot_coordinator.cc:_ZN2yb6master12_GLOBAL__N_116MakeDoneCallbackIN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_13SnapshotStateENS6_14default_deleteIS8_EEEENS4_10indexed_byINS4_13hashed_uniqueINS4_13const_mem_funIS8_RKNS_17StronglyTypedUuidINS_17TxnSnapshotId_TagEEEXadL_ZNKS8_2idEvEEEEN4mpl_2naESM_SM_EENS4_18ordered_non_uniqueINS4_3tagINS0_25MasterSnapshotCoordinator4Impl11ScheduleTagESM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_EENS4_13composite_keyIS8_NSE_IS8_RKNSF_INS_22SnapshotScheduleId_TagEEEXadL_ZNKS8_11schedule_idEvEEEENSE_IS8_NS_10HybridTimeEXadL_ZNKS8_20snapshot_hybrid_timeEvEEEENS3_6tuples9null_typeES13_S13_S13_S13_S13_S13_S13_EESM_EESM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_EENS6_9allocatorISB_EEEENS6_6__bindIMSR_FvPS8_xPNS6_11unique_lockINS6_5mutexEEEEJPSR_RKNS6_12placeholders4__phILi1EEERxRKNS1K_ILi2EEEEEEEEDaPS1D_RKT_RKNS1V_8key_typeERKNS6_12basic_stringIcNS6_11char_traitsIcEENS17_IcEEEERKT0_
Unexecuted instantiation: master_snapshot_coordinator.cc:_ZN2yb6master12_GLOBAL__N_116MakeDoneCallbackIN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_16RestorationStateENS6_14default_deleteIS8_EEEENS4_10indexed_byINS4_13hashed_uniqueINS4_13const_mem_funIS8_RKNS_17StronglyTypedUuidINS_28TxnSnapshotRestorationId_TagEEEXadL_ZNKS8_14restoration_idEvEEEEN4mpl_2naESM_SM_EESM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_SM_EENS6_9allocatorISB_EEEENS6_6__bindIMNS0_25MasterSnapshotCoordinator4ImplEFvPS8_xEJPSU_RKNS6_12placeholders4__phILi1EEERxEEEEEDaPNS6_5mutexERKT_RKNS19_8key_typeERKNS6_12basic_stringIcNS6_11char_traitsIcEENSP_IcEEEERKT0_
143
144
} // namespace
145
146
class MasterSnapshotCoordinator::Impl {
147
 public:
148
  explicit Impl(SnapshotCoordinatorContext* context)
149
5.45k
      : context_(*context), poller_(std::bind(&Impl::Poll, this)) {}
150
151
  Result<TxnSnapshotId> Create(
152
0
      const SysRowEntries& entries, bool imported, int64_t leader_term, CoarseTimePoint deadline) {
153
0
    auto synchronizer = std::make_shared<Synchronizer>();
154
0
    auto snapshot_id = TxnSnapshotId::GenerateRandom();
155
0
    SubmitCreate(
156
0
        entries, imported, SnapshotScheduleId::Nil(), HybridTime::kInvalid, snapshot_id,
157
0
        leader_term,
158
0
        tablet::MakeWeakSynchronizerOperationCompletionCallback(synchronizer));
159
0
    RETURN_NOT_OK(synchronizer->WaitUntil(ToSteady(deadline)));
160
161
0
    return snapshot_id;
162
0
  }
163
164
  Result<TxnSnapshotId> CreateForSchedule(
165
0
      const SnapshotScheduleId& schedule_id, int64_t leader_term, CoarseTimePoint deadline) {
166
0
    boost::optional<SnapshotScheduleOperation> operation;
167
0
    {
168
0
      std::lock_guard<std::mutex> lock(mutex_);
169
0
      auto it = schedules_.find(schedule_id);
170
0
      if (it == schedules_.end()) {
171
0
        return STATUS_FORMAT(NotFound, "Unknown snapshot schedule: $0", schedule_id);
172
0
      }
173
0
      auto* last_snapshot = BoundingSnapshot((**it).id(), Bound::kLast);
174
0
      auto last_snapshot_time = last_snapshot ? last_snapshot->snapshot_hybrid_time()
175
0
                                              : HybridTime::kInvalid;
176
0
      auto creating_snapshot_data = (**it).creating_snapshot_data();
177
0
      if (creating_snapshot_data.snapshot_id) {
178
0
        auto snapshot_it = snapshots_.find(creating_snapshot_data.snapshot_id);
179
0
        if (snapshot_it != snapshots_.end()) {
180
0
          VLOG(2) << __func__ << " for " << schedule_id << " while creating snapshot: "
181
0
                  << (**snapshot_it).ToString();
182
0
        } else {
183
0
          auto passed = CoarseMonoClock::now() - creating_snapshot_data.start_time;
184
0
          auto message = Format(
185
0
              "$0 for $1 while creating unknown snapshot: $2 (passed $3)",
186
0
              __func__, schedule_id, creating_snapshot_data.snapshot_id, passed);
187
0
          if (passed > 30s) {
188
0
            LOG(DFATAL) << message;
189
0
          } else {
190
0
            VLOG(2) << message;
191
0
          }
192
0
        }
193
0
      }
194
0
      operation = VERIFY_RESULT((**it).ForceCreateSnapshot(last_snapshot_time));
195
0
    }
196
197
0
    auto synchronizer = std::make_shared<Synchronizer>();
198
0
    RETURN_NOT_OK(ExecuteScheduleOperation(*operation, leader_term, synchronizer));
199
0
    RETURN_NOT_OK(synchronizer->WaitUntil(ToSteady(deadline)));
200
201
0
    return operation->snapshot_id;
202
0
  }
203
204
  CHECKED_STATUS CreateReplicated(
205
0
      int64_t leader_term, const tablet::SnapshotOperation& operation) {
206
    // TODO(txn_backup) retain logs with this operation while doing snapshot
207
0
    auto id = VERIFY_RESULT(FullyDecodeTxnSnapshotId(operation.request()->snapshot_id()));
208
209
0
    VLOG(1) << __func__ << "(" << id << ", " << operation.ToString() << ")";
210
211
0
    auto snapshot = std::make_unique<SnapshotState>(
212
0
        &context_, id, *operation.request(),
213
0
        GetRpcLimit(FLAGS_max_concurrent_snapshot_rpcs,
214
0
                    FLAGS_max_concurrent_snapshot_rpcs_per_tserver, leader_term));
215
216
0
    TabletSnapshotOperations operations;
217
0
    docdb::KeyValueWriteBatchPB write_batch;
218
0
    RETURN_NOT_OK(snapshot->StoreToWriteBatch(&write_batch));
219
0
    boost::optional<tablet::CreateSnapshotData> sys_catalog_snapshot_data;
220
0
    bool snapshot_empty = false;
221
0
    {
222
0
      std::lock_guard<std::mutex> lock(mutex_);
223
0
      auto emplace_result = snapshots_.emplace(std::move(snapshot));
224
0
      if (!emplace_result.second) {
225
0
        return STATUS_FORMAT(IllegalState, "Duplicate snapshot id: $0", id);
226
0
      }
227
228
0
      if (leader_term >= 0) {
229
0
        (**emplace_result.first).PrepareOperations(&operations);
230
0
      }
231
0
      auto temp = (**emplace_result.first).SysCatalogSnapshotData(operation);
232
0
      if (temp.ok()) {
233
0
        sys_catalog_snapshot_data = *temp;
234
0
      } else if (!temp.status().IsUninitialized()) {
235
0
        return temp.status();
236
0
      }
237
0
      snapshot_empty = (**emplace_result.first).Empty();
238
0
    }
239
240
0
    RETURN_NOT_OK(operation.tablet()->ApplyOperation(operation, /* batch_idx= */ -1, write_batch));
241
0
    if (sys_catalog_snapshot_data) {
242
0
      RETURN_NOT_OK(operation.tablet()->snapshots().Create(*sys_catalog_snapshot_data));
243
0
    }
244
245
0
    ExecuteOperations(operations, leader_term);
246
247
0
    if (leader_term >= 0 && snapshot_empty) {
248
      // There could be snapshot for 0 tables, so they should be marked as complete right after
249
      // creation.
250
0
      UpdateSnapshotIfPresent(id, leader_term);
251
0
    }
252
253
0
    return Status::OK();
254
0
  }
255
256
  void UpdateSnapshotIfPresent(const TxnSnapshotId& id, int64_t leader_term)
257
0
      NO_THREAD_SAFETY_ANALYSIS EXCLUDES(mutex_) {
258
0
    std::unique_lock<std::mutex> lock(mutex_);
259
0
    auto it = snapshots_.find(id);
260
0
    if (it != snapshots_.end()) {
261
0
      UpdateSnapshot(it->get(), leader_term, &lock);
262
0
    }
263
0
  }
264
265
62
  CHECKED_STATUS Load(tablet::Tablet* tablet) {
266
62
    std::lock_guard<std::mutex> lock(mutex_);
267
62
    RETURN_NOT_OK(EnumerateSysCatalog(tablet, context_.schema(), SysRowEntryType::SNAPSHOT,
268
62
        [this](const Slice& id, const Slice& data) NO_THREAD_SAFETY_ANALYSIS -> Status {
269
62
      return LoadEntry<SysSnapshotEntryPB>(id, data, &snapshots_);
270
62
    }));
271
62
    return EnumerateSysCatalog(tablet, context_.schema(), SysRowEntryType::SNAPSHOT_SCHEDULE,
272
0
        [this](const Slice& id, const Slice& data) NO_THREAD_SAFETY_ANALYSIS -> Status {
273
0
      return LoadEntry<SnapshotScheduleOptionsPB>(id, data, &schedules_);
274
0
    });
275
62
  }
276
277
20.3M
  CHECKED_STATUS ApplyWritePair(Slice key, const Slice& value) {
278
20.3M
    docdb::SubDocKey sub_doc_key;
279
20.3M
    RETURN_NOT_OK(sub_doc_key.FullyDecodeFrom(key, docdb::HybridTimeRequired::kFalse));
280
281
20.3M
    if (sub_doc_key.doc_key().has_cotable_id()) {
282
19.6M
      return Status::OK();
283
19.6M
    }
284
285
652k
    if (sub_doc_key.doc_key().range_group().size() != 2) {
286
0
      LOG(DFATAL) << "Unexpected size of range group in sys catalog entry (2 expected): "
287
0
                  << AsString(sub_doc_key.doc_key().range_group()) << "(" << sub_doc_key.ToString()
288
0
                  << ")";
289
0
      return Status::OK();
290
0
    }
291
292
652k
    auto first_key = sub_doc_key.doc_key().range_group().front();
293
652k
    if (first_key.value_type() != docdb::ValueType::kInt32) {
294
0
      LOG(DFATAL) << "Unexpected value type for the first range component of sys catalog entry "
295
0
                  << "(kInt32 expected): "
296
0
                  << AsString(sub_doc_key.doc_key().range_group());;
297
0
    }
298
299
652k
    if (first_key.GetInt32() == SysRowEntryType::SNAPSHOT) {
300
0
      return DoApplyWrite<SysSnapshotEntryPB>(
301
0
          sub_doc_key.doc_key().range_group()[1].GetString(), value, &snapshots_);
302
0
    }
303
304
652k
    if (first_key.GetInt32() == SysRowEntryType::SNAPSHOT_SCHEDULE) {
305
0
      return DoApplyWrite<SnapshotScheduleOptionsPB>(
306
0
          sub_doc_key.doc_key().range_group()[1].GetString(), value, &schedules_);
307
0
    }
308
309
652k
    return Status::OK();
310
652k
  }
311
312
  template <class Pb, class Map>
313
0
  CHECKED_STATUS DoApplyWrite(const std::string& id_str, const Slice& value, Map* map) {
314
0
    docdb::Value decoded_value;
315
0
    RETURN_NOT_OK(decoded_value.Decode(value));
316
317
0
    auto value_type = decoded_value.primitive_value().value_type();
318
319
0
    if (value_type == docdb::ValueType::kTombstone) {
320
0
      std::lock_guard<std::mutex> lock(mutex_);
321
0
      auto id = Uuid::TryFullyDecode(id_str);
322
0
      if (id.IsNil()) {
323
0
        LOG(WARNING) << "Unable to decode id: " << id_str;
324
0
        return Status::OK();
325
0
      }
326
0
      bool erased = map->erase(typename Map::key_type(id)) != 0;
327
0
      LOG_IF(DFATAL, !erased) << "Unknown entry tombstoned: " << id.ToString();
328
0
      return Status::OK();
329
0
    }
330
331
0
    if (value_type != docdb::ValueType::kString) {
332
0
      return STATUS_FORMAT(
333
0
          Corruption,
334
0
          "Bad value type: $0, expected kString while replaying write for sys catalog",
335
0
          decoded_value.primitive_value().value_type());
336
0
    }
337
338
0
    std::lock_guard<std::mutex> lock(mutex_);
339
0
    return LoadEntry<Pb>(id_str, decoded_value.primitive_value().GetString(), map);
340
0
  }
Unexecuted instantiation: _ZN2yb6master25MasterSnapshotCoordinator4Impl12DoApplyWriteINS0_18SysSnapshotEntryPBEN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_13SnapshotStateENS8_14default_deleteISA_EEEENS6_10indexed_byINS6_13hashed_uniqueINS6_13const_mem_funISA_RKNS_17StronglyTypedUuidINS_17TxnSnapshotId_TagEEEXadL_ZNKSA_2idEvEEEEN4mpl_2naESO_SO_EENS6_18ordered_non_uniqueINS6_3tagINS2_11ScheduleTagESO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_EENS6_13composite_keyISA_NSG_ISA_RKNSH_INS_22SnapshotScheduleId_TagEEEXadL_ZNKSA_11schedule_idEvEEEENSG_ISA_NS_10HybridTimeEXadL_ZNKSA_20snapshot_hybrid_timeEvEEEENS5_6tuples9null_typeES13_S13_S13_S13_S13_S13_S13_EESO_EESO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_EENS8_9allocatorISD_EEEEEENS_6StatusERKNS8_12basic_stringIcNS8_11char_traitsIcEENS17_IcEEEERKNS_5SliceEPT0_
Unexecuted instantiation: _ZN2yb6master25MasterSnapshotCoordinator4Impl12DoApplyWriteINS0_25SnapshotScheduleOptionsPBEN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_21SnapshotScheduleStateENS8_14default_deleteISA_EEEENS6_10indexed_byINS6_13hashed_uniqueINS6_13const_mem_funISA_RKNS_17StronglyTypedUuidINS_22SnapshotScheduleId_TagEEEXadL_ZNKSA_2idEvEEEEN4mpl_2naESO_SO_EESO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_EENS8_9allocatorISD_EEEEEENS_6StatusERKNS8_12basic_stringIcNS8_11char_traitsIcEENSR_IcEEEERKNS_5SliceEPT0_
341
342
  CHECKED_STATUS ListSnapshots(
343
0
      const TxnSnapshotId& snapshot_id, bool list_deleted, ListSnapshotsResponsePB* resp) {
344
0
    std::lock_guard<std::mutex> lock(mutex_);
345
0
    if (snapshot_id.IsNil()) {
346
0
      for (const auto& p : snapshots_.get<ScheduleTag>()) {
347
0
        if (!list_deleted) {
348
0
          auto aggreaged_state = p->AggregatedState();
349
0
          if (aggreaged_state.ok() && *aggreaged_state == SysSnapshotEntryPB::DELETED) {
350
0
            continue;
351
0
          }
352
0
        }
353
0
        RETURN_NOT_OK(p->ToPB(resp->add_snapshots()));
354
0
      }
355
0
      return Status::OK();
356
0
    }
357
358
0
    SnapshotState& snapshot = VERIFY_RESULT(FindSnapshot(snapshot_id));
359
0
    return snapshot.ToPB(resp->add_snapshots());
360
0
  }
361
362
  CHECKED_STATUS Delete(
363
0
      const TxnSnapshotId& snapshot_id, int64_t leader_term, CoarseTimePoint deadline) {
364
0
    VLOG_WITH_FUNC(4) << snapshot_id << ", " << leader_term;
365
366
0
    {
367
0
      std::lock_guard<std::mutex> lock(mutex_);
368
0
      SnapshotState& snapshot = VERIFY_RESULT(FindSnapshot(snapshot_id));
369
0
      RETURN_NOT_OK(snapshot.TryStartDelete());
370
0
    }
371
372
0
    auto synchronizer = std::make_shared<Synchronizer>();
373
0
    SubmitDelete(snapshot_id, leader_term, synchronizer);
374
0
    return synchronizer->WaitUntil(ToSteady(deadline));
375
0
  }
376
377
  CHECKED_STATUS DeleteReplicated(
378
0
      int64_t leader_term, const tablet::SnapshotOperation& operation) {
379
0
    auto snapshot_id = VERIFY_RESULT(FullyDecodeTxnSnapshotId(operation.request()->snapshot_id()));
380
0
    VLOG_WITH_FUNC(4) << leader_term << ", " << snapshot_id;
381
382
0
    docdb::KeyValueWriteBatchPB write_batch;
383
0
    TabletSnapshotOperations operations;
384
0
    bool delete_sys_catalog_snapshot;
385
0
    {
386
0
      std::lock_guard<std::mutex> lock(mutex_);
387
0
      SnapshotState& snapshot = VERIFY_RESULT(FindSnapshot(snapshot_id));
388
0
      if (snapshot.schedule_id()) {
389
0
        delete_sys_catalog_snapshot = true;
390
0
      }
391
0
      snapshot.SetInitialTabletsState(SysSnapshotEntryPB::DELETING);
392
0
      RETURN_NOT_OK(snapshot.StoreToWriteBatch(&write_batch));
393
0
      if (leader_term >= 0) {
394
0
        snapshot.PrepareOperations(&operations);
395
0
      }
396
0
    }
397
398
0
    if (delete_sys_catalog_snapshot) {
399
0
      RETURN_NOT_OK(operation.tablet()->snapshots().Delete(operation));
400
0
    }
401
402
0
    RETURN_NOT_OK(operation.tablet()->ApplyOperation(operation, /* batch_idx= */ -1, write_batch));
403
404
0
    ExecuteOperations(operations, leader_term);
405
406
0
    return Status::OK();
407
0
  }
408
409
  CHECKED_STATUS RestoreSysCatalogReplicated(
410
0
      int64_t leader_term, const tablet::SnapshotOperation& operation, Status* complete_status) {
411
0
    auto restoration = std::make_shared<SnapshotScheduleRestoration>(SnapshotScheduleRestoration {
412
0
      .snapshot_id = VERIFY_RESULT(FullyDecodeTxnSnapshotId(operation.request()->snapshot_id())),
413
0
      .restore_at = HybridTime::FromPB(operation.request()->snapshot_hybrid_time()),
414
0
      .restoration_id = VERIFY_RESULT(FullyDecodeTxnSnapshotRestorationId(
415
0
          operation.request()->restoration_id())),
416
0
      .op_id = operation.op_id(),
417
0
      .write_time = operation.hybrid_time(),
418
0
      .term = leader_term,
419
0
    });
420
0
    {
421
0
      std::lock_guard<std::mutex> lock(mutex_);
422
0
      SnapshotState& snapshot = VERIFY_RESULT(FindSnapshot(restoration->snapshot_id));
423
0
      SnapshotScheduleState& schedule_state = VERIFY_RESULT(
424
0
          FindSnapshotSchedule(snapshot.schedule_id()));
425
0
      LOG(INFO) << "Restore sys catalog from snapshot: " << snapshot.ToString() << ", schedule: "
426
0
                << schedule_state.ToString() << " at " << restoration->restore_at << ", op id: "
427
0
                << restoration->op_id;
428
0
      size_t this_idx = std::numeric_limits<size_t>::max();
429
0
      for (const auto& snapshot_schedule : schedules_) {
430
0
        if (snapshot_schedule->id() == snapshot.schedule_id()) {
431
0
          this_idx = restoration->schedules.size();
432
0
        }
433
0
        restoration->schedules.emplace_back(
434
0
            snapshot_schedule->id(), snapshot_schedule->options().filter());
435
0
      }
436
0
      if (this_idx == std::numeric_limits<size_t>::max()) {
437
0
        return STATUS_FORMAT(IllegalState, "Cannot find schedule for restoration: $0",
438
0
                             snapshot.schedule_id());
439
0
      }
440
0
      std::swap(restoration->schedules[0], restoration->schedules[this_idx]);
441
0
      if (leader_term >= 0) {
442
0
        postponed_restores_.push_back(restoration);
443
0
      }
444
0
    }
445
0
    LOG_SLOW_EXECUTION(INFO, 1000, "Restore sys catalog took") {
446
0
      RETURN_NOT_OK_PREPEND(
447
0
          context_.RestoreSysCatalog(restoration.get(), operation.tablet(), complete_status),
448
0
          "Restore sys catalog failed");
449
0
    }
450
0
    return Status::OK();
451
0
  }
452
453
  CHECKED_STATUS ListRestorations(
454
      const TxnSnapshotRestorationId& restoration_id, const TxnSnapshotId& snapshot_id,
455
0
      ListSnapshotRestorationsResponsePB* resp) {
456
0
    std::lock_guard<std::mutex> lock(mutex_);
457
0
    if (!restoration_id) {
458
0
      for (const auto& p : restorations_) {
459
0
        if (!snapshot_id || p->snapshot_id() == snapshot_id) {
460
0
          RETURN_NOT_OK(p->ToPB(resp->add_restorations()));
461
0
        }
462
0
      }
463
0
      return Status::OK();
464
0
    }
465
466
0
    RestorationState& restoration = VERIFY_RESULT(FindRestoration(restoration_id));
467
0
    return restoration.ToPB(resp->add_restorations());
468
0
  }
469
470
  Result<TxnSnapshotRestorationId> Restore(
471
0
      const TxnSnapshotId& snapshot_id, HybridTime restore_at, int64_t leader_term) {
472
0
    auto restoration_id = TxnSnapshotRestorationId::GenerateRandom();
473
0
    RETURN_NOT_OK(DoRestore(
474
0
        snapshot_id, restore_at, restoration_id, {}, RestorePhase::kInitial, leader_term));
475
0
    return restoration_id;
476
0
  }
477
478
  Result<SnapshotScheduleId> CreateSchedule(
479
0
      const CreateSnapshotScheduleRequestPB& req, int64_t leader_term, CoarseTimePoint deadline) {
480
0
    SnapshotScheduleState schedule(&context_, req);
481
482
0
    docdb::KeyValueWriteBatchPB write_batch;
483
0
    RETURN_NOT_OK(schedule.StoreToWriteBatch(&write_batch));
484
485
0
    RETURN_NOT_OK(SynchronizedWrite(std::move(write_batch), leader_term, deadline, &context_));
486
487
0
    return schedule.id();
488
0
  }
489
490
  CHECKED_STATUS ListSnapshotSchedules(
491
0
      const SnapshotScheduleId& snapshot_schedule_id, ListSnapshotSchedulesResponsePB* resp) {
492
0
    std::lock_guard<std::mutex> lock(mutex_);
493
0
    if (snapshot_schedule_id.IsNil()) {
494
0
      for (const auto& p : schedules_) {
495
0
        RETURN_NOT_OK(FillSchedule(*p, resp->add_schedules()));
496
0
      }
497
0
      return Status::OK();
498
0
    }
499
500
0
    SnapshotScheduleState& schedule = VERIFY_RESULT(FindSnapshotSchedule(snapshot_schedule_id));
501
0
    return FillSchedule(schedule, resp->add_schedules());
502
0
  }
503
504
  CHECKED_STATUS DeleteSnapshotSchedule(
505
      const SnapshotScheduleId& snapshot_schedule_id, int64_t leader_term,
506
0
      CoarseTimePoint deadline) {
507
0
    docdb::KeyValueWriteBatchPB write_batch;
508
0
    {
509
0
      std::lock_guard<std::mutex> lock(mutex_);
510
0
      SnapshotScheduleState& schedule = VERIFY_RESULT(FindSnapshotSchedule(snapshot_schedule_id));
511
0
      auto encoded_key = VERIFY_RESULT(schedule.EncodedKey());
512
0
      auto pair = write_batch.add_write_pairs();
513
0
      pair->set_key(encoded_key.AsSlice().cdata(), encoded_key.size());
514
0
      auto options = schedule.options();
515
0
      options.set_delete_time(context_.Clock()->Now().ToUint64());
516
0
      auto* value = pair->mutable_value();
517
0
      value->push_back(docdb::ValueTypeAsChar::kString);
518
0
      pb_util::AppendPartialToString(options, value);
519
0
    }
520
521
0
    return SynchronizedWrite(std::move(write_batch), leader_term, deadline, &context_);
522
0
  }
523
524
383k
  CHECKED_STATUS FillHeartbeatResponse(TSHeartbeatResponsePB* resp) {
525
383k
    std::lock_guard<std::mutex> lock(mutex_);
526
383k
    auto* out = resp->mutable_snapshots_info();
527
0
    for (const auto& schedule : schedules_) {
528
      // Don't send deleted schedules.
529
0
      if (schedule->deleted()) {
530
0
        continue;
531
0
      }
532
0
      const auto& id = schedule->id();
533
0
      auto* out_schedule = out->add_schedules();
534
0
      out_schedule->set_id(id.data(), id.size());
535
0
      auto time = LastSnapshotTime(id);
536
0
      if (time) {
537
0
        out_schedule->set_last_snapshot_hybrid_time(time.ToUint64());
538
0
      }
539
0
    }
540
383k
    out->set_last_restorations_update_ht(last_restorations_update_ht_.ToUint64());
541
0
    for (const auto& restoration : restorations_) {
542
0
      auto* out_restoration = out->add_restorations();
543
0
      const auto& id = restoration->restoration_id();
544
0
      out_restoration->set_id(id.data(), id.size());
545
0
      auto complete_time = restoration->complete_time();
546
0
      if (complete_time) {
547
0
        out_restoration->set_complete_time_ht(complete_time.ToUint64());
548
0
      }
549
0
    }
550
383k
    return Status::OK();
551
383k
  }
552
553
2.00k
  void SysCatalogLoaded(int64_t term) {
554
2.00k
    if (term == OpId::kUnknownTerm) {
555
      // Do nothing on follower.
556
0
      return;
557
0
    }
558
2.00k
    decltype(postponed_restores_) postponed_restores;
559
2.00k
    {
560
2.00k
      std::lock_guard<std::mutex> lock(mutex_);
561
0
      auto filter = [term, &postponed_restores](const auto& restoration) {
562
0
        if (restoration->term == term) {
563
0
          postponed_restores.push_back(restoration);
564
0
        }
565
        // TODO(pitr) cancel restorations
566
0
        return restoration->term <= term;
567
0
      };
568
2.00k
      postponed_restores_.erase(
569
2.00k
          std::remove_if(postponed_restores_.begin(), postponed_restores_.end(), filter),
570
2.00k
          postponed_restores_.end());
571
2.00k
    }
572
0
    for (const auto& restoration : postponed_restores) {
573
      // TODO(pitr) Notify user about failures.
574
0
      auto status = context_.VerifyRestoredObjects(*restoration);
575
0
      LOG_IF(DFATAL, !status.ok()) << "Verify restoration failed: " << status;
576
0
      std::vector<TabletId> restore_tablets;
577
0
      for (const auto& id_and_type : restoration->non_system_objects_to_restore) {
578
0
        if (id_and_type.second == SysRowEntryType::TABLET) {
579
0
          restore_tablets.push_back(id_and_type.first);
580
0
        }
581
0
      }
582
0
      status = DoRestore(restoration->snapshot_id, restoration->restore_at,
583
0
                         restoration->restoration_id, restore_tablets,
584
0
                         RestorePhase::kPostSysCatalogLoad, term);
585
0
      LOG_IF(DFATAL, !status.ok())
586
0
          << "Failed to restore tablets for restoration "
587
0
          << restoration->restoration_id << ": " << status;
588
0
    }
589
2.00k
  }
590
591
  Result<SnapshotSchedulesToObjectIdsMap> MakeSnapshotSchedulesToObjectIdsMap(
592
7.99k
      SysRowEntryType type) {
593
7.99k
    std::vector<std::pair<SnapshotScheduleId, SnapshotScheduleFilterPB>> schedules;
594
7.99k
    {
595
7.99k
      std::lock_guard<std::mutex> lock(mutex_);
596
0
      for (const auto& schedule : schedules_) {
597
0
        if (!schedule->deleted()) {
598
0
          schedules.emplace_back(schedule->id(), schedule->options().filter());
599
0
        }
600
0
      }
601
7.99k
    }
602
7.99k
    SnapshotSchedulesToObjectIdsMap result;
603
0
    for (const auto& id_and_filter : schedules) {
604
0
      auto entries = VERIFY_RESULT(CollectEntries(id_and_filter.second));
605
0
      auto& ids = result[id_and_filter.first];
606
0
      for (const auto& entry : entries.entries()) {
607
0
        if (entry.type() == type) {
608
0
          ids.push_back(entry.id());
609
0
        }
610
0
      }
611
0
      std::sort(ids.begin(), ids.end());
612
0
    }
613
7.99k
    return result;
614
7.99k
  }
615
616
0
  Result<bool> IsTableCoveredBySomeSnapshotSchedule(const TableInfo& table_info) {
617
0
    auto lock = table_info.LockForRead();
618
0
    {
619
0
      std::lock_guard<std::mutex> l(mutex_);
620
0
      for (const auto& schedule : schedules_) {
621
0
        for (const auto& table_identifier : schedule->options().filter().tables().tables()) {
622
0
          if (VERIFY_RESULT(TableMatchesIdentifier(table_info.id(),
623
0
                                                   lock->pb,
624
0
                                                   table_identifier))) {
625
0
            return true;
626
0
          }
627
0
        }
628
0
      }
629
0
    }
630
0
    return false;
631
0
  }
632
633
5.35k
  void Start() {
634
5.35k
    {
635
5.35k
      std::lock_guard<std::mutex> lock(mutex_);
636
5.35k
      last_restorations_update_ht_ = context_.Clock()->Now();
637
5.35k
    }
638
5.35k
    poller_.Start(&context_.Scheduler(), FLAGS_snapshot_coordinator_poll_interval_ms * 1ms);
639
5.35k
  }
640
641
92
  void Shutdown() {
642
92
    poller_.Shutdown();
643
92
  }
644
645
 private:
646
  template <class Pb, class Map>
647
0
  CHECKED_STATUS LoadEntry(const Slice& id_slice, const Slice& data, Map* map) REQUIRES(mutex_) {
648
0
    VLOG(2) << __func__ << "(" << id_slice.ToDebugString() << ", " << data.ToDebugString() << ")";
649
650
0
    auto id = Uuid::TryFullyDecode(id_slice);
651
0
    if (id.IsNil()) {
652
0
      return Status::OK();
653
0
    }
654
0
    auto metadata = VERIFY_RESULT(pb_util::ParseFromSlice<Pb>(data));
655
0
    return LoadEntry(typename Map::key_type(id), metadata, map);
656
0
  }
Unexecuted instantiation: _ZN2yb6master25MasterSnapshotCoordinator4Impl9LoadEntryINS0_18SysSnapshotEntryPBEN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_13SnapshotStateENS8_14default_deleteISA_EEEENS6_10indexed_byINS6_13hashed_uniqueINS6_13const_mem_funISA_RKNS_17StronglyTypedUuidINS_17TxnSnapshotId_TagEEEXadL_ZNKSA_2idEvEEEEN4mpl_2naESO_SO_EENS6_18ordered_non_uniqueINS6_3tagINS2_11ScheduleTagESO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_EENS6_13composite_keyISA_NSG_ISA_RKNSH_INS_22SnapshotScheduleId_TagEEEXadL_ZNKSA_11schedule_idEvEEEENSG_ISA_NS_10HybridTimeEXadL_ZNKSA_20snapshot_hybrid_timeEvEEEENS5_6tuples9null_typeES13_S13_S13_S13_S13_S13_S13_EESO_EESO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_EENS8_9allocatorISD_EEEEEENS_6StatusERKNS_5SliceES1D_PT0_
Unexecuted instantiation: _ZN2yb6master25MasterSnapshotCoordinator4Impl9LoadEntryINS0_25SnapshotScheduleOptionsPBEN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_21SnapshotScheduleStateENS8_14default_deleteISA_EEEENS6_10indexed_byINS6_13hashed_uniqueINS6_13const_mem_funISA_RKNS_17StronglyTypedUuidINS_22SnapshotScheduleId_TagEEEXadL_ZNKSA_2idEvEEEEN4mpl_2naESO_SO_EESO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_EENS8_9allocatorISD_EEEEEENS_6StatusERKNS_5SliceESX_PT0_
657
658
  template <class Pb, class Map>
659
  CHECKED_STATUS LoadEntry(
660
      const typename Map::key_type& id, const Pb& data, Map* map)
661
0
      REQUIRES(mutex_) {
662
0
    VLOG(1) << __func__ << "(" << id << ", " << data.ShortDebugString() << ")";
663
664
0
    auto new_entry = std::make_unique<typename Map::value_type::element_type>(&context_, id, data);
665
666
0
    auto it = map->find(id);
667
0
    if (it == map->end()) {
668
0
      map->emplace(std::move(new_entry));
669
0
    } else if ((**it).ShouldUpdate(*new_entry)) {
670
0
      map->replace(it, std::move(new_entry));
671
0
    } else {
672
0
      VLOG_WITH_FUNC(1) << "Ignore because of version check, existing: " << (**it).ToString()
673
0
                        << ", loaded: " << new_entry->ToString();
674
0
    }
675
676
0
    return Status::OK();
677
0
  }
Unexecuted instantiation: _ZN2yb6master25MasterSnapshotCoordinator4Impl9LoadEntryINS0_18SysSnapshotEntryPBEN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_13SnapshotStateENS8_14default_deleteISA_EEEENS6_10indexed_byINS6_13hashed_uniqueINS6_13const_mem_funISA_RKNS_17StronglyTypedUuidINS_17TxnSnapshotId_TagEEEXadL_ZNKSA_2idEvEEEEN4mpl_2naESO_SO_EENS6_18ordered_non_uniqueINS6_3tagINS2_11ScheduleTagESO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_EENS6_13composite_keyISA_NSG_ISA_RKNSH_INS_22SnapshotScheduleId_TagEEEXadL_ZNKSA_11schedule_idEvEEEENSG_ISA_NS_10HybridTimeEXadL_ZNKSA_20snapshot_hybrid_timeEvEEEENS5_6tuples9null_typeES13_S13_S13_S13_S13_S13_S13_EESO_EESO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_EENS8_9allocatorISD_EEEEEENS_6StatusERKNT0_8key_typeERKT_PS1B_
Unexecuted instantiation: _ZN2yb6master25MasterSnapshotCoordinator4Impl9LoadEntryINS0_25SnapshotScheduleOptionsPBEN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_21SnapshotScheduleStateENS8_14default_deleteISA_EEEENS6_10indexed_byINS6_13hashed_uniqueINS6_13const_mem_funISA_RKNS_17StronglyTypedUuidINS_22SnapshotScheduleId_TagEEEXadL_ZNKSA_2idEvEEEEN4mpl_2naESO_SO_EESO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_SO_EENS8_9allocatorISD_EEEEEENS_6StatusERKNT0_8key_typeERKT_PSV_
678
679
0
  Result<SnapshotState&> FindSnapshot(const TxnSnapshotId& snapshot_id) REQUIRES(mutex_) {
680
0
    auto it = snapshots_.find(snapshot_id);
681
0
    if (it == snapshots_.end()) {
682
0
      return STATUS(NotFound, "Could not find snapshot", snapshot_id.ToString(),
683
0
                    MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND));
684
0
    }
685
0
    return **it;
686
0
  }
687
688
  Result<RestorationState&> FindRestoration(
689
0
      const TxnSnapshotRestorationId& restoration_id) REQUIRES(mutex_) {
690
0
    auto it = restorations_.find(restoration_id);
691
0
    if (it == restorations_.end()) {
692
0
      return STATUS(NotFound, "Could not find restoration", restoration_id.ToString(),
693
0
                    MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
694
0
    }
695
0
    return **it;
696
0
  }
697
698
  Result<SnapshotScheduleState&> FindSnapshotSchedule(
699
0
      const SnapshotScheduleId& id) REQUIRES(mutex_) {
700
0
    auto it = schedules_.find(id);
701
0
    if (it == schedules_.end()) {
702
0
      return STATUS(NotFound, "Could not find snapshot schedule", id.ToString(),
703
0
                    MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND));
704
0
    }
705
0
    return **it;
706
0
  }
707
708
11.1k
  void ExecuteOperations(const TabletSnapshotOperations& operations, int64_t leader_term) {
709
11.1k
    if (operations.empty()) {
710
11.1k
      return;
711
11.1k
    }
712
0
    VLOG(4) << __func__ << "(" << AsString(operations) << ")";
713
714
0
    size_t num_operations = operations.size();
715
0
    LOG(INFO) << "Number of snapshot operations to be executed " << num_operations;
716
0
    std::vector<TabletId> tablet_ids;
717
0
    tablet_ids.reserve(num_operations);
718
0
    for (const auto& operation : operations) {
719
0
      tablet_ids.push_back(operation.tablet_id);
720
0
    }
721
0
    auto tablet_infos = context_.GetTabletInfos(tablet_ids);
722
0
    for (size_t i = 0; i != num_operations; ++i) {
723
0
      ExecuteOperation(operations[i], tablet_infos[i], leader_term);
724
0
    }
725
0
  }
726
727
  void ExecuteOperation(
728
      const TabletSnapshotOperation& operation, const TabletInfoPtr& tablet_info,
729
0
      int64_t leader_term) {
730
0
    auto callback = MakeDoneCallback(
731
0
        &mutex_, snapshots_, operation.snapshot_id, operation.tablet_id,
732
0
        std::bind(&Impl::UpdateSnapshot, this, _1, leader_term, _2));
733
0
    if (!tablet_info) {
734
0
      callback(STATUS_EC_FORMAT(NotFound, MasterError(MasterErrorPB::TABLET_NOT_RUNNING),
735
0
                                "Tablet info not found for $0", operation.tablet_id));
736
0
      return;
737
0
    }
738
0
    auto snapshot_id_str = operation.snapshot_id.AsSlice().ToBuffer();
739
740
0
    if (operation.state == SysSnapshotEntryPB::DELETING) {
741
0
      auto task = context_.CreateAsyncTabletSnapshotOp(
742
0
          tablet_info, snapshot_id_str, tserver::TabletSnapshotOpRequestPB::DELETE_ON_TABLET,
743
0
          callback);
744
0
      context_.ScheduleTabletSnapshotOp(task);
745
0
    } else if (operation.state == SysSnapshotEntryPB::CREATING) {
746
0
      auto task = context_.CreateAsyncTabletSnapshotOp(
747
0
          tablet_info, snapshot_id_str, tserver::TabletSnapshotOpRequestPB::CREATE_ON_TABLET,
748
0
          callback);
749
0
      task->SetSnapshotScheduleId(operation.schedule_id);
750
0
      task->SetSnapshotHybridTime(operation.snapshot_hybrid_time);
751
0
      context_.ScheduleTabletSnapshotOp(task);
752
0
    } else {
753
0
      LOG(DFATAL) << "Unsupported snapshot operation: " << operation.ToString();
754
0
    }
755
0
  }
756
757
  struct PollSchedulesData {
758
    std::vector<TxnSnapshotId> delete_snapshots;
759
    SnapshotScheduleOperations schedule_operations;
760
    ScheduleMinRestoreTime schedule_min_restore_time;
761
  };
762
763
24.1k
  void Poll() {
764
24.1k
    auto leader_term = context_.LeaderTerm();
765
24.1k
    if (leader_term < 0) {
766
12.9k
      return;
767
12.9k
    }
768
0
    VLOG(4) << __func__ << "()";
769
11.1k
    std::vector<TxnSnapshotId> cleanup_snapshots;
770
11.1k
    TabletSnapshotOperations operations;
771
11.1k
    PollSchedulesData schedules_data;
772
11.1k
    {
773
11.1k
      std::lock_guard<std::mutex> lock(mutex_);
774
0
      for (const auto& p : snapshots_) {
775
0
        if (p->NeedCleanup()) {
776
0
          LOG(INFO) << "Cleanup of snapshot " << p->id() << " started.";
777
0
          if (!p->CleanupTracker().Start().ok()) {
778
0
            LOG(DFATAL) << "Cleanup of snapshot " << p->id() << " was already started.";
779
0
          }
780
0
          cleanup_snapshots.push_back(p->id());
781
0
        } else {
782
          // Refresh the throttle limit.
783
0
          p->Throttler().RefreshLimit(
784
0
              GetRpcLimit(FLAGS_max_concurrent_snapshot_rpcs,
785
0
                          FLAGS_max_concurrent_snapshot_rpcs_per_tserver, leader_term));
786
0
          p->PrepareOperations(&operations);
787
0
        }
788
0
      }
789
11.1k
      PollSchedulesPrepare(&schedules_data);
790
11.1k
    }
791
0
    for (const auto& id : cleanup_snapshots) {
792
0
      CleanupObject(leader_term, id, snapshots_, EncodedSnapshotKey(id, &context_));
793
0
    }
794
11.1k
    ExecuteOperations(operations, leader_term);
795
11.1k
    PollSchedulesComplete(schedules_data, leader_term);
796
11.1k
  }
797
798
0
  void TryDeleteSnapshot(SnapshotState* snapshot, PollSchedulesData* data) {
799
0
    auto delete_status = snapshot->TryStartDelete();
800
0
    if (!delete_status.ok()) {
801
0
      VLOG(1) << "Unable to delete snapshot " << snapshot->id() << "/" << snapshot->schedule_id()
802
0
              << ": " << delete_status << ", state: " << snapshot->ToString();
803
0
      return;
804
0
    }
805
806
0
    VLOG(1) << "Cleanup snapshot: " << snapshot->id() << "/" << snapshot->schedule_id();
807
0
    data->delete_snapshots.push_back(snapshot->id());
808
0
  }
809
810
11.1k
  void PollSchedulesPrepare(PollSchedulesData* data) REQUIRES(mutex_) {
811
11.1k
    auto now = context_.Clock()->Now();
812
0
    for (const auto& p : schedules_) {
813
0
      HybridTime last_snapshot_time;
814
0
      if (p->deleted()) {
815
0
        auto range = snapshots_.get<ScheduleTag>().equal_range(p->id());
816
0
        for (const auto& snapshot : boost::make_iterator_range(range.first, range.second)) {
817
0
          TryDeleteSnapshot(snapshot.get(), data);
818
0
        }
819
0
      } else {
820
0
        auto& index = snapshots_.get<ScheduleTag>();
821
0
        auto range = index.equal_range(p->id());
822
0
        if (range.first != range.second) {
823
0
          --range.second;
824
0
          for (; range.first != range.second; ++range.first) {
825
0
            if ((**range.first).initial_state() != SysSnapshotEntryPB::DELETING) {
826
0
              break;
827
0
            }
828
0
          }
829
0
          auto& first_snapshot = **range.first;
830
0
          data->schedule_min_restore_time[p->id()] =
831
0
              first_snapshot.previous_snapshot_hybrid_time()
832
0
                  ? first_snapshot.previous_snapshot_hybrid_time()
833
0
                  : first_snapshot.snapshot_hybrid_time();
834
0
          auto gc_limit = now.AddSeconds(-p->options().retention_duration_sec());
835
0
          VLOG_WITH_FUNC(4) << "Gc limit: " << gc_limit;
836
0
          for (; range.first != range.second; ++range.first) {
837
0
            if ((**range.first).snapshot_hybrid_time() >= gc_limit) {
838
0
              break;
839
0
            }
840
0
            TryDeleteSnapshot(range.first->get(), data);
841
0
          }
842
0
          last_snapshot_time = (**range.second).snapshot_hybrid_time();
843
0
        }
844
0
      }
845
0
      p->PrepareOperations(last_snapshot_time, now, &data->schedule_operations);
846
0
    }
847
11.1k
  }
848
849
11.1k
  void PollSchedulesComplete(const PollSchedulesData& data, int64_t leader_term) EXCLUDES(mutex_) {
850
0
    for (const auto& id : data.delete_snapshots) {
851
0
      SubmitDelete(id, leader_term, nullptr);
852
0
    }
853
0
    for (const auto& operation : data.schedule_operations) {
854
0
      switch (operation.type) {
855
0
        case SnapshotScheduleOperationType::kCreateSnapshot:
856
0
          WARN_NOT_OK(ExecuteScheduleOperation(operation, leader_term),
857
0
                      Format("Failed to execute operation on $0", operation.schedule_id));
858
0
          break;
859
0
        case SnapshotScheduleOperationType::kCleanup:
860
0
          CleanupObject(
861
0
              leader_term, operation.schedule_id, schedules_,
862
0
              SnapshotScheduleState::EncodedKey(operation.schedule_id, &context_));
863
0
          break;
864
0
        default:
865
0
          LOG(DFATAL) << "Unexpected operation type: " << operation.type;
866
0
          break;
867
0
      }
868
0
    }
869
11.1k
    context_.CleanupHiddenObjects(data.schedule_min_restore_time);
870
11.1k
  }
871
872
  SnapshotState* BoundingSnapshot(const SnapshotScheduleId& schedule_id, Bound bound)
873
0
      REQUIRES(mutex_) {
874
0
    auto& index = snapshots_.get<ScheduleTag>();
875
0
    decltype(index.begin()) it;
876
0
    if (bound == Bound::kFirst) {
877
0
      it = index.lower_bound(schedule_id);
878
0
      if (it == index.end()) {
879
0
        return nullptr;
880
0
      }
881
0
    } else {
882
0
      it = index.upper_bound(schedule_id);
883
0
      if (it == index.begin()) {
884
0
        return nullptr;
885
0
      }
886
0
      --it;
887
0
    }
888
0
    return (**it).schedule_id() == schedule_id ? it->get() : nullptr;
889
0
  }
890
891
0
  HybridTime LastSnapshotTime(const SnapshotScheduleId& schedule_id) REQUIRES(mutex_) {
892
0
    auto snapshot = BoundingSnapshot(schedule_id, Bound::kLast);
893
0
    return snapshot ? snapshot->snapshot_hybrid_time() : HybridTime::kInvalid;
894
0
  }
895
896
  template <typename Id, typename Map>
897
0
  void CleanupObjectAborted(Id id, const Map& map) {
898
0
    LOG(INFO) << "Aborting cleanup of object " << id;
899
0
    std::lock_guard<std::mutex> l(mutex_);
900
0
    auto it = map.find(id);
901
0
    if (it == map.end()) {
902
0
      return;
903
0
    }
904
0
    (**it).CleanupTracker().Abort();
905
0
  }
Unexecuted instantiation: _ZN2yb6master25MasterSnapshotCoordinator4Impl20CleanupObjectAbortedINS_17StronglyTypedUuidINS_17TxnSnapshotId_TagEEEN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_13SnapshotStateENSA_14default_deleteISC_EEEENS8_10indexed_byINS8_13hashed_uniqueINS8_13const_mem_funISC_RKS6_XadL_ZNKSC_2idEvEEEEN4mpl_2naESN_SN_EENS8_18ordered_non_uniqueINS8_3tagINS2_11ScheduleTagESN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_EENS8_13composite_keyISC_NSI_ISC_RKNS4_INS_22SnapshotScheduleId_TagEEEXadL_ZNKSC_11schedule_idEvEEEENSI_ISC_NS_10HybridTimeEXadL_ZNKSC_20snapshot_hybrid_timeEvEEEENS7_6tuples9null_typeES12_S12_S12_S12_S12_S12_S12_EESN_EESN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_EENSA_9allocatorISF_EEEEEEvT_RKT0_
Unexecuted instantiation: _ZN2yb6master25MasterSnapshotCoordinator4Impl20CleanupObjectAbortedINS_17StronglyTypedUuidINS_22SnapshotScheduleId_TagEEEN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_21SnapshotScheduleStateENSA_14default_deleteISC_EEEENS8_10indexed_byINS8_13hashed_uniqueINS8_13const_mem_funISC_RKS6_XadL_ZNKSC_2idEvEEEEN4mpl_2naESN_SN_EESN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_EENSA_9allocatorISF_EEEEEEvT_RKT0_
906
907
  template <typename Map, typename Id>
908
  void CleanupObject(int64_t leader_term, Id id, const Map& map,
909
0
                     const Result<docdb::KeyBytes>& encoded_key) {
910
0
    if (!encoded_key.ok()) {
911
0
      LOG(DFATAL) << "Failed to encode id for deletion: " << encoded_key.status();
912
0
      return;
913
0
    }
914
915
0
    auto query = std::make_unique<tablet::WriteQuery>(
916
0
        leader_term, CoarseMonoClock::Now() + FLAGS_sys_catalog_write_timeout_ms * 1ms,
917
0
        nullptr /* context */, nullptr /* tablet */);
918
919
0
    auto* write_batch = query->operation().AllocateRequest()->mutable_write_batch();
920
0
    auto pair = write_batch->add_write_pairs();
921
0
    pair->set_key((*encoded_key).AsSlice().cdata(), (*encoded_key).size());
922
0
    char value = { docdb::ValueTypeAsChar::kTombstone };
923
0
    pair->set_value(&value, 1);
924
925
0
    query->set_callback([this, id, &map](const Status& s) {
926
0
      if (s.ok()) {
927
0
        LOG(INFO) << "Finished cleanup of object " << id;
928
0
        return;
929
0
      }
930
0
      CleanupObjectAborted(id, map);
931
0
    });
Unexecuted instantiation: _ZZN2yb6master25MasterSnapshotCoordinator4Impl13CleanupObjectIN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_13SnapshotStateENS7_14default_deleteIS9_EEEENS5_10indexed_byINS5_13hashed_uniqueINS5_13const_mem_funIS9_RKNS_17StronglyTypedUuidINS_17TxnSnapshotId_TagEEEXadL_ZNKS9_2idEvEEEEN4mpl_2naESN_SN_EENS5_18ordered_non_uniqueINS5_3tagINS2_11ScheduleTagESN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_EENS5_13composite_keyIS9_NSF_IS9_RKNSG_INS_22SnapshotScheduleId_TagEEEXadL_ZNKS9_11schedule_idEvEEEENSF_IS9_NS_10HybridTimeEXadL_ZNKS9_20snapshot_hybrid_timeEvEEEENS4_6tuples9null_typeES12_S12_S12_S12_S12_S12_S12_EESN_EESN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_EENS7_9allocatorISC_EEEESI_EEvxT0_RKT_RKNS_6ResultINS_5docdb8KeyBytesEEEENKUlRKNS_6StatusEE_clES1L_
Unexecuted instantiation: _ZZN2yb6master25MasterSnapshotCoordinator4Impl13CleanupObjectIN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_21SnapshotScheduleStateENS7_14default_deleteIS9_EEEENS5_10indexed_byINS5_13hashed_uniqueINS5_13const_mem_funIS9_RKNS_17StronglyTypedUuidINS_22SnapshotScheduleId_TagEEEXadL_ZNKS9_2idEvEEEEN4mpl_2naESN_SN_EESN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_EENS7_9allocatorISC_EEEESI_EEvxT0_RKT_RKNS_6ResultINS_5docdb8KeyBytesEEEENKUlRKNS_6StatusEE_clES15_
932
933
0
    context_.Submit(query.release()->PrepareSubmit(), leader_term);
934
0
  }
Unexecuted instantiation: _ZN2yb6master25MasterSnapshotCoordinator4Impl13CleanupObjectIN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_13SnapshotStateENS7_14default_deleteIS9_EEEENS5_10indexed_byINS5_13hashed_uniqueINS5_13const_mem_funIS9_RKNS_17StronglyTypedUuidINS_17TxnSnapshotId_TagEEEXadL_ZNKS9_2idEvEEEEN4mpl_2naESN_SN_EENS5_18ordered_non_uniqueINS5_3tagINS2_11ScheduleTagESN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_EENS5_13composite_keyIS9_NSF_IS9_RKNSG_INS_22SnapshotScheduleId_TagEEEXadL_ZNKS9_11schedule_idEvEEEENSF_IS9_NS_10HybridTimeEXadL_ZNKS9_20snapshot_hybrid_timeEvEEEENS4_6tuples9null_typeES12_S12_S12_S12_S12_S12_S12_EESN_EESN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_EENS7_9allocatorISC_EEEESI_EEvxT0_RKT_RKNS_6ResultINS_5docdb8KeyBytesEEE
Unexecuted instantiation: _ZN2yb6master25MasterSnapshotCoordinator4Impl13CleanupObjectIN5boost11multi_index21multi_index_containerINSt3__110unique_ptrINS0_21SnapshotScheduleStateENS7_14default_deleteIS9_EEEENS5_10indexed_byINS5_13hashed_uniqueINS5_13const_mem_funIS9_RKNS_17StronglyTypedUuidINS_22SnapshotScheduleId_TagEEEXadL_ZNKS9_2idEvEEEEN4mpl_2naESN_SN_EESN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_SN_EENS7_9allocatorISC_EEEESI_EEvxT0_RKT_RKNS_6ResultINS_5docdb8KeyBytesEEE
935
936
  CHECKED_STATUS ExecuteScheduleOperation(
937
      const SnapshotScheduleOperation& operation, int64_t leader_term,
938
0
      const std::weak_ptr<Synchronizer>& synchronizer = std::weak_ptr<Synchronizer>()) {
939
0
    auto entries = CollectEntries(operation.filter);
940
0
    VLOG(2) << __func__ << "(" << AsString(operation) << ", " << leader_term << "), entries: "
941
0
            << AsString(entries);
942
0
    if (!entries.ok()) {
943
0
      CreateSnapshotAborted(entries.status(), operation.schedule_id, operation.snapshot_id);
944
0
      return entries.status();
945
0
    }
946
0
    SubmitCreate(
947
0
        *entries, /* imported= */ false, operation.schedule_id,
948
0
        operation.previous_snapshot_hybrid_time, operation.snapshot_id, leader_term,
949
0
        [this, schedule_id = operation.schedule_id, snapshot_id = operation.snapshot_id,
950
0
         synchronizer](
951
0
            const Status& status) {
952
0
          if (!status.ok()) {
953
0
            CreateSnapshotAborted(status, schedule_id, snapshot_id);
954
0
          }
955
0
          auto locked_synchronizer = synchronizer.lock();
956
0
          if (locked_synchronizer) {
957
0
            locked_synchronizer->StatusCB(status);
958
0
          }
959
0
        });
960
0
    return Status::OK();
961
0
  }
962
963
  void CreateSnapshotAborted(
964
      const Status& status, const SnapshotScheduleId& schedule_id,
965
0
      const TxnSnapshotId& snapshot_id) {
966
0
    LOG(INFO) << __func__ << " for " << schedule_id << ", snapshot: " << snapshot_id
967
0
              << ", status: " << status;
968
0
    std::lock_guard<std::mutex> lock(mutex_);
969
0
    auto it = schedules_.find(schedule_id);
970
0
    if (it == schedules_.end()) {
971
0
      return;
972
0
    }
973
0
    (**it).SnapshotFinished(snapshot_id, status);
974
0
  }
975
976
  void SubmitCreate(
977
      const SysRowEntries& entries, bool imported, const SnapshotScheduleId& schedule_id,
978
      HybridTime previous_snapshot_hybrid_time, TxnSnapshotId snapshot_id, int64_t leader_term,
979
0
      tablet::OperationCompletionCallback completion_clbk) {
980
0
    auto operation = std::make_unique<tablet::SnapshotOperation>(/* tablet= */ nullptr);
981
0
    auto request = operation->AllocateRequest();
982
983
0
    VLOG(1) << __func__ << "(" << AsString(entries) << ", " << imported << ", " << schedule_id
984
0
            << ", " << snapshot_id << ")";
985
0
    for (const auto& entry : entries.entries()) {
986
0
      if (entry.type() == SysRowEntryType::TABLET) {
987
0
        request->add_tablet_id(entry.id());
988
0
      }
989
0
    }
990
991
0
    request->set_snapshot_hybrid_time(context_.Clock()->MaxGlobalNow().ToUint64());
992
0
    request->set_operation(tserver::TabletSnapshotOpRequestPB::CREATE_ON_MASTER);
993
0
    request->set_snapshot_id(snapshot_id.data(), snapshot_id.size());
994
0
    request->set_imported(imported);
995
0
    if (schedule_id) {
996
0
      request->set_schedule_id(schedule_id.data(), schedule_id.size());
997
0
    }
998
0
    if (previous_snapshot_hybrid_time) {
999
0
      request->set_previous_snapshot_hybrid_time(previous_snapshot_hybrid_time.ToUint64());
1000
0
    }
1001
1002
0
    request->mutable_extra_data()->PackFrom(entries);
1003
1004
0
    operation->set_completion_callback(std::move(completion_clbk));
1005
1006
0
    context_.Submit(std::move(operation), leader_term);
1007
0
  }
1008
1009
  void SubmitDelete(const TxnSnapshotId& snapshot_id, int64_t leader_term,
1010
0
                    const std::shared_ptr<Synchronizer>& synchronizer) {
1011
0
    auto operation = std::make_unique<tablet::SnapshotOperation>(nullptr);
1012
0
    auto request = operation->AllocateRequest();
1013
1014
0
    request->set_operation(tserver::TabletSnapshotOpRequestPB::DELETE_ON_MASTER);
1015
0
    request->set_snapshot_id(snapshot_id.data(), snapshot_id.size());
1016
1017
0
    operation->set_completion_callback(
1018
0
        [this, wsynchronizer = std::weak_ptr<Synchronizer>(synchronizer), snapshot_id]
1019
0
        (const Status& status) {
1020
0
          auto synchronizer = wsynchronizer.lock();
1021
0
          if (synchronizer) {
1022
0
            synchronizer->StatusCB(status);
1023
0
          }
1024
0
          if (!status.ok()) {
1025
0
            DeleteSnapshotAborted(status, snapshot_id);
1026
0
          }
1027
0
        });
1028
1029
0
    context_.Submit(std::move(operation), leader_term);
1030
0
  }
1031
1032
  CHECKED_STATUS SubmitRestore(
1033
      const TxnSnapshotId& snapshot_id, HybridTime restore_at,
1034
0
      const TxnSnapshotRestorationId& restoration_id, int64_t leader_term) {
1035
0
    auto synchronizer = std::make_shared<Synchronizer>();
1036
1037
0
    auto operation = std::make_unique<tablet::SnapshotOperation>(nullptr);
1038
0
    auto request = operation->AllocateRequest();
1039
1040
0
    request->set_operation(tserver::TabletSnapshotOpRequestPB::RESTORE_SYS_CATALOG);
1041
0
    request->set_snapshot_id(snapshot_id.data(), snapshot_id.size());
1042
0
    request->set_snapshot_hybrid_time(restore_at.ToUint64());
1043
0
    if (restoration_id) {
1044
0
      request->set_restoration_id(restoration_id.data(), restoration_id.size());
1045
0
    }
1046
1047
0
    operation->set_completion_callback(
1048
0
        tablet::MakeWeakSynchronizerOperationCompletionCallback(synchronizer));
1049
1050
0
    context_.Submit(std::move(operation), leader_term);
1051
1052
0
    return synchronizer->Wait();
1053
0
  }
1054
1055
  void DeleteSnapshotAborted(
1056
0
      const Status& status, const TxnSnapshotId& snapshot_id) {
1057
0
    LOG(INFO) << __func__ << ", snapshot: " << snapshot_id << ", status: " << status;
1058
0
    std::lock_guard<std::mutex> lock(mutex_);
1059
0
    auto it = snapshots_.find(snapshot_id);
1060
0
    if (it == snapshots_.end()) {
1061
0
      return;
1062
0
    }
1063
0
    (**it).DeleteAborted(status);
1064
0
  }
1065
1066
  void UpdateSnapshot(
1067
      SnapshotState* snapshot, int64_t leader_term, std::unique_lock<std::mutex>* lock)
1068
0
      REQUIRES(mutex_) {
1069
0
    bool batch_done = false;
1070
0
    bool is_empty = snapshot->Empty();
1071
1072
0
    if (!is_empty) {
1073
0
      batch_done = snapshot->Throttler().RemoveOutstandingTask();
1074
0
    }
1075
0
    if (!snapshot->AllTabletsDone()) {
1076
0
      if (FLAGS_schedule_snapshot_rpcs_out_of_band && batch_done && !is_empty) {
1077
        // Send another batch. This prevents having to wait for the regular cycle
1078
        // of master snapshot coordinator which can be too slow.
1079
0
        context_.Scheduler().io_service().post([this]() {
1080
0
          LOG(INFO) << "Rescheduling Snapshot RPCs out of band.";
1081
0
          Poll();
1082
0
        });
1083
0
      }
1084
0
      return;
1085
0
    }
1086
1087
0
    if (snapshot->schedule_id()) {
1088
0
      UpdateSchedule(*snapshot);
1089
0
    }
1090
1091
0
    docdb::KeyValueWriteBatchPB write_batch;
1092
0
    auto status = snapshot->StoreToWriteBatch(&write_batch);
1093
0
    if (!status.ok()) {
1094
0
      LOG(DFATAL) << "Failed to prepare write batch for snapshot: " << status;
1095
0
      return;
1096
0
    }
1097
0
    lock->unlock();
1098
1099
0
    SubmitWrite(std::move(write_batch), leader_term, &context_);
1100
0
  };
1101
1102
0
  void FinishRestoration(RestorationState* restoration, int64_t leader_term) REQUIRES(mutex_) {
1103
0
    if (!restoration->AllTabletsDone()) {
1104
0
      return;
1105
0
    }
1106
1107
0
    last_restorations_update_ht_ = context_.Clock()->Now();
1108
0
    restoration->set_complete_time(last_restorations_update_ht_);
1109
1110
0
    LOG(INFO) << "Setting restore complete time to " << last_restorations_update_ht_;
1111
1112
0
    if (restoration->schedule_id()) {
1113
0
      auto schedule = FindSnapshotSchedule(restoration->schedule_id());
1114
0
      if (schedule.ok()) {
1115
0
        docdb::KeyValueWriteBatchPB write_batch;
1116
0
        schedule->mutable_options().add_restoration_times(
1117
0
            last_restorations_update_ht_.ToUint64());
1118
0
        Status s = schedule->StoreToWriteBatch(&write_batch);
1119
0
        if (s.ok()) {
1120
0
          SubmitWrite(std::move(write_batch), leader_term, &context_);
1121
0
        } else {
1122
0
          LOG(INFO) << "Unable to prepare write batch for schedule "
1123
0
                    << schedule->id();
1124
0
        }
1125
0
      } else {
1126
0
        LOG(INFO) << "Snapshot Schedule with id " << restoration->schedule_id()
1127
0
                  << " not found";
1128
0
      }
1129
0
    }
1130
1131
0
    if (FLAGS_TEST_skip_sending_restore_finished) {
1132
0
      return;
1133
0
    }
1134
1135
0
    auto temp_ids = restoration->tablet_ids();
1136
0
    std::vector<TabletId> tablet_ids(temp_ids.begin(), temp_ids.end());
1137
0
    auto tablets = context_.GetTabletInfos(tablet_ids);
1138
0
    for (const auto& tablet : tablets) {
1139
0
      auto task = context_.CreateAsyncTabletSnapshotOp(
1140
0
          tablet, std::string(), tserver::TabletSnapshotOpRequestPB::RESTORE_FINISHED,
1141
0
          /* callback= */ nullptr);
1142
0
      task->SetRestorationId(restoration->restoration_id());
1143
0
      task->SetRestorationTime(restoration->complete_time());
1144
0
      context_.ScheduleTabletSnapshotOp(task);
1145
0
    }
1146
0
  }
1147
1148
0
  void UpdateSchedule(const SnapshotState& snapshot) REQUIRES(mutex_) {
1149
0
    auto it = schedules_.find(snapshot.schedule_id());
1150
0
    if (it == schedules_.end()) {
1151
0
      return;
1152
0
    }
1153
1154
0
    auto state = snapshot.AggregatedState();
1155
0
    Status status;
1156
0
    if (!state.ok()) {
1157
0
      status = state.status();
1158
0
    } else {
1159
0
      switch (*state) {
1160
0
        case SysSnapshotEntryPB::COMPLETE:
1161
0
          status = Status::OK();
1162
0
          break;
1163
0
        case SysSnapshotEntryPB::FAILED:
1164
0
          status = snapshot.AnyFailure();
1165
0
          break;
1166
0
        case SysSnapshotEntryPB::DELETED:
1167
0
          return;
1168
0
        default:
1169
0
          LOG(DFATAL) << "Unexpected snapshot state: " << *state << " for " << snapshot.id();
1170
0
          return;
1171
0
      }
1172
0
    }
1173
0
    (**it).SnapshotFinished(snapshot.id(), status);
1174
0
  }
1175
1176
  CHECKED_STATUS FillSchedule(const SnapshotScheduleState& schedule, SnapshotScheduleInfoPB* out)
1177
0
      REQUIRES(mutex_) {
1178
0
    RETURN_NOT_OK(schedule.ToPB(out));
1179
0
    const auto& index = snapshots_.get<ScheduleTag>();
1180
0
    auto p = index.equal_range(boost::make_tuple(schedule.id()));
1181
0
    for (auto i = p.first; i != p.second; ++i) {
1182
0
      RETURN_NOT_OK((**i).ToPB(out->add_snapshots()));
1183
0
    }
1184
0
    return Status::OK();
1185
0
  }
1186
1187
0
  Result<SysRowEntries> CollectEntries(const SnapshotScheduleFilterPB& filter) {
1188
0
    return context_.CollectEntriesForSnapshot(filter.tables().tables());
1189
0
  }
1190
1191
  CHECKED_STATUS DoRestore(
1192
      const TxnSnapshotId& snapshot_id, HybridTime restore_at,
1193
      const TxnSnapshotRestorationId& restoration_id, const std::vector<TabletId>& restore_tablets,
1194
0
      RestorePhase phase, int64_t leader_term) {
1195
0
    TabletInfos tablet_infos;
1196
0
    bool restore_sys_catalog;
1197
0
    std::unordered_set<TabletId> snapshot_tablets;
1198
0
    {
1199
0
      std::lock_guard<std::mutex> lock(mutex_);
1200
0
      SnapshotState& snapshot = VERIFY_RESULT(FindSnapshot(snapshot_id));
1201
0
      if (!VERIFY_RESULT(snapshot.Complete())) {
1202
0
        return STATUS(IllegalState, "The snapshot state is not complete", snapshot_id.ToString(),
1203
0
                      MasterError(MasterErrorPB::SNAPSHOT_IS_NOT_READY));
1204
0
      }
1205
0
      restore_sys_catalog = phase == RestorePhase::kInitial && !snapshot.schedule_id().IsNil();
1206
0
      if (!FLAGS_allow_consecutive_restore && restore_sys_catalog) {
1207
0
        SnapshotScheduleState& schedule =
1208
0
            VERIFY_RESULT(FindSnapshotSchedule(snapshot.schedule_id()));
1209
1210
        // Find the latest restore.
1211
0
        HybridTime latest_restore_ht = HybridTime::kMin;
1212
0
        for (const auto& restoration_ht : schedule.options().restoration_times()) {
1213
0
          if (HybridTime::FromPB(restoration_ht) > latest_restore_ht) {
1214
0
            latest_restore_ht = HybridTime::FromPB(restoration_ht);
1215
0
          }
1216
0
        }
1217
0
        LOG(INFO) << "Last successful restoration completed at "
1218
0
                  << latest_restore_ht;
1219
1220
0
        if (restore_at <= latest_restore_ht) {
1221
0
          LOG(INFO) << "Restore with id " << restoration_id << " not supported "
1222
0
                    << "because it is consecutive. Attempting to restore to "
1223
0
                    << restore_at << " while last successful restoration completed at "
1224
0
                    << latest_restore_ht;
1225
0
          return STATUS_FORMAT(NotSupported,
1226
0
                               "Cannot restore before the previous restoration time. "
1227
0
                                  "A Restoration was performed at $0 and the requested "
1228
0
                                  "restoration is for $1 which is before the last restoration.",
1229
0
                               latest_restore_ht, restore_at);
1230
0
        }
1231
0
      }
1232
0
      RestorationState* restoration_ptr;
1233
0
      if (phase == RestorePhase::kInitial) {
1234
0
        auto restoration = std::make_unique<RestorationState>(&context_, restoration_id, &snapshot);
1235
0
        restoration_ptr = restorations_.emplace(std::move(restoration)).first->get();
1236
0
        last_restorations_update_ht_ = context_.Clock()->Now();
1237
0
      } else {
1238
0
        restoration_ptr = &VERIFY_RESULT(FindRestoration(restoration_id)).get();
1239
0
      }
1240
0
      if (!restore_sys_catalog) {
1241
0
        if (phase == RestorePhase::kPostSysCatalogLoad) {
1242
0
          LOG(INFO)
1243
0
              << "PITR: " << restoration_id << " restore tablets: " << AsString(restore_tablets);
1244
          // New tablets could be changed between restoration point and snapshot time.
1245
          // So we take tablets list from actual catalog state.
1246
0
          restoration_ptr->InitTabletIds(restore_tablets);
1247
0
        }
1248
0
        tablet_infos = restoration_ptr->PrepareOperations();
1249
0
      }
1250
0
      auto tablet_ids = snapshot.tablet_ids();
1251
0
      snapshot_tablets.insert(tablet_ids.begin(), tablet_ids.end());
1252
0
    }
1253
1254
    // If sys catalog is restored, then tablets data will be restored after that using postponed
1255
    // restores.
1256
0
    if (restore_sys_catalog) {
1257
0
      return SubmitRestore(snapshot_id, restore_at, restoration_id, leader_term);
1258
0
    }
1259
1260
0
    auto snapshot_id_str = snapshot_id.AsSlice().ToBuffer();
1261
0
    SendMetadata send_metadata(phase == RestorePhase::kPostSysCatalogLoad);
1262
0
    LOG(INFO) << "PITR: " << restoration_id << " restore tablets: " << AsString(tablet_infos);
1263
0
    for (const auto& tablet : tablet_infos) {
1264
      // If this tablet did not participate in snapshot, i.e. was deleted.
1265
      // We just change hybrid hybrid time limit and clear hide state.
1266
0
      auto task = context_.CreateAsyncTabletSnapshotOp(
1267
0
          tablet, snapshot_tablets.count(tablet->id()) ? snapshot_id_str : std::string(),
1268
0
          tserver::TabletSnapshotOpRequestPB::RESTORE_ON_TABLET,
1269
0
          MakeDoneCallback(&mutex_, restorations_, restoration_id, tablet->tablet_id(),
1270
0
                           std::bind(&Impl::FinishRestoration, this, _1, leader_term)));
1271
0
      task->SetSnapshotHybridTime(restore_at);
1272
0
      task->SetRestorationId(restoration_id);
1273
0
      if (send_metadata) {
1274
0
        task->SetMetadata(tablet->table()->LockForRead()->pb);
1275
0
      }
1276
1277
0
      context_.ScheduleTabletSnapshotOp(task);
1278
0
    }
1279
1280
    // For empty tablet list, finish the restore.
1281
0
    if (tablet_infos.empty()) {
1282
0
      std::lock_guard<std::mutex> lock(mutex_);
1283
0
      RestorationState* restoration_ptr = &VERIFY_RESULT(FindRestoration(restoration_id)).get();
1284
0
      if (restoration_ptr) {
1285
0
        FinishRestoration(restoration_ptr, leader_term);
1286
0
      }
1287
0
    }
1288
1289
0
    return Status::OK();
1290
0
  }
1291
1292
  // Computes the maximum outstanding Snapshot Create/Delete/Restore RPC
1293
  // that is permitted. If total limit is specified then it is used otherwise
1294
  // the value is computed by multiplying tserver count with the per tserver limit.
1295
0
  uint64_t GetRpcLimit(int64_t total_limit, int64_t per_tserver_limit, int64_t leader_term) {
1296
    // NO OP for followers.
1297
0
    if (leader_term < 0) {
1298
0
      return std::numeric_limits<int>::max();
1299
0
    }
1300
    // Should execute only for leaders.
1301
0
    if (total_limit == 0) {
1302
0
      return std::numeric_limits<int>::max();
1303
0
    }
1304
0
    if (total_limit > 0) {
1305
0
      return total_limit;
1306
0
    }
1307
0
    return context_.GetNumLiveTServersForActiveCluster() * per_tserver_limit;
1308
0
  }
1309
1310
  SnapshotCoordinatorContext& context_;
1311
  std::mutex mutex_;
1312
  class ScheduleTag;
1313
  using Snapshots = boost::multi_index_container<
1314
      std::unique_ptr<SnapshotState>,
1315
      boost::multi_index::indexed_by<
1316
          // Access snapshots by id.
1317
          boost::multi_index::hashed_unique<
1318
              boost::multi_index::const_mem_fun<
1319
                  SnapshotState, const TxnSnapshotId&, &SnapshotState::id>
1320
          >,
1321
          // Group snapshots by schedule id. Ordered by hybrid time for the same schedule.
1322
          boost::multi_index::ordered_non_unique<
1323
              boost::multi_index::tag<ScheduleTag>,
1324
              boost::multi_index::composite_key<
1325
                  SnapshotState,
1326
                  boost::multi_index::const_mem_fun<
1327
                      SnapshotState, const SnapshotScheduleId&, &SnapshotState::schedule_id>,
1328
                  boost::multi_index::const_mem_fun<
1329
                      SnapshotState, HybridTime, &SnapshotState::snapshot_hybrid_time>
1330
              >
1331
          >
1332
      >
1333
  >;
1334
  // For restorations and schedules we have to use multi_index since there are template
1335
  // functions that expect same interface for those collections.
1336
  using Restorations = boost::multi_index_container<
1337
      std::unique_ptr<RestorationState>,
1338
      boost::multi_index::indexed_by<
1339
          boost::multi_index::hashed_unique<
1340
              boost::multi_index::const_mem_fun<
1341
                  RestorationState, const TxnSnapshotRestorationId&,
1342
                  &RestorationState::restoration_id>
1343
          >
1344
      >
1345
  >;
1346
  using Schedules = boost::multi_index_container<
1347
      std::unique_ptr<SnapshotScheduleState>,
1348
      boost::multi_index::indexed_by<
1349
          boost::multi_index::hashed_unique<
1350
              boost::multi_index::const_mem_fun<
1351
                  SnapshotScheduleState, const SnapshotScheduleId&, &SnapshotScheduleState::id>
1352
          >
1353
      >
1354
  >;
1355
1356
  Snapshots snapshots_ GUARDED_BY(mutex_);
1357
  Restorations restorations_ GUARDED_BY(mutex_);
1358
  HybridTime last_restorations_update_ht_ GUARDED_BY(mutex_);
1359
  Schedules schedules_ GUARDED_BY(mutex_);
1360
  rpc::Poller poller_;
1361
1362
  // Restores postponed until sys catalog is reloaed.
1363
  std::vector<SnapshotScheduleRestorationPtr> postponed_restores_ GUARDED_BY(mutex_);
1364
};
1365
1366
MasterSnapshotCoordinator::MasterSnapshotCoordinator(SnapshotCoordinatorContext* context)
1367
5.45k
    : impl_(new Impl(context)) {}
1368
1369
92
MasterSnapshotCoordinator::~MasterSnapshotCoordinator() {}
1370
1371
Result<TxnSnapshotId> MasterSnapshotCoordinator::Create(
1372
0
    const SysRowEntries& entries, bool imported, int64_t leader_term, CoarseTimePoint deadline) {
1373
0
  return impl_->Create(entries, imported, leader_term, deadline);
1374
0
}
1375
1376
Status MasterSnapshotCoordinator::CreateReplicated(
1377
0
    int64_t leader_term, const tablet::SnapshotOperation& operation) {
1378
0
  return impl_->CreateReplicated(leader_term, operation);
1379
0
}
1380
1381
Status MasterSnapshotCoordinator::DeleteReplicated(
1382
0
    int64_t leader_term, const tablet::SnapshotOperation& operation) {
1383
0
  return impl_->DeleteReplicated(leader_term, operation);
1384
0
}
1385
1386
Status MasterSnapshotCoordinator::RestoreSysCatalogReplicated(
1387
0
    int64_t leader_term, const tablet::SnapshotOperation& operation, Status* complete_status) {
1388
0
  return impl_->RestoreSysCatalogReplicated(leader_term, operation, complete_status);
1389
0
}
1390
1391
Status MasterSnapshotCoordinator::ListSnapshots(
1392
0
    const TxnSnapshotId& snapshot_id, bool list_deleted, ListSnapshotsResponsePB* resp) {
1393
0
  return impl_->ListSnapshots(snapshot_id, list_deleted, resp);
1394
0
}
1395
1396
Status MasterSnapshotCoordinator::Delete(
1397
0
    const TxnSnapshotId& snapshot_id, int64_t leader_term, CoarseTimePoint deadline) {
1398
0
  return impl_->Delete(snapshot_id, leader_term, deadline);
1399
0
}
1400
1401
Result<TxnSnapshotRestorationId> MasterSnapshotCoordinator::Restore(
1402
0
    const TxnSnapshotId& snapshot_id, HybridTime restore_at, int64_t leader_term) {
1403
0
  return impl_->Restore(snapshot_id, restore_at, leader_term);
1404
0
}
1405
1406
Status MasterSnapshotCoordinator::ListRestorations(
1407
    const TxnSnapshotRestorationId& restoration_id, const TxnSnapshotId& snapshot_id,
1408
0
    ListSnapshotRestorationsResponsePB* resp) {
1409
0
  return impl_->ListRestorations(restoration_id, snapshot_id, resp);
1410
0
}
1411
1412
Result<SnapshotScheduleId> MasterSnapshotCoordinator::CreateSchedule(
1413
    const CreateSnapshotScheduleRequestPB& request, int64_t leader_term,
1414
0
    CoarseTimePoint deadline) {
1415
0
  return impl_->CreateSchedule(request, leader_term, deadline);
1416
0
}
1417
1418
Status MasterSnapshotCoordinator::ListSnapshotSchedules(
1419
0
    const SnapshotScheduleId& snapshot_schedule_id, ListSnapshotSchedulesResponsePB* resp) {
1420
0
  return impl_->ListSnapshotSchedules(snapshot_schedule_id, resp);
1421
0
}
1422
1423
Status MasterSnapshotCoordinator::DeleteSnapshotSchedule(
1424
0
    const SnapshotScheduleId& snapshot_schedule_id, int64_t leader_term, CoarseTimePoint deadline) {
1425
0
  return impl_->DeleteSnapshotSchedule(snapshot_schedule_id, leader_term, deadline);
1426
0
}
1427
1428
62
Status MasterSnapshotCoordinator::Load(tablet::Tablet* tablet) {
1429
62
  return impl_->Load(tablet);
1430
62
}
1431
1432
5.35k
void MasterSnapshotCoordinator::Start() {
1433
5.35k
  impl_->Start();
1434
5.35k
}
1435
1436
92
void MasterSnapshotCoordinator::Shutdown() {
1437
92
  impl_->Shutdown();
1438
92
}
1439
1440
20.3M
Status MasterSnapshotCoordinator::ApplyWritePair(const Slice& key, const Slice& value) {
1441
20.3M
  return impl_->ApplyWritePair(key, value);
1442
20.3M
}
1443
1444
384k
Status MasterSnapshotCoordinator::FillHeartbeatResponse(TSHeartbeatResponsePB* resp) {
1445
384k
  return impl_->FillHeartbeatResponse(resp);
1446
384k
}
1447
1448
Result<SnapshotSchedulesToObjectIdsMap>
1449
7.99k
    MasterSnapshotCoordinator::MakeSnapshotSchedulesToObjectIdsMap(SysRowEntryType type) {
1450
7.99k
  return impl_->MakeSnapshotSchedulesToObjectIdsMap(type);
1451
7.99k
}
1452
1453
Result<bool> MasterSnapshotCoordinator::IsTableCoveredBySomeSnapshotSchedule(
1454
0
    const TableInfo& table_info) {
1455
0
  return impl_->IsTableCoveredBySomeSnapshotSchedule(table_info);
1456
0
}
1457
1458
2.00k
void MasterSnapshotCoordinator::SysCatalogLoaded(int64_t term) {
1459
2.00k
  impl_->SysCatalogLoaded(term);
1460
2.00k
}
1461
1462
Result<TxnSnapshotId> MasterSnapshotCoordinator::CreateForSchedule(
1463
0
    const SnapshotScheduleId& schedule_id, int64_t leader_term, CoarseTimePoint deadline) {
1464
0
  return impl_->CreateForSchedule(schedule_id, leader_term, deadline);
1465
0
}
1466
1467
} // namespace master
1468
} // namespace yb