YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tserver/remote_bootstrap_service.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tserver/remote_bootstrap_service.h"
34
35
#include <algorithm>
36
#include <string>
37
#include <vector>
38
39
#include <gflags/gflags.h>
40
#include <glog/logging.h>
41
42
#include "yb/common/wire_protocol.h"
43
44
#include "yb/consensus/log_util.h"
45
46
#include "yb/gutil/casts.h"
47
48
#include "yb/rpc/rpc_context.h"
49
50
#include "yb/tablet/tablet_peer.h"
51
52
#include "yb/tserver/tablet_peer_lookup.h"
53
54
#include "yb/util/crc.h"
55
#include "yb/util/fault_injection.h"
56
#include "yb/util/flag_tags.h"
57
#include "yb/util/status_format.h"
58
#include "yb/util/status_log.h"
59
#include "yb/util/thread.h"
60
61
using namespace std::literals;
62
63
// Note, this macro assumes the existence of a local var named 'context'.
64
#define RPC_RETURN_APP_ERROR(app_err, message, s) \
65
2.03k
  do { \
66
2.03k
    SetupErrorAndRespond(&context, app_err, message, s); \
67
2.03k
    return; \
68
2.03k
  } while (
false0
)
69
70
#define RPC_RETURN_NOT_OK(expr, app_err, message) \
71
30.7k
  do { \
72
30.7k
    Status s = (expr); \
73
30.7k
    if (!s.ok()) { \
74
2.03k
      RPC_RETURN_APP_ERROR(app_err, message, s); \
75
2.03k
    } \
76
30.7k
  } while (
false28.7k
)
77
78
DEFINE_uint64(remote_bootstrap_idle_timeout_ms, 180000,
79
              "Amount of time without activity before a remote bootstrap "
80
              "session will expire, in millis");
81
TAG_FLAG(remote_bootstrap_idle_timeout_ms, hidden);
82
83
DEFINE_uint64(remote_bootstrap_timeout_poll_period_ms, 10000,
84
              "How often the remote_bootstrap service polls for expired "
85
              "remote bootstrap sessions, in millis");
86
TAG_FLAG(remote_bootstrap_timeout_poll_period_ms, hidden);
87
88
DEFINE_test_flag(double, fault_crash_on_handle_rb_fetch_data, 0.0,
89
                 "Fraction of the time when the tablet will crash while "
90
                 "servicing a RemoteBootstrapService FetchData() RPC call.");
91
92
DEFINE_test_flag(uint64, inject_latency_before_change_role_secs, 0,
93
                 "Number of seconds to sleep before we call ChangeRole.");
94
95
DEFINE_test_flag(bool, skip_change_role, false,
96
                 "When set, we don't call ChangeRole after successfully finishing a remote "
97
                 "bootstrap.");
98
99
DEFINE_test_flag(double, fault_crash_leader_before_changing_role, 0.0,
100
                 "The leader will crash before changing the role (from PRE_VOTER or PRE_OBSERVER "
101
                 "to VOTER or OBSERVER respectively) of the tablet server it is remote "
102
                 "bootstrapping.");
103
104
DEFINE_test_flag(double, fault_crash_leader_after_changing_role, 0.0,
105
                 "The leader will crash after successfully sending a ChangeConfig (CHANGE_ROLE "
106
                 "from PRE_VOTER or PRE_OBSERVER to VOTER or OBSERVER respectively) for the tablet "
107
                 "server it is remote bootstrapping, but before it sends a success response.");
108
109
DEFINE_uint64(remote_bootstrap_change_role_timeout_ms, 15000,
110
              "Timeout for change role operation during remote bootstrap.");
