YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/master/snapshot_state.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/snapshot_state.h"
15
16
#include "yb/common/transaction_error.h"
17
18
#include "yb/docdb/docdb.pb.h"
19
#include "yb/docdb/key_bytes.h"
20
#include "yb/docdb/value_type.h"
21
22
#include "yb/master/master_backup.pb.h"
23
#include "yb/master/master_error.h"
24
#include "yb/master/snapshot_coordinator_context.h"
25
26
#include "yb/tablet/operations/snapshot_operation.h"
27
#include "yb/tablet/tablet_snapshots.h"
28
29
#include "yb/tserver/backup.pb.h"
30
31
#include "yb/util/atomic.h"
32
#include "yb/util/flag_tags.h"
33
#include "yb/util/pb_util.h"
34
#include "yb/util/result.h"
35
36
using namespace std::literals;
37
38
DEFINE_uint64(snapshot_coordinator_cleanup_delay_ms, 30000,
39
              "Delay for snapshot cleanup after deletion.");
40
41
DEFINE_int64(max_concurrent_snapshot_rpcs, -1,
42
             "Maximum number of tablet snapshot RPCs that can be outstanding. "
43
             "Only used if its value is >= 0. If its value is 0 then it means that "
44
             "INT_MAX number of snapshot rpcs can be concurrent. "
45
             "If its value is < 0 then the max_concurrent_snapshot_rpcs_per_tserver gflag and "
46
             "the number of TServers in the primary cluster are used to determine "
47
             "the number of maximum number of tablet snapshot RPCs that can be outstanding.");
48
TAG_FLAG(max_concurrent_snapshot_rpcs, runtime);
49
50
DEFINE_int64(max_concurrent_snapshot_rpcs_per_tserver, 1,
51
             "Maximum number of tablet snapshot RPCs per tserver that can be outstanding. "
52
             "Only used if the value of the gflag max_concurrent_snapshot_rpcs is < 0. "
53
             "When used it is multiplied with the number of TServers in the active cluster "
54
             "(not read-replicas) to obtain the total maximum concurrent snapshot RPCs. If "
55
             "the cluster config is not found and we are not able to determine the number of "
56
             "live tservers then the total maximum concurrent snapshot RPCs is just the "
57
             "value of this flag.");
