/Users/deen/code/yugabyte-db/src/yb/tserver/remote_bootstrap_file_downloader.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tserver/remote_bootstrap_file_downloader.h" |
15 | | |
16 | | #include "yb/common/wire_protocol.h" |
17 | | |
18 | | #include "yb/gutil/casts.h" |
19 | | |
20 | | #include "yb/fs/fs_manager.h" |
21 | | |
22 | | #include "yb/rpc/rpc_controller.h" |
23 | | |
24 | | #include "yb/tserver/remote_bootstrap.proxy.h" |
25 | | |
26 | | #include "yb/util/crc.h" |
27 | | #include "yb/util/flag_tags.h" |
28 | | #include "yb/util/logging.h" |
29 | | #include "yb/util/net/rate_limiter.h" |
30 | | #include "yb/util/size_literals.h" |
31 | | #include "yb/util/status_format.h" |
32 | | |
33 | | using namespace yb::size_literals; |
34 | | |
35 | | DECLARE_uint64(rpc_max_message_size); |
36 | | DEFINE_int32(remote_bootstrap_max_chunk_size, 1_MB, |
37 | | "Maximum chunk size to be transferred at a time during remote bootstrap."); |
38 | | |
39 | | // Deprecated because it's misspelled. But if set, this flag takes precedence over |
40 | | // remote_bootstrap_rate_limit_bytes_per_sec for compatibility. |
41 | | DEFINE_int64(remote_boostrap_rate_limit_bytes_per_sec, 0, |
42 | | "DEPRECATED. Replaced by flag remote_bootstrap_rate_limit_bytes_per_sec."); |
43 | | TAG_FLAG(remote_boostrap_rate_limit_bytes_per_sec, hidden); |
44 | | |
45 | | DEFINE_int64(remote_bootstrap_rate_limit_bytes_per_sec, 256_MB, |
46 | | "Maximum transmission rate during a remote bootstrap. This is across all the remote " |
47 | | "bootstrap sessions for which this process is acting as a sender or receiver. So " |
48 | | "the total limit will be 2 * remote_bootstrap_rate_limit_bytes_per_sec because a " |
49 | | "tserver or master can act both as a sender and receiver at the same time."); |
50 | | |
51 | | DEFINE_int32(bytes_remote_bootstrap_durable_write_mb, 8, |
52 | | "Explicitly call fsync after downloading the specified amount of data in MB " |
53 | | "during a remote bootstrap session. If 0 fsync() is not called."); |
54 | | |
55 | | // RETURN_NOT_OK_PREPEND() with a remote-error unwinding step. |
56 | | #define RETURN_NOT_OK_UNWIND_PREPEND(status, controller, msg) \ |
57 | 11.3k | RETURN_NOT_OK_PREPEND(UnwindRemoteError(status, controller), msg) |
58 | | |
59 | | namespace yb { |
60 | | namespace tserver { |
61 | | |
62 | | namespace { |
63 | | |
64 | | // Decode the remote error into a human-readable Status object. |
65 | | CHECKED_STATUS ExtractRemoteError( |
66 | 2.02k | const rpc::ErrorStatusPB& remote_error, const Status& original_status) { |
67 | 2.02k | if (!remote_error.HasExtension(RemoteBootstrapErrorPB::remote_bootstrap_error_ext)) { |
68 | 0 | return original_status; |
69 | 0 | } |
70 | | |
71 | 2.02k | const RemoteBootstrapErrorPB& error = |
72 | 2.02k | remote_error.GetExtension(RemoteBootstrapErrorPB::remote_bootstrap_error_ext); |
73 | 2.02k | LOG(INFO) << "ExtractRemoteError: " << error.ShortDebugString(); |
74 | 2.02k | return StatusFromPB(error.status()).CloneAndPrepend( |
75 | 2.02k | "Received error code " + RemoteBootstrapErrorPB::Code_Name(error.code()) + |
76 | 2.02k | " from remote service"); |
77 | 2.02k | } |
78 | | |
79 | | } // namespace |
80 | | |
81 | | extern std::atomic<int32_t> remote_bootstrap_clients_started_; |
82 | | |
83 | | RemoteBootstrapFileDownloader::RemoteBootstrapFileDownloader( |
84 | | const std::string* log_prefix, FsManager* fs_manager) |
85 | 2.02k | : log_prefix_(*log_prefix), fs_manager_(*fs_manager) { |
86 | 2.02k | } |
87 | | |
88 | | void RemoteBootstrapFileDownloader::Start( |
89 | | std::shared_ptr<RemoteBootstrapServiceProxy> proxy, std::string session_id, |
90 | 2.02k | MonoDelta session_idle_timeout) { |
91 | 2.02k | proxy_ = std::move(proxy); |
92 | 2.02k | session_id_ = std::move(session_id); |
93 | 2.02k | session_idle_timeout_ = session_idle_timeout; |
94 | 2.02k | } |
95 | | |
96 | 13.1k | Env& RemoteBootstrapFileDownloader::env() const { |
97 | 13.1k | return *fs_manager_.env(); |
98 | 13.1k | } |
99 | | |
100 | | Status RemoteBootstrapFileDownloader::DownloadFile( |
101 | 6.58k | const tablet::FilePB& file_pb, const std::string& dir, DataIdPB *data_id) { |
102 | 6.58k | auto file_path = JoinPathSegments(dir, file_pb.name()); |
103 | 6.58k | RETURN_NOT_OK(env().CreateDirs(DirName(file_path))); |
104 | | |
105 | 6.58k | if (file_pb.inode() != 0) { |
106 | 6.58k | auto it = inode2file_.find(file_pb.inode()); |
107 | 6.58k | if (it != inode2file_.end()) { |
108 | 38 | VLOG_WITH_PREFIX0 (2) << "File with the same inode already found: " << file_path |
109 | 0 | << " => " << it->second; |
110 | 38 | auto link_status = env().LinkFile(it->second, file_path); |
111 | 38 | if (link_status.ok()) { |
112 | 38 | return Status::OK(); |
113 | 38 | } |
114 | | // TODO fallback to copy. |
115 | 0 | LOG_WITH_PREFIX(ERROR) << "Failed to link file: " << file_path << " => " << it->second |
116 | 0 | << ": " << link_status; |
117 | 0 | } |
118 | 6.58k | } |
119 | | |
120 | 6.54k | WritableFileOptions opts; |
121 | 6.54k | opts.sync_on_close = true; |
122 | 6.54k | std::unique_ptr<WritableFile> file; |
123 | 6.54k | RETURN_NOT_OK(env().NewWritableFile(opts, file_path, &file)); |
124 | | |
125 | 6.54k | data_id->set_file_name(file_pb.name()); |
126 | 6.54k | RETURN_NOT_OK_PREPEND(DownloadFile(*data_id, file.get()), |
127 | 6.54k | Format("Unable to download $0 file $1", |
128 | 6.54k | DataIdPB::IdType_Name(data_id->type()), file_path)); |
129 | 6.54k | VLOG_WITH_PREFIX1 (2) << "Downloaded file " << file_path1 ; |
130 | | |
131 | 6.54k | if (file_pb.inode() != 0) { |
132 | 6.54k | inode2file_.emplace(file_pb.inode(), file_path); |
133 | 6.54k | } |
134 | | |
135 | 6.54k | return Status::OK(); |
136 | 6.54k | } |
137 | | |
138 | | template<class Appendable> |
139 | | Status RemoteBootstrapFileDownloader::DownloadFile( |
140 | 11.2k | const DataIdPB& data_id, Appendable* appendable) { |
141 | 11.2k | constexpr int kBytesReservedForMessageHeaders = 16384; |
142 | | |
143 | | // For periodic sync, indicates number of bytes which need to be sync'ed. |
144 | 11.2k | size_t periodic_sync_unsynced_bytes = 0; |
145 | 11.2k | uint64_t offset = 0; |
146 | 11.2k | auto max_length = std::min<size_t>(FLAGS_remote_bootstrap_max_chunk_size, |
147 | 11.2k | FLAGS_rpc_max_message_size - kBytesReservedForMessageHeaders); |
148 | | |
149 | 11.2k | std::unique_ptr<RateLimiter> rate_limiter; |
150 | | |
151 | 11.2k | if (FLAGS_remote_bootstrap_rate_limit_bytes_per_sec > 0) { |
152 | 22.6k | static auto rate_updater = []() { |
153 | 22.6k | auto remote_bootstrap_clients_started = |
154 | 22.6k | remote_bootstrap_clients_started_.load(std::memory_order_acquire); |
155 | 22.6k | if (remote_bootstrap_clients_started < 1) { |
156 | 0 | YB_LOG_EVERY_N(ERROR, 100) << "Invalid number of remote bootstrap sessions: " |
157 | 0 | << remote_bootstrap_clients_started; |
158 | 0 | return static_cast<uint64_t>(FLAGS_remote_bootstrap_rate_limit_bytes_per_sec); |
159 | 0 | } |
160 | 22.6k | return static_cast<uint64_t>( |
161 | 22.6k | FLAGS_remote_bootstrap_rate_limit_bytes_per_sec / remote_bootstrap_clients_started); |
162 | 22.6k | }; |
163 | | |
164 | 11.2k | rate_limiter = std::make_unique<RateLimiter>(rate_updater); |
165 | 11.2k | } else { |
166 | | // Inactive RateLimiter. |
167 | 0 | rate_limiter = std::make_unique<RateLimiter>(); |
168 | 0 | } |
169 | | |
170 | 11.2k | rpc::RpcController controller; |
171 | 11.2k | controller.set_timeout(session_idle_timeout_); |
172 | 11.2k | FetchDataRequestPB req; |
173 | | |
174 | 11.2k | bool done = false; |
175 | 20.5k | while (!done) { |
176 | 11.3k | controller.Reset(); |
177 | 11.3k | req.set_session_id(session_id_); |
178 | 11.3k | req.mutable_data_id()->CopyFrom(data_id); |
179 | 11.3k | req.set_offset(offset); |
180 | 11.3k | if (rate_limiter->active()11.3k ) { |
181 | 11.3k | auto max_size = rate_limiter->GetMaxSizeForNextTransmission(); |
182 | 11.3k | if (max_size > std::numeric_limits<decltype(max_length)>::max()) { |
183 | 0 | max_size = std::numeric_limits<decltype(max_length)>::max(); |
184 | 0 | } |
185 | 11.3k | max_length = std::min(max_length, decltype(max_length)(max_size)); |
186 | 11.3k | } |
187 | 11.3k | req.set_max_length(max_length); |
188 | | |
189 | 11.3k | FetchDataResponsePB resp; |
190 | 11.3k | auto status = rate_limiter->SendOrReceiveData([this, &req, &resp, &controller]() { |
191 | 11.3k | return proxy_->FetchData(req, &resp, &controller); |
192 | 11.3k | }, [&resp]() { return resp.ByteSize(); }9.30k ); |
193 | 11.3k | RETURN_NOT_OK_UNWIND_PREPEND(status, controller, "Unable to fetch data from remote"); |
194 | 9.29k | DCHECK_LE(resp.chunk().data().size(), max_length); |
195 | | |
196 | | // Sanity-check for corruption. |
197 | 9.29k | RETURN_NOT_OK_PREPEND(VerifyData(offset, resp.chunk()), |
198 | 9.29k | Format("Error validating data item $0", data_id)); |
199 | | |
200 | | // Write the data. |
201 | 9.29k | RETURN_NOT_OK(appendable->Append(resp.chunk().data())); |
202 | 18.4E | VLOG_WITH_PREFIX(3) |
203 | 18.4E | << "resp size: " << resp.ByteSize() << ", chunk size: " << resp.chunk().data().size(); |
204 | | |
205 | 9.29k | if (offset + resp.chunk().data().size() == |
206 | 9.29k | implicit_cast<size_t>(resp.chunk().total_data_length())) { |
207 | 9.23k | done = true; |
208 | 9.23k | } |
209 | 9.29k | offset += resp.chunk().data().size(); |
210 | 9.30k | if (FLAGS_bytes_remote_bootstrap_durable_write_mb != 09.29k ) { |
211 | 9.30k | periodic_sync_unsynced_bytes += resp.chunk().data().size(); |
212 | 9.30k | if (periodic_sync_unsynced_bytes > FLAGS_bytes_remote_bootstrap_durable_write_mb * 1_MB) { |
213 | 0 | RETURN_NOT_OK(appendable->Sync()); |
214 | 0 | periodic_sync_unsynced_bytes = 0; |
215 | 0 | } |
216 | 9.30k | } |
217 | 9.29k | } |
218 | | |
219 | 18.4E | VLOG_WITH_PREFIX(2) << "Transmission rate: " << rate_limiter->GetRate(); |
220 | | |
221 | 9.23k | return Status::OK(); |
222 | 11.2k | } |
223 | | |
224 | 9.29k | Status RemoteBootstrapFileDownloader::VerifyData(uint64_t offset, const DataChunkPB& chunk) { |
225 | | // Verify the offset is what we expected. |
226 | 9.29k | if (offset != chunk.offset()) { |
227 | 0 | return STATUS_FORMAT( |
228 | 0 | InvalidArgument, "Offset did not match what was asked for $0 vs $1", |
229 | 0 | offset, chunk.offset()); |
230 | 0 | } |
231 | | |
232 | | // Verify the checksum. |
233 | 9.29k | uint32_t crc32 = crc::Crc32c(chunk.data().data(), chunk.data().length()); |
234 | 9.29k | if (PREDICT_FALSE(crc32 != chunk.crc32())) { |
235 | 0 | return STATUS_FORMAT( |
236 | 0 | Corruption, "CRC32 does not match at offset $0 size $1: $2 vs $3", |
237 | 0 | offset, chunk.data().size(), crc32, chunk.crc32()); |
238 | 0 | } |
239 | 9.29k | return Status::OK(); |
240 | 9.29k | } |
241 | | |
242 | | // Enhance a RemoteError Status message with additional details from the remote. |
243 | 13.3k | Status UnwindRemoteError(const Status& status, const rpc::RpcController& controller) { |
244 | 13.3k | if (!status.IsRemoteError()) { |
245 | 11.3k | return status; |
246 | 11.3k | } |
247 | 2.02k | return ExtractRemoteError(*controller.error_response(), status); |
248 | 13.3k | } |
249 | | |
250 | | } // namespace tserver |
251 | | } // namespace yb |