YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tserver/remote_bootstrap_file_downloader.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tserver/remote_bootstrap_file_downloader.h"
15
16
#include "yb/common/wire_protocol.h"
17
18
#include "yb/gutil/casts.h"
19
20
#include "yb/fs/fs_manager.h"
21
22
#include "yb/rpc/rpc_controller.h"
23
24
#include "yb/tserver/remote_bootstrap.proxy.h"
25
26
#include "yb/util/crc.h"
27
#include "yb/util/flag_tags.h"
28
#include "yb/util/logging.h"
29
#include "yb/util/net/rate_limiter.h"
30
#include "yb/util/size_literals.h"
31
#include "yb/util/status_format.h"
32
33
using namespace yb::size_literals;
34
35
DECLARE_uint64(rpc_max_message_size);
36
DEFINE_int32(remote_bootstrap_max_chunk_size, 1_MB,
37
             "Maximum chunk size to be transferred at a time during remote bootstrap.");
38
39
// Deprecated because it's misspelled.  But if set, this flag takes precedence over
40
// remote_bootstrap_rate_limit_bytes_per_sec for compatibility.
41
DEFINE_int64(remote_boostrap_rate_limit_bytes_per_sec, 0,
42
             "DEPRECATED. Replaced by flag remote_bootstrap_rate_limit_bytes_per_sec.");
43
TAG_FLAG(remote_boostrap_rate_limit_bytes_per_sec, hidden);
44
45
DEFINE_int64(remote_bootstrap_rate_limit_bytes_per_sec, 256_MB,
46
             "Maximum transmission rate during a remote bootstrap. This is across all the remote "
47
             "bootstrap sessions for which this process is acting as a sender or receiver. So "
48
             "the total limit will be 2 * remote_bootstrap_rate_limit_bytes_per_sec because a "
49
             "tserver or master can act both as a sender and receiver at the same time.");
50
51
DEFINE_int32(bytes_remote_bootstrap_durable_write_mb, 8,
52
             "Explicitly call fsync after downloading the specified amount of data in MB "
53
             "during a remote bootstrap session. If 0 fsync() is not called.");
54
55
// RETURN_NOT_OK_PREPEND() with a remote-error unwinding step.
56
#define RETURN_NOT_OK_UNWIND_PREPEND(status, controller, msg) \
57
11.3k
  RETURN_NOT_OK_PREPEND(UnwindRemoteError(status, controller), msg)