58
TAG_FLAG(max_concurrent_snapshot_rpcs_per_tserver, runtime);
59
60
namespace yb {
61
namespace master {
62
63
Result<docdb::KeyBytes> EncodedSnapshotKey(
64
68
    const TxnSnapshotId& id, SnapshotCoordinatorContext* context) {
65
68
  return EncodedKey(SysRowEntryType::SNAPSHOT, id.AsSlice(), context);
66
68
}
67
68
namespace {
69
70
std::string MakeSnapshotStateLogPrefix(
71
145
    const TxnSnapshotId& id, const std::string& schedule_id_str) {
72
145
  auto schedule_id = TryFullyDecodeSnapshotScheduleId(schedule_id_str);
73
145
  if (schedule_id) {
74
139
    return Format("Snapshot[$0/$1]: ", id, schedule_id);
75
139
  }
76
6
  return Format("Snapshot[$0]: ", id);
77
145
}
78
79
} // namespace
80
81
SnapshotState::SnapshotState(
82
    SnapshotCoordinatorContext* context, const TxnSnapshotId& id,
83
    const tserver::TabletSnapshotOpRequestPB& request, uint64_t throttle_limit)
84
    : StateWithTablets(context, SysSnapshotEntryPB::CREATING,
85
                       MakeSnapshotStateLogPrefix(id, request.schedule_id())),
86
      id_(id), snapshot_hybrid_time_(request.snapshot_hybrid_time()),
87
      previous_snapshot_hybrid_time_(HybridTime::FromPB(request.previous_snapshot_hybrid_time())),
88
      schedule_id_(TryFullyDecodeSnapshotScheduleId(request.schedule_id())), version_(1),
89
51
      throttler_(throttle_limit) {
90
51
  InitTabletIds(request.tablet_id(),
91
51
                request.imported() ? 
SysSnapshotEntryPB::COMPLETE0
: SysSnapshotEntryPB::CREATING);
92
51
  request.extra_data().UnpackTo(&entries_);
93
51
}
94
95
SnapshotState::SnapshotState(
96
    SnapshotCoordinatorContext* context, const TxnSnapshotId& id,
97
    const SysSnapshotEntryPB& entry)
98
    : StateWithTablets(context, entry.state(),
99
                       MakeSnapshotStateLogPrefix(id, entry.schedule_id())),
100
      id_(id), snapshot_hybrid_time_(entry.snapshot_hybrid_time()),
101
      previous_snapshot_hybrid_time_(HybridTime::FromPB(entry.previous_snapshot_hybrid_time())),
102
      schedule_id_(TryFullyDecodeSnapshotScheduleId(entry.schedule_id())),
103
94
      version_(entry.version()) {
104
94
  InitTablets(entry.tablet_snapshots());
105
94
  *entries_.mutable_entries() = entry.entries();
106
94
}
107
108
9
std::string SnapshotState::ToString() const {
109
9
  return Format(
110
9
      "{ id: $0 snapshot_hybrid_time: $1 schedule_id: $2 previous_snapshot_hybrid_time: $3 "
111
9
          "version: $4 initial_state: $5 tablets: $6 }",
112
9
      id_, snapshot_hybrid_time_, schedule_id_, previous_snapshot_hybrid_time_, version_,
113
9
      InitialStateName(), tablets());
114
9
}
115
116
25
Status SnapshotState::ToPB(SnapshotInfoPB* out) {
117
25
  out->set_id(id_.data(), id_.size());
118
25
  return ToEntryPB(out->mutable_entry(), ForClient::kTrue);
119
25
}
120
121
93
Status SnapshotState::ToEntryPB(SysSnapshotEntryPB* out, ForClient for_client) {
122
93
  out->set_state(for_client ? VERIFY_RESULT(AggregatedState()) : 
initial_state()68
);
123
0
  out->set_snapshot_hybrid_time(snapshot_hybrid_time_.ToUint64());
124
93
  if (previous_snapshot_hybrid_time_) {
125
45
    out->set_previous_snapshot_hybrid_time(previous_snapshot_hybrid_time_.ToUint64());
126
45
  }
127
128
93
  TabletsToPB(out->mutable_tablet_snapshots());
129
130
93
  *out->mutable_entries() = entries_.entries();
131
132
93
  if (schedule_id_) {
133
88
    out->set_schedule_id(schedule_id_.data(), schedule_id_.size());
134
88
  }
135
136
93
  out->set_version(version_);
137
138
93
  return Status::OK();
139
93
}
140
141
68
Status SnapshotState::StoreToWriteBatch(docdb::KeyValueWriteBatchPB* out) {
142
68
  ++version_;
143
68
  auto encoded_key = VERIFY_RESULT(EncodedSnapshotKey(id_, &context()));
144
0
  auto pair = out->add_write_pairs();
145
68
  pair->set_key(encoded_key.AsSlice().cdata(), encoded_key.size());
146
68
  faststring value;
147
68
  value.push_back(docdb::ValueTypeAsChar::kString);
148
68
  SysSnapshotEntryPB entry;
149
68
  RETURN_NOT_OK(ToEntryPB(&entry, ForClient::kFalse));
150
68
  pb_util::AppendToString(entry, &value);
151
68
  pair->set_value(value.data(), value.size());
152
68
  return Status::OK();
153
68
}
154
155
0
Status SnapshotState::TryStartDelete() {
156
0
  if (initial_state() == SysSnapshotEntryPB::DELETING || delete_started_) {
157
0
    if (AllInState(SysSnapshotEntryPB::DELETED)) {
158
0
      return STATUS(NotFound, "The snapshot was deleted", id_.ToString(),
159
0
                    MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND));
160
0
    }
161
0
    return STATUS(NotFound, "The snapshot is being deleted", id_.ToString(),
162
0
                  MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND));
163
0
  }
164
0
  delete_started_ = true;
165
166
0
  return Status::OK();
167
0
}
168
169
0
void SnapshotState::DeleteAborted(const Status& status) {
170
0
  delete_started_ = false;
171
0
}
172
173
270
void SnapshotState::PrepareOperations(TabletSnapshotOperations* out) {
174
270
  DoPrepareOperations([this, out](const TabletData& tablet) -> bool {
175
35
    if (Throttler().Throttle()) {
176
7
      return false;
177
7
    }
178
28
    out->push_back(TabletSnapshotOperation {
179
28
      .tablet_id = tablet.id,
180
28
      .schedule_id = schedule_id_,
181
28
      .snapshot_id = id_,
182
28
      .state = initial_state(),
183
28
      .snapshot_hybrid_time = snapshot_hybrid_time_,
184
28
    });
185
28
    return true;
186
35
  });
187
270
}
188
189
0
void SnapshotState::SetVersion(int value) {
190
0
  version_ = value;
191
0
}
192
193
249
bool SnapshotState::NeedCleanup() const {
194
249
  return initial_state() == SysSnapshotEntryPB::DELETING &&
195
249
         PassedSinceCompletion(
196
0
            GetAtomicFlag(&FLAGS_snapshot_coordinator_cleanup_delay_ms) * 1ms) &&
197
249
         
!cleanup_tracker_.Started()0
;
198
249
}
199
200
0
bool SnapshotState::IsTerminalFailure(const Status& status) {
201
  // Table was removed.
202
0
  if (status.IsExpired()) {
203
0
    return true;
204
0
  }
205
  // Would not be able to create snapshot at specific time, since history was garbage collected.
206
0
  if (TransactionError(status) == TransactionErrorCode::kSnapshotTooOld) {
207
0
    return true;
208
0
  }
209
0
  return false;
210
0
}
211
212
94
bool SnapshotState::ShouldUpdate(const SnapshotState& other) const {
213
  // Backward compatibility mode
214
94
  auto other_version = other.version() == 0 ? 
version() + 10
: other.version();
215
  // If we have several updates for single snapshot, they are loaded in chronological order.
216
  // So latest update should be picked.
217
94
  return version() < other_version;
218
94
}
219
220
Result<tablet::CreateSnapshotData> SnapshotState::SysCatalogSnapshotData(
221
51
    const tablet::SnapshotOperation& operation) const {
222
51
  if (!schedule_id_) {
223
2
    static Status result(STATUS(Uninitialized, ""));
224
2
    return result;
225
2
  }
226
227
49
  return tablet::CreateSnapshotData {
228
49
    .snapshot_hybrid_time = snapshot_hybrid_time_,
229
49
    .hybrid_time = operation.hybrid_time(),
230
49
    .op_id = operation.op_id(),
231
49
    .snapshot_dir = 
VERIFY_RESULT45
(45
operation.GetSnapshotDir()),
232
0
    .schedule_id = schedule_id_,
233
45
  };
234
49
}
235
236
25
Status SnapshotState::CheckDoneStatus(const Status& status) {
237
25
  if (initial_state() != SysSnapshotEntryPB::DELETING) {
238
25
    return status;
239
25
  }
240
0
  MasterError error(status);
241
0
  if (error == MasterErrorPB::TABLET_NOT_RUNNING || error == MasterErrorPB::TABLE_NOT_RUNNING) {
242
0
    return Status::OK();
243
0
  }
244
0
  return status;
245
0
}
246
247
} // namespace master
248
} // namespace yb