/Users/deen/code/yugabyte-db/src/yb/tserver/remote_bootstrap_service.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tserver/remote_bootstrap_service.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <string> |
37 | | #include <vector> |
38 | | |
39 | | #include <gflags/gflags.h> |
40 | | #include <glog/logging.h> |
41 | | |
42 | | #include "yb/common/wire_protocol.h" |
43 | | |
44 | | #include "yb/consensus/log_util.h" |
45 | | |
46 | | #include "yb/gutil/casts.h" |
47 | | |
48 | | #include "yb/rpc/rpc_context.h" |
49 | | |
50 | | #include "yb/tablet/tablet_peer.h" |
51 | | |
52 | | #include "yb/tserver/tablet_peer_lookup.h" |
53 | | |
54 | | #include "yb/util/crc.h" |
55 | | #include "yb/util/fault_injection.h" |
56 | | #include "yb/util/flag_tags.h" |
57 | | #include "yb/util/status_format.h" |
58 | | #include "yb/util/status_log.h" |
59 | | #include "yb/util/thread.h" |
60 | | |
61 | | using namespace std::literals; |
62 | | |
63 | | // Note, this macro assumes the existence of a local var named 'context'. |
64 | | #define RPC_RETURN_APP_ERROR(app_err, message, s) \ |
65 | 1.02k | do { \ |
66 | 1.02k | SetupErrorAndRespond(&context, app_err, message, s); \ |
67 | 1.02k | return; \ |
68 | 0 | } while (false) |
69 | | |
70 | | #define RPC_RETURN_NOT_OK(expr, app_err, message) \ |
71 | 17.0k | do { \ |
72 | 17.0k | Status s = (expr); \ |
73 | 17.0k | if (!s.ok()) { \ |
74 | 1.01k | RPC_RETURN_APP_ERROR(app_err, message, s); \ |
75 | 1.01k | } \ |
76 | 15.9k | } while (false) |
77 | | |
78 | | DEFINE_uint64(remote_bootstrap_idle_timeout_ms, 180000, |
79 | | "Amount of time without activity before a remote bootstrap " |
80 | | "session will expire, in millis"); |
81 | | TAG_FLAG(remote_bootstrap_idle_timeout_ms, hidden); |
82 | | |
83 | | DEFINE_uint64(remote_bootstrap_timeout_poll_period_ms, 10000, |
84 | | "How often the remote_bootstrap service polls for expired " |
85 | | "remote bootstrap sessions, in millis"); |
86 | | TAG_FLAG(remote_bootstrap_timeout_poll_period_ms, hidden); |
87 | | |
88 | | DEFINE_test_flag(double, fault_crash_on_handle_rb_fetch_data, 0.0, |
89 | | "Fraction of the time when the tablet will crash while " |
90 | | "servicing a RemoteBootstrapService FetchData() RPC call."); |
91 | | |
92 | | DEFINE_test_flag(uint64, inject_latency_before_change_role_secs, 0, |
93 | | "Number of seconds to sleep before we call ChangeRole."); |
94 | | |
95 | | DEFINE_test_flag(bool, skip_change_role, false, |
96 | | "When set, we don't call ChangeRole after successfully finishing a remote " |
97 | | "bootstrap."); |
98 | | |
99 | | DEFINE_test_flag(double, fault_crash_leader_before_changing_role, 0.0, |
100 | | "The leader will crash before changing the role (from PRE_VOTER or PRE_OBSERVER " |
101 | | "to VOTER or OBSERVER respectively) of the tablet server it is remote " |
102 | | "bootstrapping."); |
103 | | |
104 | | DEFINE_test_flag(double, fault_crash_leader_after_changing_role, 0.0, |
105 | | "The leader will crash after successfully sending a ChangeConfig (CHANGE_ROLE " |
106 | | "from PRE_VOTER or PRE_OBSERVER to VOTER or OBSERVER respectively) for the tablet " |
107 | | "server it is remote bootstrapping, but before it sends a success response."); |
108 | | |
109 | | DEFINE_uint64(remote_bootstrap_change_role_timeout_ms, 15000, |
110 | | "Timeout for change role operation during remote bootstrap."); |
111 | | |
112 | | namespace yb { |
113 | | namespace tserver { |
114 | | |
115 | | using crc::Crc32c; |
116 | | using strings::Substitute; |
117 | | using tablet::TabletPeer; |
118 | | |
119 | | static void SetupErrorAndRespond(rpc::RpcContext* context, |
120 | | RemoteBootstrapErrorPB::Code code, |
121 | | const string& message, |
122 | 1.02k | const Status& s) { |
123 | 1.02k | LOG(WARNING) << "Error handling RemoteBootstrapService RPC request from " |
124 | 1.02k | << context->requestor_string() << ": " |
125 | 1.02k | << s.ToString(); |
126 | 1.02k | RemoteBootstrapErrorPB error; |
127 | 1.02k | StatusToPB(s, error.mutable_status()); |
128 | 1.02k | error.set_code(code); |
129 | 1.02k | context->RespondApplicationError(RemoteBootstrapErrorPB::remote_bootstrap_error_ext.number(), |
130 | 1.02k | message, error); |
131 | 1.02k | } |
132 | | |
133 | | RemoteBootstrapServiceImpl::RemoteBootstrapServiceImpl( |
134 | | FsManager* fs_manager, |
135 | | TabletPeerLookupIf* tablet_peer_lookup, |
136 | | const scoped_refptr<MetricEntity>& metric_entity) |
137 | | : RemoteBootstrapServiceIf(metric_entity), |
138 | | fs_manager_(CHECK_NOTNULL(fs_manager)), |
139 | | tablet_peer_lookup_(CHECK_NOTNULL(tablet_peer_lookup)), |
140 | 11.2k | shutdown_latch_(1) { |
141 | 11.2k | CHECK_OK(Thread::Create("remote-bootstrap", "rb-session-exp", |
142 | 11.2k | &RemoteBootstrapServiceImpl::EndExpiredSessions, this, |
143 | 11.2k | &session_expiration_thread_)); |
144 | 11.2k | } |
145 | | |
146 | 160 | RemoteBootstrapServiceImpl::~RemoteBootstrapServiceImpl() { |
147 | 160 | } |
148 | | |
149 | | void RemoteBootstrapServiceImpl::BeginRemoteBootstrapSession( |
150 | | const BeginRemoteBootstrapSessionRequestPB* req, |
151 | | BeginRemoteBootstrapSessionResponsePB* resp, |
152 | 1.45k | rpc::RpcContext context) { |
153 | 1.45k | const string& requestor_uuid = req->requestor_uuid(); |
154 | 1.45k | const string& tablet_id = req->tablet_id(); |
155 | | // For now, we use the requestor_uuid with the tablet id as the session id, |
156 | | // but there is no guarantee this will not change in the future. |
157 | 1.45k | MonoTime now = MonoTime::Now(); |
158 | 1.45k | const string session_id = Substitute("$0-$1-$2", requestor_uuid, tablet_id, now.ToString()); |
159 | | |
160 | 1.45k | std::shared_ptr<TabletPeer> tablet_peer; |
161 | 1.45k | RPC_RETURN_NOT_OK(tablet_peer_lookup_->GetTabletPeer(tablet_id, &tablet_peer), |
162 | 1.45k | RemoteBootstrapErrorPB::TABLET_NOT_FOUND, |
163 | 1.45k | Substitute("Unable to find specified tablet: $0", tablet_id)); |
164 | 1.45k | RPC_RETURN_NOT_OK(tablet_peer->CheckRunning(), |
165 | 1.45k | RemoteBootstrapErrorPB::TABLET_NOT_FOUND, |
166 | 1.45k | Substitute("Tablet is not running yet: $0", tablet_id)); |
167 | | |
168 | 1.44k | scoped_refptr<RemoteBootstrapSession> session; |
169 | 1.44k | { |
170 | 1.44k | std::lock_guard<std::mutex> l(sessions_mutex_); |
171 | 1.44k | auto it = sessions_.find(session_id); |
172 | 1.44k | if (it == sessions_.end()) { |
173 | 1.44k | LOG(INFO) << "Beginning new remote bootstrap session on tablet " << tablet_id |
174 | 1.44k | << " from peer " << requestor_uuid << " at " << context.requestor_string() |
175 | 1.44k | << ": session id = " << session_id; |
176 | 1.44k | session.reset(new RemoteBootstrapSession( |
177 | 1.44k | tablet_peer, session_id, requestor_uuid, &nsessions_)); |
178 | 1.44k | it = sessions_.emplace(session_id, SessionData{session, CoarseTimePoint()}).first; |
179 | 1.44k | auto new_nsessions = nsessions_.fetch_add(1, std::memory_order_acq_rel) + 1; |
180 | 0 | LOG_IF(DFATAL, implicit_cast<size_t>(new_nsessions) != sessions_.size()) |
181 | 0 | << "nsessions_ " << new_nsessions << " != number of sessions " << sessions_.size(); |
182 | 0 | } else { |
183 | 0 | session = it->second.session; |
184 | 0 | LOG(INFO) << "Re-initializing existing remote bootstrap session on tablet " << tablet_id |
185 | 0 | << " from peer " << requestor_uuid << " at " << context.requestor_string() |
186 | 0 | << ": session id = " << session_id; |
187 | 0 | } |
188 | 1.44k | it->second.ResetExpiration(); |
189 | 1.44k | } |
190 | | |
191 | 1.44k | RPC_RETURN_NOT_OK(session->Init(), |
192 | 1.44k | RemoteBootstrapErrorPB::UNKNOWN_ERROR, |
193 | 1.44k | Substitute("Error initializing remote bootstrap session for tablet $0", |
194 | 1.44k | tablet_id)); |
195 | | |
196 | 1.44k | resp->set_session_id(session_id); |
197 | 1.44k | resp->set_session_idle_timeout_millis(FLAGS_remote_bootstrap_idle_timeout_ms); |
198 | 1.44k | resp->mutable_superblock()->CopyFrom(session->tablet_superblock()); |
199 | 1.44k | resp->mutable_initial_committed_cstate()->CopyFrom(session->initial_committed_cstate()); |
200 | | |
201 | 1.44k | auto const& log_segments = session->log_segments(); |
202 | 1.44k | resp->mutable_deprecated_wal_segment_seqnos()->Reserve(narrow_cast<int>(log_segments.size())); |
203 | 1.69k | for (const scoped_refptr<log::ReadableLogSegment>& segment : log_segments) { |
204 | 1.69k | resp->add_deprecated_wal_segment_seqnos(segment->header().sequence_number()); |
205 | 1.69k | } |
206 | 1.44k | if (!log_segments.empty()) { |
207 | 1.44k | resp->set_first_wal_segment_seqno(log_segments.front()->header().sequence_number()); |
208 | 1.44k | } |
209 | | |
210 | 1.44k | context.RespondSuccess(); |
211 | 1.44k | } |
212 | | |
213 | | void RemoteBootstrapServiceImpl::CheckSessionActive( |
214 | | const CheckRemoteBootstrapSessionActiveRequestPB* req, |
215 | | CheckRemoteBootstrapSessionActiveResponsePB* resp, |
216 | 2 | rpc::RpcContext context) { |
217 | | // Look up and validate remote bootstrap session. |
218 | 2 | std::lock_guard<std::mutex> l(sessions_mutex_); |
219 | 2 | auto it = sessions_.find(req->session_id()); |
220 | 2 | if (it != sessions_.end()) { |
221 | 1 | if (req->keepalive()) { |
222 | 0 | it->second.ResetExpiration(); |
223 | 0 | } |
224 | 1 | resp->set_session_is_active(true); |
225 | 1 | context.RespondSuccess(); |
226 | 1 | } else { |
227 | 1 | resp->set_session_is_active(false); |
228 | 1 | context.RespondSuccess(); |
229 | 1 | } |
230 | 2 | } |
231 | | |
232 | | void RemoteBootstrapServiceImpl::FetchData(const FetchDataRequestPB* req, |
233 | | FetchDataResponsePB* resp, |
234 | 5.83k | rpc::RpcContext context) { |
235 | 5.83k | const string& session_id = req->session_id(); |
236 | | |
237 | | // Look up and validate remote bootstrap session. |
238 | 5.83k | scoped_refptr<RemoteBootstrapSession> session; |
239 | 5.83k | { |
240 | 5.83k | std::lock_guard<std::mutex> l(sessions_mutex_); |
241 | 5.83k | auto it = sessions_.find(session_id); |
242 | 5.83k | if (it == sessions_.end()) { |
243 | 2 | RPC_RETURN_APP_ERROR( |
244 | 2 | RemoteBootstrapErrorPB::NO_SESSION, "No such session", |
245 | 2 | STATUS_FORMAT(NotFound, "Fetch data for unknown sessions id: $0", session_id)); |
246 | 2 | } |
247 | 5.83k | it->second.ResetExpiration(); |
248 | 5.83k | session = it->second.session; |
249 | 5.83k | } |
250 | | |
251 | 5.83k | session->EnsureRateLimiterIsInitialized(); |
252 | | |
253 | 5.83k | MAYBE_FAULT(FLAGS_TEST_fault_crash_on_handle_rb_fetch_data); |
254 | | |
255 | 5.83k | int64_t rate_limit = session->rate_limiter().GetMaxSizeForNextTransmission(); |
256 | 1 | VLOG(3) << " rate limiter max len: " << rate_limit; |
257 | 5.83k | GetDataPieceInfo info = { |
258 | 5.83k | .offset = req->offset(), |
259 | 5.83k | .client_maxlen = rate_limit == 0 ? req->max_length() : std::min(req->max_length(), rate_limit), |
260 | 5.83k | .data = std::string(), |
261 | 5.83k | .data_size = 0, |
262 | 5.83k | .error_code = RemoteBootstrapErrorPB::UNKNOWN_ERROR, |
263 | 5.83k | }; |
264 | 5.83k | const DataIdPB& data_id = req->data_id(); |
265 | 5.83k | RPC_RETURN_NOT_OK(ValidateFetchRequestDataId(data_id, &info.error_code, session), |
266 | 5.83k | info.error_code, "Invalid DataId"); |
267 | | |
268 | 5.83k | RPC_RETURN_NOT_OK(session->GetDataPiece(data_id, &info), |
269 | 5.83k | info.error_code, "Unable to get piece of data file"); |
270 | | |
271 | 4.82k | session->rate_limiter().UpdateDataSizeAndMaybeSleep(info.data.size()); |
272 | 4.82k | uint32_t crc32 = Crc32c(info.data.data(), info.data.length()); |
273 | | |
274 | 4.82k | DataChunkPB* data_chunk = resp->mutable_chunk(); |
275 | 4.82k | *data_chunk->mutable_data() = std::move(info.data); |
276 | 4.82k | data_chunk->set_total_data_length(info.data_size); |
277 | 4.82k | data_chunk->set_offset(info.offset); |
278 | | |
279 | | // Calculate checksum. |
280 | 4.82k | data_chunk->set_crc32(crc32); |
281 | 4.82k | context.RespondSuccess(); |
282 | 4.82k | } |
283 | | |
284 | | void RemoteBootstrapServiceImpl::EndRemoteBootstrapSession( |
285 | | const EndRemoteBootstrapSessionRequestPB* req, |
286 | | EndRemoteBootstrapSessionResponsePB* resp, |
287 | 1.00k | rpc::RpcContext context) { |
288 | 1.00k | { |
289 | 1.00k | std::lock_guard<std::mutex> l(sessions_mutex_); |
290 | 1.00k | RemoteBootstrapErrorPB::Code app_error; |
291 | 1.00k | RPC_RETURN_NOT_OK(DoEndRemoteBootstrapSession( |
292 | 1.00k | req->session_id(), req->is_success(), &app_error), |
293 | 1.00k | app_error, "No such session"); |
294 | 999 | LOG(INFO) << "Request end of remote bootstrap session " << req->session_id() |
295 | 999 | << " received from " << context.requestor_string(); |
296 | | |
297 | 999 | if (!req->keep_session()) { |
298 | 3 | RemoveSession(req->session_id()); |
299 | 996 | } else { |
300 | 996 | resp->set_session_kept(true); |
301 | 996 | } |
302 | 999 | } |
303 | 999 | context.RespondSuccess(); |
304 | 999 | } |
305 | | |
306 | | void RemoteBootstrapServiceImpl::RemoveSession( |
307 | | const RemoveSessionRequestPB* req, |
308 | | RemoveSessionResponsePB* resp, |
309 | 941 | rpc::RpcContext context) { |
310 | 941 | { |
311 | 941 | std::lock_guard<std::mutex> l(sessions_mutex_); |
312 | 941 | RemoveSession(req->session_id()); |
313 | 941 | } |
314 | 941 | context.RespondSuccess(); |
315 | 941 | } |
316 | | |
317 | 947 | void RemoteBootstrapServiceImpl::RemoveSession(const std::string& session_id) { |
318 | | // Remove the session from the map. |
319 | | // It will get destroyed once there are no outstanding refs. |
320 | 947 | auto it = sessions_.find(session_id); |
321 | 947 | if (it == sessions_.end()) { |
322 | 0 | LOG(WARNING) << "Attempt to remove session with unknown id: " << session_id; |
323 | 0 | return; |
324 | 0 | } |
325 | 947 | LOG(INFO) << "Removing remote bootstrap session " << session_id << " on tablet " |
326 | 947 | << session_id << " with peer " << it->second.session->requestor_uuid(); |
327 | 947 | sessions_.erase(it); |
328 | 947 | nsessions_.fetch_sub(1, std::memory_order_acq_rel); |
329 | 947 | } |
330 | | |
331 | 160 | void RemoteBootstrapServiceImpl::Shutdown() { |
332 | 160 | shutdown_latch_.CountDown(); |
333 | 160 | session_expiration_thread_->Join(); |
334 | | |
335 | 160 | std::lock_guard<std::mutex> lock(sessions_mutex_); |
336 | | // Destroy all remote bootstrap sessions. |
337 | 160 | std::vector<string> session_ids; |
338 | 160 | session_ids.reserve(sessions_.size()); |
339 | 8 | for (const auto& entry : sessions_) { |
340 | 8 | session_ids.push_back(entry.first); |
341 | 8 | } |
342 | 8 | for (const string& session_id : session_ids) { |
343 | 8 | LOG(INFO) << "Destroying remote bootstrap session " << session_id << " due to service shutdown"; |
344 | 8 | RemoteBootstrapErrorPB::Code app_error; |
345 | 8 | CHECK_OK(DoEndRemoteBootstrapSession(session_id, false, &app_error)); |
346 | 8 | } |
347 | 160 | } |
348 | | |
349 | | Status RemoteBootstrapServiceImpl::ValidateFetchRequestDataId( |
350 | | const DataIdPB& data_id, |
351 | | RemoteBootstrapErrorPB::Code* app_error, |
352 | 5.83k | const scoped_refptr<RemoteBootstrapSession>& session) const { |
353 | 5.83k | int num_set = data_id.has_wal_segment_seqno() + data_id.has_file_name(); |
354 | 5.83k | if (PREDICT_FALSE(num_set != 1)) { |
355 | 2 | *app_error = RemoteBootstrapErrorPB::INVALID_REMOTE_BOOTSTRAP_REQUEST; |
356 | 2 | return STATUS(InvalidArgument, |
357 | 2 | Substitute("Only one of segment sequence number, and file name can be specified. " |
358 | 2 | "DataTypeID: $0", data_id.ShortDebugString())); |
359 | 2 | } |
360 | | |
361 | 5.82k | return session->ValidateDataId(data_id); |
362 | 5.82k | } |
363 | | |
364 | 7.31k | void RemoteBootstrapServiceImpl::SessionData::ResetExpiration() { |
365 | 7.31k | expiration = CoarseMonoClock::now() + FLAGS_remote_bootstrap_idle_timeout_ms * 1ms; |
366 | 7.31k | } |
367 | | |
368 | | Status RemoteBootstrapServiceImpl::DoEndRemoteBootstrapSession( |
369 | | const std::string& session_id, |
370 | | bool session_succeeded, |
371 | 1.01k | RemoteBootstrapErrorPB::Code* app_error) { |
372 | 1.01k | auto it = sessions_.find(session_id); |
373 | 1.01k | if (it == sessions_.end()) { |
374 | 2 | *app_error = RemoteBootstrapErrorPB::NO_SESSION; |
375 | 2 | return STATUS_FORMAT(NotFound, "End of unknown session id: $0", session_id); |
376 | 2 | } |
377 | 1.01k | auto session = it->second.session; |
378 | | |
379 | 1.01k | if (session_succeeded || session->Succeeded()) { |
380 | 1.00k | session->SetSuccess(); |
381 | | |
382 | 1.00k | if (PREDICT_FALSE(FLAGS_TEST_inject_latency_before_change_role_secs)) { |
383 | 5 | LOG(INFO) << "Injecting latency for test"; |
384 | 5 | SleepFor(MonoDelta::FromSeconds(FLAGS_TEST_inject_latency_before_change_role_secs)); |
385 | 5 | } |
386 | | |
387 | 1.00k | if (PREDICT_FALSE(FLAGS_TEST_skip_change_role)) { |
388 | 0 | LOG(INFO) << "Not changing role for " << session->requestor_uuid() |
389 | 0 | << " because flag FLAGS_TEST_skip_change_role is set"; |
390 | 0 | return Status::OK(); |
391 | 0 | } |
392 | | |
393 | 1.00k | MAYBE_FAULT(FLAGS_TEST_fault_crash_leader_before_changing_role); |
394 | | |
395 | 1.00k | MonoTime deadline = |
396 | 1.00k | MonoTime::Now() + |
397 | 1.00k | MonoDelta::FromMilliseconds(FLAGS_remote_bootstrap_change_role_timeout_ms); |
398 | 1.00k | for (;;) { |
399 | 1.00k | Status status = session->ChangeRole(); |
400 | 1.00k | if (status.ok()) { |
401 | 965 | LOG(INFO) << "ChangeRole succeeded for bootstrap session " << session_id; |
402 | 965 | MAYBE_FAULT(FLAGS_TEST_fault_crash_leader_after_changing_role); |
403 | 965 | break; |
404 | 965 | } |
405 | 35 | LOG(WARNING) << "ChangeRole failed for bootstrap session " << session_id |
406 | 35 | << ", error : " << status; |
407 | 35 | if (!status.IsLeaderHasNoLease() || MonoTime::Now() >= deadline) { |
408 | 35 | it->second.ResetExpiration(); |
409 | 35 | return Status::OK(); |
410 | 35 | } |
411 | 35 | } |
412 | 10 | } else { |
413 | 10 | LOG(ERROR) << "Remote bootstrap session " << session_id << " on tablet " << session->tablet_id() |
414 | 10 | << " with peer " << session->requestor_uuid() << " failed. session_succeeded = " |
415 | 10 | << session_succeeded; |
416 | 10 | } |
417 | | |
418 | 975 | return Status::OK(); |
419 | 1.01k | } |
420 | | |
421 | 11.2k | void RemoteBootstrapServiceImpl::EndExpiredSessions() { |
422 | 36.9k | do { |
423 | 36.9k | std::lock_guard<std::mutex> l(sessions_mutex_); |
424 | 36.9k | auto now = CoarseMonoClock::Now(); |
425 | | |
426 | 36.9k | std::vector<string> expired_session_ids; |
427 | 599 | for (const auto& entry : sessions_) { |
428 | 599 | if (entry.second.expiration < now) { |
429 | 3 | expired_session_ids.push_back(entry.first); |
430 | 3 | } |
431 | 599 | } |
432 | 3 | for (const string& session_id : expired_session_ids) { |
433 | 3 | LOG(INFO) << "Remote bootstrap session " << session_id |
434 | 3 | << " has expired. Terminating session."; |
435 | 3 | RemoteBootstrapErrorPB::Code app_error; |
436 | 3 | CHECK_OK(DoEndRemoteBootstrapSession(session_id, false, &app_error)); |
437 | 3 | RemoveSession(session_id); |
438 | 3 | } |
439 | 36.9k | } while (!shutdown_latch_.WaitFor(MonoDelta::FromMilliseconds( |
440 | 36.9k | FLAGS_remote_bootstrap_timeout_poll_period_ms))); |
441 | 11.2k | } |
442 | | |
443 | | } // namespace tserver |
444 | | } // namespace yb |