YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/operations/snapshot_operation.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
3
#include "yb/tablet/operations/snapshot_operation.h"
4
5
#include <glog/logging.h>
6
7
#include "yb/common/snapshot.h"
8
9
#include "yb/consensus/consensus_round.h"
10
#include "yb/consensus/consensus.pb.h"
11
12
#include "yb/docdb/consensus_frontier.h"
13
14
#include "yb/tablet/snapshot_coordinator.h"
15
#include "yb/tablet/tablet.h"
16
#include "yb/tablet/tablet_metadata.h"
17
#include "yb/tablet/tablet_snapshots.h"
18
19
#include "yb/tserver/backup.pb.h"
20
#include "yb/tserver/tserver_error.h"
21
22
#include "yb/util/logging.h"
23
#include "yb/util/status_format.h"
24
#include "yb/util/trace.h"
25
26
DEFINE_bool(consistent_restore, false, "Whether to enable consistent restoration of snapshots");
27
28
namespace yb {
29
namespace tablet {
30
31
using tserver::TabletServerError;
32
using tserver::TabletServerErrorPB;
33
using tserver::TabletSnapshotOpRequestPB;
34
35
template <>
36
void RequestTraits<TabletSnapshotOpRequestPB>::SetAllocatedRequest(
37
0
    consensus::ReplicateMsg* replicate, TabletSnapshotOpRequestPB* request) {
38
0
  replicate->set_allocated_snapshot_request(request);
39
0
}
40
41
template <>
42
TabletSnapshotOpRequestPB* RequestTraits<TabletSnapshotOpRequestPB>::MutableRequest(
43
1.22k
    consensus::ReplicateMsg* replicate) {
44
1.22k
  return replicate->mutable_snapshot_request();
45
1.22k
}
46
47
877
Result<std::string> SnapshotOperation::GetSnapshotDir() const {
48
877
  auto& request = *this->request();
49
877
  if (!request.snapshot_dir_override().empty()) {
50
861
    return request.snapshot_dir_override();
51
861
  }
52
16
  if (request.snapshot_id().empty()) {
53
0
    return std::string();
54
0
  }
55
16
  std::string snapshot_id_str;
56
16
  auto txn_snapshot_id = TryFullyDecodeTxnSnapshotId(request.snapshot_id());
57
16
  if (txn_snapshot_id) {
58
0
    snapshot_id_str = txn_snapshot_id.ToString();
59
16
  } else {
60
16
    snapshot_id_str = request.snapshot_id();
61
16
  }
62
63
16
  return JoinPathSegments(VERIFY_RESULT(tablet()->metadata()->TopSnapshotsDir()), snapshot_id_str);
64
16
}
65
66
3
Status SnapshotOperation::DoCheckOperationRequirements() {
67
3
  if (operation() != TabletSnapshotOpRequestPB::RESTORE_ON_TABLET) {
68
2
    return Status::OK();
69
2
  }
70
71
1
  const string snapshot_dir = VERIFY_RESULT(GetSnapshotDir());
72
1
  if (snapshot_dir.empty()) {
73
0
    return Status::OK();
74
0
  }
75
1
  Status s = tablet()->rocksdb_env().FileExists(snapshot_dir);
76
77
1
  if (!s.ok()) {
78
0
    return s.CloneAndAddErrorCode(TabletServerError(TabletServerErrorPB::INVALID_SNAPSHOT)).
79
0
             CloneAndPrepend(Format("Snapshot dir: $0", snapshot_dir));
80
0
  }
81
82
1
  return Status::OK();
83
1
}
84
85
3
bool SnapshotOperation::CheckOperationRequirements() {
86
3
  auto status = DoCheckOperationRequirements();
87
3
  if (status.ok()) {
88
3
    return true;
89
3
  }
90
91
  // LogPrefix() calls ToString() which needs correct hybrid_time.
92
0
  LOG_WITH_PREFIX(WARNING) << status;
93
0
  TRACE("Requirements was not satisfied for snapshot operation: $0", operation());
94
  // Run the callback, finish RPC and return the error to the sender.
95
0
  CompleteWithStatus(status);
96
0
  Release();
97
0
  return false;
98
0
}
99
100
0
Result<SnapshotCoordinator&> GetSnapshotCoordinator(SnapshotOperation* operation) {
101
0
  auto snapshot_coordinator = operation->tablet()->snapshot_coordinator();
102
0
  if (!snapshot_coordinator) {
103
0
    return STATUS_FORMAT(IllegalState, "Replicated $0 to tablet without snapshot coordinator",
104
0
                         TabletSnapshotOpRequestPB::Operation_Name(
105
0
                             operation->request()->operation()));
106
0
  }
107
0
  return *snapshot_coordinator;
108
0
}
109
110
864
Status SnapshotOperation::Apply(int64_t leader_term, Status* complete_status) {
111
864
  TRACE("APPLY SNAPSHOT: Starting");
112
864
  auto operation = request()->operation();
113
864
  switch (operation) {
114
0
    case TabletSnapshotOpRequestPB::CREATE_ON_MASTER:
115
0
      return VERIFY_RESULT(GetSnapshotCoordinator(this)).get().CreateReplicated(leader_term, *this);
116
0
    case TabletSnapshotOpRequestPB::DELETE_ON_MASTER:
117
0
      return VERIFY_RESULT(GetSnapshotCoordinator(this)).get().DeleteReplicated(leader_term, *this);
118
0
    case TabletSnapshotOpRequestPB::RESTORE_SYS_CATALOG:
119
0
      return VERIFY_RESULT(GetSnapshotCoordinator(this)).get().RestoreSysCatalogReplicated(
120
0
          leader_term, *this, complete_status);
121
2
    case TabletSnapshotOpRequestPB::CREATE_ON_TABLET:
122
2
      return tablet()->snapshots().Create(this);
123
862
    case TabletSnapshotOpRequestPB::RESTORE_ON_TABLET:
124
862
      return tablet()->snapshots().Restore(this);
125
0
    case TabletSnapshotOpRequestPB::DELETE_ON_TABLET:
126
0
      return tablet()->snapshots().Delete(*this);
127
0
    case TabletSnapshotOpRequestPB::RESTORE_FINISHED:
128
0
      return tablet()->snapshots().RestoreFinished(this);
129
0
    case google::protobuf::kint32min: FALLTHROUGH_INTENDED;
130
0
    case google::protobuf::kint32max: FALLTHROUGH_INTENDED;
131
0
    case TabletSnapshotOpRequestPB::UNKNOWN:
132
0
      break;
133
0
  }
134
0
  FATAL_INVALID_ENUM_VALUE(TabletSnapshotOpRequestPB::Operation, operation);
135
0
}
136
137
1.72k
bool SnapshotOperation::NeedOperationFilter() const {
138
1.72k
  return request()->operation() == TabletSnapshotOpRequestPB::RESTORE_ON_TABLET ||
139
4
         request()->operation() == TabletSnapshotOpRequestPB::RESTORE_SYS_CATALOG;
140
1.72k
}
141
142
864
void SnapshotOperation::AddedAsPending() {
143
864
  if (NeedOperationFilter()) {
144
862
    tablet()->RegisterOperationFilter(this);
145
862
  }
146
864
}
147
148
864
void SnapshotOperation::RemovedFromPending() {
149
864
  if (NeedOperationFilter()) {
150
862
    tablet()->UnregisterOperationFilter(this);
151
862
  }
152
864
}
153
154
Status SnapshotOperation::RejectionStatus(
155
0
    OpId rejected_op_id, consensus::OperationType op_type) {
156
0
  return STATUS_FORMAT(
157
0
      IllegalState, "Operation $0 ($1) is not allowed during restore",
158
0
      OperationType_Name(op_type), rejected_op_id);
159
0
}
160
161
8
bool SnapshotOperation::ShouldAllowOpDuringRestore(consensus::OperationType op_type) {
162
8
  switch (op_type) {
163
0
    case consensus::NO_OP: FALLTHROUGH_INTENDED;
164
0
    case consensus::UNKNOWN_OP: FALLTHROUGH_INTENDED;
165
8
    case consensus::CHANGE_METADATA_OP: FALLTHROUGH_INTENDED;
166
8
    case consensus::CHANGE_CONFIG_OP: FALLTHROUGH_INTENDED;
167
8
    case consensus::HISTORY_CUTOFF_OP: FALLTHROUGH_INTENDED;
168
8
    case consensus::SNAPSHOT_OP: FALLTHROUGH_INTENDED;
169
8
    case consensus::TRUNCATE_OP: FALLTHROUGH_INTENDED;
170
8
    case consensus::SPLIT_OP:
171
8
      return true;
172
0
    case consensus::UPDATE_TRANSACTION_OP: FALLTHROUGH_INTENDED;
173
0
    case consensus::WRITE_OP:
174
0
      return !FLAGS_consistent_restore;
175
0
  }
176
0
  FATAL_INVALID_ENUM_VALUE(consensus::OperationType, op_type);
177
0
}
178
179
Status SnapshotOperation::CheckOperationAllowed(
180
370
    const OpId& id, consensus::OperationType op_type) const {
181
370
  if (id == op_id() || ShouldAllowOpDuringRestore(op_type)) {
182
370
    return Status::OK();
183
370
  }
184
185
0
  return RejectionStatus(id, op_type);
186
0
}
187
188
// ------------------------------------------------------------------------------------------------
189
// SnapshotOperation
190
// ------------------------------------------------------------------------------------------------
191
192
864
Status SnapshotOperation::Prepare() {
193
864
  TRACE("PREPARE SNAPSHOT: Starting");
194
864
  RETURN_NOT_OK(tablet()->snapshots().Prepare(this));
195
196
864
  TRACE("PREPARE SNAPSHOT: finished");
197
864
  return Status::OK();
198
864
}
199
200
0
Status SnapshotOperation::DoAborted(const Status& status) {
201
0
  TRACE("SnapshotOperation: operation aborted");
202
0
  return status;
203
0
}
204
205
864
Status SnapshotOperation::DoReplicated(int64_t leader_term, Status* complete_status) {
206
864
  RETURN_NOT_OK(Apply(leader_term, complete_status));
207
  // Record the fact that we've executed the "create snapshot" Raft operation. We are not forcing
208
  // the flushed frontier to have this exact value, although in practice it will, since this is the
209
  // latest operation we've ever executed in this Raft group. This way we keep the current value
210
  // of history cutoff.
211
864
  docdb::ConsensusFrontier frontier;
212
864
  frontier.set_op_id(op_id());
213
864
  frontier.set_hybrid_time(hybrid_time());
214
864
  return tablet()->ModifyFlushedFrontier(
215
864
      frontier, rocksdb::FrontierModificationMode::kUpdate);
216
864
}
217
218
}  // namespace tablet
219
}  // namespace yb