/Users/deen/code/yugabyte-db/src/yb/tserver/remote_bootstrap_file_downloader.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tserver/remote_bootstrap_file_downloader.h" |
15 | | |
16 | | #include "yb/common/wire_protocol.h" |
17 | | |
18 | | #include "yb/gutil/casts.h" |
19 | | |
20 | | #include "yb/fs/fs_manager.h" |
21 | | |
22 | | #include "yb/rpc/rpc_controller.h" |
23 | | |
24 | | #include "yb/tserver/remote_bootstrap.proxy.h" |
25 | | |
26 | | #include "yb/util/crc.h" |
27 | | #include "yb/util/flag_tags.h" |
28 | | #include "yb/util/logging.h" |
29 | | #include "yb/util/net/rate_limiter.h" |
30 | | #include "yb/util/size_literals.h" |
31 | | #include "yb/util/status_format.h" |
32 | | |
33 | | using namespace yb::size_literals; |
34 | | |
35 | | DECLARE_uint64(rpc_max_message_size); |
36 | | DEFINE_int32(remote_bootstrap_max_chunk_size, 1_MB, |
37 | | "Maximum chunk size to be transferred at a time during remote bootstrap."); |
38 | | |
39 | | // Deprecated because it's misspelled. But if set, this flag takes precedence over |
40 | | // remote_bootstrap_rate_limit_bytes_per_sec for compatibility. |
41 | | DEFINE_int64(remote_boostrap_rate_limit_bytes_per_sec, 0, |
42 | | "DEPRECATED. Replaced by flag remote_bootstrap_rate_limit_bytes_per_sec."); |
43 | | TAG_FLAG(remote_boostrap_rate_limit_bytes_per_sec, hidden); |
44 | | |
45 | | DEFINE_int64(remote_bootstrap_rate_limit_bytes_per_sec, 256_MB, |
46 | | "Maximum transmission rate during a remote bootstrap. This is across all the remote " |
47 | | "bootstrap sessions for which this process is acting as a sender or receiver. So " |
48 | | "the total limit will be 2 * remote_bootstrap_rate_limit_bytes_per_sec because a " |
49 | | "tserver or master can act both as a sender and receiver at the same time."); |
50 | | |
51 | | DEFINE_int32(bytes_remote_bootstrap_durable_write_mb, 8, |
52 | | "Explicitly call fsync after downloading the specified amount of data in MB " |
53 | | "during a remote bootstrap session. If 0 fsync() is not called."); |
54 | | |
55 | | // RETURN_NOT_OK_PREPEND() with a remote-error unwinding step. |
56 | | #define RETURN_NOT_OK_UNWIND_PREPEND(status, controller, msg) \ |
57 | 5.82k | RETURN_NOT_OK_PREPEND(UnwindRemoteError(status, controller), msg) |
58 | | |
59 | | namespace yb { |
60 | | namespace tserver { |
61 | | |
62 | | namespace { |
63 | | |
64 | | // Decode the remote error into a human-readable Status object. |
65 | | CHECKED_STATUS ExtractRemoteError( |
66 | 1.01k | const rpc::ErrorStatusPB& remote_error, const Status& original_status) { |
67 | 1.01k | if (!remote_error.HasExtension(RemoteBootstrapErrorPB::remote_bootstrap_error_ext)) { |
68 | 0 | return original_status; |
69 | 0 | } |
70 | | |
71 | 1.01k | const RemoteBootstrapErrorPB& error = |
72 | 1.01k | remote_error.GetExtension(RemoteBootstrapErrorPB::remote_bootstrap_error_ext); |
73 | 1.01k | LOG(INFO) << "ExtractRemoteError: " << error.ShortDebugString(); |
74 | 1.01k | return StatusFromPB(error.status()).CloneAndPrepend( |
75 | 1.01k | "Received error code " + RemoteBootstrapErrorPB::Code_Name(error.code()) + |
76 | 1.01k | " from remote service"); |
77 | 1.01k | } |
78 | | |
79 | | } // namespace |
80 | | |
81 | | extern std::atomic<int32_t> remote_bootstrap_clients_started_; |
82 | | |
83 | | RemoteBootstrapFileDownloader::RemoteBootstrapFileDownloader( |
84 | | const std::string* log_prefix, FsManager* fs_manager) |
85 | 1.44k | : log_prefix_(*log_prefix), fs_manager_(*fs_manager) { |
86 | 1.44k | } |
87 | | |
88 | | void RemoteBootstrapFileDownloader::Start( |
89 | | std::shared_ptr<RemoteBootstrapServiceProxy> proxy, std::string session_id, |
90 | 1.43k | MonoDelta session_idle_timeout) { |
91 | 1.43k | proxy_ = std::move(proxy); |
92 | 1.43k | session_id_ = std::move(session_id); |
93 | 1.43k | session_idle_timeout_ = session_idle_timeout; |
94 | 1.43k | } |
95 | | |
96 | 7.13k | Env& RemoteBootstrapFileDownloader::env() const { |
97 | 7.13k | return *fs_manager_.env(); |
98 | 7.13k | } |
99 | | |
100 | | Status RemoteBootstrapFileDownloader::DownloadFile( |
101 | 3.56k | const tablet::FilePB& file_pb, const std::string& dir, DataIdPB *data_id) { |
102 | 3.56k | auto file_path = JoinPathSegments(dir, file_pb.name()); |
103 | 3.56k | RETURN_NOT_OK(env().CreateDirs(DirName(file_path))); |
104 | | |
105 | 3.56k | if (file_pb.inode() != 0) { |
106 | 3.56k | auto it = inode2file_.find(file_pb.inode()); |
107 | 3.56k | if (it != inode2file_.end()) { |
108 | 0 | VLOG_WITH_PREFIX(2) << "File with the same inode already found: " << file_path |
109 | 0 | << " => " << it->second; |
110 | 38 | auto link_status = env().LinkFile(it->second, file_path); |
111 | 38 | if (link_status.ok()) { |
112 | 38 | return Status::OK(); |
113 | 38 | } |
114 | | // TODO fallback to copy. |
115 | 0 | LOG_WITH_PREFIX(ERROR) << "Failed to link file: " << file_path << " => " << it->second |
116 | 0 | << ": " << link_status; |
117 | 0 | } |
118 | 3.56k | } |
119 | | |
120 | 3.52k | WritableFileOptions opts; |
121 | 3.52k | opts.sync_on_close = true; |
122 | 3.52k | std::unique_ptr<WritableFile> file; |
123 | 3.52k | RETURN_NOT_OK(env().NewWritableFile(opts, file_path, &file)); |
124 | | |
125 | 3.52k | data_id->set_file_name(file_pb.name()); |
126 | 3.52k | RETURN_NOT_OK_PREPEND(DownloadFile(*data_id, file.get()), |
127 | 3.52k | Format("Unable to download $0 file $1", |
128 | 3.52k | DataIdPB::IdType_Name(data_id->type()), file_path)); |
129 | 2 | VLOG_WITH_PREFIX(2) << "Downloaded file " << file_path; |
130 | | |
131 | 3.52k | if (file_pb.inode() != 0) { |
132 | 3.52k | inode2file_.emplace(file_pb.inode(), file_path); |
133 | 3.52k | } |
134 | | |
135 | 3.52k | return Status::OK(); |
136 | 3.52k | } |
137 | | |
138 | | template<class Appendable> |
139 | | Status RemoteBootstrapFileDownloader::DownloadFile( |
140 | 5.76k | const DataIdPB& data_id, Appendable* appendable) { |
141 | 5.76k | constexpr int kBytesReservedForMessageHeaders = 16384; |
142 | | |
143 | | // For periodic sync, indicates number of bytes which need to be sync'ed. |
144 | 5.76k | size_t periodic_sync_unsynced_bytes = 0; |
145 | 5.76k | uint64_t offset = 0; |
146 | 5.76k | auto max_length = std::min<size_t>(FLAGS_remote_bootstrap_max_chunk_size, |
147 | 5.76k | FLAGS_rpc_max_message_size - kBytesReservedForMessageHeaders); |
148 | | |
149 | 5.76k | std::unique_ptr<RateLimiter> rate_limiter; |
150 | | |
151 | 5.76k | if (FLAGS_remote_bootstrap_rate_limit_bytes_per_sec > 0) { |
152 | 11.6k | static auto rate_updater = []() { |
153 | 11.6k | auto remote_bootstrap_clients_started = |
154 | 11.6k | remote_bootstrap_clients_started_.load(std::memory_order_acquire); |
155 | 11.6k | if (remote_bootstrap_clients_started < 1) { |
156 | 0 | YB_LOG_EVERY_N(ERROR, 100) << "Invalid number of remote bootstrap sessions: " |
157 | 0 | << remote_bootstrap_clients_started; |
158 | 0 | return static_cast<uint64_t>(FLAGS_remote_bootstrap_rate_limit_bytes_per_sec); |
159 | 0 | } |
160 | 11.6k | return static_cast<uint64_t>( |
161 | 11.6k | FLAGS_remote_bootstrap_rate_limit_bytes_per_sec / remote_bootstrap_clients_started); |
162 | 11.6k | }; |
163 | | |
164 | 5.76k | rate_limiter = std::make_unique<RateLimiter>(rate_updater); |
165 | 0 | } else { |
166 | | // Inactive RateLimiter. |
167 | 0 | rate_limiter = std::make_unique<RateLimiter>(); |
168 | 0 | } |
169 | | |
170 | 5.76k | rpc::RpcController controller; |
171 | 5.76k | controller.set_timeout(session_idle_timeout_); |
172 | 5.76k | FetchDataRequestPB req; |
173 | | |
174 | 5.76k | bool done = false; |
175 | 10.5k | while (!done) { |
176 | 5.82k | controller.Reset(); |
177 | 5.82k | req.set_session_id(session_id_); |
178 | 5.82k | req.mutable_data_id()->CopyFrom(data_id); |
179 | 5.82k | req.set_offset(offset); |
180 | 5.82k | if (rate_limiter->active()) { |
181 | 5.82k | auto max_size = rate_limiter->GetMaxSizeForNextTransmission(); |
182 | 5.82k | if (max_size > std::numeric_limits<decltype(max_length)>::max()) { |
183 | 0 | max_size = std::numeric_limits<decltype(max_length)>::max(); |
184 | 0 | } |
185 | 5.82k | max_length = std::min(max_length, decltype(max_length)(max_size)); |
186 | 5.82k | } |
187 | 5.82k | req.set_max_length(max_length); |
188 | | |
189 | 5.82k | FetchDataResponsePB resp; |
190 | 5.82k | auto status = rate_limiter->SendOrReceiveData([this, &req, &resp, &controller]() { |
191 | 5.82k | return proxy_->FetchData(req, &resp, &controller); |
192 | 4.82k | }, [&resp]() { return resp.ByteSize(); }); |
193 | 5.82k | RETURN_NOT_OK_UNWIND_PREPEND(status, controller, "Unable to fetch data from remote"); |
194 | 4.82k | DCHECK_LE(resp.chunk().data().size(), max_length); |
195 | | |
196 | | // Sanity-check for corruption. |
197 | 4.82k | RETURN_NOT_OK_PREPEND(VerifyData(offset, resp.chunk()), |
198 | 4.82k | Format("Error validating data item $0", data_id)); |
199 | | |
200 | | // Write the data. |
201 | 4.82k | RETURN_NOT_OK(appendable->Append(resp.chunk().data())); |
202 | 18.4E | VLOG_WITH_PREFIX(3) |
203 | 18.4E | << "resp size: " << resp.ByteSize() << ", chunk size: " << resp.chunk().data().size(); |
204 | | |
205 | 4.82k | if (offset + resp.chunk().data().size() == |
206 | 4.76k | implicit_cast<size_t>(resp.chunk().total_data_length())) { |
207 | 4.76k | done = true; |
208 | 4.76k | } |
209 | 4.82k | offset += resp.chunk().data().size(); |
210 | 4.82k | if (FLAGS_bytes_remote_bootstrap_durable_write_mb != 0) { |
211 | 4.82k | periodic_sync_unsynced_bytes += resp.chunk().data().size(); |
212 | 4.82k | if (periodic_sync_unsynced_bytes > FLAGS_bytes_remote_bootstrap_durable_write_mb * 1_MB) { |
213 | 0 | RETURN_NOT_OK(appendable->Sync()); |
214 | 0 | periodic_sync_unsynced_bytes = 0; |
215 | 0 | } |
216 | 4.82k | } |
217 | 4.82k | } |
218 | | |
219 | 1 | VLOG_WITH_PREFIX(2) << "Transmission rate: " << rate_limiter->GetRate(); |
220 | | |
221 | 4.76k | return Status::OK(); |
222 | 5.76k | } |
223 | | |
224 | 4.82k | Status RemoteBootstrapFileDownloader::VerifyData(uint64_t offset, const DataChunkPB& chunk) { |
225 | | // Verify the offset is what we expected. |
226 | 4.82k | if (offset != chunk.offset()) { |
227 | 0 | return STATUS_FORMAT( |
228 | 0 | InvalidArgument, "Offset did not match what was asked for $0 vs $1", |
229 | 0 | offset, chunk.offset()); |
230 | 0 | } |
231 | | |
232 | | // Verify the checksum. |
233 | 4.82k | uint32_t crc32 = crc::Crc32c(chunk.data().data(), chunk.data().length()); |
234 | 4.82k | if (PREDICT_FALSE(crc32 != chunk.crc32())) { |
235 | 0 | return STATUS_FORMAT( |
236 | 0 | Corruption, "CRC32 does not match at offset $0 size $1: $2 vs $3", |
237 | 0 | offset, chunk.data().size(), crc32, chunk.crc32()); |
238 | 0 | } |
239 | 4.82k | return Status::OK(); |
240 | 4.82k | } |
241 | | |
242 | | // Enhance a RemoteError Status message with additional details from the remote. |
243 | 7.28k | Status UnwindRemoteError(const Status& status, const rpc::RpcController& controller) { |
244 | 7.28k | if (!status.IsRemoteError()) { |
245 | 6.27k | return status; |
246 | 6.27k | } |
247 | 1.01k | return ExtractRemoteError(*controller.error_response(), status); |
248 | 1.01k | } |
249 | | |
250 | | } // namespace tserver |
251 | | } // namespace yb |