YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tserver/remote_bootstrap_service.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tserver/remote_bootstrap_service.h"
34
35
#include <algorithm>
36
#include <string>
37
#include <vector>
38
39
#include <gflags/gflags.h>
40
#include <glog/logging.h>
41
42
#include "yb/common/wire_protocol.h"
43
44
#include "yb/consensus/log_util.h"
45
46
#include "yb/gutil/casts.h"
47
48
#include "yb/rpc/rpc_context.h"
49
50
#include "yb/tablet/tablet_peer.h"
51
52
#include "yb/tserver/tablet_peer_lookup.h"
53
54
#include "yb/util/crc.h"
55
#include "yb/util/fault_injection.h"
56
#include "yb/util/flag_tags.h"
57
#include "yb/util/status_format.h"
58
#include "yb/util/status_log.h"
59
#include "yb/util/thread.h"
60
61
using namespace std::literals;
62
63
// Note, this macro assumes the existence of a local var named 'context'.
64
#define RPC_RETURN_APP_ERROR(app_err, message, s) \
65
1.02k
  do { \
66
1.02k
    SetupErrorAndRespond(&context, app_err, message, s); \
67
1.02k
    return; \
68
0
  } while (false)
69
70
#define RPC_RETURN_NOT_OK(expr, app_err, message) \
71
17.0k
  do { \
72
17.0k
    Status s = (expr); \
73
17.0k
    if (!s.ok()) { \
74
1.01k
      RPC_RETURN_APP_ERROR(app_err, message, s); \
75
1.01k
    } \
76
15.9k
  } while (false)
77
78
DEFINE_uint64(remote_bootstrap_idle_timeout_ms, 180000,
79
              "Amount of time without activity before a remote bootstrap "
80
              "session will expire, in millis");
81
TAG_FLAG(remote_bootstrap_idle_timeout_ms, hidden);
82
83
DEFINE_uint64(remote_bootstrap_timeout_poll_period_ms, 10000,
84
              "How often the remote_bootstrap service polls for expired "
85
              "remote bootstrap sessions, in millis");
86
TAG_FLAG(remote_bootstrap_timeout_poll_period_ms, hidden);
87
88
DEFINE_test_flag(double, fault_crash_on_handle_rb_fetch_data, 0.0,
89
                 "Fraction of the time when the tablet will crash while "
90
                 "servicing a RemoteBootstrapService FetchData() RPC call.");
91
92
DEFINE_test_flag(uint64, inject_latency_before_change_role_secs, 0,
93
                 "Number of seconds to sleep before we call ChangeRole.");
94
95
DEFINE_test_flag(bool, skip_change_role, false,
96
                 "When set, we don't call ChangeRole after successfully finishing a remote "
97
                 "bootstrap.");
98
99
DEFINE_test_flag(double, fault_crash_leader_before_changing_role, 0.0,
100
                 "The leader will crash before changing the role (from PRE_VOTER or PRE_OBSERVER "
101
                 "to VOTER or OBSERVER respectively) of the tablet server it is remote "
102
                 "bootstrapping.");
103
104
DEFINE_test_flag(double, fault_crash_leader_after_changing_role, 0.0,
105
                 "The leader will crash after successfully sending a ChangeConfig (CHANGE_ROLE "
106
                 "from PRE_VOTER or PRE_OBSERVER to VOTER or OBSERVER respectively) for the tablet "
107
                 "server it is remote bootstrapping, but before it sends a success response.");
108
109
DEFINE_uint64(remote_bootstrap_change_role_timeout_ms, 15000,
110
              "Timeout for change role operation during remote bootstrap.");