58
59
namespace yb {
60
namespace tserver {
61
62
namespace {
63
64
// Decode the remote error into a human-readable Status object.
65
CHECKED_STATUS ExtractRemoteError(
66
2.02k
    const rpc::ErrorStatusPB& remote_error, const Status& original_status) {
67
2.02k
  if (!remote_error.HasExtension(RemoteBootstrapErrorPB::remote_bootstrap_error_ext)) {
68
0
    return original_status;
69
0
  }
70
71
2.02k
  const RemoteBootstrapErrorPB& error =
72
2.02k
      remote_error.GetExtension(RemoteBootstrapErrorPB::remote_bootstrap_error_ext);
73
2.02k
  LOG(INFO) << "ExtractRemoteError: " << error.ShortDebugString();
74
2.02k
  return StatusFromPB(error.status()).CloneAndPrepend(
75
2.02k
      "Received error code " + RemoteBootstrapErrorPB::Code_Name(error.code()) +
76
2.02k
          " from remote service");
77
2.02k
}
78
79
} // namespace
80
81
extern std::atomic<int32_t> remote_bootstrap_clients_started_;
82
83
RemoteBootstrapFileDownloader::RemoteBootstrapFileDownloader(
84
    const std::string* log_prefix, FsManager* fs_manager)
85
2.02k
    : log_prefix_(*log_prefix), fs_manager_(*fs_manager) {
86
2.02k
}
87
88
void RemoteBootstrapFileDownloader::Start(
89
    std::shared_ptr<RemoteBootstrapServiceProxy> proxy, std::string session_id,
90
2.02k
    MonoDelta session_idle_timeout) {
91
2.02k
  proxy_ = std::move(proxy);
92
2.02k
  session_id_ = std::move(session_id);
93
2.02k
  session_idle_timeout_ = session_idle_timeout;
94
2.02k
}
95
96
13.1k
Env& RemoteBootstrapFileDownloader::env() const {
97
13.1k
  return *fs_manager_.env();
98
13.1k
}
99
100
Status RemoteBootstrapFileDownloader::DownloadFile(
101
6.58k
    const tablet::FilePB& file_pb, const std::string& dir, DataIdPB *data_id) {
102
6.58k
  auto file_path = JoinPathSegments(dir, file_pb.name());
103
6.58k
  RETURN_NOT_OK(env().CreateDirs(DirName(file_path)));
104
105
6.58k
  if (file_pb.inode() != 0) {
106
6.58k
    auto it = inode2file_.find(file_pb.inode());
107
6.58k
    if (it != inode2file_.end()) {
108
38
      
VLOG_WITH_PREFIX0
(2) << "File with the same inode already found: " << file_path
109
0
                          << " => " << it->second;
110
38
      auto link_status = env().LinkFile(it->second, file_path);
111
38
      if (link_status.ok()) {
112
38
        return Status::OK();
113
38
      }
114
      // TODO fallback to copy.
115
0
      LOG_WITH_PREFIX(ERROR) << "Failed to link file: " << file_path << " => " << it->second
116
0
                             << ": " << link_status;
117
0
    }
118
6.58k
  }
119
120
6.54k
  WritableFileOptions opts;
121
6.54k
  opts.sync_on_close = true;
122
6.54k
  std::unique_ptr<WritableFile> file;
123
6.54k
  RETURN_NOT_OK(env().NewWritableFile(opts, file_path, &file));
124
125
6.54k
  data_id->set_file_name(file_pb.name());
126
6.54k
  RETURN_NOT_OK_PREPEND(DownloadFile(*data_id, file.get()),
127
6.54k
                        Format("Unable to download $0 file $1",
128
6.54k
                               DataIdPB::IdType_Name(data_id->type()), file_path));
129
6.54k
  
VLOG_WITH_PREFIX1
(2) << "Downloaded file " << file_path1
;
130
131
6.54k
  if (file_pb.inode() != 0) {
132
6.54k
    inode2file_.emplace(file_pb.inode(), file_path);
133
6.54k
  }
134
135
6.54k
  return Status::OK();
136
6.54k
}
137
138
template<class Appendable>
139
Status RemoteBootstrapFileDownloader::DownloadFile(
140
11.2k
    const DataIdPB& data_id, Appendable* appendable) {
141
11.2k
  constexpr int kBytesReservedForMessageHeaders = 16384;
142
143
  // For periodic sync, indicates number of bytes which need to be sync'ed.
144
11.2k
  size_t periodic_sync_unsynced_bytes = 0;
145
11.2k
  uint64_t offset = 0;
146
11.2k
  auto max_length = std::min<size_t>(FLAGS_remote_bootstrap_max_chunk_size,
147
11.2k
                                     FLAGS_rpc_max_message_size - kBytesReservedForMessageHeaders);
148
149
11.2k
  std::unique_ptr<RateLimiter> rate_limiter;
150
151
11.2k
  if (FLAGS_remote_bootstrap_rate_limit_bytes_per_sec > 0) {
152
22.6k
    static auto rate_updater = []() {
153
22.6k
      auto remote_bootstrap_clients_started =
154
22.6k
          remote_bootstrap_clients_started_.load(std::memory_order_acquire);
155
22.6k
      if (remote_bootstrap_clients_started < 1) {
156
0
        YB_LOG_EVERY_N(ERROR, 100) << "Invalid number of remote bootstrap sessions: "
157
0
                                   << remote_bootstrap_clients_started;
158
0
        return static_cast<uint64_t>(FLAGS_remote_bootstrap_rate_limit_bytes_per_sec);
159
0
      }
160
22.6k
      return static_cast<uint64_t>(
161
22.6k
          FLAGS_remote_bootstrap_rate_limit_bytes_per_sec / remote_bootstrap_clients_started);
162
22.6k
    };
163
164
11.2k
    rate_limiter = std::make_unique<RateLimiter>(rate_updater);
165
11.2k
  } else {
166
    // Inactive RateLimiter.
167
0
    rate_limiter = std::make_unique<RateLimiter>();
168
0
  }
169
170
11.2k
  rpc::RpcController controller;
171
11.2k
  controller.set_timeout(session_idle_timeout_);
172
11.2k
  FetchDataRequestPB req;
173
174
11.2k
  bool done = false;
175
20.5k
  while (!done) {
176
11.3k
    controller.Reset();
177
11.3k
    req.set_session_id(session_id_);
178
11.3k
    req.mutable_data_id()->CopyFrom(data_id);
179
11.3k
    req.set_offset(offset);
180
11.3k
    if (
rate_limiter->active()11.3k
) {
181
11.3k
      auto max_size = rate_limiter->GetMaxSizeForNextTransmission();
182
11.3k
      if (max_size > std::numeric_limits<decltype(max_length)>::max()) {
183
0
        max_size = std::numeric_limits<decltype(max_length)>::max();
184
0
      }
185
11.3k
      max_length = std::min(max_length, decltype(max_length)(max_size));
186
11.3k
    }
187
11.3k
    req.set_max_length(max_length);
188
189
11.3k
    FetchDataResponsePB resp;
190
11.3k
    auto status = rate_limiter->SendOrReceiveData([this, &req, &resp, &controller]() {
191
11.3k
      return proxy_->FetchData(req, &resp, &controller);
192
11.3k
    }, [&resp]() 
{ return resp.ByteSize(); }9.30k
);
193
11.3k
    RETURN_NOT_OK_UNWIND_PREPEND(status, controller, "Unable to fetch data from remote");
194
9.29k
    DCHECK_LE(resp.chunk().data().size(), max_length);
195
196
    // Sanity-check for corruption.
197
9.29k
    RETURN_NOT_OK_PREPEND(VerifyData(offset, resp.chunk()),
198
9.29k
                          Format("Error validating data item $0", data_id));
199
200
    // Write the data.
201
9.29k
    RETURN_NOT_OK(appendable->Append(resp.chunk().data()));
202
18.4E
    VLOG_WITH_PREFIX(3)
203
18.4E
        << "resp size: " << resp.ByteSize() << ", chunk size: " << resp.chunk().data().size();
204
205
9.29k
    if (offset + resp.chunk().data().size() ==
206
9.29k
            implicit_cast<size_t>(resp.chunk().total_data_length())) {
207
9.23k
      done = true;
208
9.23k
    }
209
9.29k
    offset += resp.chunk().data().size();
210
9.30k
    if (
FLAGS_bytes_remote_bootstrap_durable_write_mb != 09.29k
) {
211
9.30k
      periodic_sync_unsynced_bytes += resp.chunk().data().size();
212
9.30k
      if (periodic_sync_unsynced_bytes > FLAGS_bytes_remote_bootstrap_durable_write_mb * 1_MB) {
213
0
        RETURN_NOT_OK(appendable->Sync());
214
0
        periodic_sync_unsynced_bytes = 0;
215
0
      }
216
9.30k
    }
217
9.29k
  }
218
219
18.4E
  VLOG_WITH_PREFIX(2) << "Transmission rate: " << rate_limiter->GetRate();
220
221
9.23k
  return Status::OK();
222
11.2k
}
223
224
9.29k
Status RemoteBootstrapFileDownloader::VerifyData(uint64_t offset, const DataChunkPB& chunk) {
225
  // Verify the offset is what we expected.
226
9.29k
  if (offset != chunk.offset()) {
227
0
    return STATUS_FORMAT(
228
0
        InvalidArgument, "Offset did not match what was asked for $0 vs $1",
229
0
        offset, chunk.offset());
230
0
  }
231
232
  // Verify the checksum.
233
9.29k
  uint32_t crc32 = crc::Crc32c(chunk.data().data(), chunk.data().length());
234
9.29k
  if (PREDICT_FALSE(crc32 != chunk.crc32())) {
235
0
    return STATUS_FORMAT(
236
0
        Corruption, "CRC32 does not match at offset $0 size $1: $2 vs $3",
237
0
        offset, chunk.data().size(), crc32, chunk.crc32());
238
0
  }
239
9.29k
  return Status::OK();
240
9.29k
}
241
242
// Enhance a RemoteError Status message with additional details from the remote.
243
13.3k
Status UnwindRemoteError(const Status& status, const rpc::RpcController& controller) {
244
13.3k
  if (!status.IsRemoteError()) {
245
11.3k
    return status;
246
11.3k
  }
247
2.02k
  return ExtractRemoteError(*controller.error_response(), status);
248
13.3k
}
249
250
} // namespace tserver
251
} // namespace yb