/Users/deen/code/yugabyte-db/src/yb/tablet/tablet_peer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tablet/tablet_peer.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <mutex> |
37 | | #include <string> |
38 | | #include <utility> |
39 | | #include <vector> |
40 | | |
41 | | #include <gflags/gflags.h> |
42 | | |
43 | | #include "yb/consensus/consensus.h" |
44 | | #include "yb/consensus/consensus.pb.h" |
45 | | #include "yb/consensus/consensus_util.h" |
46 | | #include "yb/consensus/log.h" |
47 | | #include "yb/consensus/log_anchor_registry.h" |
48 | | #include "yb/consensus/log_util.h" |
49 | | #include "yb/consensus/opid_util.h" |
50 | | #include "yb/consensus/raft_consensus.h" |
51 | | #include "yb/consensus/retryable_requests.h" |
52 | | #include "yb/consensus/state_change_context.h" |
53 | | |
54 | | #include "yb/docdb/consensus_frontier.h" |
55 | | |
56 | | #include "yb/gutil/casts.h" |
57 | | #include "yb/gutil/strings/substitute.h" |
58 | | #include "yb/gutil/sysinfo.h" |
59 | | |
60 | | #include "yb/rocksdb/db/memtable.h" |
61 | | |
62 | | #include "yb/rpc/messenger.h" |
63 | | #include "yb/rpc/strand.h" |
64 | | #include "yb/rpc/thread_pool.h" |
65 | | |
66 | | #include "yb/tablet/operations/change_metadata_operation.h" |
67 | | #include "yb/tablet/operations/history_cutoff_operation.h" |
68 | | #include "yb/tablet/operations/operation_driver.h" |
69 | | #include "yb/tablet/operations/snapshot_operation.h" |
70 | | #include "yb/tablet/operations/split_operation.h" |
71 | | #include "yb/tablet/operations/truncate_operation.h" |
72 | | #include "yb/tablet/operations/update_txn_operation.h" |
73 | | #include "yb/tablet/operations/write_operation.h" |
74 | | #include "yb/tablet/tablet.h" |
75 | | #include "yb/tablet/tablet.pb.h" |
76 | | #include "yb/tablet/tablet_bootstrap_if.h" |
77 | | #include "yb/tablet/tablet_metadata.h" |
78 | | #include "yb/tablet/tablet_metrics.h" |
79 | | #include "yb/tablet/tablet_peer_mm_ops.h" |
80 | | #include "yb/tablet/tablet_retention_policy.h" |
81 | | #include "yb/tablet/transaction_participant.h" |
82 | | #include "yb/tablet/write_query.h" |
83 | | |
84 | | #include "yb/util/debug-util.h" |
85 | | #include "yb/util/flag_tags.h" |
86 | | #include "yb/util/format.h" |
87 | | #include "yb/util/logging.h" |
88 | | #include "yb/util/metrics.h" |
89 | | #include "yb/util/status_format.h" |
90 | | #include "yb/util/status_log.h" |
91 | | #include "yb/util/stopwatch.h" |
92 | | #include "yb/util/threadpool.h" |
93 | | #include "yb/util/trace.h" |
94 | | |
95 | | using namespace std::literals; |
96 | | using namespace std::placeholders; |
97 | | using std::shared_ptr; |
98 | | using std::string; |
99 | | |
100 | | DEFINE_test_flag(int32, delay_init_tablet_peer_ms, 0, |
101 | | "Wait before executing init tablet peer for specified amount of milliseconds."); |
102 | | |
103 | | DEFINE_int32(cdc_min_replicated_index_considered_stale_secs, 900, |
104 | | "If cdc_min_replicated_index hasn't been replicated in this amount of time, we reset its" |
105 | | "value to max int64 to avoid retaining any logs"); |
106 | | |
107 | | DEFINE_bool(propagate_safe_time, true, "Propagate safe time to read from leader to followers"); |
108 | | |
109 | | DECLARE_int32(ysql_transaction_abort_timeout_ms); |
110 | | |
111 | | namespace yb { |
112 | | namespace tablet { |
113 | | |
114 | | METRIC_DEFINE_coarse_histogram(table, op_prepare_queue_length, "Operation Prepare Queue Length", |
115 | | MetricUnit::kTasks, |
116 | | "Number of operations waiting to be prepared within this tablet. " |
117 | | "High queue lengths indicate that the server is unable to process " |
118 | | "operations as fast as they are being written to the WAL."); |
119 | | |
120 | | METRIC_DEFINE_coarse_histogram(table, op_prepare_queue_time, "Operation Prepare Queue Time", |
121 | | MetricUnit::kMicroseconds, |
122 | | "Time that operations spent waiting in the prepare queue before being " |
123 | | "processed. High queue times indicate that the server is unable to " |
124 | | "process operations as fast as they are being written to the WAL."); |
125 | | |
126 | | METRIC_DEFINE_coarse_histogram(table, op_prepare_run_time, "Operation Prepare Run Time", |
127 | | MetricUnit::kMicroseconds, |
128 | | "Time that operations spent being prepared in the tablet. " |
129 | | "High values may indicate that the server is under-provisioned or " |
130 | | "that operations are experiencing high contention with one another for " |
131 | | "locks."); |
132 | | |
133 | | using consensus::Consensus; |
134 | | using consensus::ConsensusBootstrapInfo; |
135 | | using consensus::ConsensusMetadata; |
136 | | using consensus::ConsensusOptions; |
137 | | using consensus::ConsensusRound; |
138 | | using consensus::StateChangeContext; |
139 | | using consensus::StateChangeReason; |
140 | | using consensus::RaftConfigPB; |
141 | | using consensus::RaftPeerPB; |
142 | | using consensus::RaftConsensus; |
143 | | using consensus::ReplicateMsg; |
144 | | using consensus::OpIdType; |
145 | | using log::Log; |
146 | | using log::LogAnchorRegistry; |
147 | | using rpc::Messenger; |
148 | | using strings::Substitute; |
149 | | using tserver::TabletServerErrorPB; |
150 | | |
151 | | // ============================================================================ |
152 | | // Tablet Peer |
153 | | // ============================================================================ |
154 | | TabletPeer::TabletPeer( |
155 | | const RaftGroupMetadataPtr& meta, |
156 | | const consensus::RaftPeerPB& local_peer_pb, |
157 | | const scoped_refptr<server::Clock>& clock, |
158 | | const std::string& permanent_uuid, |
159 | | Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk, |
160 | | MetricRegistry* metric_registry, |
161 | | TabletSplitter* tablet_splitter, |
162 | | const std::shared_future<client::YBClient*>& client_future) |
163 | | : meta_(meta), |
164 | | tablet_id_(meta->raft_group_id()), |
165 | | local_peer_pb_(local_peer_pb), |
166 | | state_(RaftGroupStatePB::NOT_STARTED), |
167 | | operation_tracker_(consensus::MakeTabletLogPrefix(tablet_id_, permanent_uuid)), |
168 | | status_listener_(new TabletStatusListener(meta)), |
169 | | clock_(clock), |
170 | | log_anchor_registry_(new LogAnchorRegistry()), |
171 | | mark_dirty_clbk_(std::move(mark_dirty_clbk)), |
172 | | permanent_uuid_(permanent_uuid), |
173 | | preparing_operations_counter_(operation_tracker_.LogPrefix()), |
174 | | metric_registry_(metric_registry), |
175 | | tablet_splitter_(tablet_splitter), |
176 | 88.6k | client_future_(client_future) {} |
177 | | |
178 | 47.0k | TabletPeer::~TabletPeer() { |
179 | 47.0k | std::lock_guard<simple_spinlock> lock(lock_); |
180 | | // We should either have called Shutdown(), or we should have never called |
181 | | // Init(). |
182 | 0 | LOG_IF_WITH_PREFIX(DFATAL, tablet_) << "TabletPeer not fully shut down."; |
183 | 47.0k | } |
184 | | |
185 | | Status TabletPeer::InitTabletPeer( |
186 | | const TabletPtr& tablet, |
187 | | const std::shared_ptr<MemTracker>& server_mem_tracker, |
188 | | Messenger* messenger, |
189 | | rpc::ProxyCache* proxy_cache, |
190 | | const scoped_refptr<Log>& log, |
191 | | const scoped_refptr<MetricEntity>& table_metric_entity, |
192 | | const scoped_refptr<MetricEntity>& tablet_metric_entity, |
193 | | ThreadPool* raft_pool, |
194 | | ThreadPool* tablet_prepare_pool, |
195 | | consensus::RetryableRequests* retryable_requests, |
196 | 88.7k | consensus::MultiRaftManager* multi_raft_manager) { |
197 | 21 | DCHECK(tablet) << "A TabletPeer must be provided with a Tablet"; |
198 | 17 | DCHECK(log) << "A TabletPeer must be provided with a Log"; |
199 | | |
200 | 88.7k | if (FLAGS_TEST_delay_init_tablet_peer_ms > 0) { |
201 | 0 | std::this_thread::sleep_for(FLAGS_TEST_delay_init_tablet_peer_ms * 1ms); |
202 | 0 | } |
203 | | |
204 | 88.7k | { |
205 | 88.7k | std::lock_guard<simple_spinlock> lock(lock_); |
206 | 88.7k | auto state = state_.load(std::memory_order_acquire); |
207 | 88.7k | if (state != RaftGroupStatePB::BOOTSTRAPPING) { |
208 | 0 | return STATUS_FORMAT( |
209 | 0 | IllegalState, "Invalid tablet state for init: $0", RaftGroupStatePB_Name(state)); |
210 | 0 | } |
211 | 88.7k | tablet_ = tablet; |
212 | 88.7k | proxy_cache_ = proxy_cache; |
213 | 88.7k | log_ = log; |
214 | | // "Publish" the log pointer so it can be retrieved using the log() accessor. |
215 | 88.7k | log_atomic_ = log.get(); |
216 | 88.7k | service_thread_pool_ = &messenger->ThreadPool(); |
217 | 88.7k | strand_.reset(new rpc::Strand(&messenger->ThreadPool())); |
218 | 88.7k | messenger_ = messenger; |
219 | | |
220 | 2.09k | tablet->SetMemTableFlushFilterFactory([log] { |
221 | 2.09k | auto largest_log_op_index = log->GetLatestEntryOpId().index; |
222 | 2.27k | return [largest_log_op_index] (const rocksdb::MemTable& memtable) -> Result<bool> { |
223 | 2.27k | auto frontiers = memtable.Frontiers(); |
224 | 2.27k | if (frontiers) { |
225 | 2.27k | const auto largest_memtable_op_index = |
226 | 2.27k | down_cast<const docdb::ConsensusFrontier&>(frontiers->Largest()).op_id().index; |
227 | | // We can only flush this memtable if all operations written to it have also been written |
228 | | // to the log (maybe not synced, if durable_wal_write is disabled, but that's OK). |
229 | 2.27k | auto should_flush = largest_memtable_op_index <= largest_log_op_index; |
230 | 2.27k | if (!should_flush) { |
231 | 26 | LOG(WARNING) |
232 | 26 | << "Skipping flush on memtable with ops ahead of log. " |
233 | 26 | << "Memtable index: " << largest_memtable_op_index |
234 | 26 | << " - log index: " << largest_log_op_index; |
235 | 26 | } |
236 | 2.27k | return should_flush; |
237 | 2.27k | } |
238 | | |
239 | | // It is correct to not have frontiers when memtable is empty |
240 | 5 | if (memtable.IsEmpty()) { |
241 | 1 | return true; |
242 | 1 | } |
243 | | |
244 | | // This is a degenerate case that should ideally never occur. An empty memtable got into the |
245 | | // list of immutable memtables. We say it is OK to flush it and move on. |
246 | 4 | static const char* error_msg = |
247 | 4 | "A memtable with no frontiers set found when deciding what memtables to " |
248 | 4 | "flush! This should not happen."; |
249 | 4 | LOG(ERROR) << error_msg << " Stack trace:\n" << GetStackTrace(); |
250 | 4 | return STATUS(IllegalState, error_msg); |
251 | 4 | }; |
252 | 2.09k | }); |
253 | | |
254 | 88.7k | tablet_->SetCleanupPool(raft_pool); |
255 | | |
256 | 88.7k | ConsensusOptions options; |
257 | 88.7k | options.tablet_id = meta_->raft_group_id(); |
258 | | |
259 | 88.7k | TRACE("Creating consensus instance"); |
260 | | |
261 | 88.7k | std::unique_ptr<ConsensusMetadata> cmeta; |
262 | 88.7k | RETURN_NOT_OK(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id_, |
263 | 88.7k | meta_->fs_manager()->uuid(), &cmeta)); |
264 | | |
265 | 88.7k | if (retryable_requests) { |
266 | 83.4k | retryable_requests->SetMetricEntity(tablet->GetTabletMetricsEntity()); |
267 | 83.4k | } |
268 | | |
269 | 88.7k | consensus_ = RaftConsensus::Create( |
270 | 88.7k | options, |
271 | 88.7k | std::move(cmeta), |
272 | 88.7k | local_peer_pb_, |
273 | 88.7k | table_metric_entity, |
274 | 88.7k | tablet_metric_entity, |
275 | 88.7k | clock_, |
276 | 88.7k | this, |
277 | 88.7k | messenger, |
278 | 88.7k | proxy_cache_, |
279 | 88.7k | log_.get(), |
280 | 88.7k | server_mem_tracker, |
281 | 88.7k | tablet_->mem_tracker(), |
282 | 88.7k | mark_dirty_clbk_, |
283 | 88.7k | tablet_->table_type(), |
284 | 88.7k | raft_pool, |
285 | 88.7k | retryable_requests, |
286 | 88.7k | multi_raft_manager); |
287 | 88.7k | has_consensus_.store(true, std::memory_order_release); |
288 | | |
289 | 88.7k | tablet_->SetHybridTimeLeaseProvider(std::bind(&TabletPeer::HybridTimeLease, this, _1, _2)); |
290 | 88.7k | operation_tracker_.SetPostTracker( |
291 | 88.7k | std::bind(&RaftConsensus::TrackOperationMemory, consensus_.get(), _1)); |
292 | | |
293 | 88.7k | prepare_thread_ = std::make_unique<Preparer>(consensus_.get(), tablet_prepare_pool); |
294 | | |
295 | 88.7k | ChangeConfigReplicated(RaftConfig()); // Set initial flag value. |
296 | 88.7k | } |
297 | | |
298 | 88.7k | RETURN_NOT_OK(prepare_thread_->Start()); |
299 | | |
300 | 88.7k | if (tablet_->metrics() != nullptr) { |
301 | 88.7k | TRACE("Starting instrumentation"); |
302 | 88.7k | operation_tracker_.StartInstrumentation(tablet_->GetTabletMetricsEntity()); |
303 | 88.7k | } |
304 | 88.7k | operation_tracker_.StartMemoryTracking(tablet_->mem_tracker()); |
305 | | |
306 | 88.7k | if (tablet_->transaction_coordinator()) { |
307 | 29.1k | tablet_->transaction_coordinator()->Start(); |
308 | 29.1k | } |
309 | | |
310 | 88.7k | if (tablet_->transaction_participant()) { |
311 | 25.8k | tablet_->transaction_participant()->Start(); |
312 | 25.8k | } |
313 | | |
314 | 88.7k | RETURN_NOT_OK(set_cdc_min_replicated_index(meta_->cdc_min_replicated_index())); |
315 | | |
316 | 88.7k | TRACE("TabletPeer::Init() finished"); |
317 | 153 | VLOG_WITH_PREFIX(2) << "Peer Initted"; |
318 | | |
319 | 88.7k | return Status::OK(); |
320 | 88.7k | } |
321 | | |
322 | | Result<FixedHybridTimeLease> TabletPeer::HybridTimeLease( |
323 | 33.2M | HybridTime min_allowed, CoarseTimePoint deadline) { |
324 | 33.2M | auto time = VERIFY_RESULT(WaitUntil(clock_.get(), min_allowed, deadline)); |
325 | | // min_allowed could contain non zero logical part, so we add one microsecond to be sure that |
326 | | // the resulting ht_lease is at least min_allowed. |
327 | 33.2M | auto min_allowed_micros = min_allowed.CeilPhysicalValueMicros(); |
328 | 33.2M | MicrosTime lease_micros = VERIFY_RESULT(consensus_->MajorityReplicatedHtLeaseExpiration( |
329 | 33.2M | min_allowed_micros, deadline)); |
330 | 33.2M | if (lease_micros >= kMaxHybridTimePhysicalMicros) { |
331 | | // This could happen when leader leases are disabled. |
332 | 383k | return FixedHybridTimeLease(); |
333 | 383k | } |
334 | 32.8M | return FixedHybridTimeLease { |
335 | 32.8M | .time = time, |
336 | 32.8M | .lease = HybridTime(lease_micros, /* logical */ 0) |
337 | 32.8M | }; |
338 | 32.8M | } |
339 | | |
340 | 13.3M | Result<HybridTime> TabletPeer::PreparePeerRequest() { |
341 | 13.3M | auto leader_term = consensus_->GetLeaderState(/* allow_stale= */ true).term; |
342 | 13.3M | if (leader_term >= 0) { |
343 | 10.2M | auto last_write_ht = tablet_->mvcc_manager()->LastReplicatedHybridTime(); |
344 | 10.2M | auto propagated_history_cutoff = |
345 | 10.2M | tablet_->RetentionPolicy()->HistoryCutoffToPropagate(last_write_ht); |
346 | | |
347 | 10.2M | if (propagated_history_cutoff) { |
348 | 0 | VLOG_WITH_PREFIX(2) << "Propagate history cutoff: " << propagated_history_cutoff; |
349 | |
|
350 | 0 | auto operation = std::make_unique<HistoryCutoffOperation>(tablet_.get()); |
351 | 0 | auto request = operation->AllocateRequest(); |
352 | 0 | request->set_history_cutoff(propagated_history_cutoff.ToUint64()); |
353 | |
|
354 | 0 | Submit(std::move(operation), leader_term); |
355 | 0 | } |
356 | 10.2M | } |
357 | | |
358 | 13.3M | if (!FLAGS_propagate_safe_time) { |
359 | 6 | return HybridTime::kInvalid; |
360 | 6 | } |
361 | | |
362 | | // Get the current majority-replicated HT leader lease without any waiting. |
363 | 13.3M | auto ht_lease = VERIFY_RESULT(HybridTimeLease( |
364 | 13.3M | /* min_allowed= */ HybridTime::kMin, /* deadline */ CoarseTimePoint::max())); |
365 | 13.3M | return tablet_->mvcc_manager()->SafeTime(ht_lease); |
366 | 13.3M | } |
367 | | |
368 | 15.1M | void TabletPeer::MajorityReplicated() { |
369 | 15.1M | auto ht_lease = HybridTimeLease( |
370 | 15.1M | /* min_allowed= */ HybridTime::kMin, /* deadline */ CoarseTimePoint::max()); |
371 | 15.1M | if (!ht_lease.ok()) { |
372 | 0 | LOG_WITH_PREFIX(DFATAL) << "Failed to get current lease: " << ht_lease.status(); |
373 | 0 | return; |
374 | 0 | } |
375 | | |
376 | 15.1M | tablet_->mvcc_manager()->UpdatePropagatedSafeTimeOnLeader(*ht_lease); |
377 | 15.1M | } |
378 | | |
379 | 98.8k | void TabletPeer::ChangeConfigReplicated(const RaftConfigPB& config) { |
380 | 98.8k | tablet_->mvcc_manager()->SetLeaderOnlyMode(config.peers_size() == 1); |
381 | 98.8k | } |
382 | | |
383 | 13.1M | uint64_t TabletPeer::NumSSTFiles() { |
384 | 13.1M | return tablet_->GetCurrentVersionNumSSTFiles(); |
385 | 13.1M | } |
386 | | |
387 | 136k | void TabletPeer::ListenNumSSTFilesChanged(std::function<void()> listener) { |
388 | 136k | tablet_->ListenNumSSTFilesChanged(std::move(listener)); |
389 | 136k | } |
390 | | |
391 | 2.73M | Status TabletPeer::CheckOperationAllowed(const OpId& op_id, consensus::OperationType op_type) { |
392 | 2.73M | return tablet_->CheckOperationAllowed(op_id, op_type); |
393 | 2.73M | } |
394 | | |
395 | 88.7k | Status TabletPeer::Start(const ConsensusBootstrapInfo& bootstrap_info) { |
396 | 88.7k | { |
397 | 88.7k | std::lock_guard<simple_spinlock> l(state_change_lock_); |
398 | 88.7k | TRACE("Starting consensus"); |
399 | | |
400 | 18.4E | VLOG_WITH_PREFIX(2) << "Peer starting"; |
401 | | |
402 | 11 | VLOG(2) << "RaftConfig before starting: " << consensus_->CommittedConfig().DebugString(); |
403 | | |
404 | | // If tablet was previously considered shutdown w.r.t. metrics, |
405 | | // fix that for a tablet now being reinstated. |
406 | 41 | DVLOG_WITH_PREFIX(3) |
407 | 41 | << "Remove from set of tablets that have been shutdown so as to allow reporting metrics"; |
408 | 88.7k | metric_registry_->tablets_shutdown_erase(tablet_id()); |
409 | | |
410 | 88.7k | RETURN_NOT_OK(consensus_->Start(bootstrap_info)); |
411 | 88.7k | RETURN_NOT_OK(UpdateState(RaftGroupStatePB::BOOTSTRAPPING, RaftGroupStatePB::RUNNING, |
412 | 88.7k | "Incorrect state to start TabletPeer, ")); |
413 | 88.7k | } |
414 | | // The context tracks that the current caller does not hold the lock for consensus state. |
415 | | // So mark dirty callback, e.g., consensus->ConsensusState() for master consensus callback of |
416 | | // SysCatalogStateChanged, can get the lock when needed. |
417 | 88.7k | auto context = |
418 | 88.7k | std::make_shared<StateChangeContext>(StateChangeReason::TABLET_PEER_STARTED, false); |
419 | | // Because we changed the tablet state, we need to re-report the tablet to the master. |
420 | 88.7k | mark_dirty_clbk_.Run(context); |
421 | | |
422 | 88.7k | return tablet_->EnableCompactions(/* non_abortable_ops_pause */ nullptr); |
423 | 88.7k | } |
424 | | |
425 | 89.7k | consensus::RaftConfigPB TabletPeer::RaftConfig() const { |
426 | 0 | CHECK(consensus_) << "consensus is null"; |
427 | 89.7k | return consensus_->CommittedConfig(); |
428 | 89.7k | } |
429 | | |
430 | 47.8k | bool TabletPeer::StartShutdown(IsDropTable is_drop_table) { |
431 | 47.8k | LOG_WITH_PREFIX(INFO) << "Initiating TabletPeer shutdown"; |
432 | | |
433 | 47.8k | { |
434 | 47.8k | std::lock_guard<decltype(lock_)> lock(lock_); |
435 | 47.8k | if (tablet_) { |
436 | 47.7k | tablet_->StartShutdown(is_drop_table); |
437 | 47.7k | } |
438 | 47.8k | } |
439 | | |
440 | 47.8k | { |
441 | 47.8k | RaftGroupStatePB state = state_.load(std::memory_order_acquire); |
442 | 47.8k | for (;;) { |
443 | 47.8k | if (state == RaftGroupStatePB::QUIESCING || state == RaftGroupStatePB::SHUTDOWN) { |
444 | 24 | return false; |
445 | 24 | } |
446 | 47.7k | if (state_.compare_exchange_strong( |
447 | 47.7k | state, RaftGroupStatePB::QUIESCING, std::memory_order_acq_rel)) { |
448 | 47.7k | LOG_WITH_PREFIX(INFO) << "Started shutdown from state: " << RaftGroupStatePB_Name(state); |
449 | 47.7k | break; |
450 | 47.7k | } |
451 | 47.7k | } |
452 | 47.8k | } |
453 | | |
454 | 47.7k | std::lock_guard<simple_spinlock> l(state_change_lock_); |
455 | | // Even though Tablet::Shutdown() also unregisters its ops, we have to do it here |
456 | | // to ensure that any currently running operation finishes before we proceed with |
457 | | // the rest of the shutdown sequence. In particular, a maintenance operation could |
458 | | // indirectly end up calling into the log, which we are about to shut down. |
459 | 47.7k | UnregisterMaintenanceOps(); |
460 | | |
461 | 47.7k | std::shared_ptr<consensus::RaftConsensus> consensus; |
462 | 47.7k | { |
463 | 47.7k | std::lock_guard<decltype(lock_)> lock(lock_); |
464 | 47.7k | consensus = consensus_; |
465 | 47.7k | } |
466 | 47.7k | if (consensus) { |
467 | 47.7k | consensus->Shutdown(); |
468 | 47.7k | } |
469 | | |
470 | 47.7k | return true; |
471 | 47.8k | } |
472 | | |
473 | 47.7k | void TabletPeer::CompleteShutdown(IsDropTable is_drop_table) { |
474 | 47.7k | auto* strand = strand_.get(); |
475 | 47.7k | if (strand) { |
476 | 47.7k | strand->Shutdown(); |
477 | 47.7k | } |
478 | | |
479 | 47.7k | preparing_operations_counter_.Shutdown(); |
480 | | |
481 | | // TODO: KUDU-183: Keep track of the pending tasks and send an "abort" message. |
482 | 47.7k | LOG_SLOW_EXECUTION(WARNING, 1000, |
483 | 47.7k | Substitute("TabletPeer: tablet $0: Waiting for Operations to complete", tablet_id())) { |
484 | 47.7k | operation_tracker_.WaitForAllToFinish(); |
485 | 47.7k | } |
486 | | |
487 | 47.7k | if (prepare_thread_) { |
488 | 47.7k | prepare_thread_->Stop(); |
489 | 47.7k | } |
490 | | |
491 | 47.7k | if (log_) { |
492 | 47.7k | WARN_NOT_OK(log_->Close(), LogPrefix() + "Error closing the Log"); |
493 | 47.7k | } |
494 | | |
495 | 99 | VLOG_WITH_PREFIX(1) << "Shut down!"; |
496 | | |
497 | 47.7k | if (tablet_) { |
498 | 47.6k | tablet_->CompleteShutdown(is_drop_table); |
499 | 47.6k | } |
500 | | |
501 | | // Only mark the peer as SHUTDOWN when all other components have shut down. |
502 | 47.7k | { |
503 | 47.7k | std::lock_guard<simple_spinlock> lock(lock_); |
504 | 47.7k | strand_.reset(); |
505 | | // Release mem tracker resources. |
506 | 47.7k | has_consensus_.store(false, std::memory_order_release); |
507 | 47.7k | consensus_.reset(); |
508 | 47.7k | prepare_thread_.reset(); |
509 | 47.7k | tablet_.reset(); |
510 | 47.7k | auto state = state_.load(std::memory_order_acquire); |
511 | 83 | LOG_IF_WITH_PREFIX(DFATAL, state != RaftGroupStatePB::QUIESCING) << |
512 | 83 | "Bad state when completing shutdown: " << RaftGroupStatePB_Name(state); |
513 | 47.7k | state_.store(RaftGroupStatePB::SHUTDOWN, std::memory_order_release); |
514 | | |
515 | 47.7k | if (metric_registry_) { |
516 | 18.4E | DVLOG_WITH_PREFIX(3) |
517 | 18.4E | << "Add to set of tablets that have been shutdown so as to avoid reporting metrics"; |
518 | 47.6k | metric_registry_->tablets_shutdown_insert(tablet_id()); |
519 | 47.6k | } |
520 | 47.7k | } |
521 | 47.7k | } |
522 | | |
523 | 23 | void TabletPeer::WaitUntilShutdown() { |
524 | 23 | const MonoDelta kSingleWait = 10ms; |
525 | 23 | const MonoDelta kReportInterval = 5s; |
526 | 23 | const MonoDelta kMaxWait = 30s; |
527 | | |
528 | 23 | MonoDelta waited = MonoDelta::kZero; |
529 | 23 | MonoDelta last_reported = MonoDelta::kZero; |
530 | 23 | while (state_.load(std::memory_order_acquire) != RaftGroupStatePB::SHUTDOWN) { |
531 | 0 | if (waited >= last_reported + kReportInterval) { |
532 | 0 | if (waited >= kMaxWait) { |
533 | 0 | LOG_WITH_PREFIX(DFATAL) |
534 | 0 | << "Wait for shutdown " << waited << " exceeded kMaxWait " << kMaxWait; |
535 | 0 | } else { |
536 | 0 | LOG_WITH_PREFIX(WARNING) << "Long wait for shutdown: " << waited; |
537 | 0 | } |
538 | 0 | last_reported = waited; |
539 | 0 | } |
540 | 0 | SleepFor(kSingleWait); |
541 | 0 | waited += kSingleWait; |
542 | 0 | } |
543 | | |
544 | 23 | if (metric_registry_) { |
545 | 0 | DVLOG_WITH_PREFIX(3) |
546 | 0 | << "Add to set of tablets that have been shutdown so as to avoid reporting metrics"; |
547 | 23 | metric_registry_->tablets_shutdown_insert(tablet_id()); |
548 | 23 | } |
549 | 23 | } |
550 | | |
551 | 47.6k | Status TabletPeer::Shutdown(IsDropTable is_drop_table) { |
552 | 47.6k | bool isShutdownInitiated = StartShutdown(is_drop_table); |
553 | | |
554 | 47.6k | RETURN_NOT_OK(AbortSQLTransactions()); |
555 | | |
556 | 47.6k | if (isShutdownInitiated) { |
557 | 47.5k | CompleteShutdown(is_drop_table); |
558 | 33 | } else { |
559 | 33 | WaitUntilShutdown(); |
560 | 33 | } |
561 | 47.6k | return Status::OK(); |
562 | 47.6k | } |
563 | | |
564 | 47.6k | Status TabletPeer::AbortSQLTransactions() { |
565 | | // Once raft group state enters QUIESCING state, |
566 | | // new queries cannot be processed from then onwards. |
567 | | // Aborting any remaining active transactions in the tablet. |
568 | 47.6k | if (tablet_ && tablet_->table_type() == TableType::PGSQL_TABLE_TYPE) { |
569 | 9.90k | if (tablet_->transaction_participant()) { |
570 | 9.90k | HybridTime maxCutoff = HybridTime::kMax; |
571 | 9.90k | LOG(INFO) << "Aborting transactions that started prior to " << maxCutoff |
572 | 9.90k | << " for tablet id " << tablet_->tablet_id(); |
573 | 9.90k | CoarseTimePoint deadline = CoarseMonoClock::Now() + |
574 | 9.90k | MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_abort_timeout_ms); |
575 | 9.90k | WARN_NOT_OK(tablet_->transaction_participant()->StopActiveTxnsPriorTo(maxCutoff, deadline), |
576 | 9.90k | "Cannot abort transactions for tablet " + tablet_->tablet_id()); |
577 | 9.90k | } |
578 | 9.90k | } |
579 | 47.6k | return Status::OK(); |
580 | 47.6k | } |
581 | | |
582 | 16.0M | Status TabletPeer::CheckRunning() const { |
583 | 16.0M | auto state = state_.load(std::memory_order_acquire); |
584 | 16.0M | if (state != RaftGroupStatePB::RUNNING) { |
585 | 955 | if (state == RaftGroupStatePB::QUIESCING) { |
586 | 955 | return STATUS(ShutdownInProgress, "The tablet is shutting down"); |
587 | 955 | } |
588 | 0 | return STATUS_FORMAT(IllegalState, "The tablet is not in a running state: $0", |
589 | 0 | RaftGroupStatePB_Name(state)); |
590 | 0 | } |
591 | | |
592 | 16.0M | return Status::OK(); |
593 | 16.0M | } |
594 | | |
595 | 0 | bool TabletPeer::IsShutdownStarted() const { |
596 | 0 | auto state = state_.load(std::memory_order_acquire); |
597 | 0 | return state == RaftGroupStatePB::QUIESCING || state == RaftGroupStatePB::SHUTDOWN; |
598 | 0 | } |
599 | | |
600 | 102 | Status TabletPeer::CheckShutdownOrNotStarted() const { |
601 | 102 | RaftGroupStatePB value = state_.load(std::memory_order_acquire); |
602 | 102 | if (value != RaftGroupStatePB::SHUTDOWN && value != RaftGroupStatePB::NOT_STARTED) { |
603 | 0 | return STATUS(IllegalState, Substitute("The tablet is not in a shutdown state: $0", |
604 | 0 | RaftGroupStatePB_Name(value))); |
605 | 0 | } |
606 | | |
607 | 102 | return Status::OK(); |
608 | 102 | } |
609 | | |
610 | 5.39k | Status TabletPeer::WaitUntilConsensusRunning(const MonoDelta& timeout) { |
611 | 5.39k | MonoTime start(MonoTime::Now()); |
612 | | |
613 | 5.39k | int backoff_exp = 0; |
614 | 5.39k | const int kMaxBackoffExp = 8; |
615 | 5.67k | while (true) { |
616 | 5.67k | RaftGroupStatePB cached_state = state_.load(std::memory_order_acquire); |
617 | 5.67k | if (cached_state == RaftGroupStatePB::QUIESCING || cached_state == RaftGroupStatePB::SHUTDOWN) { |
618 | 0 | return STATUS(IllegalState, |
619 | 0 | Substitute("The tablet is already shutting down or shutdown. State: $0", |
620 | 0 | RaftGroupStatePB_Name(cached_state))); |
621 | 0 | } |
622 | 5.67k | if (cached_state == RUNNING && has_consensus_.load(std::memory_order_acquire) && |
623 | 5.39k | consensus_->IsRunning()) { |
624 | 5.39k | break; |
625 | 5.39k | } |
626 | 284 | MonoTime now(MonoTime::Now()); |
627 | 284 | MonoDelta elapsed(now.GetDeltaSince(start)); |
628 | 284 | if (elapsed.MoreThan(timeout)) { |
629 | 1 | return STATUS(TimedOut, Substitute("Consensus is not running after waiting for $0. State; $1", |
630 | 1 | elapsed.ToString(), RaftGroupStatePB_Name(cached_state))); |
631 | 1 | } |
632 | 283 | SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp)); |
633 | 283 | backoff_exp = std::min(backoff_exp + 1, kMaxBackoffExp); |
634 | 283 | } |
635 | 5.39k | return Status::OK(); |
636 | 5.39k | } |
637 | | |
638 | 1.65M | void TabletPeer::WriteAsync(std::unique_ptr<WriteQuery> query) { |
639 | 1.65M | ScopedOperation preparing_token(&preparing_operations_counter_); |
640 | 1.65M | auto status = CheckRunning(); |
641 | 1.65M | if (!status.ok()) { |
642 | 1 | query->Cancel(status); |
643 | 1 | return; |
644 | 1 | } |
645 | | |
646 | 1.65M | query->operation().set_preparing_token(std::move(preparing_token)); |
647 | 1.65M | tablet_->AcquireLocksAndPerformDocOperations(std::move(query)); |
648 | 1.65M | } |
649 | | |
650 | 0 | Result<HybridTime> TabletPeer::ReportReadRestart() { |
651 | 0 | tablet_->metrics()->restart_read_requests->Increment(); |
652 | 0 | return tablet_->SafeTime(RequireLease::kTrue); |
653 | 0 | } |
654 | | |
655 | 2.69M | void TabletPeer::Submit(std::unique_ptr<Operation> operation, int64_t term) { |
656 | 2.69M | auto status = CheckRunning(); |
657 | | |
658 | 2.69M | if (status.ok()) { |
659 | 2.69M | auto driver = NewLeaderOperationDriver(&operation, term); |
660 | 2.69M | if (driver.ok()) { |
661 | 2.69M | (**driver).ExecuteAsync(); |
662 | 18.4E | } else { |
663 | 18.4E | status = driver.status(); |
664 | 18.4E | } |
665 | 2.69M | } |
666 | 2.69M | if (!status.ok()) { |
667 | 6 | operation->Aborted(status, /* was_pending= */ false); |
668 | 6 | } |
669 | 2.69M | } |
670 | | |
671 | | void TabletPeer::SubmitUpdateTransaction( |
672 | 873k | std::unique_ptr<UpdateTxnOperation> operation, int64_t term) { |
673 | 873k | if (!operation->tablet()) { |
674 | 0 | operation->SetTablet(tablet()); |
675 | 0 | } |
676 | 873k | Submit(std::move(operation), term); |
677 | 873k | } |
678 | | |
679 | 321k | HybridTime TabletPeer::SafeTimeForTransactionParticipant() { |
680 | 321k | return tablet_->mvcc_manager()->SafeTimeForFollower( |
681 | 321k | /* min_allowed= */ HybridTime::kMin, /* deadline= */ CoarseTimePoint::min()); |
682 | 321k | } |
683 | | |
684 | 10.2k | Result<HybridTime> TabletPeer::WaitForSafeTime(HybridTime safe_time, CoarseTimePoint deadline) { |
685 | 10.2k | return tablet_->SafeTime(RequireLease::kFallbackToFollower, safe_time, deadline); |
686 | 10.2k | } |
687 | | |
688 | 969k | void TabletPeer::GetLastReplicatedData(RemoveIntentsData* data) { |
689 | 969k | data->op_id = consensus_->GetLastCommittedOpId(); |
690 | 969k | data->log_ht = tablet_->mvcc_manager()->LastReplicatedHybridTime(); |
691 | 969k | } |
692 | | |
693 | 984k | void TabletPeer::GetLastCDCedData(RemoveIntentsData* data) { |
694 | 984k | if (consensus_ != nullptr) { |
695 | 983k | data->op_id.index = consensus_->GetLastCDCedOpId().index; |
696 | 983k | data->op_id.term = consensus_->GetLastCDCedOpId().term; |
697 | 983k | } |
698 | | |
699 | 984k | if((tablet_ != nullptr) && (tablet_->mvcc_manager() != nullptr)) { |
700 | | // for now use this hybrid time, ideally it should be of last_updated_time |
701 | 983k | data->log_ht = tablet_->mvcc_manager()->LastReplicatedHybridTime(); |
702 | 983k | } |
703 | 984k | } |
704 | | |
705 | 728k | void TabletPeer::UpdateClock(HybridTime hybrid_time) { |
706 | 728k | clock_->Update(hybrid_time); |
707 | 728k | } |
708 | | |
709 | | std::unique_ptr<UpdateTxnOperation> TabletPeer::CreateUpdateTransaction( |
710 | 239k | TransactionStatePB* request) { |
711 | 239k | auto result = std::make_unique<UpdateTxnOperation>(tablet()); |
712 | 239k | result->TakeRequest(request); |
713 | 239k | return result; |
714 | 239k | } |
715 | | |
716 | 7.35k | void TabletPeer::GetTabletStatusPB(TabletStatusPB* status_pb_out) { |
717 | 7.35k | std::lock_guard<simple_spinlock> lock(lock_); |
718 | 7.35k | DCHECK(status_pb_out != nullptr); |
719 | 7.35k | DCHECK(status_listener_.get() != nullptr); |
720 | 7.35k | const auto disk_size_info = GetOnDiskSizeInfo(); |
721 | 7.35k | status_pb_out->set_tablet_id(status_listener_->tablet_id()); |
722 | 7.35k | status_pb_out->set_namespace_name(status_listener_->namespace_name()); |
723 | 7.35k | status_pb_out->set_table_name(status_listener_->table_name()); |
724 | 7.35k | status_pb_out->set_table_id(status_listener_->table_id()); |
725 | 7.35k | status_pb_out->set_last_status(status_listener_->last_status()); |
726 | 7.35k | status_listener_->partition()->ToPB(status_pb_out->mutable_partition()); |
727 | 7.35k | status_pb_out->set_state(state_); |
728 | 7.35k | status_pb_out->set_tablet_data_state(meta_->tablet_data_state()); |
729 | 7.35k | auto tablet = tablet_; |
730 | 7.35k | if (tablet) { |
731 | 7.29k | status_pb_out->set_table_type(tablet->table_type()); |
732 | 7.29k | } |
733 | 7.35k | disk_size_info.ToPB(status_pb_out); |
734 | | // Set hide status of the tablet. |
735 | 7.35k | status_pb_out->set_is_hidden(meta_->hidden()); |
736 | 7.35k | } |
737 | | |
738 | 6 | Status TabletPeer::RunLogGC() { |
739 | 6 | if (!CheckRunning().ok()) { |
740 | 0 | return Status::OK(); |
741 | 0 | } |
742 | 6 | auto s = reset_cdc_min_replicated_index_if_stale(); |
743 | 6 | if (!s.ok()) { |
744 | 0 | LOG_WITH_PREFIX(WARNING) << "Unable to reset cdc min replicated index " << s; |
745 | 0 | } |
746 | 6 | int64_t min_log_index; |
747 | 0 | if (VLOG_IS_ON(2)) { |
748 | 0 | std::string details; |
749 | 0 | min_log_index = VERIFY_RESULT(GetEarliestNeededLogIndex(&details)); |
750 | 0 | LOG_WITH_PREFIX(INFO) << __func__ << ": " << details; |
751 | 6 | } else { |
752 | 6 | min_log_index = VERIFY_RESULT(GetEarliestNeededLogIndex()); |
753 | 6 | } |
754 | 6 | int32_t num_gced = 0; |
755 | 6 | return log_->GC(min_log_index, &num_gced); |
756 | 6 | } |
757 | | |
758 | 44 | TabletDataState TabletPeer::data_state() const { |
759 | 44 | std::lock_guard<simple_spinlock> lock(lock_); |
760 | 44 | return meta_->tablet_data_state(); |
761 | 44 | } |
762 | | |
763 | 7 | string TabletPeer::HumanReadableState() const { |
764 | 7 | std::lock_guard<simple_spinlock> lock(lock_); |
765 | 7 | TabletDataState data_state = meta_->tablet_data_state(); |
766 | 7 | RaftGroupStatePB state = this->state(); |
767 | | // If failed, any number of things could have gone wrong. |
768 | 7 | if (state == RaftGroupStatePB::FAILED) { |
769 | 0 | return Substitute("$0 ($1): $2", RaftGroupStatePB_Name(state), |
770 | 0 | TabletDataState_Name(data_state), |
771 | 0 | error_.get()->ToString()); |
772 | | // If it's remotely bootstrapping, or tombstoned, that is the important thing |
773 | | // to show. |
774 | 7 | } else if (!CanServeTabletData(data_state)) { |
775 | 0 | return TabletDataState_Name(data_state); |
776 | 7 | } else if (data_state == TabletDataState::TABLET_DATA_SPLIT_COMPLETED) { |
777 | 0 | return RaftGroupStatePB_Name(state) + " (split)"; |
778 | 0 | } |
779 | | // Otherwise, the tablet's data is in a "normal" state, so we just display |
780 | | // the runtime state (BOOTSTRAPPING, RUNNING, etc). |
781 | 7 | return RaftGroupStatePB_Name(state); |
782 | 7 | } |
783 | | |
784 | | namespace { |
785 | | |
786 | 0 | consensus::OperationType MapOperationTypeToPB(OperationType operation_type) { |
787 | 0 | switch (operation_type) { |
788 | 0 | case OperationType::kWrite: |
789 | 0 | return consensus::WRITE_OP; |
790 | | |
791 | 0 | case OperationType::kChangeMetadata: |
792 | 0 | return consensus::CHANGE_METADATA_OP; |
793 | | |
794 | 0 | case OperationType::kUpdateTransaction: |
795 | 0 | return consensus::UPDATE_TRANSACTION_OP; |
796 | | |
797 | 0 | case OperationType::kSnapshot: |
798 | 0 | return consensus::SNAPSHOT_OP; |
799 | | |
800 | 0 | case OperationType::kTruncate: |
801 | 0 | return consensus::TRUNCATE_OP; |
802 | | |
803 | 0 | case OperationType::kHistoryCutoff: |
804 | 0 | return consensus::HISTORY_CUTOFF_OP; |
805 | | |
806 | 0 | case OperationType::kSplit: |
807 | 0 | return consensus::SPLIT_OP; |
808 | | |
809 | 0 | case OperationType::kEmpty: |
810 | 0 | LOG(FATAL) << "OperationType::kEmpty cannot be converted to consensus::OperationType"; |
811 | 0 | } |
812 | 0 | FATAL_INVALID_ENUM_VALUE(OperationType, operation_type); |
813 | 0 | } |
814 | | |
815 | | } // namespace |
816 | | |
817 | | void TabletPeer::GetInFlightOperations(Operation::TraceType trace_type, |
818 | 0 | vector<consensus::OperationStatusPB>* out) const { |
819 | 0 | for (const auto& driver : operation_tracker_.GetPendingOperations()) { |
820 | 0 | if (driver->operation() == nullptr) { |
821 | 0 | continue; |
822 | 0 | } |
823 | 0 | auto op_type = driver->operation_type(); |
824 | 0 | if (op_type == OperationType::kEmpty) { |
825 | | // This is a special-purpose in-memory-only operation for updating propagated safe time on |
826 | | // a follower. |
827 | 0 | continue; |
828 | 0 | } |
829 | | |
830 | 0 | consensus::OperationStatusPB status_pb; |
831 | 0 | driver->GetOpId().ToPB(status_pb.mutable_op_id()); |
832 | 0 | status_pb.set_operation_type(MapOperationTypeToPB(op_type)); |
833 | 0 | status_pb.set_description(driver->ToString()); |
834 | 0 | int64_t running_for_micros = |
835 | 0 | MonoTime::Now().GetDeltaSince(driver->start_time()).ToMicroseconds(); |
836 | 0 | status_pb.set_running_for_micros(running_for_micros); |
837 | 0 | if (trace_type == Operation::TRACE_TXNS) { |
838 | 0 | status_pb.set_trace_buffer(driver->trace()->DumpToString(true)); |
839 | 0 | } |
840 | 0 | out->push_back(status_pb); |
841 | 0 | } |
842 | 0 | } |
843 | | |
844 | 6.61M | Result<int64_t> TabletPeer::GetEarliestNeededLogIndex(std::string* details) const { |
845 | | // First, we anchor on the last OpId in the Log to establish a lower bound |
846 | | // and avoid racing with the other checks. This limits the Log GC candidate |
847 | | // segments before we check the anchors. |
848 | 6.61M | auto latest_log_entry_op_id = log_->GetLatestEntryOpId(); |
849 | 6.61M | int64_t min_index = latest_log_entry_op_id.index; |
850 | 6.61M | if (details) { |
851 | 0 | *details += Format("Latest log entry op id: $0\n", latest_log_entry_op_id); |
852 | 0 | } |
853 | | |
854 | | // If we never have written to the log, no need to proceed. |
855 | 6.61M | if (min_index == 0) { |
856 | 208k | return min_index; |
857 | 208k | } |
858 | | |
859 | | // Next, we interrogate the anchor registry. |
860 | | // Returns OK if minimum known, NotFound if no anchors are registered. |
861 | 6.40M | { |
862 | 6.40M | int64_t min_anchor_index; |
863 | 6.40M | Status s = log_anchor_registry_->GetEarliestRegisteredLogIndex(&min_anchor_index); |
864 | 6.40M | if (PREDICT_FALSE(!s.ok())) { |
865 | 0 | DCHECK(s.IsNotFound()) << "Unexpected error calling LogAnchorRegistry: " << s.ToString(); |
866 | 9.04k | } else { |
867 | 9.04k | min_index = std::min(min_index, min_anchor_index); |
868 | 9.04k | if (details) { |
869 | 0 | *details += Format("Min anchor index: $0\n", min_anchor_index); |
870 | 0 | } |
871 | 9.04k | } |
872 | 6.40M | } |
873 | | |
874 | | // Next, interrogate the OperationTracker. |
875 | 6.40M | int64_t min_pending_op_index = std::numeric_limits<int64_t>::max(); |
876 | 278k | for (const auto& driver : operation_tracker_.GetPendingOperations()) { |
877 | 278k | auto tx_op_id = driver->GetOpId(); |
878 | | // A operation which doesn't have an opid hasn't been submitted for replication yet and |
879 | | // thus has no need to anchor the log. |
880 | 278k | if (tx_op_id != yb::OpId::Invalid()) { |
881 | 250k | min_pending_op_index = std::min(min_pending_op_index, tx_op_id.index); |
882 | 250k | } |
883 | 278k | } |
884 | | |
885 | 6.40M | min_index = std::min(min_index, min_pending_op_index); |
886 | 6.40M | if (details && min_pending_op_index != std::numeric_limits<int64_t>::max()) { |
887 | 0 | *details += Format("Min pending op id index: $0\n", min_pending_op_index); |
888 | 0 | } |
889 | | |
890 | 6.40M | auto min_retryable_request_op_id = consensus_->MinRetryableRequestOpId(); |
891 | 6.40M | min_index = std::min(min_index, min_retryable_request_op_id.index); |
892 | 6.40M | if (details) { |
893 | 0 | *details += Format("Min retryable request op id: $0\n", min_retryable_request_op_id); |
894 | 0 | } |
895 | | |
896 | 6.40M | auto* transaction_coordinator = tablet()->transaction_coordinator(); |
897 | 6.40M | if (transaction_coordinator) { |
898 | 2.68M | auto transaction_coordinator_min_op_index = transaction_coordinator->PrepareGC(details); |
899 | 2.68M | min_index = std::min(min_index, transaction_coordinator_min_op_index); |
900 | 2.68M | } |
901 | | |
902 | | // We keep at least one committed operation in the log so that we can always recover safe time |
903 | | // during bootstrap. |
904 | | // Last committed op id should be read before MaxPersistentOpId to avoid race condition |
905 | | // described in MaxPersistentOpIdForDb. |
906 | | // |
907 | | // If we read last committed op id AFTER reading last persistent op id (INCORRECT): |
908 | | // - We read max persistent op id and find there is no new data, so we ignore it. |
909 | | // - New data gets written and Raft-committed, but not yet flushed to an SSTable. |
910 | | // - We read the last committed op id, which is greater than what max persistent op id would have |
911 | | // returned. |
912 | | // - We garbage-collect the Raft log entries corresponding to the new data. |
913 | | // - Power is lost and the server reboots, losing committed data. |
914 | | // |
915 | | // If we read last committed op id BEFORE reading last persistent op id (CORRECT): |
916 | | // - We read the last committed op id. |
917 | | // - We read max persistent op id and find there is no new data, so we ignore it. |
918 | | // - New data gets written and Raft-committed, but not yet flushed to an SSTable. |
919 | | // - We still don't garbage-collect the logs containing the committed but unflushed data, |
920 | | // because the earlier value of the last committed op id that we read prevents us from doing so. |
921 | 6.40M | auto last_committed_op_id = consensus()->GetLastCommittedOpId(); |
922 | 6.40M | min_index = std::min(min_index, last_committed_op_id.index); |
923 | 6.40M | if (details) { |
924 | 0 | *details += Format("Last committed op id: $0\n", last_committed_op_id); |
925 | 0 | } |
926 | | |
927 | 6.40M | if (tablet_->table_type() != TableType::TRANSACTION_STATUS_TABLE_TYPE) { |
928 | 3.71M | tablet_->FlushIntentsDbIfNecessary(latest_log_entry_op_id); |
929 | 3.71M | auto max_persistent_op_id = VERIFY_RESULT( |
930 | 3.71M | tablet_->MaxPersistentOpId(true /* invalid_if_no_new_data */)); |
931 | 3.71M | if (max_persistent_op_id.regular.valid()) { |
932 | 1.56M | min_index = std::min(min_index, max_persistent_op_id.regular.index); |
933 | 1.56M | if (details) { |
934 | 0 | *details += Format("Max persistent regular op id: $0\n", max_persistent_op_id.regular); |
935 | 0 | } |
936 | 1.56M | } |
937 | 3.71M | if (max_persistent_op_id.intents.valid()) { |
938 | 427k | min_index = std::min(min_index, max_persistent_op_id.intents.index); |
939 | 427k | if (details) { |
940 | 0 | *details += Format("Max persistent intents op id: $0\n", max_persistent_op_id.intents); |
941 | 0 | } |
942 | 427k | } |
943 | 3.71M | } |
944 | | |
945 | 6.40M | if (details) { |
946 | 0 | *details += Format("Earliest needed log index: $0\n", min_index); |
947 | 0 | } |
948 | | |
949 | 6.40M | return min_index; |
950 | 6.40M | } |
951 | | |
952 | 6.61M | Status TabletPeer::GetGCableDataSize(int64_t* retention_size) const { |
953 | 6.61M | RETURN_NOT_OK(CheckRunning()); |
954 | 6.61M | int64_t min_op_idx = VERIFY_RESULT(GetEarliestNeededLogIndex()); |
955 | 6.61M | RETURN_NOT_OK(log_->GetGCableDataSize(min_op_idx, retention_size)); |
956 | 6.61M | return Status::OK(); |
957 | 6.61M | } |
958 | | |
959 | 846k | log::Log* TabletPeer::log() const { |
960 | 846k | Log* log = log_atomic_.load(std::memory_order_acquire); |
961 | 39 | LOG_IF_WITH_PREFIX(FATAL, !log) << "log() called before the log instance is initialized."; |
962 | 846k | return log; |
963 | 846k | } |
964 | | |
965 | 49.0k | yb::OpId TabletPeer::GetLatestLogEntryOpId() const { |
966 | 49.0k | Log* log = log_atomic_.load(std::memory_order_acquire); |
967 | 49.0k | if (log) { |
968 | 49.0k | return log->GetLatestEntryOpId(); |
969 | 49.0k | } |
970 | 5 | return yb::OpId(); |
971 | 5 | } |
972 | | |
973 | 89.0k | Status TabletPeer::set_cdc_min_replicated_index_unlocked(int64_t cdc_min_replicated_index) { |
974 | 89.0k | LOG_WITH_PREFIX(INFO) << "Setting cdc min replicated index to " << cdc_min_replicated_index; |
975 | 89.0k | RETURN_NOT_OK(meta_->set_cdc_min_replicated_index(cdc_min_replicated_index)); |
976 | 89.0k | Log* log = log_atomic_.load(std::memory_order_acquire); |
977 | 89.0k | if (log) { |
978 | 88.9k | log->set_cdc_min_replicated_index(cdc_min_replicated_index); |
979 | 88.9k | } |
980 | 89.0k | cdc_min_replicated_index_refresh_time_ = MonoTime::Now(); |
981 | 89.0k | return Status::OK(); |
982 | 89.0k | } |
983 | | |
984 | 89.0k | Status TabletPeer::set_cdc_min_replicated_index(int64_t cdc_min_replicated_index) { |
985 | 89.0k | std::lock_guard<decltype(cdc_min_replicated_index_lock_)> l(cdc_min_replicated_index_lock_); |
986 | 89.0k | return set_cdc_min_replicated_index_unlocked(cdc_min_replicated_index); |
987 | 89.0k | } |
988 | | |
989 | 6 | Status TabletPeer::reset_cdc_min_replicated_index_if_stale() { |
990 | 6 | std::lock_guard<decltype(cdc_min_replicated_index_lock_)> l(cdc_min_replicated_index_lock_); |
991 | 6 | auto seconds_since_last_refresh = |
992 | 6 | MonoTime::Now().GetDeltaSince(cdc_min_replicated_index_refresh_time_).ToSeconds(); |
993 | 6 | if (seconds_since_last_refresh > FLAGS_cdc_min_replicated_index_considered_stale_secs) { |
994 | 0 | LOG_WITH_PREFIX(INFO) << "Resetting cdc min replicated index. Seconds since last update: " |
995 | 0 | << seconds_since_last_refresh; |
996 | 0 | RETURN_NOT_OK(set_cdc_min_replicated_index_unlocked(std::numeric_limits<int64_t>::max())); |
997 | 0 | } |
998 | 6 | return Status::OK(); |
999 | 6 | } |
1000 | | |
1001 | 5.09M | std::unique_ptr<Operation> TabletPeer::CreateOperation(consensus::ReplicateMsg* replicate_msg) { |
1002 | 5.09M | switch (replicate_msg->op_type()) { |
1003 | 2.93M | case consensus::WRITE_OP: |
1004 | 1.57k | DCHECK(replicate_msg->has_write()) << "WRITE_OP replica" |
1005 | 1.57k | " operation must receive a WriteRequestPB"; |
1006 | | // We use separate preparing token only on leader, so here it could be empty. |
1007 | 2.93M | return std::make_unique<WriteOperation>(tablet()); |
1008 | | |
1009 | 320k | case consensus::CHANGE_METADATA_OP: |
1010 | 300 | DCHECK(replicate_msg->has_change_metadata_request()) << "CHANGE_METADATA_OP replica" |
1011 | 300 | " operation must receive an ChangeMetadataRequestPB"; |
1012 | 320k | return std::make_unique<ChangeMetadataOperation>(tablet(), log()); |
1013 | | |
1014 | 1.73M | case consensus::UPDATE_TRANSACTION_OP: |
1015 | 1.24k | DCHECK(replicate_msg->has_transaction_state()) << "UPDATE_TRANSACTION_OP replica" |
1016 | 1.24k | " operation must receive an TransactionStatePB"; |
1017 | 1.73M | return std::make_unique<UpdateTxnOperation>(tablet()); |
1018 | | |
1019 | 106k | case consensus::TRUNCATE_OP: |
1020 | 354 | DCHECK(replicate_msg->has_truncate()) << "TRUNCATE_OP replica" |
1021 | 354 | " operation must receive an TruncateRequestPB"; |
1022 | 106k | return std::make_unique<TruncateOperation>(tablet()); |
1023 | | |
1024 | 500 | case consensus::SNAPSHOT_OP: |
1025 | 0 | DCHECK(replicate_msg->has_snapshot_request()) << "SNAPSHOT_OP replica" |
1026 | 0 | " operation must receive an TabletSnapshotOpRequestPB"; |
1027 | 500 | return std::make_unique<SnapshotOperation>(tablet()); |
1028 | | |
1029 | 0 | case consensus::HISTORY_CUTOFF_OP: |
1030 | 0 | DCHECK(replicate_msg->has_history_cutoff()) << "HISTORY_CUTOFF_OP replica" |
1031 | 0 | " transaction must receive an HistoryCutoffPB"; |
1032 | 0 | return std::make_unique<HistoryCutoffOperation>(tablet()); |
1033 | | |
1034 | 31 | case consensus::SPLIT_OP: |
1035 | 0 | DCHECK(replicate_msg->has_split_request()) << "SPLIT_OP replica" |
1036 | 0 | " operation must receive an SplitOpRequestPB"; |
1037 | 31 | return std::make_unique<SplitOperation>(tablet(), tablet_splitter_); |
1038 | | |
1039 | 0 | case consensus::UNKNOWN_OP: FALLTHROUGH_INTENDED; |
1040 | 0 | case consensus::NO_OP: FALLTHROUGH_INTENDED; |
1041 | 0 | case consensus::CHANGE_CONFIG_OP: |
1042 | 0 | FATAL_INVALID_ENUM_VALUE(consensus::OperationType, replicate_msg->op_type()); |
1043 | 5.09M | } |
1044 | 0 | FATAL_INVALID_ENUM_VALUE(consensus::OperationType, replicate_msg->op_type()); |
1045 | 0 | } |
1046 | | |
1047 | | Status TabletPeer::StartReplicaOperation( |
1048 | 5.09M | const scoped_refptr<ConsensusRound>& round, HybridTime propagated_safe_time) { |
1049 | 5.09M | RaftGroupStatePB value = state(); |
1050 | 5.09M | if (value != RaftGroupStatePB::RUNNING && value != RaftGroupStatePB::BOOTSTRAPPING) { |
1051 | 4 | return STATUS(IllegalState, RaftGroupStatePB_Name(value)); |
1052 | 4 | } |
1053 | | |
1054 | 5.09M | consensus::ReplicateMsg* replicate_msg = round->replicate_msg().get(); |
1055 | 5.09M | DCHECK(replicate_msg->has_hybrid_time()); |
1056 | 5.09M | auto operation = CreateOperation(replicate_msg); |
1057 | | |
1058 | | // TODO(todd) Look at wiring the stuff below on the driver |
1059 | | // It's imperative that we set the round here on any type of operation, as this |
1060 | | // allows us to keep the reference to the request in the round instead of copying it. |
1061 | 5.09M | operation->set_consensus_round(round); |
1062 | 5.09M | HybridTime ht(replicate_msg->hybrid_time()); |
1063 | 5.09M | operation->set_hybrid_time(ht); |
1064 | 5.09M | clock_->Update(ht); |
1065 | | |
1066 | | // This sets the monotonic counter to at least replicate_msg.monotonic_counter() atomically. |
1067 | 5.09M | tablet_->UpdateMonotonicCounter(replicate_msg->monotonic_counter()); |
1068 | | |
1069 | 5.09M | auto* operation_ptr = operation.get(); |
1070 | 5.09M | OperationDriverPtr driver = VERIFY_RESULT(NewReplicaOperationDriver(&operation)); |
1071 | | |
1072 | 5.09M | operation_ptr->consensus_round()->SetCallback(driver.get()); |
1073 | | |
1074 | 5.09M | if (propagated_safe_time) { |
1075 | 4.42M | driver->SetPropagatedSafeTime(propagated_safe_time, tablet_->mvcc_manager()); |
1076 | 4.42M | } |
1077 | | |
1078 | 5.09M | driver->ExecuteAsync(); |
1079 | 5.09M | return Status::OK(); |
1080 | 5.09M | } |
1081 | | |
1082 | 5.71M | void TabletPeer::SetPropagatedSafeTime(HybridTime ht) { |
1083 | 5.71M | auto driver = NewReplicaOperationDriver(nullptr); |
1084 | 5.71M | if (!driver.ok()) { |
1085 | 0 | LOG_WITH_PREFIX(ERROR) << "Failed to create operation driver to set propagated hybrid time"; |
1086 | 0 | return; |
1087 | 0 | } |
1088 | 5.71M | (**driver).SetPropagatedSafeTime(ht, tablet_->mvcc_manager()); |
1089 | 5.71M | (**driver).ExecuteAsync(); |
1090 | 5.71M | } |
1091 | | |
1092 | 2.93M | bool TabletPeer::ShouldApplyWrite() { |
1093 | 2.93M | return tablet_->ShouldApplyWrite(); |
1094 | 2.93M | } |
1095 | | |
1096 | 16.7M | consensus::Consensus* TabletPeer::consensus() const { |
1097 | 16.7M | return raft_consensus(); |
1098 | 16.7M | } |
1099 | | |
1100 | 18.2M | consensus::RaftConsensus* TabletPeer::raft_consensus() const { |
1101 | 18.2M | std::lock_guard<simple_spinlock> lock(lock_); |
1102 | 18.2M | return consensus_.get(); |
1103 | 18.2M | } |
1104 | | |
1105 | 11.6M | shared_ptr<consensus::Consensus> TabletPeer::shared_consensus() const { |
1106 | 11.6M | std::lock_guard<simple_spinlock> lock(lock_); |
1107 | 11.6M | return consensus_; |
1108 | 11.6M | } |
1109 | | |
1110 | 10.4M | shared_ptr<consensus::RaftConsensus> TabletPeer::shared_raft_consensus() const { |
1111 | 10.4M | std::lock_guard<simple_spinlock> lock(lock_); |
1112 | 10.4M | return consensus_; |
1113 | 10.4M | } |
1114 | | |
1115 | | Result<OperationDriverPtr> TabletPeer::NewLeaderOperationDriver( |
1116 | 2.69M | std::unique_ptr<Operation>* operation, int64_t term) { |
1117 | 2.69M | if (term == OpId::kUnknownTerm) { |
1118 | 0 | return STATUS(InvalidArgument, "Leader operation driver for unknown term"); |
1119 | 0 | } |
1120 | 2.69M | return NewOperationDriver(operation, term); |
1121 | 2.69M | } |
1122 | | |
1123 | | Result<OperationDriverPtr> TabletPeer::NewReplicaOperationDriver( |
1124 | 10.8M | std::unique_ptr<Operation>* operation) { |
1125 | 10.8M | return NewOperationDriver(operation, OpId::kUnknownTerm); |
1126 | 10.8M | } |
1127 | | |
1128 | | Result<OperationDriverPtr> TabletPeer::NewOperationDriver(std::unique_ptr<Operation>* operation, |
1129 | 13.4M | int64_t term) { |
1130 | 13.4M | auto operation_driver = CreateOperationDriver(); |
1131 | 13.4M | RETURN_NOT_OK(operation_driver->Init(operation, term)); |
1132 | 13.4M | return operation_driver; |
1133 | 13.4M | } |
1134 | | |
1135 | 88.6k | void TabletPeer::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) { |
1136 | | // Taking state_change_lock_ ensures that we don't shut down concurrently with |
1137 | | // this last start-up task. |
1138 | | // Note that the state_change_lock_ is taken in Shutdown(), |
1139 | | // prior to calling UnregisterMaintenanceOps(). |
1140 | | |
1141 | 88.6k | std::lock_guard<simple_spinlock> l(state_change_lock_); |
1142 | | |
1143 | 88.6k | if (state() != RaftGroupStatePB::RUNNING) { |
1144 | 0 | LOG_WITH_PREFIX(WARNING) << "Not registering maintenance operations: tablet not RUNNING"; |
1145 | 0 | return; |
1146 | 0 | } |
1147 | | |
1148 | 88.6k | DCHECK(maintenance_ops_.empty()); |
1149 | | |
1150 | 88.6k | auto log_gc = std::make_unique<LogGCOp>(this); |
1151 | 88.6k | maint_mgr->RegisterOp(log_gc.get()); |
1152 | 88.6k | maintenance_ops_.push_back(std::move(log_gc)); |
1153 | 88.6k | LOG_WITH_PREFIX(INFO) << "Registered log gc"; |
1154 | 88.6k | } |
1155 | | |
1156 | 47.7k | void TabletPeer::UnregisterMaintenanceOps() { |
1157 | 47.7k | DCHECK(state_change_lock_.is_locked()); |
1158 | 47.7k | for (auto& op : maintenance_ops_) { |
1159 | 47.7k | op->Unregister(); |
1160 | 47.7k | } |
1161 | 47.7k | maintenance_ops_.clear(); |
1162 | 47.7k | } |
1163 | | |
1164 | 7.35k | TabletOnDiskSizeInfo TabletPeer::GetOnDiskSizeInfo() const { |
1165 | 7.35k | TabletOnDiskSizeInfo info; |
1166 | | |
1167 | 7.35k | if (consensus_) { |
1168 | 7.29k | info.consensus_metadata_disk_size = consensus_->OnDiskSize(); |
1169 | 7.29k | } |
1170 | | |
1171 | 7.35k | if (tablet_) { |
1172 | 7.29k | info.sst_files_disk_size = tablet_->GetCurrentVersionSstFilesSize(); |
1173 | 7.29k | info.uncompressed_sst_files_disk_size = |
1174 | 7.29k | tablet_->GetCurrentVersionSstFilesUncompressedSize(); |
1175 | 7.29k | } |
1176 | | |
1177 | 7.35k | auto log = log_atomic_.load(std::memory_order_acquire); |
1178 | 7.35k | if (log) { |
1179 | 7.32k | info.wal_files_disk_size = log->OnDiskSize(); |
1180 | 7.32k | } |
1181 | | |
1182 | 7.35k | info.RecomputeTotalSize(); |
1183 | 7.35k | return info; |
1184 | 7.35k | } |
1185 | | |
1186 | 3.77k | size_t TabletPeer::GetNumLogSegments() const { |
1187 | 3.77k | auto log = log_atomic_.load(std::memory_order_acquire); |
1188 | 3.74k | return log ? log->num_segments() : 0; |
1189 | 3.77k | } |
1190 | | |
1191 | 531k | std::string TabletPeer::LogPrefix() const { |
1192 | 531k | return Substitute("T $0 P $1 [state=$2]: ", |
1193 | 531k | tablet_id_, permanent_uuid_, RaftGroupStatePB_Name(state())); |
1194 | 531k | } |
1195 | | |
1196 | 13.5M | scoped_refptr<OperationDriver> TabletPeer::CreateOperationDriver() { |
1197 | 13.5M | return scoped_refptr<OperationDriver>(new OperationDriver( |
1198 | 13.5M | &operation_tracker_, |
1199 | 13.5M | consensus_.get(), |
1200 | 13.5M | prepare_thread_.get(), |
1201 | 13.5M | tablet_->table_type())); |
1202 | 13.5M | } |
1203 | | |
1204 | 1.63M | int64_t TabletPeer::LeaderTerm() const { |
1205 | 1.63M | shared_ptr<consensus::Consensus> consensus; |
1206 | 1.63M | { |
1207 | 1.63M | std::lock_guard<simple_spinlock> lock(lock_); |
1208 | 1.63M | consensus = consensus_; |
1209 | 1.63M | } |
1210 | 1.63M | return consensus ? consensus->LeaderTerm() : yb::OpId::kUnknownTerm; |
1211 | 1.63M | } |
1212 | | |
1213 | 165k | Result<HybridTime> TabletPeer::LeaderSafeTime() const { |
1214 | 165k | return tablet_->SafeTime(); |
1215 | 165k | } |
1216 | | |
1217 | 11.8M | consensus::LeaderStatus TabletPeer::LeaderStatus(bool allow_stale) const { |
1218 | 11.8M | shared_ptr<consensus::Consensus> consensus; |
1219 | 11.8M | { |
1220 | 11.8M | std::lock_guard<simple_spinlock> lock(lock_); |
1221 | 11.8M | consensus = consensus_; |
1222 | 11.8M | } |
1223 | 11.4M | return consensus ? consensus->GetLeaderStatus(allow_stale) : consensus::LeaderStatus::NOT_LEADER; |
1224 | 11.8M | } |
1225 | | |
1226 | 139k | HybridTime TabletPeer::HtLeaseExpiration() const { |
1227 | 139k | HybridTime result( |
1228 | 139k | CHECK_RESULT(consensus_->MajorityReplicatedHtLeaseExpiration(0, CoarseTimePoint::max())), 0); |
1229 | 139k | return std::max(result, tablet_->mvcc_manager()->LastReplicatedHybridTime()); |
1230 | 139k | } |
1231 | | |
1232 | 0 | TableType TabletPeer::table_type() EXCLUDES(lock_) { |
1233 | | // TODO: what if tablet is not set? |
1234 | 0 | return DCHECK_NOTNULL(tablet())->table_type(); |
1235 | 0 | } |
1236 | | |
1237 | 7 | void TabletPeer::SetFailed(const Status& error) { |
1238 | 7 | DCHECK(error_.get(std::memory_order_acquire) == nullptr); |
1239 | 7 | error_ = MakeAtomicUniquePtr<Status>(error); |
1240 | 7 | auto state = state_.load(std::memory_order_acquire); |
1241 | 7 | while (state != RaftGroupStatePB::FAILED && state != RaftGroupStatePB::QUIESCING && |
1242 | 7 | state != RaftGroupStatePB::SHUTDOWN) { |
1243 | 7 | if (state_.compare_exchange_weak(state, RaftGroupStatePB::FAILED, std::memory_order_acq_rel)) { |
1244 | 7 | LOG_WITH_PREFIX(INFO) << "Changed state from " << RaftGroupStatePB_Name(state) |
1245 | 7 | << " to FAILED"; |
1246 | 7 | break; |
1247 | 7 | } |
1248 | 7 | } |
1249 | 7 | } |
1250 | | |
1251 | | Status TabletPeer::UpdateState(RaftGroupStatePB expected, RaftGroupStatePB new_state, |
1252 | 177k | const std::string& error_message) { |
1253 | 177k | RaftGroupStatePB old = expected; |
1254 | 177k | if (!state_.compare_exchange_strong(old, new_state, std::memory_order_acq_rel)) { |
1255 | 0 | return STATUS_FORMAT( |
1256 | 0 | InvalidArgument, "$0 Expected state: $1, got: $2", |
1257 | 0 | error_message, RaftGroupStatePB_Name(expected), RaftGroupStatePB_Name(old)); |
1258 | 0 | } |
1259 | | |
1260 | 177k | LOG_WITH_PREFIX(INFO) << "Changed state from " << RaftGroupStatePB_Name(old) << " to " |
1261 | 177k | << RaftGroupStatePB_Name(new_state); |
1262 | 177k | return Status::OK(); |
1263 | 177k | } |
1264 | | |
1265 | 97.6k | void TabletPeer::Enqueue(rpc::ThreadPoolTask* task) { |
1266 | 97.6k | rpc::ThreadPool* thread_pool = service_thread_pool_.load(std::memory_order_acquire); |
1267 | 97.6k | if (!thread_pool) { |
1268 | 0 | task->Done(STATUS(Aborted, "Thread pool not ready")); |
1269 | 0 | return; |
1270 | 0 | } |
1271 | | |
1272 | 97.6k | thread_pool->Enqueue(task); |
1273 | 97.6k | } |
1274 | | |
1275 | 968k | void TabletPeer::StrandEnqueue(rpc::StrandTask* task) { |
1276 | 968k | rpc::Strand* strand = strand_.get(); |
1277 | 968k | if (!strand) { |
1278 | 0 | task->Done(STATUS(Aborted, "Thread pool not ready")); |
1279 | 0 | return; |
1280 | 0 | } |
1281 | | |
1282 | 968k | strand->Enqueue(task); |
1283 | 968k | } |
1284 | | |
1285 | 24.1k | bool TabletPeer::CanBeDeleted() { |
1286 | 24.1k | const auto consensus = shared_raft_consensus(); |
1287 | 24.1k | if (!consensus || consensus->LeaderTerm() == OpId::kUnknownTerm) { |
1288 | 16.6k | return false; |
1289 | 16.6k | } |
1290 | | |
1291 | 7.51k | const auto tablet = shared_tablet(); |
1292 | 7.51k | if (!tablet) { |
1293 | 0 | return false; |
1294 | 0 | } |
1295 | | |
1296 | 7.51k | auto op_id = tablet->metadata()->GetOpIdToDeleteAfterAllApplied(); |
1297 | 7.51k | if (!op_id.valid()) { |
1298 | 7.45k | return false; |
1299 | 7.45k | } |
1300 | | |
1301 | 62 | const auto all_applied_op_id = consensus->GetAllAppliedOpId(); |
1302 | 62 | if (all_applied_op_id < op_id) { |
1303 | 47 | return false; |
1304 | 47 | } |
1305 | | |
1306 | 15 | LOG_WITH_PREFIX(INFO) << Format( |
1307 | 15 | "Marked tablet $0 as requiring cleanup due to all replicas have been split (all applied op " |
1308 | 15 | "id: $1, split op id: $2)", |
1309 | 15 | tablet_id(), all_applied_op_id, op_id); |
1310 | | |
1311 | 15 | return true; |
1312 | 15 | } |
1313 | | |
1314 | 25.8k | rpc::Scheduler& TabletPeer::scheduler() const { |
1315 | 25.8k | return messenger_->scheduler(); |
1316 | 25.8k | } |
1317 | | |
1318 | | } // namespace tablet |
1319 | | } // namespace yb |