111
112
namespace yb {
113
namespace tserver {
114
115
using crc::Crc32c;
116
using strings::Substitute;
117
using tablet::TabletPeer;
118
119
static void SetupErrorAndRespond(rpc::RpcContext* context,
120
                                 RemoteBootstrapErrorPB::Code code,
121
                                 const string& message,
122
1.02k
                                 const Status& s) {
123
1.02k
  LOG(WARNING) << "Error handling RemoteBootstrapService RPC request from "
124
1.02k
               << context->requestor_string() << ": "
125
1.02k
               << s.ToString();
126
1.02k
  RemoteBootstrapErrorPB error;
127
1.02k
  StatusToPB(s, error.mutable_status());
128
1.02k
  error.set_code(code);
129
1.02k
  context->RespondApplicationError(RemoteBootstrapErrorPB::remote_bootstrap_error_ext.number(),
130
1.02k
                                   message, error);
131
1.02k
}
132
133
RemoteBootstrapServiceImpl::RemoteBootstrapServiceImpl(
134
    FsManager* fs_manager,
135
    TabletPeerLookupIf* tablet_peer_lookup,
136
    const scoped_refptr<MetricEntity>& metric_entity)
137
    : RemoteBootstrapServiceIf(metric_entity),
138
      fs_manager_(CHECK_NOTNULL(fs_manager)),
139
      tablet_peer_lookup_(CHECK_NOTNULL(tablet_peer_lookup)),
140
11.2k
      shutdown_latch_(1) {
141
11.2k
  CHECK_OK(Thread::Create("remote-bootstrap", "rb-session-exp",
142
11.2k
                          &RemoteBootstrapServiceImpl::EndExpiredSessions, this,
143
11.2k
                          &session_expiration_thread_));
144
11.2k
}
145
146
160
RemoteBootstrapServiceImpl::~RemoteBootstrapServiceImpl() {
147
160
}
148
149
void RemoteBootstrapServiceImpl::BeginRemoteBootstrapSession(
150
        const BeginRemoteBootstrapSessionRequestPB* req,
151
        BeginRemoteBootstrapSessionResponsePB* resp,
152
1.45k
        rpc::RpcContext context) {
153
1.45k
  const string& requestor_uuid = req->requestor_uuid();
154
1.45k
  const string& tablet_id = req->tablet_id();
155
  // For now, we use the requestor_uuid with the tablet id as the session id,
156
  // but there is no guarantee this will not change in the future.
157
1.45k
  MonoTime now = MonoTime::Now();
158
1.45k
  const string session_id = Substitute("$0-$1-$2", requestor_uuid, tablet_id, now.ToString());
159
160
1.45k
  std::shared_ptr<TabletPeer> tablet_peer;
161
1.45k
  RPC_RETURN_NOT_OK(tablet_peer_lookup_->GetTabletPeer(tablet_id, &tablet_peer),
162
1.45k
                    RemoteBootstrapErrorPB::TABLET_NOT_FOUND,
163
1.45k
                    Substitute("Unable to find specified tablet: $0", tablet_id));
164
1.45k
  RPC_RETURN_NOT_OK(tablet_peer->CheckRunning(),
165
1.45k
                    RemoteBootstrapErrorPB::TABLET_NOT_FOUND,
166
1.45k
                    Substitute("Tablet is not running yet: $0", tablet_id));
167
168
1.44k
  scoped_refptr<RemoteBootstrapSession> session;
169
1.44k
  {
170
1.44k
    std::lock_guard<std::mutex> l(sessions_mutex_);
171
1.44k
    auto it = sessions_.find(session_id);
172
1.44k
    if (it == sessions_.end()) {
173
1.44k
      LOG(INFO) << "Beginning new remote bootstrap session on tablet " << tablet_id
174
1.44k
                << " from peer " << requestor_uuid << " at " << context.requestor_string()
175
1.44k
                << ": session id = " << session_id;
176
1.44k
      session.reset(new RemoteBootstrapSession(
177
1.44k
          tablet_peer, session_id, requestor_uuid, &nsessions_));
178
1.44k
      it = sessions_.emplace(session_id, SessionData{session, CoarseTimePoint()}).first;
179
1.44k
      auto new_nsessions = nsessions_.fetch_add(1, std::memory_order_acq_rel) + 1;
180
0
      LOG_IF(DFATAL, implicit_cast<size_t>(new_nsessions) != sessions_.size())
181
0
          << "nsessions_ " << new_nsessions << " !=  number of sessions " << sessions_.size();
182
0
    } else {
183
0
      session = it->second.session;
184
0
      LOG(INFO) << "Re-initializing existing remote bootstrap session on tablet " << tablet_id
185
0
                << " from peer " << requestor_uuid << " at " << context.requestor_string()
186
0
                << ": session id = " << session_id;
187
0
    }
188
1.44k
    it->second.ResetExpiration();
189
1.44k
  }
190
191
1.44k
  RPC_RETURN_NOT_OK(session->Init(),
192
1.44k
                    RemoteBootstrapErrorPB::UNKNOWN_ERROR,
193
1.44k
                    Substitute("Error initializing remote bootstrap session for tablet $0",
194
1.44k
                               tablet_id));
195
196
1.44k
  resp->set_session_id(session_id);
197
1.44k
  resp->set_session_idle_timeout_millis(FLAGS_remote_bootstrap_idle_timeout_ms);
198
1.44k
  resp->mutable_superblock()->CopyFrom(session->tablet_superblock());
199
1.44k
  resp->mutable_initial_committed_cstate()->CopyFrom(session->initial_committed_cstate());
200
201
1.44k
  auto const& log_segments = session->log_segments();
202
1.44k
  resp->mutable_deprecated_wal_segment_seqnos()->Reserve(narrow_cast<int>(log_segments.size()));
203
1.69k
  for (const scoped_refptr<log::ReadableLogSegment>& segment : log_segments) {
204
1.69k
    resp->add_deprecated_wal_segment_seqnos(segment->header().sequence_number());
205
1.69k
  }
206
1.44k
  if (!log_segments.empty()) {
207
1.44k
    resp->set_first_wal_segment_seqno(log_segments.front()->header().sequence_number());
208
1.44k
  }
209
210
1.44k
  context.RespondSuccess();
211
1.44k
}
212
213
void RemoteBootstrapServiceImpl::CheckSessionActive(
214
        const CheckRemoteBootstrapSessionActiveRequestPB* req,
215
        CheckRemoteBootstrapSessionActiveResponsePB* resp,
216
2
        rpc::RpcContext context) {
217
  // Look up and validate remote bootstrap session.
218
2
  std::lock_guard<std::mutex> l(sessions_mutex_);
219
2
  auto it = sessions_.find(req->session_id());
220
2
  if (it != sessions_.end()) {
221
1
    if (req->keepalive()) {
222
0
      it->second.ResetExpiration();
223
0
    }
224
1
    resp->set_session_is_active(true);
225
1
    context.RespondSuccess();
226
1
  } else {
227
1
    resp->set_session_is_active(false);
228
1
    context.RespondSuccess();
229
1
  }
230
2
}
231
232
void RemoteBootstrapServiceImpl::FetchData(const FetchDataRequestPB* req,
233
                                           FetchDataResponsePB* resp,
234
5.83k
                                           rpc::RpcContext context) {
235
5.83k
  const string& session_id = req->session_id();
236
237
  // Look up and validate remote bootstrap session.
238
5.83k
  scoped_refptr<RemoteBootstrapSession> session;
239
5.83k
  {
240
5.83k
    std::lock_guard<std::mutex> l(sessions_mutex_);
241
5.83k
    auto it = sessions_.find(session_id);
242
5.83k
    if (it == sessions_.end()) {
243
2
      RPC_RETURN_APP_ERROR(
244
2
          RemoteBootstrapErrorPB::NO_SESSION, "No such session",
245
2
          STATUS_FORMAT(NotFound, "Fetch data for unknown sessions id: $0", session_id));
246
2
    }
247
5.83k
    it->second.ResetExpiration();
248
5.83k
    session = it->second.session;
249
5.83k
  }
250
251
5.83k
  session->EnsureRateLimiterIsInitialized();
252
253
5.83k
  MAYBE_FAULT(FLAGS_TEST_fault_crash_on_handle_rb_fetch_data);
254
255
5.83k
  int64_t rate_limit = session->rate_limiter().GetMaxSizeForNextTransmission();
256
1
  VLOG(3) << " rate limiter max len: " << rate_limit;
257
5.83k
  GetDataPieceInfo info = {
258
5.83k
    .offset = req->offset(),
259
5.83k
    .client_maxlen = rate_limit == 0 ? req->max_length() : std::min(req->max_length(), rate_limit),
260
5.83k
    .data = std::string(),
261
5.83k
    .data_size = 0,
262
5.83k
    .error_code = RemoteBootstrapErrorPB::UNKNOWN_ERROR,
263
5.83k
  };
264
5.83k
  const DataIdPB& data_id = req->data_id();
265
5.83k
  RPC_RETURN_NOT_OK(ValidateFetchRequestDataId(data_id, &info.error_code, session),
266
5.83k
                    info.error_code, "Invalid DataId");
267
268
5.83k
  RPC_RETURN_NOT_OK(session->GetDataPiece(data_id, &info),
269
5.83k
                    info.error_code, "Unable to get piece of data file");
270
271
4.82k
  session->rate_limiter().UpdateDataSizeAndMaybeSleep(info.data.size());
272
4.82k
  uint32_t crc32 = Crc32c(info.data.data(), info.data.length());
273
274
4.82k
  DataChunkPB* data_chunk = resp->mutable_chunk();
275
4.82k
  *data_chunk->mutable_data() = std::move(info.data);
276
4.82k
  data_chunk->set_total_data_length(info.data_size);
277
4.82k
  data_chunk->set_offset(info.offset);
278
279
  // Calculate checksum.
280
4.82k
  data_chunk->set_crc32(crc32);
281
4.82k
  context.RespondSuccess();
282
4.82k
}
283
284
void RemoteBootstrapServiceImpl::EndRemoteBootstrapSession(
285
        const EndRemoteBootstrapSessionRequestPB* req,
286
        EndRemoteBootstrapSessionResponsePB* resp,
287
1.00k
        rpc::RpcContext context) {
288
1.00k
  {
289
1.00k
    std::lock_guard<std::mutex> l(sessions_mutex_);
290
1.00k
    RemoteBootstrapErrorPB::Code app_error;
291
1.00k
    RPC_RETURN_NOT_OK(DoEndRemoteBootstrapSession(
292
1.00k
                          req->session_id(), req->is_success(), &app_error),
293
1.00k
                      app_error, "No such session");
294
999
    LOG(INFO) << "Request end of remote bootstrap session " << req->session_id()
295
999
              << " received from " << context.requestor_string();
296
297
999
    if (!req->keep_session()) {
298
3
      RemoveSession(req->session_id());
299
996
    } else {
300
996
      resp->set_session_kept(true);
301
996
    }
302
999
  }
303
999
  context.RespondSuccess();
304
999
}
305
306
void RemoteBootstrapServiceImpl::RemoveSession(
307
        const RemoveSessionRequestPB* req,
308
        RemoveSessionResponsePB* resp,
309
941
        rpc::RpcContext context) {
310
941
  {
311
941
    std::lock_guard<std::mutex> l(sessions_mutex_);
312
941
    RemoveSession(req->session_id());
313
941
  }
314
941
  context.RespondSuccess();
315
941
}
316
317
947
void RemoteBootstrapServiceImpl::RemoveSession(const std::string& session_id) {
318
  // Remove the session from the map.
319
  // It will get destroyed once there are no outstanding refs.
320
947
  auto it = sessions_.find(session_id);
321
947
  if (it == sessions_.end()) {
322
0
    LOG(WARNING) << "Attempt to remove session with unknown id: " << session_id;
323
0
    return;
324
0
  }
325
947
  LOG(INFO) << "Removing remote bootstrap session " << session_id << " on tablet "
326
947
            << session_id << " with peer " << it->second.session->requestor_uuid();
327
947
  sessions_.erase(it);
328
947
  nsessions_.fetch_sub(1, std::memory_order_acq_rel);
329
947
}
330
331
160
void RemoteBootstrapServiceImpl::Shutdown() {
332
160
  shutdown_latch_.CountDown();
333
160
  session_expiration_thread_->Join();
334
335
160
  std::lock_guard<std::mutex> lock(sessions_mutex_);
336
  // Destroy all remote bootstrap sessions.
337
160
  std::vector<string> session_ids;
338
160
  session_ids.reserve(sessions_.size());
339
8
  for (const auto& entry : sessions_) {
340
8
    session_ids.push_back(entry.first);
341
8
  }
342
8
  for (const string& session_id : session_ids) {
343
8
    LOG(INFO) << "Destroying remote bootstrap session " << session_id << " due to service shutdown";
344
8
    RemoteBootstrapErrorPB::Code app_error;
345
8
    CHECK_OK(DoEndRemoteBootstrapSession(session_id, false, &app_error));
346
8
  }
347
160
}
348
349
Status RemoteBootstrapServiceImpl::ValidateFetchRequestDataId(
350
        const DataIdPB& data_id,
351
        RemoteBootstrapErrorPB::Code* app_error,
352
5.83k
        const scoped_refptr<RemoteBootstrapSession>& session) const {
353
5.83k
  int num_set = data_id.has_wal_segment_seqno() + data_id.has_file_name();
354
5.83k
  if (PREDICT_FALSE(num_set != 1)) {
355
2
    *app_error = RemoteBootstrapErrorPB::INVALID_REMOTE_BOOTSTRAP_REQUEST;
356
2
    return STATUS(InvalidArgument,
357
2
        Substitute("Only one of segment sequence number, and file name can be specified. "
358
2
                   "DataTypeID: $0", data_id.ShortDebugString()));
359
2
  }
360
361
5.82k
  return session->ValidateDataId(data_id);
362
5.82k
}
363
364
7.31k
void RemoteBootstrapServiceImpl::SessionData::ResetExpiration() {
365
7.31k
  expiration = CoarseMonoClock::now() + FLAGS_remote_bootstrap_idle_timeout_ms * 1ms;
366
7.31k
}
367
368
Status RemoteBootstrapServiceImpl::DoEndRemoteBootstrapSession(
369
        const std::string& session_id,
370
        bool session_succeeded,
371
1.01k
        RemoteBootstrapErrorPB::Code* app_error) {
372
1.01k
  auto it = sessions_.find(session_id);
373
1.01k
  if (it == sessions_.end()) {
374
2
    *app_error = RemoteBootstrapErrorPB::NO_SESSION;
375
2
    return STATUS_FORMAT(NotFound, "End of unknown session id: $0", session_id);
376
2
  }
377
1.01k
  auto session = it->second.session;
378
379
1.01k
  if (session_succeeded || session->Succeeded()) {
380
1.00k
    session->SetSuccess();
381
382
1.00k
    if (PREDICT_FALSE(FLAGS_TEST_inject_latency_before_change_role_secs)) {
383
5
      LOG(INFO) << "Injecting latency for test";
384
5
      SleepFor(MonoDelta::FromSeconds(FLAGS_TEST_inject_latency_before_change_role_secs));
385
5
    }
386
387
1.00k
    if (PREDICT_FALSE(FLAGS_TEST_skip_change_role)) {
388
0
      LOG(INFO) << "Not changing role for " << session->requestor_uuid()
389
0
                << " because flag FLAGS_TEST_skip_change_role is set";
390
0
      return Status::OK();
391
0
    }
392
393
1.00k
    MAYBE_FAULT(FLAGS_TEST_fault_crash_leader_before_changing_role);
394
395
1.00k
    MonoTime deadline =
396
1.00k
        MonoTime::Now() +
397
1.00k
        MonoDelta::FromMilliseconds(FLAGS_remote_bootstrap_change_role_timeout_ms);
398
1.00k
    for (;;) {
399
1.00k
      Status status = session->ChangeRole();
400
1.00k
      if (status.ok()) {
401
965
        LOG(INFO) << "ChangeRole succeeded for bootstrap session " << session_id;
402
965
        MAYBE_FAULT(FLAGS_TEST_fault_crash_leader_after_changing_role);
403
965
        break;
404
965
      }
405
35
      LOG(WARNING) << "ChangeRole failed for bootstrap session " << session_id
406
35
                   << ", error : " << status;
407
35
      if (!status.IsLeaderHasNoLease() || MonoTime::Now() >= deadline) {
408
35
        it->second.ResetExpiration();
409
35
        return Status::OK();
410
35
      }
411
35
    }
412
10
  } else {
413
10
    LOG(ERROR) << "Remote bootstrap session " << session_id << " on tablet " << session->tablet_id()
414
10
               << " with peer " << session->requestor_uuid() << " failed. session_succeeded = "
415
10
               << session_succeeded;
416
10
  }
417
418
975
  return Status::OK();
419
1.01k
}
420
421
11.2k
void RemoteBootstrapServiceImpl::EndExpiredSessions() {
422
36.9k
  do {
423
36.9k
    std::lock_guard<std::mutex> l(sessions_mutex_);
424
36.9k
    auto now = CoarseMonoClock::Now();
425
426
36.9k
    std::vector<string> expired_session_ids;
427
599
    for (const auto& entry : sessions_) {
428
599
      if (entry.second.expiration < now) {
429
3
        expired_session_ids.push_back(entry.first);
430
3
      }
431
599
    }
432
3
    for (const string& session_id : expired_session_ids) {
433
3
      LOG(INFO) << "Remote bootstrap session " << session_id
434
3
                << " has expired. Terminating session.";
435
3
      RemoteBootstrapErrorPB::Code app_error;
436
3
      CHECK_OK(DoEndRemoteBootstrapSession(session_id, false, &app_error));
437
3
      RemoveSession(session_id);
438
3
    }
439
36.9k
  } while (!shutdown_latch_.WaitFor(MonoDelta::FromMilliseconds(
440
36.9k
                                    FLAGS_remote_bootstrap_timeout_poll_period_ms)));
441
11.2k
}
442
443
} // namespace tserver
444
} // namespace yb