YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tserver/remote_bootstrap_session.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tserver/remote_bootstrap_session.h"
34
35
#include <boost/optional.hpp>
36
#include <glog/logging.h>
37
38
#include "yb/consensus/consensus.h"
39
#include "yb/consensus/log.h"
40
#include "yb/consensus/opid_util.h"
41
42
#include "yb/gutil/casts.h"
43
#include "yb/gutil/strings/substitute.h"
44
#include "yb/gutil/type_traits.h"
45
46
#include "yb/tablet/tablet.h"
47
#include "yb/tablet/tablet_metadata.h"
48
#include "yb/tablet/tablet_peer.h"
49
#include "yb/tablet/tablet_snapshots.h"
50
51
#include "yb/tserver/remote_bootstrap_snapshots.h"
52
53
#include "yb/util/env_util.h"
54
#include "yb/util/logging.h"
55
#include "yb/util/size_literals.h"
56
#include "yb/util/status_format.h"
57
#include "yb/util/status_log.h"
58
#include "yb/util/stopwatch.h"
59
#include "yb/util/trace.h"
60
61
DECLARE_uint64(rpc_max_message_size);
62
DECLARE_int64(remote_bootstrap_rate_limit_bytes_per_sec);
63
64
namespace yb {
65
namespace tserver {
66
67
using std::shared_ptr;
68
using std::vector;
69
using std::string;
70
71
using consensus::MinimumOpId;
72
using consensus::PeerMemberType;
73
using consensus::RaftPeerPB;
74
using log::LogAnchorRegistry;
75
using log::ReadableLogSegment;
76
using strings::Substitute;
77
using tablet::RaftGroupMetadata;
78
using tablet::RaftGroupMetadataPtr;
79
using tablet::TabletPeer;
80
using tablet::RaftGroupReplicaSuperBlockPB;
81
82
RemoteBootstrapSession::RemoteBootstrapSession(
83
    const std::shared_ptr<TabletPeer>& tablet_peer, std::string session_id,
84
    std::string requestor_uuid, const std::atomic<int>* nsessions)
85
    : tablet_peer_(tablet_peer),
86
      session_id_(std::move(session_id)),
87
      requestor_uuid_(std::move(requestor_uuid)),
88
      succeeded_(false),
89
2.03k
      nsessions_(nsessions) {
90
2.03k
  AddSource<RemoteBootstrapSnapshotsSource>();
91
2.03k
}
92
93
1.95k
RemoteBootstrapSession::~RemoteBootstrapSession() {
94
  // No lock taken in the destructor, should only be 1 thread with access now.
95
1.95k
  CHECK_OK(UnregisterAnchorIfNeededUnlocked());
96
97
  // Delete checkpoint directory.
98
1.95k
  if (!checkpoint_dir_.empty()) {
99
1.95k
    auto s = env()->DeleteRecursively(checkpoint_dir_);
100
1.95k
    if (!s.ok()) {
101
0
      LOG(WARNING) << "Unable to delete checkpoint directory " << checkpoint_dir_;
102
1.95k
    } else {
103
1.95k
      LOG(INFO) << "Successfully deleted checkpoint directory " << checkpoint_dir_;
104
1.95k
    }
105
1.95k
  } else {
106
0
    LOG(INFO) << "No checkpoint directory was created for this session";
107
0
  }
108
109
1.95k
}
110
111
2.02k
Status RemoteBootstrapSession::ChangeRole() {
112
2.02k
  CHECK(Succeeded());
113
114
2.02k
  shared_ptr<consensus::Consensus> consensus = tablet_peer_->shared_consensus();
115
  // This check fixes an issue with test TestDeleteTabletDuringRemoteBootstrap in which a tablet is
116
  // tombstoned while the bootstrap is happening. This causes the peer's consensus object to be
117
  // null.
118
2.02k
  if (!consensus) {
119
0
    tablet::RaftGroupStatePB tablet_state = tablet_peer_->state();
120
0
    return STATUS(IllegalState, Substitute(
121
0
        "Unable to change role for server $0 in config for tablet $1. Consensus is not available. "
122
0
        "Tablet state: $2 ($3)", requestor_uuid_, tablet_peer_->tablet_id(),
123
0
        tablet::RaftGroupStatePB_Name(tablet_state), tablet_state));
124
0
  }
125
126
  // If peer being bootstrapped is already a VOTER, don't send the ChangeConfig request. This could
127
  // happen when a tserver that is already a VOTER in the configuration tombstones its tablet, and
128
  // the leader starts bootstrapping it.
129
2.02k
  const consensus::RaftConfigPB config = tablet_peer_->RaftConfig();
130
7.99k
  for (const RaftPeerPB& peer_pb : config.peers()) {
131
7.99k
    if (peer_pb.permanent_uuid() != requestor_uuid_) {
132
6.04k
      continue;
133
6.04k
    }
134
135
1.95k
    switch(peer_pb.member_type()) {
136
0
      case PeerMemberType::OBSERVER: FALLTHROUGH_INTENDED;
137
6
      case PeerMemberType::VOTER:
138
6
        LOG(ERROR) << "Peer " << peer_pb.permanent_uuid() << " is a "
139
6
                   << PeerMemberType_Name(peer_pb.member_type())
140
6
                   << " Not changing its role after remote bootstrap";
141
142
        // Even though this is an error, we return Status::OK() so the remote server doesn't
143
        // tombstone its tablet.
144
6
        return Status::OK();
145
146
77
      case PeerMemberType::PRE_OBSERVER: FALLTHROUGH_INTENDED;
147
1.95k
      case PeerMemberType::PRE_VOTER: {
148
1.95k
        consensus::ChangeConfigRequestPB req;
149
1.95k
        consensus::ChangeConfigResponsePB resp;
150
151
1.95k
        req.set_tablet_id(tablet_peer_->tablet_id());
152
1.95k
        req.set_type(consensus::CHANGE_ROLE);
153
1.95k
        RaftPeerPB* peer = req.mutable_server();
154
1.95k
        peer->set_permanent_uuid(requestor_uuid_);
155
156
1.95k
        boost::optional<TabletServerErrorPB::Code> error_code;
157
158
1.95k
        LOG(INFO) << "Changing config with request: { " << req.ShortDebugString() << " } "
159
1.95k
                  << "in bootstrap session " << session_id_;
160
161
        // If another ChangeConfig is being processed, our request will be rejected.
162
1.95k
        return consensus->ChangeConfig(req, &DoNothingStatusCB, &error_code);
163
77
      }
164
0
      case PeerMemberType::UNKNOWN_MEMBER_TYPE:
165
0
        return STATUS(IllegalState, Substitute("Unable to change role for peer $0 in config for "
166
1.95k
                                               "tablet $1. Peer has an invalid member type $2",
167
1.95k
                                               peer_pb.permanent_uuid(), tablet_peer_->tablet_id(),
168
1.95k
                                               PeerMemberType_Name(peer_pb.member_type())));
169
1.95k
    }
170
0
    LOG(FATAL) << "Unexpected peer member type "
171
0
               << PeerMemberType_Name(peer_pb.member_type());
172
0
  }
173
65
  return STATUS(IllegalState, Substitute("Unable to find peer $0 in config for tablet $1",
174
2.02k
                                         requestor_uuid_, tablet_peer_->tablet_id()));
175
2.02k
}
176
177
2.03k
Status RemoteBootstrapSession::SetInitialCommittedState() {
178
2.03k
  shared_ptr <consensus::Consensus> consensus = tablet_peer_->shared_consensus();
179
2.03k
  if (!consensus) {
180
0
    tablet::RaftGroupStatePB tablet_state = tablet_peer_->state();
181
0
    return STATUS(IllegalState,
182
0
                  Substitute("Unable to initialize remote bootstrap session "
183
0
                             "for tablet $0. Consensus is not available. Tablet state: $1 ($2)",
184
0
                             tablet_peer_->tablet_id(), tablet::RaftGroupStatePB_Name(tablet_state),
185
0
                             tablet_state));
186
0
  }
187
2.03k
  initial_committed_cstate_ = consensus->ConsensusState(consensus::CONSENSUS_CONFIG_COMMITTED);
188
2.03k
  return Status::OK();
189
2.03k
}
190
191
1.60k
Result<google::protobuf::RepeatedPtrField<tablet::FilePB>> ListFiles(const std::string& dir) {
192
1.60k
  std::vector<std::string> files;
193
1.60k
  auto env = Env::Default();
194
1.60k
  auto status = env->GetChildren(dir, ExcludeDots::kTrue, &files);
195
1.60k
  if (!status.ok()) {
196
0
    return STATUS(IllegalState, Substitute("Unable to get RocksDB files in dir $0: $1", dir,
197
0
                                           status.ToString()));
198
0
  }
199
200
1.60k
  google::protobuf::RepeatedPtrField<tablet::FilePB> result;
201
1.60k
  result.Reserve(narrow_cast<int>(files.size()));
202
6.70k
  for (const auto& file : files) {
203
6.70k
    auto full_path = JoinPathSegments(dir, file);
204
6.70k
    if (VERIFY_RESULT(env->IsDirectory(full_path))) {
205
105
      auto sub_files = VERIFY_RESULT(ListFiles(full_path));
206
210
      for (auto& subfile : sub_files) {
207
210
        subfile.set_name(JoinPathSegments(file, subfile.name()));
208
210
        *result.Add() = std::move(subfile);
209
210
      }
210
105
      continue;
211
105
    }
212
6.60k
    auto file_pb = result.Add();
213
6.60k
    file_pb->set_name(file);
214
6.60k
    file_pb->set_size_bytes(VERIFY_RESULT(env->GetFileSize(full_path)));
215
6.60k
    file_pb->set_inode(VERIFY_RESULT(env->GetFileINode(full_path)));
216
6.60k
  }
217
218
1.60k
  return result;
219
1.60k
}
220
221
const std::string RemoteBootstrapSession::kCheckpointsDir = "checkpoints";
222
223
2.03k
Status RemoteBootstrapSession::Init() {
224
  // Take locks to support re-initialization of the same session.
225
2.03k
  std::lock_guard<std::mutex> lock(mutex_);
226
2.03k
  RETURN_NOT_OK(UnregisterAnchorIfNeededUnlocked());
227
228
2.03k
  const string& tablet_id = tablet_peer_->tablet_id();
229
230
  // Prevent log GC while we grab log segments and Tablet metadata.
231
2.03k
  string anchor_owner_token = Substitute("RemoteBootstrap-$0", session_id_);
232
2.03k
  tablet_peer_->log_anchor_registry()->Register(
233
2.03k
      MinimumOpId().index(), anchor_owner_token, &log_anchor_);
234
235
  // Read the SuperBlock from disk.
236
2.03k
  const RaftGroupMetadataPtr& metadata = tablet_peer_->tablet_metadata();
237
2.03k
  RETURN_NOT_OK_PREPEND(metadata->ReadSuperBlockFromDisk(&tablet_superblock_),
238
2.03k
                        Substitute("Unable to access superblock for tablet $0",
239
2.03k
                                   tablet_id));
240
241
2.03k
  if (!tablet_peer_->log_available()) {
242
0
    return STATUS(IllegalState, "Tablet is not running (log is uninitialized)");
243
0
  }
244
  // Get the latest opid in the log at this point in time so we can re-anchor.
245
2.03k
  auto last_logged_opid = tablet_peer_->GetLatestLogEntryOpId();
246
247
2.03k
  auto tablet = tablet_peer_->shared_tablet();
248
2.03k
  if (PREDICT_FALSE(!tablet)) {
249
0
    return STATUS(IllegalState, "Tablet is not running");
250
0
  }
251
252
2.03k
  MonoTime now = MonoTime::Now();
253
2.03k
  auto* kv_store = tablet_superblock_.mutable_kv_store();
254
2.03k
  const auto checkpoints_dir = JoinPathSegments(kv_store->rocksdb_dir(), kCheckpointsDir);
255
256
2.03k
  auto session_checkpoint_dir = std::to_string(last_logged_opid.index) + "_" + now.ToString();
257
2.03k
  checkpoint_dir_ = JoinPathSegments(checkpoints_dir, session_checkpoint_dir);
258
259
  // Clear any previous RocksDB files in the superblock. Each session should create a new list
260
  // based the checkpoint directory files.
261
2.03k
  kv_store->clear_rocksdb_files();
262
2.03k
  auto status = tablet->snapshots().CreateCheckpoint(checkpoint_dir_);
263
2.03k
  if (status.ok()) {
264
1.49k
    *kv_store->mutable_rocksdb_files() = VERIFY_RESULT(ListFiles(checkpoint_dir_));
265
1.49k
  } else 
if (539
!status.IsNotSupported()539
) {
266
0
    RETURN_NOT_OK(status);
267
0
  }
268
269
10.1k
  
for (const auto& source : sources_)2.03k
{
270
10.1k
    if (source) {
271
2.03k
      RETURN_NOT_OK(source->Init());
272
2.03k
    }
273
10.1k
  }
274
275
  // Get the current segments from the log, including the active segment.
276
  // The Log doesn't add the active segment to the log reader's list until
277
  // a header has been written to it (but it will not have a footer).
278
2.03k
  RETURN_NOT_OK(tablet_peer_->log()->GetSegmentsSnapshot(&log_segments_));
279
2.03k
  log_anchor_index_ = last_logged_opid.index;
280
2.03k
  for (const auto& log_segment : log_segments_) {
281
2.03k
    if (log_segment->HasFooter() && 
log_segment->footer().has_min_replicate_index()558
) {
282
558
      log_anchor_index_ = log_segment->footer().min_replicate_index();
283
558
      break;
284
558
    }
285
2.03k
  }
286
287
  // Re-anchor on the highest OpId that was in the log right before we
288
  // snapshotted the log segments. This helps ensure that we don't end up in a
289
  // remote bootstrap loop due to a follower falling too far behind the
290
  // leader's log when remote bootstrap is slow. The remote controls when
291
  // this anchor is released by ending the remote bootstrap session.
292
2.03k
  RETURN_NOT_OK(tablet_peer_->log_anchor_registry()->UpdateRegistration(
293
2.03k
      log_anchor_index_, &log_anchor_));
294
295
  // Look up the committed consensus state.
296
  // We do this after snapshotting the log for YB table types to avoid a scenario where the latest
297
  // entry in the log has a term higher than the term stored in the consensus metadata, which
298
  // will result in a CHECK failure on RaftConsensus init.
299
2.03k
  RETURN_NOT_OK(SetInitialCommittedState());
300
301
2.03k
  start_time_ = MonoTime::Now();
302
303
2.03k
  return Status::OK();
304
2.03k
}
305
306
10
const std::string& RemoteBootstrapSession::tablet_id() const {
307
10
  return tablet_peer_->tablet_id();
308
10
}
309
310
1.95k
const std::string& RemoteBootstrapSession::requestor_uuid() const {
311
1.95k
  return requestor_uuid_;
312
1.95k
}
313
314
namespace {
315
316
// Determine the length of the data chunk to return to the client.
317
9.30k
int64_t DetermineReadLength(int64_t bytes_remaining, int64_t requested_len) {
318
  // Determine the size of the chunks we want to read.
319
  // Choose "system max" as a multiple of typical HDD block size (4K) with 4K to
320
  // spare for other stuff in the message, like headers, other protobufs, etc.
321
9.30k
  const int32_t kSpareBytes = 4096;
322
9.30k
  const int32_t kDiskSectorSize = 4096;
323
9.30k
  auto system_max_chunk_size =
324
9.30k
      ((FLAGS_rpc_max_message_size - kSpareBytes) / kDiskSectorSize) * kDiskSectorSize;
325
9.30k
  CHECK_GT(system_max_chunk_size, 0) << "rpc_max_message_size is too low to transfer data: "
326
0
                                     << FLAGS_rpc_max_message_size;
327
328
  // The min of the {requested, system} maxes is the effective max.
329
9.30k
  int64_t maxlen = requested_len > 0 ? std::min<int64_t>(requested_len, system_max_chunk_size)
330
9.30k
                                     : 
system_max_chunk_size0
;
331
9.30k
  return std::min(bytes_remaining, maxlen);
332
9.30k
}
333
334
// Calculate the size of the data to return given a maximum client message
335
// length, the file itself, and the offset into the file to be read from.
336
9.30k
Result<int64_t> GetResponseDataSize(GetDataPieceInfo* info) {
337
  // If requested offset is off the end of the data, bail.
338
9.30k
  if (info->offset >= info->data_size) {
339
0
    info->error_code = RemoteBootstrapErrorPB::INVALID_REMOTE_BOOTSTRAP_REQUEST;
340
0
    return STATUS_FORMAT(InvalidArgument,
341
0
                         "Requested offset ($0) is beyond the data size ($1)",
342
0
                         info->offset, info->data_size);
343
0
  }
344
345
9.30k
  auto result = DetermineReadLength(info->bytes_remaining(), info->client_maxlen);
346
9.30k
  DCHECK_GT(result, 0);
347
9.30k
  if (info->client_maxlen > 0) {
348
9.29k
    DCHECK_LE(result, info->client_maxlen);
349
9.29k
  }
350
351
9.30k
  return result;
352
9.30k
}
353
354
// Read a chunk of a file into a buffer.
355
// data_name provides a string for the block/log to be used in error messages.
356
9.30k
Status ReadFileChunkToBuf(RandomAccessFile* file, const string& data_name, GetDataPieceInfo* info) {
357
9.30k
  auto response_data_size = VERIFY_RESULT_PREPEND(
358
9.30k
      GetResponseDataSize(info), Format("Error reading $0", data_name));
359
360
0
  Stopwatch chunk_timer(Stopwatch::THIS_THREAD);
361
9.30k
  chunk_timer.start();
362
363
  // Writing into a std::string buffer is basically guaranteed to work on C++11,
364
  // however any modern compiler should be compatible with it.
365
  // Violates the API contract, but avoids excessive copies.
366
9.30k
  info->data.resize(response_data_size);
367
9.30k
  auto buf = reinterpret_cast<uint8_t*>(const_cast<char*>(info->data.data()));
368
9.30k
  Slice slice;
369
9.30k
  Status s = env_util::ReadFully(file, info->offset, response_data_size, &slice, buf);
370
9.30k
  if (PREDICT_FALSE(!s.ok())) {
371
0
    s = s.CloneAndPrepend(Format("Unable to read existing file for $0", data_name));
372
0
    LOG(WARNING) << s;
373
0
    info->error_code = RemoteBootstrapErrorPB::IO_ERROR;
374
0
    return s;
375
0
  }
376
  // Figure out if Slice points to buf or if Slice points to the mmap.
377
  // If it points to the mmap then copy into buf.
378
9.30k
  if (slice.data() != buf) {
379
0
    memcpy(buf, slice.data(), slice.size());
380
0
  }
381
9.30k
  chunk_timer.stop();
382
9.30k
  TRACE("Remote bootstrap: $0: $1 total bytes read. Total time elapsed: $2",
383
9.30k
        data_name, response_data_size, chunk_timer.elapsed().ToString());
384
385
9.30k
  return Status::OK();
386
9.30k
}
387
388
} // namespace
389
390
8.48k
Env* RemoteBootstrapSession::env() const {
391
8.48k
  return tablet_peer_->tablet_metadata()->fs_manager()->env();
392
8.48k
}
393
394
11.3k
RemoteBootstrapSource* RemoteBootstrapSession::Source(DataIdPB::IdType id_type) const {
395
11.3k
  size_t idx = id_type;
396
18.4E
  return 
idx < sources_.size()11.3k
?
sources_[idx].get()11.3k
: nullptr;
397
11.3k
}
398
399
11.3k
Status RemoteBootstrapSession::ValidateDataId(const yb::tserver::DataIdPB& data_id) {
400
11.3k
  const auto& source = Source(data_id.type());
401
402
11.3k
  if (source) {
403
18
    return source->ValidateDataId(data_id);
404
18
  }
405
406
11.3k
  switch (data_id.type()) {
407
4.78k
    case DataIdPB::LOG_SEGMENT:
408
4.78k
      if (PREDICT_FALSE(!data_id.wal_segment_seqno())) {
409
0
        return STATUS(InvalidArgument,
410
0
            "segment sequence number must be specified for type == LOG_SEGMENT",
411
0
            data_id.ShortDebugString());
412
0
      }
413
4.78k
      return Status::OK();
414
6.52k
    case DataIdPB::ROCKSDB_FILE:
415
6.52k
      if (PREDICT_FALSE(data_id.file_name().empty())) {
416
0
        return STATUS(InvalidArgument,
417
0
            "file name must be specified for type == ROCKSDB_FILE",
418
0
            data_id.ShortDebugString());
419
0
      }
420
6.52k
      return Status::OK();
421
0
    case DataIdPB::SNAPSHOT_FILE: FALLTHROUGH_INTENDED;
422
0
    case DataIdPB::UNKNOWN:
423
0
      return STATUS(InvalidArgument, "Type not supported", data_id.ShortDebugString());
424
11.3k
  }
425
0
  LOG(FATAL) << "Invalid data id type: " << data_id.type();
426
0
}
427
428
11.3k
Status RemoteBootstrapSession::GetDataPiece(const DataIdPB& data_id, GetDataPieceInfo* info) {
429
11.3k
  const auto& source = sources_[data_id.type()];
430
431
11.3k
  if (source) {
432
    // Fetching a snapshot file chunk.
433
18
    RETURN_NOT_OK_PREPEND(
434
18
        source->GetDataPiece(data_id, info),
435
18
        "Unable to get piece of snapshot file");
436
16
    return Status::OK();
437
18
  }
438
439
440
11.3k
  switch (data_id.type()) {
441
4.78k
    case DataIdPB::LOG_SEGMENT: {
442
      // Fetching a log segment chunk.
443
4.78k
      RETURN_NOT_OK_PREPEND(GetLogSegmentPiece(data_id.wal_segment_seqno(), info),
444
4.78k
                            "Unable to get piece of log segment");
445
2.76k
      break;
446
4.78k
    }
447
6.52k
    case DataIdPB::ROCKSDB_FILE: {
448
      // Fetching a RocksDB file chunk.
449
6.52k
      const string file_name = data_id.file_name();
450
6.52k
      RETURN_NOT_OK_PREPEND(GetRocksDBFilePiece(data_id.file_name(), info),
451
6.52k
                            "Unable to get piece of RocksDB file");
452
6.52k
      break;
453
6.52k
    }
454
0
    default:
455
0
      info->error_code = RemoteBootstrapErrorPB::INVALID_REMOTE_BOOTSTRAP_REQUEST;
456
0
      return STATUS_SUBSTITUTE(InvalidArgument, "Invalid request type $0", data_id.type());
457
11.3k
  }
458
9.28k
  DCHECK(info->client_maxlen == 0 ||
459
2
         info->data.size() <= implicit_cast<size_t>(info->client_maxlen))
460
2
      << "client_maxlen: " << info->client_maxlen << ", data->size(): " << info->data.size();
461
462
9.28k
  return Status::OK();
463
11.3k
}
464
465
4.78k
Status RemoteBootstrapSession::GetLogSegmentPiece(uint64_t segment_seqno, GetDataPieceInfo* info) {
466
4.78k
  std::shared_ptr<RandomAccessFile> file;
467
4.78k
  {
468
4.78k
    std::lock_guard<std::mutex> lock(mutex_);
469
4.78k
    if (opened_log_segment_seqno_ != segment_seqno) {
470
4.71k
      RETURN_NOT_OK(OpenLogSegment(segment_seqno, &info->error_code));
471
4.71k
    }
472
2.76k
    info->data_size = opened_log_segment_file_size_;
473
2.76k
    file = opened_log_segment_file_;
474
2.76k
  }
475
2.76k
  RETURN_NOT_OK(ReadFileChunkToBuf(file.get(), Substitute("log segment $0", segment_seqno), info));
476
477
  // Note: We do not eagerly close log segment files, since we share ownership
478
  // of the LogSegment objects with the Log itself.
479
480
2.76k
  return Status::OK();
481
2.76k
}
482
483
Status RemoteBootstrapSession::GetRocksDBFilePiece(
484
6.52k
    const std::string& file_name, GetDataPieceInfo* info) {
485
6.52k
  return GetFilePiece(checkpoint_dir_, file_name, env(), info);
486
6.52k
}
487
488
Status RemoteBootstrapSession::GetFilePiece(
489
6.54k
    const std::string& path, const std::string& file_name, Env* env, GetDataPieceInfo* info) {
490
6.54k
  auto file_path = JoinPathSegments(path, file_name);
491
6.54k
  if (!env->FileExists(file_path)) {
492
4
    info->error_code = RemoteBootstrapErrorPB::ROCKSDB_FILE_NOT_FOUND;
493
4
    return STATUS(NotFound, Substitute("Unable to find RocksDB file $0 in directory $1",
494
4
                                       file_name, path));
495
4
  }
496
497
6.54k
  std::unique_ptr<RandomAccessFile> readable_file;
498
499
6.54k
  RETURN_NOT_OK(env->NewRandomAccessFile(file_path, &readable_file));
500
501
6.54k
  info->data_size = VERIFY_RESULT(readable_file->Size());
502
6.54k
  auto inode = VERIFY_RESULT(readable_file->INode());
503
0
  VLOG(2) << "Reading RocksDB file. File path: " << file_path << ", file size: " << info->data_size
504
0
          << ", inode: " << inode;
505
506
6.54k
  RETURN_NOT_OK(ReadFileChunkToBuf(
507
6.54k
      readable_file.get(), Substitute("rocksdb file $0", file_name), info));
508
509
6.54k
  return Status::OK();
510
6.54k
}
511
512
// Add a file to the cache and populate the given ImmutableRandomAcccessFileInfo
513
// object with the file ref and size.
514
template <class Collection, class Key, class Readable>
515
static Status AddImmutableFileToMap(Collection* const cache,
516
                                    const Key& key,
517
                                    const Readable& readable,
518
                                    uint64_t size) {
519
  // Sanity check for 0-length files.
520
  if (size == 0) {
521
    return STATUS(Corruption, "Found 0-length object");
522
  }
523
524
  // Looks good, add it to the cache.
525
  typedef typename Collection::mapped_type InfoPtr;
526
  typedef typename InfoPtr::element_type Info;
527
  CHECK(cache->emplace(key, std::make_unique<Info>(readable, size)).second);
528
529
  return Status::OK();
530
}
531
532
Status RemoteBootstrapSession::OpenLogSegment(
533
4.71k
    uint64_t segment_seqno, RemoteBootstrapErrorPB::Code* error_code) {
534
4.71k
  auto active_seqno = tablet_peer_->log()->active_segment_sequence_number();
535
4.71k
  auto log_segment = tablet_peer_->log()->GetSegmentBySequenceNumber(segment_seqno);
536
  // Usually active log segment is extended, while sent of the wire. So we cannot send next segment,
537
  // Otherwise entries at end of previously active log segment could be missing.
538
4.71k
  if (opened_log_segment_active_) {
539
2.01k
    *error_code = RemoteBootstrapErrorPB::WAL_SEGMENT_NOT_FOUND;
540
2.01k
    return STATUS_FORMAT(NotFound, "Already sent active log segment, don't send $0", segment_seqno);
541
2.01k
  }
542
2.69k
  if (!log_segment) {
543
1
    *error_code = RemoteBootstrapErrorPB::WAL_SEGMENT_NOT_FOUND;
544
1
    return STATUS_FORMAT(NotFound, "Log segment $0 not found", segment_seqno);
545
1
  }
546
2.69k
  opened_log_segment_file_size_ = log_segment->readable_up_to() + log_segment->get_header_size();
547
2.69k
  opened_log_segment_seqno_ = segment_seqno;
548
2.69k
  opened_log_segment_file_ = log_segment->readable_file_checkpoint();
549
2.69k
  opened_log_segment_active_ = active_seqno == segment_seqno;
550
551
2.69k
  if (log_segment->HasFooter() &&
552
2.69k
      
log_segment->footer().min_replicate_index() > log_anchor_index_673
) {
553
122
    log_anchor_index_ = log_segment->footer().min_replicate_index();
554
555
    // Update log anchor, since we don't need older logs anymore.
556
122
    auto status = tablet_peer_->log_anchor_registry()->UpdateRegistration(
557
122
        log_anchor_index_, &log_anchor_);
558
122
    if (!status.ok()) {
559
0
      *error_code = RemoteBootstrapErrorPB::UNKNOWN_ERROR;
560
0
      return status;
561
0
    }
562
122
  }
563
564
2.69k
  return Status::OK();
565
2.69k
}
566
567
3.99k
Status RemoteBootstrapSession::UnregisterAnchorIfNeededUnlocked() {
568
3.99k
  return tablet_peer_->log_anchor_registry()->UnregisterIfAnchored(&log_anchor_);
569
3.99k
}
570
571
2.02k
void RemoteBootstrapSession::SetSuccess() {
572
2.02k
  std::lock_guard<std::mutex> lock(mutex_);
573
2.02k
  succeeded_ = true;
574
2.02k
}
575
576
2.03k
bool RemoteBootstrapSession::Succeeded() {
577
2.03k
  std::lock_guard<std::mutex> lock(mutex_);
578
2.03k
  return succeeded_;
579
2.03k
}
580
581
11.3k
void RemoteBootstrapSession::EnsureRateLimiterIsInitialized() {
582
11.3k
  if (!rate_limiter_.IsInitialized()) {
583
2.02k
    InitRateLimiter();
584
2.02k
  }
585
11.3k
}
586
587
588
2.02k
void RemoteBootstrapSession::InitRateLimiter() {
589
2.02k
  if (FLAGS_remote_bootstrap_rate_limit_bytes_per_sec > 0 && nsessions_) {
590
    // Calling SetTargetRateUpdater will activate the rate limiter.
591
20.6k
    rate_limiter_.SetTargetRateUpdater([this]() -> uint64_t {
592
20.6k
      DCHECK_GT(FLAGS_remote_bootstrap_rate_limit_bytes_per_sec, 0);
593
20.6k
      if (FLAGS_remote_bootstrap_rate_limit_bytes_per_sec <= 0) {
594
0
        YB_LOG_EVERY_N(ERROR, 1000)
595
0
          << "Invalid value for remote_bootstrap_rate_limit_bytes_per_sec: "
596
0
          << FLAGS_remote_bootstrap_rate_limit_bytes_per_sec;
597
        // Since the rate limiter is initialized, it's expected that the value of
598
        // FLAGS_remote_bootstrap_rate_limit_bytes_per_sec is greater than 0. Since this is not the
599
        // case, we'll log an error, and set the rate to 50 MB/s.
600
0
        return 50_MB;
601
0
      }
602
20.6k
      auto nsessions = nsessions_->load(std::memory_order_acquire);
603
20.6k
      if (nsessions > 0) {
604
20.6k
        return FLAGS_remote_bootstrap_rate_limit_bytes_per_sec / nsessions;
605
20.6k
      } else {
606
3
        LOG(DFATAL) << "Invalid number of sessions: " << nsessions;
607
3
        return FLAGS_remote_bootstrap_rate_limit_bytes_per_sec;
608
3
      }
609
20.6k
    });
610
2.02k
  }
611
2.02k
  rate_limiter_.Init();
612
2.02k
}
613
614
} // namespace tserver
615
} // namespace yb