YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tserver/remote_bootstrap_session.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tserver/remote_bootstrap_session.h"
34
35
#include <boost/optional.hpp>
36
#include <glog/logging.h>
37
38
#include "yb/consensus/consensus.h"
39
#include "yb/consensus/log.h"
40
#include "yb/consensus/opid_util.h"
41
42
#include "yb/gutil/casts.h"
43
#include "yb/gutil/strings/substitute.h"
44
#include "yb/gutil/type_traits.h"
45
46
#include "yb/tablet/tablet.h"
47
#include "yb/tablet/tablet_metadata.h"
48
#include "yb/tablet/tablet_peer.h"
49
#include "yb/tablet/tablet_snapshots.h"
50
51
#include "yb/tserver/remote_bootstrap_snapshots.h"
52
53
#include "yb/util/env_util.h"
54
#include "yb/util/logging.h"
55
#include "yb/util/size_literals.h"
56
#include "yb/util/status_format.h"
57
#include "yb/util/status_log.h"
58
#include "yb/util/stopwatch.h"
59
#include "yb/util/trace.h"
60
61
DECLARE_uint64(rpc_max_message_size);
62
DECLARE_int64(remote_bootstrap_rate_limit_bytes_per_sec);
63
64
namespace yb {
65
namespace tserver {
66
67
using std::shared_ptr;
68
using std::vector;
69
using std::string;
70
71
using consensus::MinimumOpId;
72
using consensus::PeerMemberType;
73
using consensus::RaftPeerPB;
74
using log::LogAnchorRegistry;
75
using log::ReadableLogSegment;
76
using strings::Substitute;
77
using tablet::RaftGroupMetadata;
78
using tablet::RaftGroupMetadataPtr;
79
using tablet::TabletPeer;
80
using tablet::RaftGroupReplicaSuperBlockPB;
81
82
RemoteBootstrapSession::RemoteBootstrapSession(
83
    const std::shared_ptr<TabletPeer>& tablet_peer, std::string session_id,
84
    std::string requestor_uuid, const std::atomic<int>* nsessions)
85
    : tablet_peer_(tablet_peer),
86
      session_id_(std::move(session_id)),
87
      requestor_uuid_(std::move(requestor_uuid)),
88
      succeeded_(false),
89
1.44k
      nsessions_(nsessions) {
90
1.44k
  AddSource<RemoteBootstrapSnapshotsSource>();
91
1.44k
}
92
93
960
RemoteBootstrapSession::~RemoteBootstrapSession() {
94
  // No lock taken in the destructor, should only be 1 thread with access now.
95
960
  CHECK_OK(UnregisterAnchorIfNeededUnlocked());
96
97
  // Delete checkpoint directory.
98
960
  if (!checkpoint_dir_.empty()) {
99
960
    auto s = env()->DeleteRecursively(checkpoint_dir_);
100
960
    if (!s.ok()) {
101
0
      LOG(WARNING) << "Unable to delete checkpoint directory " << checkpoint_dir_;
102
960
    } else {
103
960
      LOG(INFO) << "Successfully deleted checkpoint directory " << checkpoint_dir_;
104
960
    }
105
0
  } else {
106
0
    LOG(INFO) << "No checkpoint directory was created for this session";
107
0
  }
108
109
960
}
110
111
1.00k
Status RemoteBootstrapSession::ChangeRole() {
112
1.00k
  CHECK(Succeeded());
113
114
1.00k
  shared_ptr<consensus::Consensus> consensus = tablet_peer_->shared_consensus();
115
  // This check fixes an issue with test TestDeleteTabletDuringRemoteBootstrap in which a tablet is
116
  // tombstoned while the bootstrap is happening. This causes the peer's consensus object to be
117
  // null.
118
1.00k
  if (!consensus) {
119
0
    tablet::RaftGroupStatePB tablet_state = tablet_peer_->state();
120
0
    return STATUS(IllegalState, Substitute(
121
0
        "Unable to change role for server $0 in config for tablet $1. Consensus is not available. "
122
0
        "Tablet state: $2 ($3)", requestor_uuid_, tablet_peer_->tablet_id(),
123
0
        tablet::RaftGroupStatePB_Name(tablet_state), tablet_state));
124
0
  }
125
126
  // If peer being bootstrapped is already a VOTER, don't send the ChangeConfig request. This could
127
  // happen when a tserver that is already a VOTER in the configuration tombstones its tablet, and
128
  // the leader starts bootstrapping it.
129
1.00k
  const consensus::RaftConfigPB config = tablet_peer_->RaftConfig();
130
4.15k
  for (const RaftPeerPB& peer_pb : config.peers()) {
131
4.15k
    if (peer_pb.permanent_uuid() != requestor_uuid_) {
132
3.18k
      continue;
133
3.18k
    }
134
135
965
    switch(peer_pb.member_type()) {
136
0
      case PeerMemberType::OBSERVER: FALLTHROUGH_INTENDED;
137
8
      case PeerMemberType::VOTER:
138
8
        LOG(ERROR) << "Peer " << peer_pb.permanent_uuid() << " is a "
139
8
                   << PeerMemberType_Name(peer_pb.member_type())
140
8
                   << " Not changing its role after remote bootstrap";
141
142
        // Even though this is an error, we return Status::OK() so the remote server doesn't
143
        // tombstone its tablet.
144
8
        return Status::OK();
145
146
57
      case PeerMemberType::PRE_OBSERVER: FALLTHROUGH_INTENDED;
147
957
      case PeerMemberType::PRE_VOTER: {
148
957
        consensus::ChangeConfigRequestPB req;
149
957
        consensus::ChangeConfigResponsePB resp;
150
151
957
        req.set_tablet_id(tablet_peer_->tablet_id());
152
957
        req.set_type(consensus::CHANGE_ROLE);
153
957
        RaftPeerPB* peer = req.mutable_server();
154
957
        peer->set_permanent_uuid(requestor_uuid_);
155
156
957
        boost::optional<TabletServerErrorPB::Code> error_code;
157
158
957
        LOG(INFO) << "Changing config with request: { " << req.ShortDebugString() << " } "
159
957
                  << "in bootstrap session " << session_id_;
160
161
        // If another ChangeConfig is being processed, our request will be rejected.
162
957
        return consensus->ChangeConfig(req, &DoNothingStatusCB, &error_code);
163
57
      }
164
0
      case PeerMemberType::UNKNOWN_MEMBER_TYPE:
165
0
        return STATUS(IllegalState, Substitute("Unable to change role for peer $0 in config for "
166
0
                                               "tablet $1. Peer has an invalid member type $2",
167
0
                                               peer_pb.permanent_uuid(), tablet_peer_->tablet_id(),
168
0
                                               PeerMemberType_Name(peer_pb.member_type())));
169
0
    }
170
0
    LOG(FATAL) << "Unexpected peer member type "
171
0
               << PeerMemberType_Name(peer_pb.member_type());
172
0
  }
173
35
  return STATUS(IllegalState, Substitute("Unable to find peer $0 in config for tablet $1",
174
1.00k
                                         requestor_uuid_, tablet_peer_->tablet_id()));
175
1.00k
}
176
177
1.44k
Status RemoteBootstrapSession::SetInitialCommittedState() {
178
1.44k
  shared_ptr <consensus::Consensus> consensus = tablet_peer_->shared_consensus();
179
1.44k
  if (!consensus) {
180
0
    tablet::RaftGroupStatePB tablet_state = tablet_peer_->state();
181
0
    return STATUS(IllegalState,
182
0
                  Substitute("Unable to initialize remote bootstrap session "
183
0
                             "for tablet $0. Consensus is not available. Tablet state: $1 ($2)",
184
0
                             tablet_peer_->tablet_id(), tablet::RaftGroupStatePB_Name(tablet_state),
185
0
                             tablet_state));
186
0
  }
187
1.44k
  initial_committed_cstate_ = consensus->ConsensusState(consensus::CONSENSUS_CONFIG_COMMITTED);
188
1.44k
  return Status::OK();
189
1.44k
}
190
191
1.33k
Result<google::protobuf::RepeatedPtrField<tablet::FilePB>> ListFiles(const std::string& dir) {
192
1.33k
  std::vector<std::string> files;
193
1.33k
  auto env = Env::Default();
194
1.33k
  auto status = env->GetChildren(dir, ExcludeDots::kTrue, &files);
195
1.33k
  if (!status.ok()) {
196
0
    return STATUS(IllegalState, Substitute("Unable to get RocksDB files in dir $0: $1", dir,
197
0
                                           status.ToString()));
198
0
  }
199
200
1.33k
  google::protobuf::RepeatedPtrField<tablet::FilePB> result;
201
1.33k
  result.Reserve(narrow_cast<int>(files.size()));
202
4.49k
  for (const auto& file : files) {
203
4.49k
    auto full_path = JoinPathSegments(dir, file);
204
4.49k
    if (VERIFY_RESULT(env->IsDirectory(full_path))) {
205
55
      auto sub_files = VERIFY_RESULT(ListFiles(full_path));
206
114
      for (auto& subfile : sub_files) {
207
114
        subfile.set_name(JoinPathSegments(file, subfile.name()));
208
114
        *result.Add() = std::move(subfile);
209
114
      }
210
55
      continue;
211
4.44k
    }
212
4.44k
    auto file_pb = result.Add();
213
4.44k
    file_pb->set_name(file);
214
4.44k
    file_pb->set_size_bytes(VERIFY_RESULT(env->GetFileSize(full_path)));
215
4.44k
    file_pb->set_inode(VERIFY_RESULT(env->GetFileINode(full_path)));
216
4.44k
  }
217
218
1.33k
  return result;
219
1.33k
}
220
221
const std::string RemoteBootstrapSession::kCheckpointsDir = "checkpoints";
222
223
1.44k
Status RemoteBootstrapSession::Init() {
224
  // Take locks to support re-initialization of the same session.
225
1.44k
  std::lock_guard<std::mutex> lock(mutex_);
226
1.44k
  RETURN_NOT_OK(UnregisterAnchorIfNeededUnlocked());
227
228
1.44k
  const string& tablet_id = tablet_peer_->tablet_id();
229
230
  // Prevent log GC while we grab log segments and Tablet metadata.
231
1.44k
  string anchor_owner_token = Substitute("RemoteBootstrap-$0", session_id_);
232
1.44k
  tablet_peer_->log_anchor_registry()->Register(
233
1.44k
      MinimumOpId().index(), anchor_owner_token, &log_anchor_);
234
235
  // Read the SuperBlock from disk.
236
1.44k
  const RaftGroupMetadataPtr& metadata = tablet_peer_->tablet_metadata();
237
1.44k
  RETURN_NOT_OK_PREPEND(metadata->ReadSuperBlockFromDisk(&tablet_superblock_),
238
1.44k
                        Substitute("Unable to access superblock for tablet $0",
239
1.44k
                                   tablet_id));
240
241
1.44k
  if (!tablet_peer_->log_available()) {
242
0
    return STATUS(IllegalState, "Tablet is not running (log is uninitialized)");
243
0
  }
244
  // Get the latest opid in the log at this point in time so we can re-anchor.
245
1.44k
  auto last_logged_opid = tablet_peer_->GetLatestLogEntryOpId();
246
247
1.44k
  auto tablet = tablet_peer_->shared_tablet();
248
1.44k
  if (PREDICT_FALSE(!tablet)) {
249
0
    return STATUS(IllegalState, "Tablet is not running");
250
0
  }
251
252
1.44k
  MonoTime now = MonoTime::Now();
253
1.44k
  auto* kv_store = tablet_superblock_.mutable_kv_store();
254
1.44k
  const auto checkpoints_dir = JoinPathSegments(kv_store->rocksdb_dir(), kCheckpointsDir);
255
256
1.44k
  auto session_checkpoint_dir = std::to_string(last_logged_opid.index) + "_" + now.ToString();
257
1.44k
  checkpoint_dir_ = JoinPathSegments(checkpoints_dir, session_checkpoint_dir);
258
259
  // Clear any previous RocksDB files in the superblock. Each session should create a new list
260
  // based the checkpoint directory files.
261
1.44k
  kv_store->clear_rocksdb_files();
262
1.44k
  auto status = tablet->snapshots().CreateCheckpoint(checkpoint_dir_);
263
1.44k
  if (status.ok()) {
264
1.28k
    *kv_store->mutable_rocksdb_files() = VERIFY_RESULT(ListFiles(checkpoint_dir_));
265
168
  } else if (!status.IsNotSupported()) {
266
0
    RETURN_NOT_OK(status);
267
0
  }
268
269
7.23k
  for (const auto& source : sources_) {
270
7.23k
    if (source) {
271
1.44k
      RETURN_NOT_OK(source->Init());
272
1.44k
    }
273
7.23k
  }
274
275
  // Get the current segments from the log, including the active segment.
276
  // The Log doesn't add the active segment to the log reader's list until
277
  // a header has been written to it (but it will not have a footer).
278
1.44k
  RETURN_NOT_OK(tablet_peer_->log()->GetSegmentsSnapshot(&log_segments_));
279
1.44k
  log_anchor_index_ = last_logged_opid.index;
280
1.44k
  for (const auto& log_segment : log_segments_) {
281
1.44k
    if (log_segment->HasFooter() && log_segment->footer().has_min_replicate_index()) {
282
236
      log_anchor_index_ = log_segment->footer().min_replicate_index();
283
236
      break;
284
236
    }
285
1.44k
  }
286
287
  // Re-anchor on the highest OpId that was in the log right before we
288
  // snapshotted the log segments. This helps ensure that we don't end up in a
289
  // remote bootstrap loop due to a follower falling too far behind the
290
  // leader's log when remote bootstrap is slow. The remote controls when
291
  // this anchor is released by ending the remote bootstrap session.
292
1.44k
  RETURN_NOT_OK(tablet_peer_->log_anchor_registry()->UpdateRegistration(
293
1.44k
      log_anchor_index_, &log_anchor_));
294
295
  // Look up the committed consensus state.
296
  // We do this after snapshotting the log for YB table types to avoid a scenario where the latest
297
  // entry in the log has a term higher than the term stored in the consensus metadata, which
298
  // will result in a CHECK failure on RaftConsensus init.
299
1.44k
  RETURN_NOT_OK(SetInitialCommittedState());
300
301
1.44k
  start_time_ = MonoTime::Now();
302
303
1.44k
  return Status::OK();
304
1.44k
}
305
306
10
const std::string& RemoteBootstrapSession::tablet_id() const {
307
10
  return tablet_peer_->tablet_id();
308
10
}
309
310
957
const std::string& RemoteBootstrapSession::requestor_uuid() const {
311
957
  return requestor_uuid_;
312
957
}
313
314
namespace {
315
316
// Determine the length of the data chunk to return to the client.
317
4.82k
int64_t DetermineReadLength(int64_t bytes_remaining, int64_t requested_len) {
318
  // Determine the size of the chunks we want to read.
319
  // Choose "system max" as a multiple of typical HDD block size (4K) with 4K to
320
  // spare for other stuff in the message, like headers, other protobufs, etc.
321
4.82k
  const int32_t kSpareBytes = 4096;
322
4.82k
  const int32_t kDiskSectorSize = 4096;
323
4.82k
  auto system_max_chunk_size =
324
4.82k
      ((FLAGS_rpc_max_message_size - kSpareBytes) / kDiskSectorSize) * kDiskSectorSize;
325
0
  CHECK_GT(system_max_chunk_size, 0) << "rpc_max_message_size is too low to transfer data: "
326
0
                                     << FLAGS_rpc_max_message_size;
327
328
  // The min of the {requested, system} maxes is the effective max.
329
4.82k
  int64_t maxlen = requested_len > 0 ? std::min<int64_t>(requested_len, system_max_chunk_size)
330
0
                                     : system_max_chunk_size;
331
4.82k
  return std::min(bytes_remaining, maxlen);
332
4.82k
}
333
334
// Calculate the size of the data to return given a maximum client message
335
// length, the file itself, and the offset into the file to be read from.
336
4.82k
Result<int64_t> GetResponseDataSize(GetDataPieceInfo* info) {
337
  // If requested offset is off the end of the data, bail.
338
4.82k
  if (info->offset >= info->data_size) {
339
0
    info->error_code = RemoteBootstrapErrorPB::INVALID_REMOTE_BOOTSTRAP_REQUEST;
340
0
    return STATUS_FORMAT(InvalidArgument,
341
0
                         "Requested offset ($0) is beyond the data size ($1)",
342
0
                         info->offset, info->data_size);
343
0
  }
344
345
4.82k
  auto result = DetermineReadLength(info->bytes_remaining(), info->client_maxlen);
346
4.82k
  DCHECK_GT(result, 0);
347
4.82k
  if (info->client_maxlen > 0) {
348
4.82k
    DCHECK_LE(result, info->client_maxlen);
349
4.82k
  }
350
351
4.82k
  return result;
352
4.82k
}
353
354
// Read a chunk of a file into a buffer.
355
// data_name provides a string for the block/log to be used in error messages.
356
4.82k
Status ReadFileChunkToBuf(RandomAccessFile* file, const string& data_name, GetDataPieceInfo* info) {
357
4.82k
  auto response_data_size = VERIFY_RESULT_PREPEND(
358
4.82k
      GetResponseDataSize(info), Format("Error reading $0", data_name));
359
360
4.82k
  Stopwatch chunk_timer(Stopwatch::THIS_THREAD);
361
4.82k
  chunk_timer.start();
362
363
  // Writing into a std::string buffer is basically guaranteed to work on C++11,
364
  // however any modern compiler should be compatible with it.
365
  // Violates the API contract, but avoids excessive copies.
366
4.82k
  info->data.resize(response_data_size);
367
4.82k
  auto buf = reinterpret_cast<uint8_t*>(const_cast<char*>(info->data.data()));
368
4.82k
  Slice slice;
369
4.82k
  Status s = env_util::ReadFully(file, info->offset, response_data_size, &slice, buf);
370
4.82k
  if (PREDICT_FALSE(!s.ok())) {
371
0
    s = s.CloneAndPrepend(Format("Unable to read existing file for $0", data_name));
372
0
    LOG(WARNING) << s;
373
0
    info->error_code = RemoteBootstrapErrorPB::IO_ERROR;
374
0
    return s;
375
0
  }
376
  // Figure out if Slice points to buf or if Slice points to the mmap.
377
  // If it points to the mmap then copy into buf.
378
4.82k
  if (slice.data() != buf) {
379
0
    memcpy(buf, slice.data(), slice.size());
380
0
  }
381
4.82k
  chunk_timer.stop();
382
4.82k
  TRACE("Remote bootstrap: $0: $1 total bytes read. Total time elapsed: $2",
383
4.82k
        data_name, response_data_size, chunk_timer.elapsed().ToString());
384
385
4.82k
  return Status::OK();
386
4.82k
}
387
388
} // namespace
389
390
4.46k
Env* RemoteBootstrapSession::env() const {
391
4.46k
  return tablet_peer_->tablet_metadata()->fs_manager()->env();
392
4.46k
}
393
394
5.82k
RemoteBootstrapSource* RemoteBootstrapSession::Source(DataIdPB::IdType id_type) const {
395
5.82k
  size_t idx = id_type;
396
18.4E
  return idx < sources_.size() ? sources_[idx].get() : nullptr;
397
5.82k
}
398
399
5.83k
Status RemoteBootstrapSession::ValidateDataId(const yb::tserver::DataIdPB& data_id) {
400
5.83k
  const auto& source = Source(data_id.type());
401
402
5.83k
  if (source) {
403
18
    return source->ValidateDataId(data_id);
404
18
  }
405
406
5.81k
  switch (data_id.type()) {
407
2.30k
    case DataIdPB::LOG_SEGMENT:
408
2.30k
      if (PREDICT_FALSE(!data_id.wal_segment_seqno())) {
409
0
        return STATUS(InvalidArgument,
410
0
            "segment sequence number must be specified for type == LOG_SEGMENT",
411
0
            data_id.ShortDebugString());
412
0
      }
413
2.30k
      return Status::OK();
414
3.50k
    case DataIdPB::ROCKSDB_FILE:
415
3.50k
      if (PREDICT_FALSE(data_id.file_name().empty())) {
416
0
        return STATUS(InvalidArgument,
417
0
            "file name must be specified for type == ROCKSDB_FILE",
418
0
            data_id.ShortDebugString());
419
0
      }
420
3.50k
      return Status::OK();
421
0
    case DataIdPB::SNAPSHOT_FILE: FALLTHROUGH_INTENDED;
422
0
    case DataIdPB::UNKNOWN:
423
0
      return STATUS(InvalidArgument, "Type not supported", data_id.ShortDebugString());
424
0
  }
425
0
  LOG(FATAL) << "Invalid data id type: " << data_id.type();
426
0
}
427
428
5.82k
Status RemoteBootstrapSession::GetDataPiece(const DataIdPB& data_id, GetDataPieceInfo* info) {
429
5.82k
  const auto& source = sources_[data_id.type()];
430
431
5.82k
  if (source) {
432
    // Fetching a snapshot file chunk.
433
18
    RETURN_NOT_OK_PREPEND(
434
18
        source->GetDataPiece(data_id, info),
435
16
        "Unable to get piece of snapshot file");
436
16
    return Status::OK();
437
5.81k
  }
438
439
440
5.81k
  switch (data_id.type()) {
441
2.30k
    case DataIdPB::LOG_SEGMENT: {
442
      // Fetching a log segment chunk.
443
2.30k
      RETURN_NOT_OK_PREPEND(GetLogSegmentPiece(data_id.wal_segment_seqno(), info),
444
1.30k
                            "Unable to get piece of log segment");
445
1.30k
      break;
446
2.30k
    }
447
3.50k
    case DataIdPB::ROCKSDB_FILE: {
448
      // Fetching a RocksDB file chunk.
449
3.50k
      const string file_name = data_id.file_name();
450
3.50k
      RETURN_NOT_OK_PREPEND(GetRocksDBFilePiece(data_id.file_name(), info),
451
3.50k
                            "Unable to get piece of RocksDB file");
452
3.50k
      break;
453
3.50k
    }
454
0
    default:
455
0
      info->error_code = RemoteBootstrapErrorPB::INVALID_REMOTE_BOOTSTRAP_REQUEST;
456
0
      return STATUS_SUBSTITUTE(InvalidArgument, "Invalid request type $0", data_id.type());
457
4.81k
  }
458
1
  DCHECK(info->client_maxlen == 0 ||
459
1
         info->data.size() <= implicit_cast<size_t>(info->client_maxlen))
460
1
      << "client_maxlen: " << info->client_maxlen << ", data->size(): " << info->data.size();
461
462
4.81k
  return Status::OK();
463
4.81k
}
464
465
2.30k
Status RemoteBootstrapSession::GetLogSegmentPiece(uint64_t segment_seqno, GetDataPieceInfo* info) {
466
2.30k
  std::shared_ptr<RandomAccessFile> file;
467
2.30k
  {
468
2.30k
    std::lock_guard<std::mutex> lock(mutex_);
469
2.30k
    if (opened_log_segment_seqno_ != segment_seqno) {
470
2.24k
      RETURN_NOT_OK(OpenLogSegment(segment_seqno, &info->error_code));
471
2.24k
    }
472
1.30k
    info->data_size = opened_log_segment_file_size_;
473
1.30k
    file = opened_log_segment_file_;
474
1.30k
  }
475
1.30k
  RETURN_NOT_OK(ReadFileChunkToBuf(file.get(), Substitute("log segment $0", segment_seqno), info));
476
477
  // Note: We do not eagerly close log segment files, since we share ownership
478
  // of the LogSegment objects with the Log itself.
479
480
1.30k
  return Status::OK();
481
1.30k
}
482
483
Status RemoteBootstrapSession::GetRocksDBFilePiece(
484
3.50k
    const std::string& file_name, GetDataPieceInfo* info) {
485
3.50k
  return GetFilePiece(checkpoint_dir_, file_name, env(), info);
486
3.50k
}
487
488
Status RemoteBootstrapSession::GetFilePiece(
489
3.52k
    const std::string& path, const std::string& file_name, Env* env, GetDataPieceInfo* info) {
490
3.52k
  auto file_path = JoinPathSegments(path, file_name);
491
3.52k
  if (!env->FileExists(file_path)) {
492
4
    info->error_code = RemoteBootstrapErrorPB::ROCKSDB_FILE_NOT_FOUND;
493
4
    return STATUS(NotFound, Substitute("Unable to find RocksDB file $0 in directory $1",
494
4
                                       file_name, path));
495
4
  }
496
497
3.52k
  std::unique_ptr<RandomAccessFile> readable_file;
498
499
3.52k
  RETURN_NOT_OK(env->NewRandomAccessFile(file_path, &readable_file));
500
501
3.52k
  info->data_size = VERIFY_RESULT(readable_file->Size());
502
3.52k
  auto inode = VERIFY_RESULT(readable_file->INode());
503
2
  VLOG(2) << "Reading RocksDB file. File path: " << file_path << ", file size: " << info->data_size
504
2
          << ", inode: " << inode;
505
506
3.52k
  RETURN_NOT_OK(ReadFileChunkToBuf(
507
3.52k
      readable_file.get(), Substitute("rocksdb file $0", file_name), info));
508
509
3.52k
  return Status::OK();
510
3.52k
}
511
512
// Add a file to the cache and populate the given ImmutableRandomAcccessFileInfo
513
// object with the file ref and size.
514
template <class Collection, class Key, class Readable>
515
static Status AddImmutableFileToMap(Collection* const cache,
516
                                    const Key& key,
517
                                    const Readable& readable,
518
                                    uint64_t size) {
519
  // Sanity check for 0-length files.
520
  if (size == 0) {
521
    return STATUS(Corruption, "Found 0-length object");
522
  }
523
524
  // Looks good, add it to the cache.
525
  typedef typename Collection::mapped_type InfoPtr;
526
  typedef typename InfoPtr::element_type Info;
527
  CHECK(cache->emplace(key, std::make_unique<Info>(readable, size)).second);
528
529
  return Status::OK();
530
}
531
532
Status RemoteBootstrapSession::OpenLogSegment(
533
2.24k
    uint64_t segment_seqno, RemoteBootstrapErrorPB::Code* error_code) {
534
2.24k
  auto active_seqno = tablet_peer_->log()->active_segment_sequence_number();
535
2.24k
  auto log_segment = tablet_peer_->log()->GetSegmentBySequenceNumber(segment_seqno);
536
  // Usually active log segment is extended, while sent of the wire. So we cannot send next segment,
537
  // Otherwise entries at end of previously active log segment could be missing.
538
2.24k
  if (opened_log_segment_active_) {
539
999
    *error_code = RemoteBootstrapErrorPB::WAL_SEGMENT_NOT_FOUND;
540
999
    return STATUS_FORMAT(NotFound, "Already sent active log segment, don't send $0", segment_seqno);
541
999
  }
542
1.24k
  if (!log_segment) {
543
1
    *error_code = RemoteBootstrapErrorPB::WAL_SEGMENT_NOT_FOUND;
544
1
    return STATUS_FORMAT(NotFound, "Log segment $0 not found", segment_seqno);
545
1
  }
546
1.24k
  opened_log_segment_file_size_ = log_segment->readable_up_to() + log_segment->get_header_size();
547
1.24k
  opened_log_segment_seqno_ = segment_seqno;
548
1.24k
  opened_log_segment_file_ = log_segment->readable_file_checkpoint();
549
1.24k
  opened_log_segment_active_ = active_seqno == segment_seqno;
550
551
1.24k
  if (log_segment->HasFooter() &&
552
243
      log_segment->footer().min_replicate_index() > log_anchor_index_) {
553
14
    log_anchor_index_ = log_segment->footer().min_replicate_index();
554
555
    // Update log anchor, since we don't need older logs anymore.
556
14
    auto status = tablet_peer_->log_anchor_registry()->UpdateRegistration(
557
14
        log_anchor_index_, &log_anchor_);
558
14
    if (!status.ok()) {
559
0
      *error_code = RemoteBootstrapErrorPB::UNKNOWN_ERROR;
560
0
      return status;
561
0
    }
562
1.24k
  }
563
564
1.24k
  return Status::OK();
565
1.24k
}
566
567
2.40k
Status RemoteBootstrapSession::UnregisterAnchorIfNeededUnlocked() {
568
2.40k
  return tablet_peer_->log_anchor_registry()->UnregisterIfAnchored(&log_anchor_);
569
2.40k
}
570
571
1.00k
void RemoteBootstrapSession::SetSuccess() {
572
1.00k
  std::lock_guard<std::mutex> lock(mutex_);
573
1.00k
  succeeded_ = true;
574
1.00k
}
575
576
1.01k
bool RemoteBootstrapSession::Succeeded() {
577
1.01k
  std::lock_guard<std::mutex> lock(mutex_);
578
1.01k
  return succeeded_;
579
1.01k
}
580
581
5.83k
void RemoteBootstrapSession::EnsureRateLimiterIsInitialized() {
582
5.83k
  if (!rate_limiter_.IsInitialized()) {
583
1.00k
    InitRateLimiter();
584
1.00k
  }
585
5.83k
}
586
587
588
1.00k
void RemoteBootstrapSession::InitRateLimiter() {
589
1.00k
  if (FLAGS_remote_bootstrap_rate_limit_bytes_per_sec > 0 && nsessions_) {
590
    // Calling SetTargetRateUpdater will activate the rate limiter.
591
10.6k
    rate_limiter_.SetTargetRateUpdater([this]() -> uint64_t {
592
10.6k
      DCHECK_GT(FLAGS_remote_bootstrap_rate_limit_bytes_per_sec, 0);
593
10.6k
      if (FLAGS_remote_bootstrap_rate_limit_bytes_per_sec <= 0) {
594
0
        YB_LOG_EVERY_N(ERROR, 1000)
595
0
          << "Invalid value for remote_bootstrap_rate_limit_bytes_per_sec: "
596
0
          << FLAGS_remote_bootstrap_rate_limit_bytes_per_sec;
597
        // Since the rate limiter is initialized, it's expected that the value of
598
        // FLAGS_remote_bootstrap_rate_limit_bytes_per_sec is greater than 0. Since this is not the
599
        // case, we'll log an error, and set the rate to 50 MB/s.
600
0
        return 50_MB;
601
0
      }
602
10.6k
      auto nsessions = nsessions_->load(std::memory_order_acquire);
603
10.6k
      if (nsessions > 0) {
604
10.6k
        return FLAGS_remote_bootstrap_rate_limit_bytes_per_sec / nsessions;
605
1
      } else {
606
1
        LOG(DFATAL) << "Invalid number of sessions: " << nsessions;
607
1
        return FLAGS_remote_bootstrap_rate_limit_bytes_per_sec;
608
1
      }
609
10.6k
    });
610
1.00k
  }
611
1.00k
  rate_limiter_.Init();
612
1.00k
}
613
614
} // namespace tserver
615
} // namespace yb