/Users/deen/code/yugabyte-db/src/yb/master/snapshot_state.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/master/snapshot_state.h" |
15 | | |
16 | | #include "yb/common/transaction_error.h" |
17 | | |
18 | | #include "yb/docdb/docdb.pb.h" |
19 | | #include "yb/docdb/key_bytes.h" |
20 | | #include "yb/docdb/value_type.h" |
21 | | |
22 | | #include "yb/master/master_backup.pb.h" |
23 | | #include "yb/master/master_error.h" |
24 | | #include "yb/master/snapshot_coordinator_context.h" |
25 | | |
26 | | #include "yb/tablet/operations/snapshot_operation.h" |
27 | | #include "yb/tablet/tablet_snapshots.h" |
28 | | |
29 | | #include "yb/tserver/backup.pb.h" |
30 | | |
31 | | #include "yb/util/atomic.h" |
32 | | #include "yb/util/flag_tags.h" |
33 | | #include "yb/util/pb_util.h" |
34 | | #include "yb/util/result.h" |
35 | | |
36 | | using namespace std::literals; |
37 | | |
38 | | DEFINE_uint64(snapshot_coordinator_cleanup_delay_ms, 30000, |
39 | | "Delay for snapshot cleanup after deletion."); |
40 | | |
41 | | DEFINE_int64(max_concurrent_snapshot_rpcs, 0, |
42 | | "Maximum number of tablet snapshot RPCs that can be outstanding. " |
43 | | "Only used if its value is >= 0. Default value is 0 which means that " |
44 | | "INT_MAX number of snapshot rpcs can be concurrent. " |
45 | | "If its value is < 0 then the max_concurrent_snapshot_rpcs_per_tserver gflag and " |
46 | | "the number of TServers in the primary cluster is used to determine " |
47 | | "the number of maximum number of tablet snapshot RPCs that can be outstanding."); |
48 | | TAG_FLAG(max_concurrent_snapshot_rpcs, runtime); |
49 | | |
50 | | DEFINE_int64(max_concurrent_snapshot_rpcs_per_tserver, 5, |
51 | | "Maximum number of tablet snapshot RPCs per tserver that can be outstanding. " |
52 | | "Only used if the value of the gflag max_concurrent_snapshot_rpcs is < 0. " |
53 | | "When used it is multiplied with the number of TServers in the active cluster " |
54 | | "(not read-replicas) to obtain the total maximum concurrent snapshot RPCs."); |
55 | | TAG_FLAG(max_concurrent_snapshot_rpcs_per_tserver, runtime); |
56 | | |
57 | | namespace yb { |
58 | | namespace master { |
59 | | |
60 | | Result<docdb::KeyBytes> EncodedSnapshotKey( |
61 | 0 | const TxnSnapshotId& id, SnapshotCoordinatorContext* context) { |
62 | 0 | return EncodedKey(SysRowEntryType::SNAPSHOT, id.AsSlice(), context); |
63 | 0 | } |
64 | | |
65 | | namespace { |
66 | | |
67 | | std::string MakeSnapshotStateLogPrefix( |
68 | 0 | const TxnSnapshotId& id, const std::string& schedule_id_str) { |
69 | 0 | auto schedule_id = TryFullyDecodeSnapshotScheduleId(schedule_id_str); |
70 | 0 | if (schedule_id) { |
71 | 0 | return Format("Snapshot[$0/$1]: ", id, schedule_id); |
72 | 0 | } |
73 | 0 | return Format("Snapshot[$0]: ", id); |
74 | 0 | } |
75 | | |
76 | | } // namespace |
77 | | |
78 | | SnapshotState::SnapshotState( |
79 | | SnapshotCoordinatorContext* context, const TxnSnapshotId& id, |
80 | | const tserver::TabletSnapshotOpRequestPB& request, uint64_t throttle_limit) |
81 | | : StateWithTablets(context, SysSnapshotEntryPB::CREATING, |
82 | | MakeSnapshotStateLogPrefix(id, request.schedule_id())), |
83 | | id_(id), snapshot_hybrid_time_(request.snapshot_hybrid_time()), |
84 | | previous_snapshot_hybrid_time_(HybridTime::FromPB(request.previous_snapshot_hybrid_time())), |
85 | | schedule_id_(TryFullyDecodeSnapshotScheduleId(request.schedule_id())), version_(1), |
86 | 0 | throttler_(throttle_limit) { |
87 | 0 | InitTabletIds(request.tablet_id(), |
88 | 0 | request.imported() ? SysSnapshotEntryPB::COMPLETE : SysSnapshotEntryPB::CREATING); |
89 | 0 | request.extra_data().UnpackTo(&entries_); |
90 | 0 | } |
91 | | |
92 | | SnapshotState::SnapshotState( |
93 | | SnapshotCoordinatorContext* context, const TxnSnapshotId& id, |
94 | | const SysSnapshotEntryPB& entry) |
95 | | : StateWithTablets(context, entry.state(), |
96 | | MakeSnapshotStateLogPrefix(id, entry.schedule_id())), |
97 | | id_(id), snapshot_hybrid_time_(entry.snapshot_hybrid_time()), |
98 | | previous_snapshot_hybrid_time_(HybridTime::FromPB(entry.previous_snapshot_hybrid_time())), |
99 | | schedule_id_(TryFullyDecodeSnapshotScheduleId(entry.schedule_id())), |
100 | 0 | version_(entry.version()) { |
101 | 0 | InitTablets(entry.tablet_snapshots()); |
102 | 0 | *entries_.mutable_entries() = entry.entries(); |
103 | 0 | } |
104 | | |
105 | 0 | std::string SnapshotState::ToString() const { |
106 | 0 | return Format( |
107 | 0 | "{ id: $0 snapshot_hybrid_time: $1 schedule_id: $2 previous_snapshot_hybrid_time: $3 " |
108 | 0 | "version: $4 initial_state: $5 tablets: $6 }", |
109 | 0 | id_, snapshot_hybrid_time_, schedule_id_, previous_snapshot_hybrid_time_, version_, |
110 | 0 | InitialStateName(), tablets()); |
111 | 0 | } |
112 | | |
113 | 0 | Status SnapshotState::ToPB(SnapshotInfoPB* out) { |
114 | 0 | out->set_id(id_.data(), id_.size()); |
115 | 0 | return ToEntryPB(out->mutable_entry(), ForClient::kTrue); |
116 | 0 | } |
117 | | |
118 | 0 | Status SnapshotState::ToEntryPB(SysSnapshotEntryPB* out, ForClient for_client) { |
119 | 0 | out->set_state(for_client ? VERIFY_RESULT(AggregatedState()) : initial_state()); |
120 | 0 | out->set_snapshot_hybrid_time(snapshot_hybrid_time_.ToUint64()); |
121 | 0 | if (previous_snapshot_hybrid_time_) { |
122 | 0 | out->set_previous_snapshot_hybrid_time(previous_snapshot_hybrid_time_.ToUint64()); |
123 | 0 | } |
124 | |
|
125 | 0 | TabletsToPB(out->mutable_tablet_snapshots()); |
126 | |
|
127 | 0 | *out->mutable_entries() = entries_.entries(); |
128 | |
|
129 | 0 | if (schedule_id_) { |
130 | 0 | out->set_schedule_id(schedule_id_.data(), schedule_id_.size()); |
131 | 0 | } |
132 | |
|
133 | 0 | out->set_version(version_); |
134 | |
|
135 | 0 | return Status::OK(); |
136 | 0 | } |
137 | | |
138 | 0 | Status SnapshotState::StoreToWriteBatch(docdb::KeyValueWriteBatchPB* out) { |
139 | 0 | ++version_; |
140 | 0 | auto encoded_key = VERIFY_RESULT(EncodedSnapshotKey(id_, &context())); |
141 | 0 | auto pair = out->add_write_pairs(); |
142 | 0 | pair->set_key(encoded_key.AsSlice().cdata(), encoded_key.size()); |
143 | 0 | faststring value; |
144 | 0 | value.push_back(docdb::ValueTypeAsChar::kString); |
145 | 0 | SysSnapshotEntryPB entry; |
146 | 0 | RETURN_NOT_OK(ToEntryPB(&entry, ForClient::kFalse)); |
147 | 0 | pb_util::AppendToString(entry, &value); |
148 | 0 | pair->set_value(value.data(), value.size()); |
149 | 0 | return Status::OK(); |
150 | 0 | } |
151 | | |
152 | 0 | Status SnapshotState::TryStartDelete() { |
153 | 0 | if (initial_state() == SysSnapshotEntryPB::DELETING || delete_started_) { |
154 | 0 | if (AllInState(SysSnapshotEntryPB::DELETED)) { |
155 | 0 | return STATUS(NotFound, "The snapshot was deleted", id_.ToString(), |
156 | 0 | MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND)); |
157 | 0 | } |
158 | 0 | return STATUS(NotFound, "The snapshot is being deleted", id_.ToString(), |
159 | 0 | MasterError(MasterErrorPB::SNAPSHOT_NOT_FOUND)); |
160 | 0 | } |
161 | 0 | delete_started_ = true; |
162 | |
|
163 | 0 | return Status::OK(); |
164 | 0 | } |
165 | | |
166 | 0 | void SnapshotState::DeleteAborted(const Status& status) { |
167 | 0 | delete_started_ = false; |
168 | 0 | } |
169 | | |
170 | 0 | void SnapshotState::PrepareOperations(TabletSnapshotOperations* out) { |
171 | 0 | DoPrepareOperations([this, out](const TabletData& tablet) -> bool { |
172 | 0 | if (Throttler().Throttle()) { |
173 | 0 | return false; |
174 | 0 | } |
175 | 0 | out->push_back(TabletSnapshotOperation { |
176 | 0 | .tablet_id = tablet.id, |
177 | 0 | .schedule_id = schedule_id_, |
178 | 0 | .snapshot_id = id_, |
179 | 0 | .state = initial_state(), |
180 | 0 | .snapshot_hybrid_time = snapshot_hybrid_time_, |
181 | 0 | }); |
182 | 0 | return true; |
183 | 0 | }); |
184 | 0 | } |
185 | | |
186 | 0 | void SnapshotState::SetVersion(int value) { |
187 | 0 | version_ = value; |
188 | 0 | } |
189 | | |
190 | 0 | bool SnapshotState::NeedCleanup() const { |
191 | 0 | return initial_state() == SysSnapshotEntryPB::DELETING && |
192 | 0 | PassedSinceCompletion( |
193 | 0 | GetAtomicFlag(&FLAGS_snapshot_coordinator_cleanup_delay_ms) * 1ms) && |
194 | 0 | !cleanup_tracker_.Started(); |
195 | 0 | } |
196 | | |
197 | 0 | bool SnapshotState::IsTerminalFailure(const Status& status) { |
198 | | // Table was removed. |
199 | 0 | if (status.IsExpired()) { |
200 | 0 | return true; |
201 | 0 | } |
202 | | // Would not be able to create snapshot at specific time, since history was garbage collected. |
203 | 0 | if (TransactionError(status) == TransactionErrorCode::kSnapshotTooOld) { |
204 | 0 | return true; |
205 | 0 | } |
206 | 0 | return false; |
207 | 0 | } |
208 | | |
209 | 0 | bool SnapshotState::ShouldUpdate(const SnapshotState& other) const { |
210 | | // Backward compatibility mode |
211 | 0 | auto other_version = other.version() == 0 ? version() + 1 : other.version(); |
212 | | // If we have several updates for single snapshot, they are loaded in chronological order. |
213 | | // So latest update should be picked. |
214 | 0 | return version() < other_version; |
215 | 0 | } |
216 | | |
217 | | Result<tablet::CreateSnapshotData> SnapshotState::SysCatalogSnapshotData( |
218 | 0 | const tablet::SnapshotOperation& operation) const { |
219 | 0 | if (!schedule_id_) { |
220 | 0 | static Status result(STATUS(Uninitialized, "")); |
221 | 0 | return result; |
222 | 0 | } |
223 | | |
224 | 0 | return tablet::CreateSnapshotData { |
225 | 0 | .snapshot_hybrid_time = snapshot_hybrid_time_, |
226 | 0 | .hybrid_time = operation.hybrid_time(), |
227 | 0 | .op_id = operation.op_id(), |
228 | 0 | .snapshot_dir = VERIFY_RESULT(operation.GetSnapshotDir()), |
229 | 0 | .schedule_id = schedule_id_, |
230 | 0 | }; |
231 | 0 | } |
232 | | |
233 | 0 | Status SnapshotState::CheckDoneStatus(const Status& status) { |
234 | 0 | if (initial_state() != SysSnapshotEntryPB::DELETING) { |
235 | 0 | return status; |
236 | 0 | } |
237 | 0 | MasterError error(status); |
238 | 0 | if (error == MasterErrorPB::TABLET_NOT_RUNNING || error == MasterErrorPB::TABLE_NOT_RUNNING) { |
239 | 0 | return Status::OK(); |
240 | 0 | } |
241 | 0 | return status; |
242 | 0 | } |
243 | | |
244 | | } // namespace master |
245 | | } // namespace yb |