/Users/deen/code/yugabyte-db/src/yb/tserver/remote_bootstrap_session.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tserver/remote_bootstrap_session.h" |
34 | | |
35 | | #include <boost/optional.hpp> |
36 | | #include <glog/logging.h> |
37 | | |
38 | | #include "yb/consensus/consensus.h" |
39 | | #include "yb/consensus/log.h" |
40 | | #include "yb/consensus/opid_util.h" |
41 | | |
42 | | #include "yb/gutil/casts.h" |
43 | | #include "yb/gutil/strings/substitute.h" |
44 | | #include "yb/gutil/type_traits.h" |
45 | | |
46 | | #include "yb/tablet/tablet.h" |
47 | | #include "yb/tablet/tablet_metadata.h" |
48 | | #include "yb/tablet/tablet_peer.h" |
49 | | #include "yb/tablet/tablet_snapshots.h" |
50 | | |
51 | | #include "yb/tserver/remote_bootstrap_snapshots.h" |
52 | | |
53 | | #include "yb/util/env_util.h" |
54 | | #include "yb/util/logging.h" |
55 | | #include "yb/util/size_literals.h" |
56 | | #include "yb/util/status_format.h" |
57 | | #include "yb/util/status_log.h" |
58 | | #include "yb/util/stopwatch.h" |
59 | | #include "yb/util/trace.h" |
60 | | |
61 | | DECLARE_uint64(rpc_max_message_size); |
62 | | DECLARE_int64(remote_bootstrap_rate_limit_bytes_per_sec); |
63 | | |
64 | | namespace yb { |
65 | | namespace tserver { |
66 | | |
67 | | using std::shared_ptr; |
68 | | using std::vector; |
69 | | using std::string; |
70 | | |
71 | | using consensus::MinimumOpId; |
72 | | using consensus::PeerMemberType; |
73 | | using consensus::RaftPeerPB; |
74 | | using log::LogAnchorRegistry; |
75 | | using log::ReadableLogSegment; |
76 | | using strings::Substitute; |
77 | | using tablet::RaftGroupMetadata; |
78 | | using tablet::RaftGroupMetadataPtr; |
79 | | using tablet::TabletPeer; |
80 | | using tablet::RaftGroupReplicaSuperBlockPB; |
81 | | |
82 | | RemoteBootstrapSession::RemoteBootstrapSession( |
83 | | const std::shared_ptr<TabletPeer>& tablet_peer, std::string session_id, |
84 | | std::string requestor_uuid, const std::atomic<int>* nsessions) |
85 | | : tablet_peer_(tablet_peer), |
86 | | session_id_(std::move(session_id)), |
87 | | requestor_uuid_(std::move(requestor_uuid)), |
88 | | succeeded_(false), |
89 | 1.44k | nsessions_(nsessions) { |
90 | 1.44k | AddSource<RemoteBootstrapSnapshotsSource>(); |
91 | 1.44k | } |
92 | | |
93 | 960 | RemoteBootstrapSession::~RemoteBootstrapSession() { |
94 | | // No lock taken in the destructor, should only be 1 thread with access now. |
95 | 960 | CHECK_OK(UnregisterAnchorIfNeededUnlocked()); |
96 | | |
97 | | // Delete checkpoint directory. |
98 | 960 | if (!checkpoint_dir_.empty()) { |
99 | 960 | auto s = env()->DeleteRecursively(checkpoint_dir_); |
100 | 960 | if (!s.ok()) { |
101 | 0 | LOG(WARNING) << "Unable to delete checkpoint directory " << checkpoint_dir_; |
102 | 960 | } else { |
103 | 960 | LOG(INFO) << "Successfully deleted checkpoint directory " << checkpoint_dir_; |
104 | 960 | } |
105 | 0 | } else { |
106 | 0 | LOG(INFO) << "No checkpoint directory was created for this session"; |
107 | 0 | } |
108 | | |
109 | 960 | } |
110 | | |
111 | 1.00k | Status RemoteBootstrapSession::ChangeRole() { |
112 | 1.00k | CHECK(Succeeded()); |
113 | | |
114 | 1.00k | shared_ptr<consensus::Consensus> consensus = tablet_peer_->shared_consensus(); |
115 | | // This check fixes an issue with test TestDeleteTabletDuringRemoteBootstrap in which a tablet is |
116 | | // tombstoned while the bootstrap is happening. This causes the peer's consensus object to be |
117 | | // null. |
118 | 1.00k | if (!consensus) { |
119 | 0 | tablet::RaftGroupStatePB tablet_state = tablet_peer_->state(); |
120 | 0 | return STATUS(IllegalState, Substitute( |
121 | 0 | "Unable to change role for server $0 in config for tablet $1. Consensus is not available. " |
122 | 0 | "Tablet state: $2 ($3)", requestor_uuid_, tablet_peer_->tablet_id(), |
123 | 0 | tablet::RaftGroupStatePB_Name(tablet_state), tablet_state)); |
124 | 0 | } |
125 | | |
126 | | // If peer being bootstrapped is already a VOTER, don't send the ChangeConfig request. This could |
127 | | // happen when a tserver that is already a VOTER in the configuration tombstones its tablet, and |
128 | | // the leader starts bootstrapping it. |
129 | 1.00k | const consensus::RaftConfigPB config = tablet_peer_->RaftConfig(); |
130 | 4.15k | for (const RaftPeerPB& peer_pb : config.peers()) { |
131 | 4.15k | if (peer_pb.permanent_uuid() != requestor_uuid_) { |
132 | 3.18k | continue; |
133 | 3.18k | } |
134 | | |
135 | 965 | switch(peer_pb.member_type()) { |
136 | 0 | case PeerMemberType::OBSERVER: FALLTHROUGH_INTENDED; |
137 | 8 | case PeerMemberType::VOTER: |
138 | 8 | LOG(ERROR) << "Peer " << peer_pb.permanent_uuid() << " is a " |
139 | 8 | << PeerMemberType_Name(peer_pb.member_type()) |
140 | 8 | << " Not changing its role after remote bootstrap"; |
141 | | |
142 | | // Even though this is an error, we return Status::OK() so the remote server doesn't |
143 | | // tombstone its tablet. |
144 | 8 | return Status::OK(); |
145 | |
|
146 | 57 | case PeerMemberType::PRE_OBSERVER: FALLTHROUGH_INTENDED; |
147 | 957 | case PeerMemberType::PRE_VOTER: { |
148 | 957 | consensus::ChangeConfigRequestPB req; |
149 | 957 | consensus::ChangeConfigResponsePB resp; |
150 | | |
151 | 957 | req.set_tablet_id(tablet_peer_->tablet_id()); |
152 | 957 | req.set_type(consensus::CHANGE_ROLE); |
153 | 957 | RaftPeerPB* peer = req.mutable_server(); |
154 | 957 | peer->set_permanent_uuid(requestor_uuid_); |
155 | | |
156 | 957 | boost::optional<TabletServerErrorPB::Code> error_code; |
157 | | |
158 | 957 | LOG(INFO) << "Changing config with request: { " << req.ShortDebugString() << " } " |
159 | 957 | << "in bootstrap session " << session_id_; |
160 | | |
161 | | // If another ChangeConfig is being processed, our request will be rejected. |
162 | 957 | return consensus->ChangeConfig(req, &DoNothingStatusCB, &error_code); |
163 | 57 | } |
164 | 0 | case PeerMemberType::UNKNOWN_MEMBER_TYPE: |
165 | 0 | return STATUS(IllegalState, Substitute("Unable to change role for peer $0 in config for " |
166 | 0 | "tablet $1. Peer has an invalid member type $2", |
167 | 0 | peer_pb.permanent_uuid(), tablet_peer_->tablet_id(), |
168 | 0 | PeerMemberType_Name(peer_pb.member_type()))); |
169 | 0 | } |
170 | 0 | LOG(FATAL) << "Unexpected peer member type " |
171 | 0 | << PeerMemberType_Name(peer_pb.member_type()); |
172 | 0 | } |
173 | 35 | return STATUS(IllegalState, Substitute("Unable to find peer $0 in config for tablet $1", |
174 | 1.00k | requestor_uuid_, tablet_peer_->tablet_id())); |
175 | 1.00k | } |
176 | | |
177 | 1.44k | Status RemoteBootstrapSession::SetInitialCommittedState() { |
178 | 1.44k | shared_ptr <consensus::Consensus> consensus = tablet_peer_->shared_consensus(); |
179 | 1.44k | if (!consensus) { |
180 | 0 | tablet::RaftGroupStatePB tablet_state = tablet_peer_->state(); |
181 | 0 | return STATUS(IllegalState, |
182 | 0 | Substitute("Unable to initialize remote bootstrap session " |
183 | 0 | "for tablet $0. Consensus is not available. Tablet state: $1 ($2)", |
184 | 0 | tablet_peer_->tablet_id(), tablet::RaftGroupStatePB_Name(tablet_state), |
185 | 0 | tablet_state)); |
186 | 0 | } |
187 | 1.44k | initial_committed_cstate_ = consensus->ConsensusState(consensus::CONSENSUS_CONFIG_COMMITTED); |
188 | 1.44k | return Status::OK(); |
189 | 1.44k | } |
190 | | |
191 | 1.33k | Result<google::protobuf::RepeatedPtrField<tablet::FilePB>> ListFiles(const std::string& dir) { |
192 | 1.33k | std::vector<std::string> files; |
193 | 1.33k | auto env = Env::Default(); |
194 | 1.33k | auto status = env->GetChildren(dir, ExcludeDots::kTrue, &files); |
195 | 1.33k | if (!status.ok()) { |
196 | 0 | return STATUS(IllegalState, Substitute("Unable to get RocksDB files in dir $0: $1", dir, |
197 | 0 | status.ToString())); |
198 | 0 | } |
199 | | |
200 | 1.33k | google::protobuf::RepeatedPtrField<tablet::FilePB> result; |
201 | 1.33k | result.Reserve(narrow_cast<int>(files.size())); |
202 | 4.49k | for (const auto& file : files) { |
203 | 4.49k | auto full_path = JoinPathSegments(dir, file); |
204 | 4.49k | if (VERIFY_RESULT(env->IsDirectory(full_path))) { |
205 | 55 | auto sub_files = VERIFY_RESULT(ListFiles(full_path)); |
206 | 114 | for (auto& subfile : sub_files) { |
207 | 114 | subfile.set_name(JoinPathSegments(file, subfile.name())); |
208 | 114 | *result.Add() = std::move(subfile); |
209 | 114 | } |
210 | 55 | continue; |
211 | 4.44k | } |
212 | 4.44k | auto file_pb = result.Add(); |
213 | 4.44k | file_pb->set_name(file); |
214 | 4.44k | file_pb->set_size_bytes(VERIFY_RESULT(env->GetFileSize(full_path))); |
215 | 4.44k | file_pb->set_inode(VERIFY_RESULT(env->GetFileINode(full_path))); |
216 | 4.44k | } |
217 | | |
218 | 1.33k | return result; |
219 | 1.33k | } |
220 | | |
221 | | const std::string RemoteBootstrapSession::kCheckpointsDir = "checkpoints"; |
222 | | |
223 | 1.44k | Status RemoteBootstrapSession::Init() { |
224 | | // Take locks to support re-initialization of the same session. |
225 | 1.44k | std::lock_guard<std::mutex> lock(mutex_); |
226 | 1.44k | RETURN_NOT_OK(UnregisterAnchorIfNeededUnlocked()); |
227 | | |
228 | 1.44k | const string& tablet_id = tablet_peer_->tablet_id(); |
229 | | |
230 | | // Prevent log GC while we grab log segments and Tablet metadata. |
231 | 1.44k | string anchor_owner_token = Substitute("RemoteBootstrap-$0", session_id_); |
232 | 1.44k | tablet_peer_->log_anchor_registry()->Register( |
233 | 1.44k | MinimumOpId().index(), anchor_owner_token, &log_anchor_); |
234 | | |
235 | | // Read the SuperBlock from disk. |
236 | 1.44k | const RaftGroupMetadataPtr& metadata = tablet_peer_->tablet_metadata(); |
237 | 1.44k | RETURN_NOT_OK_PREPEND(metadata->ReadSuperBlockFromDisk(&tablet_superblock_), |
238 | 1.44k | Substitute("Unable to access superblock for tablet $0", |
239 | 1.44k | tablet_id)); |
240 | | |
241 | 1.44k | if (!tablet_peer_->log_available()) { |
242 | 0 | return STATUS(IllegalState, "Tablet is not running (log is uninitialized)"); |
243 | 0 | } |
244 | | // Get the latest opid in the log at this point in time so we can re-anchor. |
245 | 1.44k | auto last_logged_opid = tablet_peer_->GetLatestLogEntryOpId(); |
246 | | |
247 | 1.44k | auto tablet = tablet_peer_->shared_tablet(); |
248 | 1.44k | if (PREDICT_FALSE(!tablet)) { |
249 | 0 | return STATUS(IllegalState, "Tablet is not running"); |
250 | 0 | } |
251 | | |
252 | 1.44k | MonoTime now = MonoTime::Now(); |
253 | 1.44k | auto* kv_store = tablet_superblock_.mutable_kv_store(); |
254 | 1.44k | const auto checkpoints_dir = JoinPathSegments(kv_store->rocksdb_dir(), kCheckpointsDir); |
255 | | |
256 | 1.44k | auto session_checkpoint_dir = std::to_string(last_logged_opid.index) + "_" + now.ToString(); |
257 | 1.44k | checkpoint_dir_ = JoinPathSegments(checkpoints_dir, session_checkpoint_dir); |
258 | | |
259 | | // Clear any previous RocksDB files in the superblock. Each session should create a new list |
260 | | // based the checkpoint directory files. |
261 | 1.44k | kv_store->clear_rocksdb_files(); |
262 | 1.44k | auto status = tablet->snapshots().CreateCheckpoint(checkpoint_dir_); |
263 | 1.44k | if (status.ok()) { |
264 | 1.28k | *kv_store->mutable_rocksdb_files() = VERIFY_RESULT(ListFiles(checkpoint_dir_)); |
265 | 168 | } else if (!status.IsNotSupported()) { |
266 | 0 | RETURN_NOT_OK(status); |
267 | 0 | } |
268 | | |
269 | 7.23k | for (const auto& source : sources_) { |
270 | 7.23k | if (source) { |
271 | 1.44k | RETURN_NOT_OK(source->Init()); |
272 | 1.44k | } |
273 | 7.23k | } |
274 | | |
275 | | // Get the current segments from the log, including the active segment. |
276 | | // The Log doesn't add the active segment to the log reader's list until |
277 | | // a header has been written to it (but it will not have a footer). |
278 | 1.44k | RETURN_NOT_OK(tablet_peer_->log()->GetSegmentsSnapshot(&log_segments_)); |
279 | 1.44k | log_anchor_index_ = last_logged_opid.index; |
280 | 1.44k | for (const auto& log_segment : log_segments_) { |
281 | 1.44k | if (log_segment->HasFooter() && log_segment->footer().has_min_replicate_index()) { |
282 | 236 | log_anchor_index_ = log_segment->footer().min_replicate_index(); |
283 | 236 | break; |
284 | 236 | } |
285 | 1.44k | } |
286 | | |
287 | | // Re-anchor on the highest OpId that was in the log right before we |
288 | | // snapshotted the log segments. This helps ensure that we don't end up in a |
289 | | // remote bootstrap loop due to a follower falling too far behind the |
290 | | // leader's log when remote bootstrap is slow. The remote controls when |
291 | | // this anchor is released by ending the remote bootstrap session. |
292 | 1.44k | RETURN_NOT_OK(tablet_peer_->log_anchor_registry()->UpdateRegistration( |
293 | 1.44k | log_anchor_index_, &log_anchor_)); |
294 | | |
295 | | // Look up the committed consensus state. |
296 | | // We do this after snapshotting the log for YB table types to avoid a scenario where the latest |
297 | | // entry in the log has a term higher than the term stored in the consensus metadata, which |
298 | | // will result in a CHECK failure on RaftConsensus init. |
299 | 1.44k | RETURN_NOT_OK(SetInitialCommittedState()); |
300 | | |
301 | 1.44k | start_time_ = MonoTime::Now(); |
302 | | |
303 | 1.44k | return Status::OK(); |
304 | 1.44k | } |
305 | | |
306 | 10 | const std::string& RemoteBootstrapSession::tablet_id() const { |
307 | 10 | return tablet_peer_->tablet_id(); |
308 | 10 | } |
309 | | |
310 | 957 | const std::string& RemoteBootstrapSession::requestor_uuid() const { |
311 | 957 | return requestor_uuid_; |
312 | 957 | } |
313 | | |
314 | | namespace { |
315 | | |
316 | | // Determine the length of the data chunk to return to the client. |
317 | 4.82k | int64_t DetermineReadLength(int64_t bytes_remaining, int64_t requested_len) { |
318 | | // Determine the size of the chunks we want to read. |
319 | | // Choose "system max" as a multiple of typical HDD block size (4K) with 4K to |
320 | | // spare for other stuff in the message, like headers, other protobufs, etc. |
321 | 4.82k | const int32_t kSpareBytes = 4096; |
322 | 4.82k | const int32_t kDiskSectorSize = 4096; |
323 | 4.82k | auto system_max_chunk_size = |
324 | 4.82k | ((FLAGS_rpc_max_message_size - kSpareBytes) / kDiskSectorSize) * kDiskSectorSize; |
325 | 0 | CHECK_GT(system_max_chunk_size, 0) << "rpc_max_message_size is too low to transfer data: " |
326 | 0 | << FLAGS_rpc_max_message_size; |
327 | | |
328 | | // The min of the {requested, system} maxes is the effective max. |
329 | 4.82k | int64_t maxlen = requested_len > 0 ? std::min<int64_t>(requested_len, system_max_chunk_size) |
330 | 0 | : system_max_chunk_size; |
331 | 4.82k | return std::min(bytes_remaining, maxlen); |
332 | 4.82k | } |
333 | | |
334 | | // Calculate the size of the data to return given a maximum client message |
335 | | // length, the file itself, and the offset into the file to be read from. |
336 | 4.82k | Result<int64_t> GetResponseDataSize(GetDataPieceInfo* info) { |
337 | | // If requested offset is off the end of the data, bail. |
338 | 4.82k | if (info->offset >= info->data_size) { |
339 | 0 | info->error_code = RemoteBootstrapErrorPB::INVALID_REMOTE_BOOTSTRAP_REQUEST; |
340 | 0 | return STATUS_FORMAT(InvalidArgument, |
341 | 0 | "Requested offset ($0) is beyond the data size ($1)", |
342 | 0 | info->offset, info->data_size); |
343 | 0 | } |
344 | | |
345 | 4.82k | auto result = DetermineReadLength(info->bytes_remaining(), info->client_maxlen); |
346 | 4.82k | DCHECK_GT(result, 0); |
347 | 4.82k | if (info->client_maxlen > 0) { |
348 | 4.82k | DCHECK_LE(result, info->client_maxlen); |
349 | 4.82k | } |
350 | | |
351 | 4.82k | return result; |
352 | 4.82k | } |
353 | | |
354 | | // Read a chunk of a file into a buffer. |
355 | | // data_name provides a string for the block/log to be used in error messages. |
356 | 4.82k | Status ReadFileChunkToBuf(RandomAccessFile* file, const string& data_name, GetDataPieceInfo* info) { |
357 | 4.82k | auto response_data_size = VERIFY_RESULT_PREPEND( |
358 | 4.82k | GetResponseDataSize(info), Format("Error reading $0", data_name)); |
359 | | |
360 | 4.82k | Stopwatch chunk_timer(Stopwatch::THIS_THREAD); |
361 | 4.82k | chunk_timer.start(); |
362 | | |
363 | | // Writing into a std::string buffer is basically guaranteed to work on C++11, |
364 | | // however any modern compiler should be compatible with it. |
365 | | // Violates the API contract, but avoids excessive copies. |
366 | 4.82k | info->data.resize(response_data_size); |
367 | 4.82k | auto buf = reinterpret_cast<uint8_t*>(const_cast<char*>(info->data.data())); |
368 | 4.82k | Slice slice; |
369 | 4.82k | Status s = env_util::ReadFully(file, info->offset, response_data_size, &slice, buf); |
370 | 4.82k | if (PREDICT_FALSE(!s.ok())) { |
371 | 0 | s = s.CloneAndPrepend(Format("Unable to read existing file for $0", data_name)); |
372 | 0 | LOG(WARNING) << s; |
373 | 0 | info->error_code = RemoteBootstrapErrorPB::IO_ERROR; |
374 | 0 | return s; |
375 | 0 | } |
376 | | // Figure out if Slice points to buf or if Slice points to the mmap. |
377 | | // If it points to the mmap then copy into buf. |
378 | 4.82k | if (slice.data() != buf) { |
379 | 0 | memcpy(buf, slice.data(), slice.size()); |
380 | 0 | } |
381 | 4.82k | chunk_timer.stop(); |
382 | 4.82k | TRACE("Remote bootstrap: $0: $1 total bytes read. Total time elapsed: $2", |
383 | 4.82k | data_name, response_data_size, chunk_timer.elapsed().ToString()); |
384 | | |
385 | 4.82k | return Status::OK(); |
386 | 4.82k | } |
387 | | |
388 | | } // namespace |
389 | | |
390 | 4.46k | Env* RemoteBootstrapSession::env() const { |
391 | 4.46k | return tablet_peer_->tablet_metadata()->fs_manager()->env(); |
392 | 4.46k | } |
393 | | |
394 | 5.82k | RemoteBootstrapSource* RemoteBootstrapSession::Source(DataIdPB::IdType id_type) const { |
395 | 5.82k | size_t idx = id_type; |
396 | 18.4E | return idx < sources_.size() ? sources_[idx].get() : nullptr; |
397 | 5.82k | } |
398 | | |
399 | 5.83k | Status RemoteBootstrapSession::ValidateDataId(const yb::tserver::DataIdPB& data_id) { |
400 | 5.83k | const auto& source = Source(data_id.type()); |
401 | | |
402 | 5.83k | if (source) { |
403 | 18 | return source->ValidateDataId(data_id); |
404 | 18 | } |
405 | | |
406 | 5.81k | switch (data_id.type()) { |
407 | 2.30k | case DataIdPB::LOG_SEGMENT: |
408 | 2.30k | if (PREDICT_FALSE(!data_id.wal_segment_seqno())) { |
409 | 0 | return STATUS(InvalidArgument, |
410 | 0 | "segment sequence number must be specified for type == LOG_SEGMENT", |
411 | 0 | data_id.ShortDebugString()); |
412 | 0 | } |
413 | 2.30k | return Status::OK(); |
414 | 3.50k | case DataIdPB::ROCKSDB_FILE: |
415 | 3.50k | if (PREDICT_FALSE(data_id.file_name().empty())) { |
416 | 0 | return STATUS(InvalidArgument, |
417 | 0 | "file name must be specified for type == ROCKSDB_FILE", |
418 | 0 | data_id.ShortDebugString()); |
419 | 0 | } |
420 | 3.50k | return Status::OK(); |
421 | 0 | case DataIdPB::SNAPSHOT_FILE: FALLTHROUGH_INTENDED; |
422 | 0 | case DataIdPB::UNKNOWN: |
423 | 0 | return STATUS(InvalidArgument, "Type not supported", data_id.ShortDebugString()); |
424 | 0 | } |
425 | 0 | LOG(FATAL) << "Invalid data id type: " << data_id.type(); |
426 | 0 | } |
427 | | |
428 | 5.82k | Status RemoteBootstrapSession::GetDataPiece(const DataIdPB& data_id, GetDataPieceInfo* info) { |
429 | 5.82k | const auto& source = sources_[data_id.type()]; |
430 | | |
431 | 5.82k | if (source) { |
432 | | // Fetching a snapshot file chunk. |
433 | 18 | RETURN_NOT_OK_PREPEND( |
434 | 18 | source->GetDataPiece(data_id, info), |
435 | 16 | "Unable to get piece of snapshot file"); |
436 | 16 | return Status::OK(); |
437 | 5.81k | } |
438 | | |
439 | | |
440 | 5.81k | switch (data_id.type()) { |
441 | 2.30k | case DataIdPB::LOG_SEGMENT: { |
442 | | // Fetching a log segment chunk. |
443 | 2.30k | RETURN_NOT_OK_PREPEND(GetLogSegmentPiece(data_id.wal_segment_seqno(), info), |
444 | 1.30k | "Unable to get piece of log segment"); |
445 | 1.30k | break; |
446 | 2.30k | } |
447 | 3.50k | case DataIdPB::ROCKSDB_FILE: { |
448 | | // Fetching a RocksDB file chunk. |
449 | 3.50k | const string file_name = data_id.file_name(); |
450 | 3.50k | RETURN_NOT_OK_PREPEND(GetRocksDBFilePiece(data_id.file_name(), info), |
451 | 3.50k | "Unable to get piece of RocksDB file"); |
452 | 3.50k | break; |
453 | 3.50k | } |
454 | 0 | default: |
455 | 0 | info->error_code = RemoteBootstrapErrorPB::INVALID_REMOTE_BOOTSTRAP_REQUEST; |
456 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "Invalid request type $0", data_id.type()); |
457 | 4.81k | } |
458 | 1 | DCHECK(info->client_maxlen == 0 || |
459 | 1 | info->data.size() <= implicit_cast<size_t>(info->client_maxlen)) |
460 | 1 | << "client_maxlen: " << info->client_maxlen << ", data->size(): " << info->data.size(); |
461 | | |
462 | 4.81k | return Status::OK(); |
463 | 4.81k | } |
464 | | |
465 | 2.30k | Status RemoteBootstrapSession::GetLogSegmentPiece(uint64_t segment_seqno, GetDataPieceInfo* info) { |
466 | 2.30k | std::shared_ptr<RandomAccessFile> file; |
467 | 2.30k | { |
468 | 2.30k | std::lock_guard<std::mutex> lock(mutex_); |
469 | 2.30k | if (opened_log_segment_seqno_ != segment_seqno) { |
470 | 2.24k | RETURN_NOT_OK(OpenLogSegment(segment_seqno, &info->error_code)); |
471 | 2.24k | } |
472 | 1.30k | info->data_size = opened_log_segment_file_size_; |
473 | 1.30k | file = opened_log_segment_file_; |
474 | 1.30k | } |
475 | 1.30k | RETURN_NOT_OK(ReadFileChunkToBuf(file.get(), Substitute("log segment $0", segment_seqno), info)); |
476 | | |
477 | | // Note: We do not eagerly close log segment files, since we share ownership |
478 | | // of the LogSegment objects with the Log itself. |
479 | | |
480 | 1.30k | return Status::OK(); |
481 | 1.30k | } |
482 | | |
483 | | Status RemoteBootstrapSession::GetRocksDBFilePiece( |
484 | 3.50k | const std::string& file_name, GetDataPieceInfo* info) { |
485 | 3.50k | return GetFilePiece(checkpoint_dir_, file_name, env(), info); |
486 | 3.50k | } |
487 | | |
488 | | Status RemoteBootstrapSession::GetFilePiece( |
489 | 3.52k | const std::string& path, const std::string& file_name, Env* env, GetDataPieceInfo* info) { |
490 | 3.52k | auto file_path = JoinPathSegments(path, file_name); |
491 | 3.52k | if (!env->FileExists(file_path)) { |
492 | 4 | info->error_code = RemoteBootstrapErrorPB::ROCKSDB_FILE_NOT_FOUND; |
493 | 4 | return STATUS(NotFound, Substitute("Unable to find RocksDB file $0 in directory $1", |
494 | 4 | file_name, path)); |
495 | 4 | } |
496 | | |
497 | 3.52k | std::unique_ptr<RandomAccessFile> readable_file; |
498 | | |
499 | 3.52k | RETURN_NOT_OK(env->NewRandomAccessFile(file_path, &readable_file)); |
500 | | |
501 | 3.52k | info->data_size = VERIFY_RESULT(readable_file->Size()); |
502 | 3.52k | auto inode = VERIFY_RESULT(readable_file->INode()); |
503 | 2 | VLOG(2) << "Reading RocksDB file. File path: " << file_path << ", file size: " << info->data_size |
504 | 2 | << ", inode: " << inode; |
505 | | |
506 | 3.52k | RETURN_NOT_OK(ReadFileChunkToBuf( |
507 | 3.52k | readable_file.get(), Substitute("rocksdb file $0", file_name), info)); |
508 | | |
509 | 3.52k | return Status::OK(); |
510 | 3.52k | } |
511 | | |
512 | | // Add a file to the cache and populate the given ImmutableRandomAcccessFileInfo |
513 | | // object with the file ref and size. |
514 | | template <class Collection, class Key, class Readable> |
515 | | static Status AddImmutableFileToMap(Collection* const cache, |
516 | | const Key& key, |
517 | | const Readable& readable, |
518 | | uint64_t size) { |
519 | | // Sanity check for 0-length files. |
520 | | if (size == 0) { |
521 | | return STATUS(Corruption, "Found 0-length object"); |
522 | | } |
523 | | |
524 | | // Looks good, add it to the cache. |
525 | | typedef typename Collection::mapped_type InfoPtr; |
526 | | typedef typename InfoPtr::element_type Info; |
527 | | CHECK(cache->emplace(key, std::make_unique<Info>(readable, size)).second); |
528 | | |
529 | | return Status::OK(); |
530 | | } |
531 | | |
532 | | Status RemoteBootstrapSession::OpenLogSegment( |
533 | 2.24k | uint64_t segment_seqno, RemoteBootstrapErrorPB::Code* error_code) { |
534 | 2.24k | auto active_seqno = tablet_peer_->log()->active_segment_sequence_number(); |
535 | 2.24k | auto log_segment = tablet_peer_->log()->GetSegmentBySequenceNumber(segment_seqno); |
536 | | // Usually active log segment is extended, while sent of the wire. So we cannot send next segment, |
537 | | // Otherwise entries at end of previously active log segment could be missing. |
538 | 2.24k | if (opened_log_segment_active_) { |
539 | 999 | *error_code = RemoteBootstrapErrorPB::WAL_SEGMENT_NOT_FOUND; |
540 | 999 | return STATUS_FORMAT(NotFound, "Already sent active log segment, don't send $0", segment_seqno); |
541 | 999 | } |
542 | 1.24k | if (!log_segment) { |
543 | 1 | *error_code = RemoteBootstrapErrorPB::WAL_SEGMENT_NOT_FOUND; |
544 | 1 | return STATUS_FORMAT(NotFound, "Log segment $0 not found", segment_seqno); |
545 | 1 | } |
546 | 1.24k | opened_log_segment_file_size_ = log_segment->readable_up_to() + log_segment->get_header_size(); |
547 | 1.24k | opened_log_segment_seqno_ = segment_seqno; |
548 | 1.24k | opened_log_segment_file_ = log_segment->readable_file_checkpoint(); |
549 | 1.24k | opened_log_segment_active_ = active_seqno == segment_seqno; |
550 | | |
551 | 1.24k | if (log_segment->HasFooter() && |
552 | 243 | log_segment->footer().min_replicate_index() > log_anchor_index_) { |
553 | 14 | log_anchor_index_ = log_segment->footer().min_replicate_index(); |
554 | | |
555 | | // Update log anchor, since we don't need older logs anymore. |
556 | 14 | auto status = tablet_peer_->log_anchor_registry()->UpdateRegistration( |
557 | 14 | log_anchor_index_, &log_anchor_); |
558 | 14 | if (!status.ok()) { |
559 | 0 | *error_code = RemoteBootstrapErrorPB::UNKNOWN_ERROR; |
560 | 0 | return status; |
561 | 0 | } |
562 | 1.24k | } |
563 | | |
564 | 1.24k | return Status::OK(); |
565 | 1.24k | } |
566 | | |
567 | 2.40k | Status RemoteBootstrapSession::UnregisterAnchorIfNeededUnlocked() { |
568 | 2.40k | return tablet_peer_->log_anchor_registry()->UnregisterIfAnchored(&log_anchor_); |
569 | 2.40k | } |
570 | | |
571 | 1.00k | void RemoteBootstrapSession::SetSuccess() { |
572 | 1.00k | std::lock_guard<std::mutex> lock(mutex_); |
573 | 1.00k | succeeded_ = true; |
574 | 1.00k | } |
575 | | |
576 | 1.01k | bool RemoteBootstrapSession::Succeeded() { |
577 | 1.01k | std::lock_guard<std::mutex> lock(mutex_); |
578 | 1.01k | return succeeded_; |
579 | 1.01k | } |
580 | | |
581 | 5.83k | void RemoteBootstrapSession::EnsureRateLimiterIsInitialized() { |
582 | 5.83k | if (!rate_limiter_.IsInitialized()) { |
583 | 1.00k | InitRateLimiter(); |
584 | 1.00k | } |
585 | 5.83k | } |
586 | | |
587 | | |
588 | 1.00k | void RemoteBootstrapSession::InitRateLimiter() { |
589 | 1.00k | if (FLAGS_remote_bootstrap_rate_limit_bytes_per_sec > 0 && nsessions_) { |
590 | | // Calling SetTargetRateUpdater will activate the rate limiter. |
591 | 10.6k | rate_limiter_.SetTargetRateUpdater([this]() -> uint64_t { |
592 | 10.6k | DCHECK_GT(FLAGS_remote_bootstrap_rate_limit_bytes_per_sec, 0); |
593 | 10.6k | if (FLAGS_remote_bootstrap_rate_limit_bytes_per_sec <= 0) { |
594 | 0 | YB_LOG_EVERY_N(ERROR, 1000) |
595 | 0 | << "Invalid value for remote_bootstrap_rate_limit_bytes_per_sec: " |
596 | 0 | << FLAGS_remote_bootstrap_rate_limit_bytes_per_sec; |
597 | | // Since the rate limiter is initialized, it's expected that the value of |
598 | | // FLAGS_remote_bootstrap_rate_limit_bytes_per_sec is greater than 0. Since this is not the |
599 | | // case, we'll log an error, and set the rate to 50 MB/s. |
600 | 0 | return 50_MB; |
601 | 0 | } |
602 | 10.6k | auto nsessions = nsessions_->load(std::memory_order_acquire); |
603 | 10.6k | if (nsessions > 0) { |
604 | 10.6k | return FLAGS_remote_bootstrap_rate_limit_bytes_per_sec / nsessions; |
605 | 1 | } else { |
606 | 1 | LOG(DFATAL) << "Invalid number of sessions: " << nsessions; |
607 | 1 | return FLAGS_remote_bootstrap_rate_limit_bytes_per_sec; |
608 | 1 | } |
609 | 10.6k | }); |
610 | 1.00k | } |
611 | 1.00k | rate_limiter_.Init(); |
612 | 1.00k | } |
613 | | |
614 | | } // namespace tserver |
615 | | } // namespace yb |