111
112
namespace yb {
113
namespace tserver {
114
115
using crc::Crc32c;
116
using strings::Substitute;
117
using tablet::TabletPeer;
118
119
static void SetupErrorAndRespond(rpc::RpcContext* context,
120
                                 RemoteBootstrapErrorPB::Code code,
121
                                 const string& message,
122
2.03k
                                 const Status& s) {
123
2.03k
  LOG(WARNING) << "Error handling RemoteBootstrapService RPC request from "
124
2.03k
               << context->requestor_string() << ": "
125
2.03k
               << s.ToString();
126
2.03k
  RemoteBootstrapErrorPB error;
127
2.03k
  StatusToPB(s, error.mutable_status());
128
2.03k
  error.set_code(code);
129
2.03k
  context->RespondApplicationError(RemoteBootstrapErrorPB::remote_bootstrap_error_ext.number(),
130
2.03k
                                   message, error);
131
2.03k
}
132
133
RemoteBootstrapServiceImpl::RemoteBootstrapServiceImpl(
134
    FsManager* fs_manager,
135
    TabletPeerLookupIf* tablet_peer_lookup,
136
    const scoped_refptr<MetricEntity>& metric_entity)
137
    : RemoteBootstrapServiceIf(metric_entity),
138
      fs_manager_(CHECK_NOTNULL(fs_manager)),
139
      tablet_peer_lookup_(CHECK_NOTNULL(tablet_peer_lookup)),
140
16.7k
      shutdown_latch_(1) {
141
16.7k
  CHECK_OK(Thread::Create("remote-bootstrap", "rb-session-exp",
142
16.7k
                          &RemoteBootstrapServiceImpl::EndExpiredSessions, this,
143
16.7k
                          &session_expiration_thread_));
144
16.7k
}
145
146
182
RemoteBootstrapServiceImpl::~RemoteBootstrapServiceImpl() {
147
182
}
148
149
void RemoteBootstrapServiceImpl::BeginRemoteBootstrapSession(
150
        const BeginRemoteBootstrapSessionRequestPB* req,
151
        BeginRemoteBootstrapSessionResponsePB* resp,
152
2.03k
        rpc::RpcContext context) {
153
2.03k
  const string& requestor_uuid = req->requestor_uuid();
154
2.03k
  const string& tablet_id = req->tablet_id();
155
  // For now, we use the requestor_uuid with the tablet id as the session id,
156
  // but there is no guarantee this will not change in the future.
157
2.03k
  MonoTime now = MonoTime::Now();
158
2.03k
  const string session_id = Substitute("$0-$1-$2", requestor_uuid, tablet_id, now.ToString());
159
160
2.03k
  std::shared_ptr<TabletPeer> tablet_peer;
161
2.03k
  RPC_RETURN_NOT_OK(tablet_peer_lookup_->GetTabletPeer(tablet_id, &tablet_peer),
162
2.03k
                    RemoteBootstrapErrorPB::TABLET_NOT_FOUND,
163
2.03k
                    Substitute("Unable to find specified tablet: $0", tablet_id));
164
2.03k
  RPC_RETURN_NOT_OK(tablet_peer->CheckRunning(),
165
2.03k
                    RemoteBootstrapErrorPB::TABLET_NOT_FOUND,
166
2.03k
                    Substitute("Tablet is not running yet: $0", tablet_id));
167
168
2.03k
  scoped_refptr<RemoteBootstrapSession> session;
169
2.03k
  {
170
2.03k
    std::lock_guard<std::mutex> l(sessions_mutex_);
171
2.03k
    auto it = sessions_.find(session_id);
172
2.03k
    if (it == sessions_.end()) {
173
2.03k
      LOG(INFO) << "Beginning new remote bootstrap session on tablet " << tablet_id
174
2.03k
                << " from peer " << requestor_uuid << " at " << context.requestor_string()
175
2.03k
                << ": session id = " << session_id;
176
2.03k
      session.reset(new RemoteBootstrapSession(
177
2.03k
          tablet_peer, session_id, requestor_uuid, &nsessions_));
178
2.03k
      it = sessions_.emplace(session_id, SessionData{session, CoarseTimePoint()}).first;
179
2.03k
      auto new_nsessions = nsessions_.fetch_add(1, std::memory_order_acq_rel) + 1;
180
2.03k
      LOG_IF(DFATAL, implicit_cast<size_t>(new_nsessions) != sessions_.size())
181
0
          << "nsessions_ " << new_nsessions << " !=  number of sessions " << sessions_.size();
182
2.03k
    } else {
183
0
      session = it->second.session;
184
0
      LOG(INFO) << "Re-initializing existing remote bootstrap session on tablet " << tablet_id
185
0
                << " from peer " << requestor_uuid << " at " << context.requestor_string()
186
0
                << ": session id = " << session_id;
187
0
    }
188
2.03k
    it->second.ResetExpiration();
189
2.03k
  }
190
191
2.03k
  RPC_RETURN_NOT_OK(session->Init(),
192
2.03k
                    RemoteBootstrapErrorPB::UNKNOWN_ERROR,
193
2.03k
                    Substitute("Error initializing remote bootstrap session for tablet $0",
194
2.03k
                               tablet_id));
195
196
2.03k
  resp->set_session_id(session_id);
197
2.03k
  resp->set_session_idle_timeout_millis(FLAGS_remote_bootstrap_idle_timeout_ms);
198
2.03k
  resp->mutable_superblock()->CopyFrom(session->tablet_superblock());
199
2.03k
  resp->mutable_initial_committed_cstate()->CopyFrom(session->initial_committed_cstate());
200
201
2.03k
  auto const& log_segments = session->log_segments();
202
2.03k
  resp->mutable_deprecated_wal_segment_seqnos()->Reserve(narrow_cast<int>(log_segments.size()));
203
2.71k
  for (const scoped_refptr<log::ReadableLogSegment>& segment : log_segments) {
204
2.71k
    resp->add_deprecated_wal_segment_seqnos(segment->header().sequence_number());
205
2.71k
  }
206
2.03k
  if (!log_segments.empty()) {
207
2.03k
    resp->set_first_wal_segment_seqno(log_segments.front()->header().sequence_number());
208
2.03k
  }
209
210
2.03k
  context.RespondSuccess();
211
2.03k
}
212
213
void RemoteBootstrapServiceImpl::CheckSessionActive(
214
        const CheckRemoteBootstrapSessionActiveRequestPB* req,
215
        CheckRemoteBootstrapSessionActiveResponsePB* resp,
216
5
        rpc::RpcContext context) {
217
  // Look up and validate remote bootstrap session.
218
5
  std::lock_guard<std::mutex> l(sessions_mutex_);
219
5
  auto it = sessions_.find(req->session_id());
220
5
  if (it != sessions_.end()) {
221
4
    if (req->keepalive()) {
222
0
      it->second.ResetExpiration();
223
0
    }
224
4
    resp->set_session_is_active(true);
225
4
    context.RespondSuccess();
226
4
  } else {
227
1
    resp->set_session_is_active(false);
228
1
    context.RespondSuccess();
229
1
  }
230
5
}
231
232
void RemoteBootstrapServiceImpl::FetchData(const FetchDataRequestPB* req,
233
                                           FetchDataResponsePB* resp,
234
11.3k
                                           rpc::RpcContext context) {
235
11.3k
  const string& session_id = req->session_id();
236
237
  // Look up and validate remote bootstrap session.
238
11.3k
  scoped_refptr<RemoteBootstrapSession> session;
239
11.3k
  {
240
11.3k
    std::lock_guard<std::mutex> l(sessions_mutex_);
241
11.3k
    auto it = sessions_.find(session_id);
242
11.3k
    if (it == sessions_.end()) {
243
2
      RPC_RETURN_APP_ERROR(
244
2
          RemoteBootstrapErrorPB::NO_SESSION, "No such session",
245
2
          STATUS_FORMAT(NotFound, "Fetch data for unknown sessions id: $0", session_id));
246
2
    }
247
11.3k
    it->second.ResetExpiration();
248
11.3k
    session = it->second.session;
249
11.3k
  }
250
251
0
  session->EnsureRateLimiterIsInitialized();
252
253
11.3k
  MAYBE_FAULT(FLAGS_TEST_fault_crash_on_handle_rb_fetch_data);
254
255
11.3k
  int64_t rate_limit = session->rate_limiter().GetMaxSizeForNextTransmission();
256
18.4E
  VLOG(3) << " rate limiter max len: " << rate_limit;
257
11.3k
  GetDataPieceInfo info = {
258
11.3k
    .offset = req->offset(),
259
11.3k
    .client_maxlen = rate_limit == 0 ? 
req->max_length()0
: std::min(req->max_length(), rate_limit),
260
11.3k
    .data = std::string(),
261
11.3k
    .data_size = 0,
262
11.3k
    .error_code = RemoteBootstrapErrorPB::UNKNOWN_ERROR,
263
11.3k
  };
264
11.3k
  const DataIdPB& data_id = req->data_id();
265
11.3k
  RPC_RETURN_NOT_OK(ValidateFetchRequestDataId(data_id, &info.error_code, session),
266
11.3k
                    info.error_code, "Invalid DataId");
267
268
11.3k
  RPC_RETURN_NOT_OK(session->GetDataPiece(data_id, &info),
269
11.3k
                    info.error_code, "Unable to get piece of data file");
270
271
9.30k
  session->rate_limiter().UpdateDataSizeAndMaybeSleep(info.data.size());
272
9.30k
  uint32_t crc32 = Crc32c(info.data.data(), info.data.length());
273
274
9.30k
  DataChunkPB* data_chunk = resp->mutable_chunk();
275
9.30k
  *data_chunk->mutable_data() = std::move(info.data);
276
9.30k
  data_chunk->set_total_data_length(info.data_size);
277
9.30k
  data_chunk->set_offset(info.offset);
278
279
  // Calculate checksum.
280
9.30k
  data_chunk->set_crc32(crc32);
281
9.30k
  context.RespondSuccess();
282
9.30k
}
283
284
void RemoteBootstrapServiceImpl::EndRemoteBootstrapSession(
285
        const EndRemoteBootstrapSessionRequestPB* req,
286
        EndRemoteBootstrapSessionResponsePB* resp,
287
2.02k
        rpc::RpcContext context) {
288
2.02k
  {
289
2.02k
    std::lock_guard<std::mutex> l(sessions_mutex_);
290
2.02k
    RemoteBootstrapErrorPB::Code app_error;
291
2.02k
    RPC_RETURN_NOT_OK(DoEndRemoteBootstrapSession(
292
2.02k
                          req->session_id(), req->is_success(), &app_error),
293
2.02k
                      app_error, "No such session");
294
2.02k
    LOG(INFO) << "Request end of remote bootstrap session " << req->session_id()
295
2.02k
              << " received from " << context.requestor_string();
296
297
2.02k
    if (!req->keep_session()) {
298
3
      RemoveSession(req->session_id());
299
2.01k
    } else {
300
2.01k
      resp->set_session_kept(true);
301
2.01k
    }
302
2.02k
  }
303
0
  context.RespondSuccess();
304
2.02k
}
305
306
void RemoteBootstrapServiceImpl::RemoveSession(
307
        const RemoveSessionRequestPB* req,
308
        RemoveSessionResponsePB* resp,
309
1.93k
        rpc::RpcContext context) {
310
1.93k
  {
311
1.93k
    std::lock_guard<std::mutex> l(sessions_mutex_);
312
1.93k
    RemoveSession(req->session_id());
313
1.93k
  }
314
1.93k
  context.RespondSuccess();
315
1.93k
}
316
317
1.94k
void RemoteBootstrapServiceImpl::RemoveSession(const std::string& session_id) {
318
  // Remove the session from the map.
319
  // It will get destroyed once there are no outstanding refs.
320
1.94k
  auto it = sessions_.find(session_id);
321
1.94k
  if (it == sessions_.end()) {
322
0
    LOG(WARNING) << "Attempt to remove session with unknown id: " << session_id;
323
0
    return;
324
0
  }
325
1.94k
  LOG(INFO) << "Removing remote bootstrap session " << session_id << " on tablet "
326
1.94k
            << session_id << " with peer " << it->second.session->requestor_uuid();
327
1.94k
  sessions_.erase(it);
328
1.94k
  nsessions_.fetch_sub(1, std::memory_order_acq_rel);
329
1.94k
}
330
331
185
void RemoteBootstrapServiceImpl::Shutdown() {
332
185
  shutdown_latch_.CountDown();
333
185
  session_expiration_thread_->Join();
334
335
185
  std::lock_guard<std::mutex> lock(sessions_mutex_);
336
  // Destroy all remote bootstrap sessions.
337
185
  std::vector<string> session_ids;
338
185
  session_ids.reserve(sessions_.size());
339
185
  for (const auto& entry : sessions_) {
340
8
    session_ids.push_back(entry.first);
341
8
  }
342
185
  for (const string& session_id : session_ids) {
343
8
    LOG(INFO) << "Destroying remote bootstrap session " << session_id << " due to service shutdown";
344
8
    RemoteBootstrapErrorPB::Code app_error;
345
8
    CHECK_OK(DoEndRemoteBootstrapSession(session_id, false, &app_error));
346
8
  }
347
185
}
348
349
Status RemoteBootstrapServiceImpl::ValidateFetchRequestDataId(
350
        const DataIdPB& data_id,
351
        RemoteBootstrapErrorPB::Code* app_error,
352
11.3k
        const scoped_refptr<RemoteBootstrapSession>& session) const {
353
11.3k
  int num_set = data_id.has_wal_segment_seqno() + data_id.has_file_name();
354
11.3k
  if (PREDICT_FALSE(num_set != 1)) {
355
2
    *app_error = RemoteBootstrapErrorPB::INVALID_REMOTE_BOOTSTRAP_REQUEST;
356
2
    return STATUS(InvalidArgument,
357
2
        Substitute("Only one of segment sequence number, and file name can be specified. "
358
2
                   "DataTypeID: $0", data_id.ShortDebugString()));
359
2
  }
360
361
11.3k
  return session->ValidateDataId(data_id);
362
11.3k
}
363
364
13.4k
void RemoteBootstrapServiceImpl::SessionData::ResetExpiration() {
365
13.4k
  expiration = CoarseMonoClock::now() + FLAGS_remote_bootstrap_idle_timeout_ms * 1ms;
366
13.4k
}
367
368
Status RemoteBootstrapServiceImpl::DoEndRemoteBootstrapSession(
369
        const std::string& session_id,
370
        bool session_succeeded,
371
2.03k
        RemoteBootstrapErrorPB::Code* app_error) {
372
2.03k
  auto it = sessions_.find(session_id);
373
2.03k
  if (it == sessions_.end()) {
374
2
    *app_error = RemoteBootstrapErrorPB::NO_SESSION;
375
2
    return STATUS_FORMAT(NotFound, "End of unknown session id: $0", session_id);
376
2
  }
377
2.03k
  auto session = it->second.session;
378
379
2.03k
  if (session_succeeded || 
session->Succeeded()15
) {
380
2.02k
    session->SetSuccess();
381
382
2.02k
    if (PREDICT_FALSE(FLAGS_TEST_inject_latency_before_change_role_secs)) {
383
5
      LOG(INFO) << "Injecting latency for test";
384
5
      SleepFor(MonoDelta::FromSeconds(FLAGS_TEST_inject_latency_before_change_role_secs));
385
5
    }
386
387
2.02k
    if (PREDICT_FALSE(FLAGS_TEST_skip_change_role)) {
388
0
      LOG(INFO) << "Not changing role for " << session->requestor_uuid()
389
0
                << " because flag FLAGS_TEST_skip_change_role is set";
390
0
      return Status::OK();
391
0
    }
392
393
2.02k
    MAYBE_FAULT(FLAGS_TEST_fault_crash_leader_before_changing_role);
394
395
2.02k
    MonoTime deadline =
396
2.02k
        MonoTime::Now() +
397
2.02k
        MonoDelta::FromMilliseconds(FLAGS_remote_bootstrap_change_role_timeout_ms);
398
2.02k
    for (;;) {
399
2.02k
      Status status = session->ChangeRole();
400
2.02k
      if (status.ok()) {
401
1.95k
        LOG(INFO) << "ChangeRole succeeded for bootstrap session " << session_id;
402
1.95k
        MAYBE_FAULT(FLAGS_TEST_fault_crash_leader_after_changing_role);
403
1.95k
        break;
404
1.95k
      }
405
65
      LOG(WARNING) << "ChangeRole failed for bootstrap session " << session_id
406
65
                   << ", error : " << status;
407
65
      if (!status.IsLeaderHasNoLease() || 
MonoTime::Now() >= deadline0
) {
408
65
        it->second.ResetExpiration();
409
65
        return Status::OK();
410
65
      }
411
65
    }
412
2.02k
  } else {
413
10
    LOG(ERROR) << "Remote bootstrap session " << session_id << " on tablet " << session->tablet_id()
414
10
               << " with peer " << session->requestor_uuid() << " failed. session_succeeded = "
415
10
               << session_succeeded;
416
10
  }
417
418
1.96k
  return Status::OK();
419
2.03k
}
420
421
16.7k
void RemoteBootstrapServiceImpl::EndExpiredSessions() {
422
794k
  do {
423
794k
    std::lock_guard<std::mutex> l(sessions_mutex_);
424
794k
    auto now = CoarseMonoClock::Now();
425
426
794k
    std::vector<string> expired_session_ids;
427
794k
    for (const auto& entry : sessions_) {
428
391
      if (entry.second.expiration < now) {
429
5
        expired_session_ids.push_back(entry.first);
430
5
      }
431
391
    }
432
794k
    for (const string& session_id : expired_session_ids) {
433
5
      LOG(INFO) << "Remote bootstrap session " << session_id
434
5
                << " has expired. Terminating session.";
435
5
      RemoteBootstrapErrorPB::Code app_error;
436
5
      CHECK_OK(DoEndRemoteBootstrapSession(session_id, false, &app_error));
437
5
      RemoveSession(session_id);
438
5
    }
439
794k
  } while (!shutdown_latch_.WaitFor(MonoDelta::FromMilliseconds(
440
794k
                                    FLAGS_remote_bootstrap_timeout_poll_period_ms)));
441
16.7k
}
442
443
} // namespace tserver
444
} // namespace yb