/Users/deen/code/yugabyte-db/src/yb/tablet/tablet_peer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tablet/tablet_peer.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <mutex> |
37 | | #include <string> |
38 | | #include <utility> |
39 | | #include <vector> |
40 | | |
41 | | #include <gflags/gflags.h> |
42 | | |
43 | | #include "yb/consensus/consensus.h" |
44 | | #include "yb/consensus/consensus.pb.h" |
45 | | #include "yb/consensus/consensus_util.h" |
46 | | #include "yb/consensus/log.h" |
47 | | #include "yb/consensus/log_anchor_registry.h" |
48 | | #include "yb/consensus/log_util.h" |
49 | | #include "yb/consensus/opid_util.h" |
50 | | #include "yb/consensus/raft_consensus.h" |
51 | | #include "yb/consensus/retryable_requests.h" |
52 | | #include "yb/consensus/state_change_context.h" |
53 | | |
54 | | #include "yb/docdb/consensus_frontier.h" |
55 | | |
56 | | #include "yb/gutil/casts.h" |
57 | | #include "yb/gutil/strings/substitute.h" |
58 | | #include "yb/gutil/sysinfo.h" |
59 | | |
60 | | #include "yb/rocksdb/db/memtable.h" |
61 | | |
62 | | #include "yb/rpc/messenger.h" |
63 | | #include "yb/rpc/strand.h" |
64 | | #include "yb/rpc/thread_pool.h" |
65 | | |
66 | | #include "yb/tablet/operations/change_metadata_operation.h" |
67 | | #include "yb/tablet/operations/history_cutoff_operation.h" |
68 | | #include "yb/tablet/operations/operation_driver.h" |
69 | | #include "yb/tablet/operations/snapshot_operation.h" |
70 | | #include "yb/tablet/operations/split_operation.h" |
71 | | #include "yb/tablet/operations/truncate_operation.h" |
72 | | #include "yb/tablet/operations/update_txn_operation.h" |
73 | | #include "yb/tablet/operations/write_operation.h" |
74 | | #include "yb/tablet/tablet.h" |
75 | | #include "yb/tablet/tablet.pb.h" |
76 | | #include "yb/tablet/tablet_bootstrap_if.h" |
77 | | #include "yb/tablet/tablet_metadata.h" |
78 | | #include "yb/tablet/tablet_metrics.h" |
79 | | #include "yb/tablet/tablet_peer_mm_ops.h" |
80 | | #include "yb/tablet/tablet_retention_policy.h" |
81 | | #include "yb/tablet/transaction_participant.h" |
82 | | #include "yb/tablet/write_query.h" |
83 | | |
84 | | #include "yb/util/debug-util.h" |
85 | | #include "yb/util/flag_tags.h" |
86 | | #include "yb/util/format.h" |
87 | | #include "yb/util/logging.h" |
88 | | #include "yb/util/metrics.h" |
89 | | #include "yb/util/status_format.h" |
90 | | #include "yb/util/status_log.h" |
91 | | #include "yb/util/stopwatch.h" |
92 | | #include "yb/util/threadpool.h" |
93 | | #include "yb/util/trace.h" |
94 | | |
95 | | using namespace std::literals; |
96 | | using namespace std::placeholders; |
97 | | using std::shared_ptr; |
98 | | using std::string; |
99 | | |
100 | | DEFINE_test_flag(int32, delay_init_tablet_peer_ms, 0, |
101 | | "Wait before executing init tablet peer for specified amount of milliseconds."); |
102 | | |
103 | | DEFINE_int32(cdc_min_replicated_index_considered_stale_secs, 900, |
104 | | "If cdc_min_replicated_index hasn't been replicated in this amount of time, we reset its" |
105 | | "value to max int64 to avoid retaining any logs"); |
106 | | |
107 | | DEFINE_bool(propagate_safe_time, true, "Propagate safe time to read from leader to followers"); |
108 | | |
109 | | DECLARE_int32(ysql_transaction_abort_timeout_ms); |
110 | | |
111 | | namespace yb { |
112 | | namespace tablet { |
113 | | |
114 | | METRIC_DEFINE_coarse_histogram(table, op_prepare_queue_length, "Operation Prepare Queue Length", |
115 | | MetricUnit::kTasks, |
116 | | "Number of operations waiting to be prepared within this tablet. " |
117 | | "High queue lengths indicate that the server is unable to process " |
118 | | "operations as fast as they are being written to the WAL."); |
119 | | |
120 | | METRIC_DEFINE_coarse_histogram(table, op_prepare_queue_time, "Operation Prepare Queue Time", |
121 | | MetricUnit::kMicroseconds, |
122 | | "Time that operations spent waiting in the prepare queue before being " |
123 | | "processed. High queue times indicate that the server is unable to " |
124 | | "process operations as fast as they are being written to the WAL."); |
125 | | |
126 | | METRIC_DEFINE_coarse_histogram(table, op_prepare_run_time, "Operation Prepare Run Time", |
127 | | MetricUnit::kMicroseconds, |
128 | | "Time that operations spent being prepared in the tablet. " |
129 | | "High values may indicate that the server is under-provisioned or " |
130 | | "that operations are experiencing high contention with one another for " |
131 | | "locks."); |
132 | | |
133 | | using consensus::Consensus; |
134 | | using consensus::ConsensusBootstrapInfo; |
135 | | using consensus::ConsensusMetadata; |
136 | | using consensus::ConsensusOptions; |
137 | | using consensus::ConsensusRound; |
138 | | using consensus::StateChangeContext; |
139 | | using consensus::StateChangeReason; |
140 | | using consensus::RaftConfigPB; |
141 | | using consensus::RaftPeerPB; |
142 | | using consensus::RaftConsensus; |
143 | | using consensus::ReplicateMsg; |
144 | | using consensus::OpIdType; |
145 | | using log::Log; |
146 | | using log::LogAnchorRegistry; |
147 | | using rpc::Messenger; |
148 | | using strings::Substitute; |
149 | | using tserver::TabletServerErrorPB; |
150 | | |
151 | | // ============================================================================ |
152 | | // Tablet Peer |
153 | | // ============================================================================ |
154 | | TabletPeer::TabletPeer( |
155 | | const RaftGroupMetadataPtr& meta, |
156 | | const consensus::RaftPeerPB& local_peer_pb, |
157 | | const scoped_refptr<server::Clock>& clock, |
158 | | const std::string& permanent_uuid, |
159 | | Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk, |
160 | | MetricRegistry* metric_registry, |
161 | | TabletSplitter* tablet_splitter, |
162 | | const std::shared_future<client::YBClient*>& client_future) |
163 | | : meta_(meta), |
164 | | tablet_id_(meta->raft_group_id()), |
165 | | local_peer_pb_(local_peer_pb), |
166 | | state_(RaftGroupStatePB::NOT_STARTED), |
167 | | operation_tracker_(consensus::MakeTabletLogPrefix(tablet_id_, permanent_uuid)), |
168 | | status_listener_(new TabletStatusListener(meta)), |
169 | | clock_(clock), |
170 | | log_anchor_registry_(new LogAnchorRegistry()), |
171 | | mark_dirty_clbk_(std::move(mark_dirty_clbk)), |
172 | | permanent_uuid_(permanent_uuid), |
173 | | preparing_operations_counter_(operation_tracker_.LogPrefix()), |
174 | | metric_registry_(metric_registry), |
175 | | tablet_splitter_(tablet_splitter), |
176 | 150k | client_future_(client_future) {} |
177 | | |
178 | 74.1k | TabletPeer::~TabletPeer() { |
179 | 74.1k | std::lock_guard<simple_spinlock> lock(lock_); |
180 | | // We should either have called Shutdown(), or we should have never called |
181 | | // Init(). |
182 | 74.1k | LOG_IF_WITH_PREFIX0 (DFATAL, tablet_) << "TabletPeer not fully shut down."0 ; |
183 | 74.1k | } |
184 | | |
185 | | Status TabletPeer::InitTabletPeer( |
186 | | const TabletPtr& tablet, |
187 | | const std::shared_ptr<MemTracker>& server_mem_tracker, |
188 | | Messenger* messenger, |
189 | | rpc::ProxyCache* proxy_cache, |
190 | | const scoped_refptr<Log>& log, |
191 | | const scoped_refptr<MetricEntity>& table_metric_entity, |
192 | | const scoped_refptr<MetricEntity>& tablet_metric_entity, |
193 | | ThreadPool* raft_pool, |
194 | | ThreadPool* tablet_prepare_pool, |
195 | | consensus::RetryableRequests* retryable_requests, |
196 | 150k | consensus::MultiRaftManager* multi_raft_manager) { |
197 | 150k | DCHECK(tablet) << "A TabletPeer must be provided with a Tablet"38 ; |
198 | 150k | DCHECK(log) << "A TabletPeer must be provided with a Log"46 ; |
199 | | |
200 | 150k | if (FLAGS_TEST_delay_init_tablet_peer_ms > 0) { |
201 | 0 | std::this_thread::sleep_for(FLAGS_TEST_delay_init_tablet_peer_ms * 1ms); |
202 | 0 | } |
203 | | |
204 | 150k | { |
205 | 150k | std::lock_guard<simple_spinlock> lock(lock_); |
206 | 150k | auto state = state_.load(std::memory_order_acquire); |
207 | 150k | if (state != RaftGroupStatePB::BOOTSTRAPPING) { |
208 | 0 | return STATUS_FORMAT( |
209 | 0 | IllegalState, "Invalid tablet state for init: $0", RaftGroupStatePB_Name(state)); |
210 | 0 | } |
211 | 150k | tablet_ = tablet; |
212 | 150k | proxy_cache_ = proxy_cache; |
213 | 150k | log_ = log; |
214 | | // "Publish" the log pointer so it can be retrieved using the log() accessor. |
215 | 150k | log_atomic_ = log.get(); |
216 | 150k | service_thread_pool_ = &messenger->ThreadPool(); |
217 | 150k | strand_.reset(new rpc::Strand(&messenger->ThreadPool())); |
218 | 150k | messenger_ = messenger; |
219 | | |
220 | 150k | tablet->SetMemTableFlushFilterFactory([log] { |
221 | 3.06k | auto largest_log_op_index = log->GetLatestEntryOpId().index; |
222 | 3.31k | return [largest_log_op_index] (const rocksdb::MemTable& memtable) -> Result<bool> { |
223 | 3.31k | auto frontiers = memtable.Frontiers(); |
224 | 3.31k | if (frontiers) { |
225 | 3.30k | const auto largest_memtable_op_index = |
226 | 3.30k | down_cast<const docdb::ConsensusFrontier&>(frontiers->Largest()).op_id().index; |
227 | | // We can only flush this memtable if all operations written to it have also been written |
228 | | // to the log (maybe not synced, if durable_wal_write is disabled, but that's OK). |
229 | 3.30k | auto should_flush = largest_memtable_op_index <= largest_log_op_index; |
230 | 3.30k | if (!should_flush) { |
231 | 34 | LOG(WARNING) |
232 | 34 | << "Skipping flush on memtable with ops ahead of log. " |
233 | 34 | << "Memtable index: " << largest_memtable_op_index |
234 | 34 | << " - log index: " << largest_log_op_index; |
235 | 34 | } |
236 | 3.30k | return should_flush; |
237 | 3.30k | } |
238 | | |
239 | | // It is correct to not have frontiers when memtable is empty |
240 | 7 | if (memtable.IsEmpty()) { |
241 | 7 | return true; |
242 | 7 | } |
243 | | |
244 | | // This is a degenerate case that should ideally never occur. An empty memtable got into the |
245 | | // list of immutable memtables. We say it is OK to flush it and move on. |
246 | 0 | static const char* error_msg = |
247 | 0 | "A memtable with no frontiers set found when deciding what memtables to " |
248 | 0 | "flush! This should not happen."; |
249 | 0 | LOG(ERROR) << error_msg << " Stack trace:\n" << GetStackTrace(); |
250 | 0 | return STATUS(IllegalState, error_msg); |
251 | 7 | }; |
252 | 3.06k | }); |
253 | | |
254 | 150k | tablet_->SetCleanupPool(raft_pool); |
255 | | |
256 | 150k | ConsensusOptions options; |
257 | 150k | options.tablet_id = meta_->raft_group_id(); |
258 | | |
259 | 150k | TRACE("Creating consensus instance"); |
260 | | |
261 | 150k | std::unique_ptr<ConsensusMetadata> cmeta; |
262 | 150k | RETURN_NOT_OK(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id_, |
263 | 150k | meta_->fs_manager()->uuid(), &cmeta)); |
264 | | |
265 | 150k | if (retryable_requests) { |
266 | 142k | retryable_requests->SetMetricEntity(tablet->GetTabletMetricsEntity()); |
267 | 142k | } |
268 | | |
269 | 150k | consensus_ = RaftConsensus::Create( |
270 | 150k | options, |
271 | 150k | std::move(cmeta), |
272 | 150k | local_peer_pb_, |
273 | 150k | table_metric_entity, |
274 | 150k | tablet_metric_entity, |
275 | 150k | clock_, |
276 | 150k | this, |
277 | 150k | messenger, |
278 | 150k | proxy_cache_, |
279 | 150k | log_.get(), |
280 | 150k | server_mem_tracker, |
281 | 150k | tablet_->mem_tracker(), |
282 | 150k | mark_dirty_clbk_, |
283 | 150k | tablet_->table_type(), |
284 | 150k | raft_pool, |
285 | 150k | retryable_requests, |
286 | 150k | multi_raft_manager); |
287 | 150k | has_consensus_.store(true, std::memory_order_release); |
288 | | |
289 | 150k | tablet_->SetHybridTimeLeaseProvider(std::bind(&TabletPeer::HybridTimeLease, this, _1, _2)); |
290 | 150k | operation_tracker_.SetPostTracker( |
291 | 150k | std::bind(&RaftConsensus::TrackOperationMemory, consensus_.get(), _1)); |
292 | | |
293 | 150k | prepare_thread_ = std::make_unique<Preparer>(consensus_.get(), tablet_prepare_pool); |
294 | | |
295 | 150k | ChangeConfigReplicated(RaftConfig()); // Set initial flag value. |
296 | 150k | } |
297 | | |
298 | 150k | RETURN_NOT_OK(prepare_thread_->Start()); |
299 | | |
300 | 150k | if (tablet_->metrics() != nullptr) { |
301 | 150k | TRACE("Starting instrumentation"); |
302 | 150k | operation_tracker_.StartInstrumentation(tablet_->GetTabletMetricsEntity()); |
303 | 150k | } |
304 | 150k | operation_tracker_.StartMemoryTracking(tablet_->mem_tracker()); |
305 | | |
306 | 150k | if (tablet_->transaction_coordinator()) { |
307 | 47.9k | tablet_->transaction_coordinator()->Start(); |
308 | 47.9k | } |
309 | | |
310 | 150k | if (tablet_->transaction_participant()) { |
311 | 56.8k | tablet_->transaction_participant()->Start(); |
312 | 56.8k | } |
313 | | |
314 | 150k | RETURN_NOT_OK(set_cdc_min_replicated_index(meta_->cdc_min_replicated_index())); |
315 | | |
316 | 150k | TRACE("TabletPeer::Init() finished"); |
317 | 150k | VLOG_WITH_PREFIX28 (2) << "Peer Initted"28 ; |
318 | | |
319 | 150k | return Status::OK(); |
320 | 150k | } |
321 | | |
322 | | Result<FixedHybridTimeLease> TabletPeer::HybridTimeLease( |
323 | 73.3M | HybridTime min_allowed, CoarseTimePoint deadline) { |
324 | 73.3M | auto time = VERIFY_RESULT(WaitUntil(clock_.get(), min_allowed, deadline)); |
325 | | // min_allowed could contain non zero logical part, so we add one microsecond to be sure that |
326 | | // the resulting ht_lease is at least min_allowed. |
327 | 0 | auto min_allowed_micros = min_allowed.CeilPhysicalValueMicros(); |
328 | 73.3M | MicrosTime lease_micros = VERIFY_RESULT73.3M (consensus_->MajorityReplicatedHtLeaseExpiration( |
329 | 73.3M | min_allowed_micros, deadline)); |
330 | 73.3M | if (lease_micros >= kMaxHybridTimePhysicalMicros) { |
331 | | // This could happen when leader leases are disabled. |
332 | 1.09M | return FixedHybridTimeLease(); |
333 | 1.09M | } |
334 | 72.2M | return FixedHybridTimeLease { |
335 | 72.2M | .time = time, |
336 | 72.2M | .lease = HybridTime(lease_micros, /* logical */ 0) |
337 | 72.2M | }; |
338 | 73.3M | } |
339 | | |
340 | 28.8M | Result<HybridTime> TabletPeer::PreparePeerRequest() { |
341 | 28.8M | auto leader_term = consensus_->GetLeaderState(/* allow_stale= */ true).term; |
342 | 28.8M | if (leader_term >= 0) { |
343 | 25.4M | auto last_write_ht = tablet_->mvcc_manager()->LastReplicatedHybridTime(); |
344 | 25.4M | auto propagated_history_cutoff = |
345 | 25.4M | tablet_->RetentionPolicy()->HistoryCutoffToPropagate(last_write_ht); |
346 | | |
347 | 25.4M | if (propagated_history_cutoff) { |
348 | 0 | VLOG_WITH_PREFIX(2) << "Propagate history cutoff: " << propagated_history_cutoff; |
349 | |
|
350 | 0 | auto operation = std::make_unique<HistoryCutoffOperation>(tablet_.get()); |
351 | 0 | auto request = operation->AllocateRequest(); |
352 | 0 | request->set_history_cutoff(propagated_history_cutoff.ToUint64()); |
353 | |
|
354 | 0 | Submit(std::move(operation), leader_term); |
355 | 0 | } |
356 | 25.4M | } |
357 | | |
358 | 28.8M | if (!FLAGS_propagate_safe_time) { |
359 | 6 | return HybridTime::kInvalid; |
360 | 6 | } |
361 | | |
362 | | // Get the current majority-replicated HT leader lease without any waiting. |
363 | 28.8M | auto ht_lease = VERIFY_RESULT(HybridTimeLease( |
364 | 28.8M | /* min_allowed= */ HybridTime::kMin, /* deadline */ CoarseTimePoint::max())); |
365 | 0 | return tablet_->mvcc_manager()->SafeTime(ht_lease); |
366 | 28.8M | } |
367 | | |
368 | 34.6M | void TabletPeer::MajorityReplicated() { |
369 | 34.6M | auto ht_lease = HybridTimeLease( |
370 | 34.6M | /* min_allowed= */ HybridTime::kMin, /* deadline */ CoarseTimePoint::max()); |
371 | 34.6M | if (!ht_lease.ok()) { |
372 | 0 | LOG_WITH_PREFIX(DFATAL) << "Failed to get current lease: " << ht_lease.status(); |
373 | 0 | return; |
374 | 0 | } |
375 | | |
376 | 34.6M | tablet_->mvcc_manager()->UpdatePropagatedSafeTimeOnLeader(*ht_lease); |
377 | 34.6M | } |
378 | | |
379 | 169k | void TabletPeer::ChangeConfigReplicated(const RaftConfigPB& config) { |
380 | 169k | tablet_->mvcc_manager()->SetLeaderOnlyMode(config.peers_size() == 1); |
381 | 169k | } |
382 | | |
383 | 24.3M | uint64_t TabletPeer::NumSSTFiles() { |
384 | 24.3M | return tablet_->GetCurrentVersionNumSSTFiles(); |
385 | 24.3M | } |
386 | | |
387 | 225k | void TabletPeer::ListenNumSSTFilesChanged(std::function<void()> listener) { |
388 | 225k | tablet_->ListenNumSSTFilesChanged(std::move(listener)); |
389 | 225k | } |
390 | | |
391 | 5.11M | Status TabletPeer::CheckOperationAllowed(const OpId& op_id, consensus::OperationType op_type) { |
392 | 5.11M | return tablet_->CheckOperationAllowed(op_id, op_type); |
393 | 5.11M | } |
394 | | |
395 | 150k | Status TabletPeer::Start(const ConsensusBootstrapInfo& bootstrap_info) { |
396 | 150k | { |
397 | 150k | std::lock_guard<simple_spinlock> l(state_change_lock_); |
398 | 150k | TRACE("Starting consensus"); |
399 | | |
400 | 18.4E | VLOG_WITH_PREFIX(2) << "Peer starting"; |
401 | | |
402 | 150k | VLOG(2) << "RaftConfig before starting: " << consensus_->CommittedConfig().DebugString()9 ; |
403 | | |
404 | | // If tablet was previously considered shutdown w.r.t. metrics, |
405 | | // fix that for a tablet now being reinstated. |
406 | 150k | DVLOG_WITH_PREFIX22 (3) |
407 | 22 | << "Remove from set of tablets that have been shutdown so as to allow reporting metrics"; |
408 | 150k | metric_registry_->tablets_shutdown_erase(tablet_id()); |
409 | | |
410 | 150k | RETURN_NOT_OK(consensus_->Start(bootstrap_info)); |
411 | 150k | RETURN_NOT_OK(UpdateState(RaftGroupStatePB::BOOTSTRAPPING, RaftGroupStatePB::RUNNING, |
412 | 150k | "Incorrect state to start TabletPeer, ")); |
413 | 150k | } |
414 | | // The context tracks that the current caller does not hold the lock for consensus state. |
415 | | // So mark dirty callback, e.g., consensus->ConsensusState() for master consensus callback of |
416 | | // SysCatalogStateChanged, can get the lock when needed. |
417 | 150k | auto context = |
418 | 150k | std::make_shared<StateChangeContext>(StateChangeReason::TABLET_PEER_STARTED, false); |
419 | | // Because we changed the tablet state, we need to re-report the tablet to the master. |
420 | 150k | mark_dirty_clbk_.Run(context); |
421 | | |
422 | 150k | return tablet_->EnableCompactions(/* non_abortable_ops_pause */ nullptr); |
423 | 150k | } |
424 | | |
425 | 152k | consensus::RaftConfigPB TabletPeer::RaftConfig() const { |
426 | 152k | CHECK(consensus_) << "consensus is null"0 ; |
427 | 152k | return consensus_->CommittedConfig(); |
428 | 152k | } |
429 | | |
430 | 75.7k | bool TabletPeer::StartShutdown(IsDropTable is_drop_table) { |
431 | 75.7k | LOG_WITH_PREFIX(INFO) << "Initiating TabletPeer shutdown"; |
432 | | |
433 | 75.7k | { |
434 | 75.7k | std::lock_guard<decltype(lock_)> lock(lock_); |
435 | 75.7k | if (tablet_) { |
436 | 75.6k | tablet_->StartShutdown(is_drop_table); |
437 | 75.6k | } |
438 | 75.7k | } |
439 | | |
440 | 75.7k | { |
441 | 75.7k | RaftGroupStatePB state = state_.load(std::memory_order_acquire); |
442 | 75.7k | for (;;) { |
443 | 75.7k | if (state == RaftGroupStatePB::QUIESCING || state == RaftGroupStatePB::SHUTDOWN) { |
444 | 96 | return false; |
445 | 96 | } |
446 | 75.6k | if (state_.compare_exchange_strong( |
447 | 75.6k | state, RaftGroupStatePB::QUIESCING, std::memory_order_acq_rel)) { |
448 | 75.6k | LOG_WITH_PREFIX(INFO) << "Started shutdown from state: " << RaftGroupStatePB_Name(state); |
449 | 75.6k | break; |
450 | 75.6k | } |
451 | 75.6k | } |
452 | 75.7k | } |
453 | | |
454 | 75.6k | std::lock_guard<simple_spinlock> l(state_change_lock_); |
455 | | // Even though Tablet::Shutdown() also unregisters its ops, we have to do it here |
456 | | // to ensure that any currently running operation finishes before we proceed with |
457 | | // the rest of the shutdown sequence. In particular, a maintenance operation could |
458 | | // indirectly end up calling into the log, which we are about to shut down. |
459 | 75.6k | UnregisterMaintenanceOps(); |
460 | | |
461 | 75.6k | std::shared_ptr<consensus::RaftConsensus> consensus; |
462 | 75.6k | { |
463 | 75.6k | std::lock_guard<decltype(lock_)> lock(lock_); |
464 | 75.6k | consensus = consensus_; |
465 | 75.6k | } |
466 | 75.6k | if (consensus) { |
467 | 75.6k | consensus->Shutdown(); |
468 | 75.6k | } |
469 | | |
470 | 75.6k | return true; |
471 | 75.7k | } |
472 | | |
473 | 75.6k | void TabletPeer::CompleteShutdown(IsDropTable is_drop_table) { |
474 | 75.6k | auto* strand = strand_.get(); |
475 | 75.6k | if (strand) { |
476 | 75.6k | strand->Shutdown(); |
477 | 75.6k | } |
478 | | |
479 | 75.6k | preparing_operations_counter_.Shutdown(); |
480 | | |
481 | | // TODO: KUDU-183: Keep track of the pending tasks and send an "abort" message. |
482 | 75.6k | LOG_SLOW_EXECUTION(WARNING, 1000, |
483 | 75.6k | Substitute("TabletPeer: tablet $0: Waiting for Operations to complete", tablet_id())) { |
484 | 75.6k | operation_tracker_.WaitForAllToFinish(); |
485 | 75.6k | } |
486 | | |
487 | 75.6k | if (prepare_thread_) { |
488 | 75.5k | prepare_thread_->Stop(); |
489 | 75.5k | } |
490 | | |
491 | 75.6k | if (log_) { |
492 | 75.5k | WARN_NOT_OK(log_->Close(), LogPrefix() + "Error closing the Log"); |
493 | 75.5k | } |
494 | | |
495 | 75.6k | VLOG_WITH_PREFIX224 (1) << "Shut down!"224 ; |
496 | | |
497 | 75.6k | if (tablet_) { |
498 | 75.2k | tablet_->CompleteShutdown(is_drop_table); |
499 | 75.2k | } |
500 | | |
501 | | // Only mark the peer as SHUTDOWN when all other components have shut down. |
502 | 75.6k | { |
503 | 75.6k | std::lock_guard<simple_spinlock> lock(lock_); |
504 | 75.6k | strand_.reset(); |
505 | | // Release mem tracker resources. |
506 | 75.6k | has_consensus_.store(false, std::memory_order_release); |
507 | 75.6k | consensus_.reset(); |
508 | 75.6k | prepare_thread_.reset(); |
509 | 75.6k | tablet_.reset(); |
510 | 75.6k | auto state = state_.load(std::memory_order_acquire); |
511 | 75.6k | LOG_IF_WITH_PREFIX151 (DFATAL, state != RaftGroupStatePB::QUIESCING) << |
512 | 151 | "Bad state when completing shutdown: " << RaftGroupStatePB_Name(state); |
513 | 75.6k | state_.store(RaftGroupStatePB::SHUTDOWN, std::memory_order_release); |
514 | | |
515 | 75.6k | if (metric_registry_) { |
516 | 18.4E | DVLOG_WITH_PREFIX(3) |
517 | 18.4E | << "Add to set of tablets that have been shutdown so as to avoid reporting metrics"; |
518 | 75.3k | metric_registry_->tablets_shutdown_insert(tablet_id()); |
519 | 75.3k | } |
520 | 75.6k | } |
521 | 75.6k | } |
522 | | |
523 | 95 | void TabletPeer::WaitUntilShutdown() { |
524 | 95 | const MonoDelta kSingleWait = 10ms; |
525 | 95 | const MonoDelta kReportInterval = 5s; |
526 | 95 | const MonoDelta kMaxWait = 30s; |
527 | | |
528 | 95 | MonoDelta waited = MonoDelta::kZero; |
529 | 95 | MonoDelta last_reported = MonoDelta::kZero; |
530 | 95 | while (state_.load(std::memory_order_acquire) != RaftGroupStatePB::SHUTDOWN) { |
531 | 0 | if (waited >= last_reported + kReportInterval) { |
532 | 0 | if (waited >= kMaxWait) { |
533 | 0 | LOG_WITH_PREFIX(DFATAL) |
534 | 0 | << "Wait for shutdown " << waited << " exceeded kMaxWait " << kMaxWait; |
535 | 0 | } else { |
536 | 0 | LOG_WITH_PREFIX(WARNING) << "Long wait for shutdown: " << waited; |
537 | 0 | } |
538 | 0 | last_reported = waited; |
539 | 0 | } |
540 | 0 | SleepFor(kSingleWait); |
541 | 0 | waited += kSingleWait; |
542 | 0 | } |
543 | | |
544 | 95 | if (metric_registry_) { |
545 | 95 | DVLOG_WITH_PREFIX0 (3) |
546 | 0 | << "Add to set of tablets that have been shutdown so as to avoid reporting metrics"; |
547 | 95 | metric_registry_->tablets_shutdown_insert(tablet_id()); |
548 | 95 | } |
549 | 95 | } |
550 | | |
551 | 75.5k | Status TabletPeer::Shutdown(IsDropTable is_drop_table) { |
552 | 75.5k | bool isShutdownInitiated = StartShutdown(is_drop_table); |
553 | | |
554 | 75.5k | RETURN_NOT_OK(AbortSQLTransactions()); |
555 | | |
556 | 75.5k | if (isShutdownInitiated) { |
557 | 75.4k | CompleteShutdown(is_drop_table); |
558 | 75.4k | } else { |
559 | 113 | WaitUntilShutdown(); |
560 | 113 | } |
561 | 75.5k | return Status::OK(); |
562 | 75.5k | } |
563 | | |
564 | 75.5k | Status TabletPeer::AbortSQLTransactions() { |
565 | | // Once raft group state enters QUIESCING state, |
566 | | // new queries cannot be processed from then onwards. |
567 | | // Aborting any remaining active transactions in the tablet. |
568 | 75.5k | if (tablet_ && tablet_->table_type() == TableType::PGSQL_TABLE_TYPE75.4k ) { |
569 | 35.2k | if (tablet_->transaction_participant()) { |
570 | 35.2k | HybridTime maxCutoff = HybridTime::kMax; |
571 | 35.2k | LOG(INFO) << "Aborting transactions that started prior to " << maxCutoff |
572 | 35.2k | << " for tablet id " << tablet_->tablet_id(); |
573 | 35.2k | CoarseTimePoint deadline = CoarseMonoClock::Now() + |
574 | 35.2k | MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_abort_timeout_ms); |
575 | 35.2k | WARN_NOT_OK(tablet_->transaction_participant()->StopActiveTxnsPriorTo(maxCutoff, deadline), |
576 | 35.2k | "Cannot abort transactions for tablet " + tablet_->tablet_id()); |
577 | 35.2k | } |
578 | 35.2k | } |
579 | 75.5k | return Status::OK(); |
580 | 75.5k | } |
581 | | |
582 | 69.1M | Status TabletPeer::CheckRunning() const { |
583 | 69.1M | auto state = state_.load(std::memory_order_acquire); |
584 | 69.1M | if (state != RaftGroupStatePB::RUNNING) { |
585 | 613 | if (state == RaftGroupStatePB::QUIESCING) { |
586 | 613 | return STATUS(ShutdownInProgress, "The tablet is shutting down"); |
587 | 613 | } |
588 | 0 | return STATUS_FORMAT(IllegalState, "The tablet is not in a running state: $0", |
589 | 613 | RaftGroupStatePB_Name(state)); |
590 | 613 | } |
591 | | |
592 | 69.1M | return Status::OK(); |
593 | 69.1M | } |
594 | | |
595 | 0 | bool TabletPeer::IsShutdownStarted() const { |
596 | 0 | auto state = state_.load(std::memory_order_acquire); |
597 | 0 | return state == RaftGroupStatePB::QUIESCING || state == RaftGroupStatePB::SHUTDOWN; |
598 | 0 | } |
599 | | |
600 | 153 | Status TabletPeer::CheckShutdownOrNotStarted() const { |
601 | 153 | RaftGroupStatePB value = state_.load(std::memory_order_acquire); |
602 | 153 | if (value != RaftGroupStatePB::SHUTDOWN && value != RaftGroupStatePB::NOT_STARTED0 ) { |
603 | 0 | return STATUS(IllegalState, Substitute("The tablet is not in a shutdown state: $0", |
604 | 0 | RaftGroupStatePB_Name(value))); |
605 | 0 | } |
606 | | |
607 | 153 | return Status::OK(); |
608 | 153 | } |
609 | | |
610 | 7.96k | Status TabletPeer::WaitUntilConsensusRunning(const MonoDelta& timeout) { |
611 | 7.96k | MonoTime start(MonoTime::Now()); |
612 | | |
613 | 7.96k | int backoff_exp = 0; |
614 | 7.96k | const int kMaxBackoffExp = 8; |
615 | 8.23k | while (true) { |
616 | 8.23k | RaftGroupStatePB cached_state = state_.load(std::memory_order_acquire); |
617 | 8.23k | if (cached_state == RaftGroupStatePB::QUIESCING || cached_state == RaftGroupStatePB::SHUTDOWN) { |
618 | 0 | return STATUS(IllegalState, |
619 | 0 | Substitute("The tablet is already shutting down or shutdown. State: $0", |
620 | 0 | RaftGroupStatePB_Name(cached_state))); |
621 | 0 | } |
622 | 8.23k | if (cached_state == RUNNING && has_consensus_.load(std::memory_order_acquire)7.96k && |
623 | 8.23k | consensus_->IsRunning()7.96k ) { |
624 | 7.96k | break; |
625 | 7.96k | } |
626 | 275 | MonoTime now(MonoTime::Now()); |
627 | 275 | MonoDelta elapsed(now.GetDeltaSince(start)); |
628 | 275 | if (elapsed.MoreThan(timeout)) { |
629 | 1 | return STATUS(TimedOut, Substitute("Consensus is not running after waiting for $0. State; $1", |
630 | 1 | elapsed.ToString(), RaftGroupStatePB_Name(cached_state))); |
631 | 1 | } |
632 | 274 | SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp)); |
633 | 274 | backoff_exp = std::min(backoff_exp + 1, kMaxBackoffExp); |
634 | 274 | } |
635 | 7.96k | return Status::OK(); |
636 | 7.96k | } |
637 | | |
638 | 3.16M | void TabletPeer::WriteAsync(std::unique_ptr<WriteQuery> query) { |
639 | 3.16M | ScopedOperation preparing_token(&preparing_operations_counter_); |
640 | 3.16M | auto status = CheckRunning(); |
641 | 3.16M | if (!status.ok()) { |
642 | 4 | query->Cancel(status); |
643 | 4 | return; |
644 | 4 | } |
645 | | |
646 | 3.16M | query->operation().set_preparing_token(std::move(preparing_token)); |
647 | 3.16M | tablet_->AcquireLocksAndPerformDocOperations(std::move(query)); |
648 | 3.16M | } |
649 | | |
650 | 0 | Result<HybridTime> TabletPeer::ReportReadRestart() { |
651 | 0 | tablet_->metrics()->restart_read_requests->Increment(); |
652 | 0 | return tablet_->SafeTime(RequireLease::kTrue); |
653 | 0 | } |
654 | | |
655 | 5.04M | void TabletPeer::Submit(std::unique_ptr<Operation> operation, int64_t term) { |
656 | 5.04M | auto status = CheckRunning(); |
657 | | |
658 | 5.04M | if (status.ok()) { |
659 | 5.04M | auto driver = NewLeaderOperationDriver(&operation, term); |
660 | 5.04M | if (driver.ok()5.04M ) { |
661 | 5.04M | (**driver).ExecuteAsync(); |
662 | 18.4E | } else { |
663 | 18.4E | status = driver.status(); |
664 | 18.4E | } |
665 | 5.04M | } |
666 | 5.04M | if (!status.ok()) { |
667 | 7 | operation->Aborted(status, /* was_pending= */ false); |
668 | 7 | } |
669 | 5.04M | } |
670 | | |
671 | | void TabletPeer::SubmitUpdateTransaction( |
672 | 1.50M | std::unique_ptr<UpdateTxnOperation> operation, int64_t term) { |
673 | 1.50M | if (!operation->tablet()) { |
674 | 0 | operation->SetTablet(tablet()); |
675 | 0 | } |
676 | 1.50M | Submit(std::move(operation), term); |
677 | 1.50M | } |
678 | | |
679 | 477k | HybridTime TabletPeer::SafeTimeForTransactionParticipant() { |
680 | 477k | return tablet_->mvcc_manager()->SafeTimeForFollower( |
681 | 477k | /* min_allowed= */ HybridTime::kMin, /* deadline= */ CoarseTimePoint::min()); |
682 | 477k | } |
683 | | |
684 | 26.6k | Result<HybridTime> TabletPeer::WaitForSafeTime(HybridTime safe_time, CoarseTimePoint deadline) { |
685 | 26.6k | return tablet_->SafeTime(RequireLease::kFallbackToFollower, safe_time, deadline); |
686 | 26.6k | } |
687 | | |
688 | 1.67M | Status TabletPeer::GetLastReplicatedData(RemoveIntentsData* data) { |
689 | 1.67M | std::shared_ptr<consensus::RaftConsensus> consensus; |
690 | 1.67M | TabletPtr tablet; |
691 | 1.67M | { |
692 | 1.67M | std::lock_guard<simple_spinlock> lock(lock_); |
693 | 1.67M | consensus = consensus_; |
694 | 1.67M | tablet = tablet_; |
695 | 1.67M | } |
696 | 1.67M | if (!consensus) { |
697 | 0 | return STATUS(IllegalState, "Consensus destroyed"); |
698 | 0 | } |
699 | 1.67M | if (!tablet) { |
700 | 0 | return STATUS(IllegalState, "Tablet destroyed"); |
701 | 0 | } |
702 | 1.67M | data->op_id = consensus->GetLastCommittedOpId(); |
703 | 1.67M | data->log_ht = tablet->mvcc_manager()->LastReplicatedHybridTime(); |
704 | 1.67M | return Status::OK(); |
705 | 1.67M | } |
706 | | |
707 | 1.91M | void TabletPeer::GetLastCDCedData(RemoveIntentsData* data) { |
708 | 1.91M | if (consensus_ != nullptr) { |
709 | 1.90M | data->op_id.index = consensus_->GetLastCDCedOpId().index; |
710 | 1.90M | data->op_id.term = consensus_->GetLastCDCedOpId().term; |
711 | 1.90M | } |
712 | | |
713 | 1.91M | if((tablet_ != nullptr) && (tablet_->mvcc_manager() != nullptr)1.91M ) { |
714 | | // for now use this hybrid time, ideally it should be of last_updated_time |
715 | 1.90M | data->log_ht = tablet_->mvcc_manager()->LastReplicatedHybridTime(); |
716 | 1.90M | } |
717 | 1.91M | } |
718 | | |
719 | 1.19M | void TabletPeer::UpdateClock(HybridTime hybrid_time) { |
720 | 1.19M | clock_->Update(hybrid_time); |
721 | 1.19M | } |
722 | | |
723 | | std::unique_ptr<UpdateTxnOperation> TabletPeer::CreateUpdateTransaction( |
724 | 406k | TransactionStatePB* request) { |
725 | 406k | auto result = std::make_unique<UpdateTxnOperation>(tablet()); |
726 | 406k | result->TakeRequest(request); |
727 | 406k | return result; |
728 | 406k | } |
729 | | |
730 | 8.14k | void TabletPeer::GetTabletStatusPB(TabletStatusPB* status_pb_out) { |
731 | 8.14k | std::lock_guard<simple_spinlock> lock(lock_); |
732 | 8.14k | DCHECK(status_pb_out != nullptr); |
733 | 8.14k | DCHECK(status_listener_.get() != nullptr); |
734 | 8.14k | const auto disk_size_info = GetOnDiskSizeInfo(); |
735 | 8.14k | status_pb_out->set_tablet_id(status_listener_->tablet_id()); |
736 | 8.14k | status_pb_out->set_namespace_name(status_listener_->namespace_name()); |
737 | 8.14k | status_pb_out->set_table_name(status_listener_->table_name()); |
738 | 8.14k | status_pb_out->set_table_id(status_listener_->table_id()); |
739 | 8.14k | status_pb_out->set_last_status(status_listener_->last_status()); |
740 | 8.14k | status_listener_->partition()->ToPB(status_pb_out->mutable_partition()); |
741 | 8.14k | status_pb_out->set_state(state_); |
742 | 8.14k | status_pb_out->set_tablet_data_state(meta_->tablet_data_state()); |
743 | 8.14k | auto tablet = tablet_; |
744 | 8.14k | if (tablet) { |
745 | 8.07k | status_pb_out->set_table_type(tablet->table_type()); |
746 | 8.07k | } |
747 | 8.14k | disk_size_info.ToPB(status_pb_out); |
748 | | // Set hide status of the tablet. |
749 | 8.14k | status_pb_out->set_is_hidden(meta_->hidden()); |
750 | 8.14k | } |
751 | | |
752 | 20 | Status TabletPeer::RunLogGC() { |
753 | 20 | if (!CheckRunning().ok()) { |
754 | 0 | return Status::OK(); |
755 | 0 | } |
756 | 20 | auto s = reset_cdc_min_replicated_index_if_stale(); |
757 | 20 | if (!s.ok()) { |
758 | 0 | LOG_WITH_PREFIX(WARNING) << "Unable to reset cdc min replicated index " << s; |
759 | 0 | } |
760 | 20 | int64_t min_log_index; |
761 | 20 | if (VLOG_IS_ON(2)) { |
762 | 0 | std::string details; |
763 | 0 | min_log_index = VERIFY_RESULT(GetEarliestNeededLogIndex(&details)); |
764 | 0 | LOG_WITH_PREFIX(INFO) << __func__ << ": " << details; |
765 | 20 | } else { |
766 | 20 | min_log_index = VERIFY_RESULT(GetEarliestNeededLogIndex()); |
767 | 20 | } |
768 | 20 | int32_t num_gced = 0; |
769 | 20 | return log_->GC(min_log_index, &num_gced); |
770 | 20 | } |
771 | | |
772 | 141 | TabletDataState TabletPeer::data_state() const { |
773 | 141 | std::lock_guard<simple_spinlock> lock(lock_); |
774 | 141 | return meta_->tablet_data_state(); |
775 | 141 | } |
776 | | |
777 | 13 | string TabletPeer::HumanReadableState() const { |
778 | 13 | std::lock_guard<simple_spinlock> lock(lock_); |
779 | 13 | TabletDataState data_state = meta_->tablet_data_state(); |
780 | 13 | RaftGroupStatePB state = this->state(); |
781 | | // If failed, any number of things could have gone wrong. |
782 | 13 | if (state == RaftGroupStatePB::FAILED) { |
783 | 0 | return Substitute("$0 ($1): $2", RaftGroupStatePB_Name(state), |
784 | 0 | TabletDataState_Name(data_state), |
785 | 0 | error_.get()->ToString()); |
786 | | // If it's remotely bootstrapping, or tombstoned, that is the important thing |
787 | | // to show. |
788 | 13 | } else if (!CanServeTabletData(data_state)) { |
789 | 0 | return TabletDataState_Name(data_state); |
790 | 13 | } else if (data_state == TabletDataState::TABLET_DATA_SPLIT_COMPLETED) { |
791 | 0 | return RaftGroupStatePB_Name(state) + " (split)"; |
792 | 0 | } |
793 | | // Otherwise, the tablet's data is in a "normal" state, so we just display |
794 | | // the runtime state (BOOTSTRAPPING, RUNNING, etc). |
795 | 13 | return RaftGroupStatePB_Name(state); |
796 | 13 | } |
797 | | |
798 | | namespace { |
799 | | |
800 | 0 | consensus::OperationType MapOperationTypeToPB(OperationType operation_type) { |
801 | 0 | switch (operation_type) { |
802 | 0 | case OperationType::kWrite: |
803 | 0 | return consensus::WRITE_OP; |
804 | | |
805 | 0 | case OperationType::kChangeMetadata: |
806 | 0 | return consensus::CHANGE_METADATA_OP; |
807 | | |
808 | 0 | case OperationType::kUpdateTransaction: |
809 | 0 | return consensus::UPDATE_TRANSACTION_OP; |
810 | | |
811 | 0 | case OperationType::kSnapshot: |
812 | 0 | return consensus::SNAPSHOT_OP; |
813 | | |
814 | 0 | case OperationType::kTruncate: |
815 | 0 | return consensus::TRUNCATE_OP; |
816 | | |
817 | 0 | case OperationType::kHistoryCutoff: |
818 | 0 | return consensus::HISTORY_CUTOFF_OP; |
819 | | |
820 | 0 | case OperationType::kSplit: |
821 | 0 | return consensus::SPLIT_OP; |
822 | | |
823 | 0 | case OperationType::kEmpty: |
824 | 0 | LOG(FATAL) << "OperationType::kEmpty cannot be converted to consensus::OperationType"; |
825 | 0 | } |
826 | 0 | FATAL_INVALID_ENUM_VALUE(OperationType, operation_type); |
827 | 0 | } |
828 | | |
829 | | } // namespace |
830 | | |
831 | | void TabletPeer::GetInFlightOperations(Operation::TraceType trace_type, |
832 | 0 | vector<consensus::OperationStatusPB>* out) const { |
833 | 0 | for (const auto& driver : operation_tracker_.GetPendingOperations()) { |
834 | 0 | if (driver->operation() == nullptr) { |
835 | 0 | continue; |
836 | 0 | } |
837 | 0 | auto op_type = driver->operation_type(); |
838 | 0 | if (op_type == OperationType::kEmpty) { |
839 | | // This is a special-purpose in-memory-only operation for updating propagated safe time on |
840 | | // a follower. |
841 | 0 | continue; |
842 | 0 | } |
843 | | |
844 | 0 | consensus::OperationStatusPB status_pb; |
845 | 0 | driver->GetOpId().ToPB(status_pb.mutable_op_id()); |
846 | 0 | status_pb.set_operation_type(MapOperationTypeToPB(op_type)); |
847 | 0 | status_pb.set_description(driver->ToString()); |
848 | 0 | int64_t running_for_micros = |
849 | 0 | MonoTime::Now().GetDeltaSince(driver->start_time()).ToMicroseconds(); |
850 | 0 | status_pb.set_running_for_micros(running_for_micros); |
851 | 0 | if (trace_type == Operation::TRACE_TXNS) { |
852 | 0 | status_pb.set_trace_buffer(driver->trace()->DumpToString(true)); |
853 | 0 | } |
854 | 0 | out->push_back(status_pb); |
855 | 0 | } |
856 | 0 | } |
857 | | |
858 | 50.4M | Result<int64_t> TabletPeer::GetEarliestNeededLogIndex(std::string* details) const { |
859 | | // First, we anchor on the last OpId in the Log to establish a lower bound |
860 | | // and avoid racing with the other checks. This limits the Log GC candidate |
861 | | // segments before we check the anchors. |
862 | 50.4M | auto latest_log_entry_op_id = log_->GetLatestEntryOpId(); |
863 | 50.4M | int64_t min_index = latest_log_entry_op_id.index; |
864 | 50.4M | if (details) { |
865 | 0 | *details += Format("Latest log entry op id: $0\n", latest_log_entry_op_id); |
866 | 0 | } |
867 | | |
868 | | // If we never have written to the log, no need to proceed. |
869 | 50.4M | if (min_index == 0) { |
870 | 310k | return min_index; |
871 | 310k | } |
872 | | |
873 | | // Next, we interrogate the anchor registry. |
874 | | // Returns OK if minimum known, NotFound if no anchors are registered. |
875 | 50.1M | { |
876 | 50.1M | int64_t min_anchor_index; |
877 | 50.1M | Status s = log_anchor_registry_->GetEarliestRegisteredLogIndex(&min_anchor_index); |
878 | 50.1M | if (PREDICT_FALSE(!s.ok())) { |
879 | 50.1M | DCHECK(s.IsNotFound()) << "Unexpected error calling LogAnchorRegistry: " << s.ToString()0 ; |
880 | 50.1M | } else { |
881 | 14.5k | min_index = std::min(min_index, min_anchor_index); |
882 | 14.5k | if (details) { |
883 | 0 | *details += Format("Min anchor index: $0\n", min_anchor_index); |
884 | 0 | } |
885 | 14.5k | } |
886 | 50.1M | } |
887 | | |
888 | | // Next, interrogate the OperationTracker. |
889 | 50.1M | int64_t min_pending_op_index = std::numeric_limits<int64_t>::max(); |
890 | 50.1M | for (const auto& driver : operation_tracker_.GetPendingOperations()) { |
891 | 963k | auto tx_op_id = driver->GetOpId(); |
892 | | // A operation which doesn't have an opid hasn't been submitted for replication yet and |
893 | | // thus has no need to anchor the log. |
894 | 963k | if (tx_op_id != yb::OpId::Invalid()) { |
895 | 929k | min_pending_op_index = std::min(min_pending_op_index, tx_op_id.index); |
896 | 929k | } |
897 | 963k | } |
898 | | |
899 | 50.1M | min_index = std::min(min_index, min_pending_op_index); |
900 | 50.1M | if (details && min_pending_op_index != std::numeric_limits<int64_t>::max()0 ) { |
901 | 0 | *details += Format("Min pending op id index: $0\n", min_pending_op_index); |
902 | 0 | } |
903 | | |
904 | 50.1M | auto min_retryable_request_op_id = consensus_->MinRetryableRequestOpId(); |
905 | 50.1M | min_index = std::min(min_index, min_retryable_request_op_id.index); |
906 | 50.1M | if (details) { |
907 | 0 | *details += Format("Min retryable request op id: $0\n", min_retryable_request_op_id); |
908 | 0 | } |
909 | | |
910 | 50.1M | auto* transaction_coordinator = tablet()->transaction_coordinator(); |
911 | 50.1M | if (transaction_coordinator) { |
912 | 10.4M | auto transaction_coordinator_min_op_index = transaction_coordinator->PrepareGC(details); |
913 | 10.4M | min_index = std::min(min_index, transaction_coordinator_min_op_index); |
914 | 10.4M | } |
915 | | |
916 | | // We keep at least one committed operation in the log so that we can always recover safe time |
917 | | // during bootstrap. |
918 | | // Last committed op id should be read before MaxPersistentOpId to avoid race condition |
919 | | // described in MaxPersistentOpIdForDb. |
920 | | // |
921 | | // If we read last committed op id AFTER reading last persistent op id (INCORRECT): |
922 | | // - We read max persistent op id and find there is no new data, so we ignore it. |
923 | | // - New data gets written and Raft-committed, but not yet flushed to an SSTable. |
924 | | // - We read the last committed op id, which is greater than what max persistent op id would have |
925 | | // returned. |
926 | | // - We garbage-collect the Raft log entries corresponding to the new data. |
927 | | // - Power is lost and the server reboots, losing committed data. |
928 | | // |
929 | | // If we read last committed op id BEFORE reading last persistent op id (CORRECT): |
930 | | // - We read the last committed op id. |
931 | | // - We read max persistent op id and find there is no new data, so we ignore it. |
932 | | // - New data gets written and Raft-committed, but not yet flushed to an SSTable. |
933 | | // - We still don't garbage-collect the logs containing the committed but unflushed data, |
934 | | // because the earlier value of the last committed op id that we read prevents us from doing so. |
935 | 50.1M | auto last_committed_op_id = consensus()->GetLastCommittedOpId(); |
936 | 50.1M | min_index = std::min(min_index, last_committed_op_id.index); |
937 | 50.1M | if (details) { |
938 | 0 | *details += Format("Last committed op id: $0\n", last_committed_op_id); |
939 | 0 | } |
940 | | |
941 | 50.1M | if (tablet_->table_type() != TableType::TRANSACTION_STATUS_TABLE_TYPE) { |
942 | 39.7M | tablet_->FlushIntentsDbIfNecessary(latest_log_entry_op_id); |
943 | 39.7M | auto max_persistent_op_id = VERIFY_RESULT( |
944 | 39.7M | tablet_->MaxPersistentOpId(true /* invalid_if_no_new_data */)); |
945 | 39.7M | if (max_persistent_op_id.regular.valid()) { |
946 | 18.1M | min_index = std::min(min_index, max_persistent_op_id.regular.index); |
947 | 18.1M | if (details) { |
948 | 0 | *details += Format("Max persistent regular op id: $0\n", max_persistent_op_id.regular); |
949 | 0 | } |
950 | 18.1M | } |
951 | 39.7M | if (max_persistent_op_id.intents.valid()) { |
952 | 2.45M | min_index = std::min(min_index, max_persistent_op_id.intents.index); |
953 | 2.45M | if (details) { |
954 | 0 | *details += Format("Max persistent intents op id: $0\n", max_persistent_op_id.intents); |
955 | 0 | } |
956 | 2.45M | } |
957 | 39.7M | } |
958 | | |
959 | 50.1M | if (details) { |
960 | 0 | *details += Format("Earliest needed log index: $0\n", min_index); |
961 | 0 | } |
962 | | |
963 | 50.1M | return min_index; |
964 | 50.1M | } |
965 | | |
966 | 50.4M | Status TabletPeer::GetGCableDataSize(int64_t* retention_size) const { |
967 | 50.4M | RETURN_NOT_OK(CheckRunning()); |
968 | 50.4M | int64_t min_op_idx = VERIFY_RESULT(GetEarliestNeededLogIndex()); |
969 | 50.4M | RETURN_NOT_OK(log_->GetGCableDataSize(min_op_idx, retention_size)); |
970 | 50.4M | return Status::OK(); |
971 | 50.4M | } |
972 | | |
973 | 3.47M | log::Log* TabletPeer::log() const { |
974 | 3.47M | Log* log = log_atomic_.load(std::memory_order_acquire); |
975 | 3.47M | LOG_IF_WITH_PREFIX69 (FATAL, !log) << "log() called before the log instance is initialized."69 ; |
976 | 3.47M | return log; |
977 | 3.47M | } |
978 | | |
979 | 77.5k | yb::OpId TabletPeer::GetLatestLogEntryOpId() const { |
980 | 77.5k | Log* log = log_atomic_.load(std::memory_order_acquire); |
981 | 77.5k | if (log) { |
982 | 77.5k | return log->GetLatestEntryOpId(); |
983 | 77.5k | } |
984 | 3 | return yb::OpId(); |
985 | 77.5k | } |
986 | | |
987 | 150k | Status TabletPeer::set_cdc_min_replicated_index_unlocked(int64_t cdc_min_replicated_index) { |
988 | 150k | LOG_WITH_PREFIX(INFO) << "Setting cdc min replicated index to " << cdc_min_replicated_index; |
989 | 150k | RETURN_NOT_OK(meta_->set_cdc_min_replicated_index(cdc_min_replicated_index)); |
990 | 150k | Log* log = log_atomic_.load(std::memory_order_acquire); |
991 | 150k | if (log) { |
992 | 150k | log->set_cdc_min_replicated_index(cdc_min_replicated_index); |
993 | 150k | } |
994 | 150k | cdc_min_replicated_index_refresh_time_ = MonoTime::Now(); |
995 | 150k | return Status::OK(); |
996 | 150k | } |
997 | | |
998 | 150k | Status TabletPeer::set_cdc_min_replicated_index(int64_t cdc_min_replicated_index) { |
999 | 150k | std::lock_guard<decltype(cdc_min_replicated_index_lock_)> l(cdc_min_replicated_index_lock_); |
1000 | 150k | return set_cdc_min_replicated_index_unlocked(cdc_min_replicated_index); |
1001 | 150k | } |
1002 | | |
1003 | 20 | Status TabletPeer::reset_cdc_min_replicated_index_if_stale() { |
1004 | 20 | std::lock_guard<decltype(cdc_min_replicated_index_lock_)> l(cdc_min_replicated_index_lock_); |
1005 | 20 | auto seconds_since_last_refresh = |
1006 | 20 | MonoTime::Now().GetDeltaSince(cdc_min_replicated_index_refresh_time_).ToSeconds(); |
1007 | 20 | if (seconds_since_last_refresh > FLAGS_cdc_min_replicated_index_considered_stale_secs) { |
1008 | 0 | LOG_WITH_PREFIX(INFO) << "Resetting cdc min replicated index. Seconds since last update: " |
1009 | 0 | << seconds_since_last_refresh; |
1010 | 0 | RETURN_NOT_OK(set_cdc_min_replicated_index_unlocked(std::numeric_limits<int64_t>::max())); |
1011 | 0 | } |
1012 | 20 | return Status::OK(); |
1013 | 20 | } |
1014 | | |
1015 | 9.28M | std::unique_ptr<Operation> TabletPeer::CreateOperation(consensus::ReplicateMsg* replicate_msg) { |
1016 | 9.28M | switch (replicate_msg->op_type()) { |
1017 | 5.53M | case consensus::WRITE_OP: |
1018 | 5.53M | DCHECK(replicate_msg->has_write()) << "WRITE_OP replica" |
1019 | 4.90k | " operation must receive a WriteRequestPB"; |
1020 | | // We use separate preparing token only on leader, so here it could be empty. |
1021 | 5.53M | return std::make_unique<WriteOperation>(tablet()); |
1022 | | |
1023 | 645k | case consensus::CHANGE_METADATA_OP: |
1024 | 645k | DCHECK(replicate_msg->has_change_metadata_request()) << "CHANGE_METADATA_OP replica" |
1025 | 352 | " operation must receive an ChangeMetadataRequestPB"; |
1026 | 645k | return std::make_unique<ChangeMetadataOperation>(tablet(), log()); |
1027 | | |
1028 | 2.99M | case consensus::UPDATE_TRANSACTION_OP: |
1029 | 2.99M | DCHECK(replicate_msg->has_transaction_state()) << "UPDATE_TRANSACTION_OP replica" |
1030 | 2.29k | " operation must receive an TransactionStatePB"; |
1031 | 2.99M | return std::make_unique<UpdateTxnOperation>(tablet()); |
1032 | | |
1033 | 114k | case consensus::TRUNCATE_OP: |
1034 | 114k | DCHECK(replicate_msg->has_truncate()) << "TRUNCATE_OP replica" |
1035 | 515 | " operation must receive an TruncateRequestPB"; |
1036 | 114k | return std::make_unique<TruncateOperation>(tablet()); |
1037 | | |
1038 | 1.15k | case consensus::SNAPSHOT_OP: |
1039 | 1.15k | DCHECK(replicate_msg->has_snapshot_request()) << "SNAPSHOT_OP replica" |
1040 | 0 | " operation must receive an TabletSnapshotOpRequestPB"; |
1041 | 1.15k | return std::make_unique<SnapshotOperation>(tablet()); |
1042 | | |
1043 | 0 | case consensus::HISTORY_CUTOFF_OP: |
1044 | 0 | DCHECK(replicate_msg->has_history_cutoff()) << "HISTORY_CUTOFF_OP replica" |
1045 | 0 | " transaction must receive an HistoryCutoffPB"; |
1046 | 0 | return std::make_unique<HistoryCutoffOperation>(tablet()); |
1047 | | |
1048 | 26 | case consensus::SPLIT_OP: |
1049 | 26 | DCHECK(replicate_msg->has_split_request()) << "SPLIT_OP replica" |
1050 | 0 | " operation must receive an SplitOpRequestPB"; |
1051 | 26 | return std::make_unique<SplitOperation>(tablet(), tablet_splitter_); |
1052 | | |
1053 | 0 | case consensus::UNKNOWN_OP: FALLTHROUGH_INTENDED; |
1054 | 0 | case consensus::NO_OP: FALLTHROUGH_INTENDED; |
1055 | 0 | case consensus::CHANGE_CONFIG_OP: |
1056 | 0 | FATAL_INVALID_ENUM_VALUE(consensus::OperationType, replicate_msg->op_type()); |
1057 | 9.28M | } |
1058 | 0 | FATAL_INVALID_ENUM_VALUE(consensus::OperationType, replicate_msg->op_type()); |
1059 | 0 | } |
1060 | | |
1061 | | Status TabletPeer::StartReplicaOperation( |
1062 | 9.28M | const scoped_refptr<ConsensusRound>& round, HybridTime propagated_safe_time) { |
1063 | 9.28M | RaftGroupStatePB value = state(); |
1064 | 9.28M | if (value != RaftGroupStatePB::RUNNING && value != RaftGroupStatePB::BOOTSTRAPPING122 ) { |
1065 | 0 | return STATUS(IllegalState, RaftGroupStatePB_Name(value)); |
1066 | 0 | } |
1067 | | |
1068 | 9.28M | consensus::ReplicateMsg* replicate_msg = round->replicate_msg().get(); |
1069 | 9.28M | DCHECK(replicate_msg->has_hybrid_time()); |
1070 | 9.28M | auto operation = CreateOperation(replicate_msg); |
1071 | | |
1072 | | // TODO(todd) Look at wiring the stuff below on the driver |
1073 | | // It's imperative that we set the round here on any type of operation, as this |
1074 | | // allows us to keep the reference to the request in the round instead of copying it. |
1075 | 9.28M | operation->set_consensus_round(round); |
1076 | 9.28M | HybridTime ht(replicate_msg->hybrid_time()); |
1077 | 9.28M | operation->set_hybrid_time(ht); |
1078 | 9.28M | clock_->Update(ht); |
1079 | | |
1080 | | // This sets the monotonic counter to at least replicate_msg.monotonic_counter() atomically. |
1081 | 9.28M | tablet_->UpdateMonotonicCounter(replicate_msg->monotonic_counter()); |
1082 | | |
1083 | 9.28M | auto* operation_ptr = operation.get(); |
1084 | 9.28M | OperationDriverPtr driver = VERIFY_RESULT9.28M (NewReplicaOperationDriver(&operation));9.28M |
1085 | | |
1086 | 0 | operation_ptr->consensus_round()->SetCallback(driver.get()); |
1087 | | |
1088 | 9.28M | if (propagated_safe_time) { |
1089 | 8.14M | driver->SetPropagatedSafeTime(propagated_safe_time, tablet_->mvcc_manager()); |
1090 | 8.14M | } |
1091 | | |
1092 | 9.28M | driver->ExecuteAsync(); |
1093 | 9.28M | return Status::OK(); |
1094 | 9.28M | } |
1095 | | |
1096 | 17.1M | void TabletPeer::SetPropagatedSafeTime(HybridTime ht) { |
1097 | 17.1M | auto driver = NewReplicaOperationDriver(nullptr); |
1098 | 17.1M | if (!driver.ok()) { |
1099 | 0 | LOG_WITH_PREFIX(ERROR) << "Failed to create operation driver to set propagated hybrid time"; |
1100 | 0 | return; |
1101 | 0 | } |
1102 | 17.1M | (**driver).SetPropagatedSafeTime(ht, tablet_->mvcc_manager()); |
1103 | 17.1M | (**driver).ExecuteAsync(); |
1104 | 17.1M | } |
1105 | | |
1106 | 5.53M | bool TabletPeer::ShouldApplyWrite() { |
1107 | 5.53M | return tablet_->ShouldApplyWrite(); |
1108 | 5.53M | } |
1109 | | |
1110 | 119M | consensus::Consensus* TabletPeer::consensus() const { |
1111 | 119M | return raft_consensus(); |
1112 | 119M | } |
1113 | | |
1114 | 122M | consensus::RaftConsensus* TabletPeer::raft_consensus() const { |
1115 | 122M | std::lock_guard<simple_spinlock> lock(lock_); |
1116 | 122M | return consensus_.get(); |
1117 | 122M | } |
1118 | | |
1119 | 27.2M | shared_ptr<consensus::Consensus> TabletPeer::shared_consensus() const { |
1120 | 27.2M | std::lock_guard<simple_spinlock> lock(lock_); |
1121 | 27.2M | return consensus_; |
1122 | 27.2M | } |
1123 | | |
1124 | 28.6M | shared_ptr<consensus::RaftConsensus> TabletPeer::shared_raft_consensus() const { |
1125 | 28.6M | std::lock_guard<simple_spinlock> lock(lock_); |
1126 | 28.6M | return consensus_; |
1127 | 28.6M | } |
1128 | | |
1129 | | Result<OperationDriverPtr> TabletPeer::NewLeaderOperationDriver( |
1130 | 5.04M | std::unique_ptr<Operation>* operation, int64_t term) { |
1131 | 5.04M | if (term == OpId::kUnknownTerm) { |
1132 | 0 | return STATUS(InvalidArgument, "Leader operation driver for unknown term"); |
1133 | 0 | } |
1134 | 5.04M | return NewOperationDriver(operation, term); |
1135 | 5.04M | } |
1136 | | |
1137 | | Result<OperationDriverPtr> TabletPeer::NewReplicaOperationDriver( |
1138 | 26.4M | std::unique_ptr<Operation>* operation) { |
1139 | 26.4M | return NewOperationDriver(operation, OpId::kUnknownTerm); |
1140 | 26.4M | } |
1141 | | |
1142 | | Result<OperationDriverPtr> TabletPeer::NewOperationDriver(std::unique_ptr<Operation>* operation, |
1143 | 31.4M | int64_t term) { |
1144 | 31.4M | auto operation_driver = CreateOperationDriver(); |
1145 | 31.4M | RETURN_NOT_OK(operation_driver->Init(operation, term)); |
1146 | 31.4M | return operation_driver; |
1147 | 31.4M | } |
1148 | | |
1149 | 150k | void TabletPeer::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) { |
1150 | | // Taking state_change_lock_ ensures that we don't shut down concurrently with |
1151 | | // this last start-up task. |
1152 | | // Note that the state_change_lock_ is taken in Shutdown(), |
1153 | | // prior to calling UnregisterMaintenanceOps(). |
1154 | | |
1155 | 150k | std::lock_guard<simple_spinlock> l(state_change_lock_); |
1156 | | |
1157 | 150k | if (state() != RaftGroupStatePB::RUNNING) { |
1158 | 0 | LOG_WITH_PREFIX(WARNING) << "Not registering maintenance operations: tablet not RUNNING"; |
1159 | 0 | return; |
1160 | 0 | } |
1161 | | |
1162 | 150k | DCHECK(maintenance_ops_.empty()); |
1163 | | |
1164 | 150k | auto log_gc = std::make_unique<LogGCOp>(this); |
1165 | 150k | maint_mgr->RegisterOp(log_gc.get()); |
1166 | 150k | maintenance_ops_.push_back(std::move(log_gc)); |
1167 | 150k | LOG_WITH_PREFIX(INFO) << "Registered log gc"; |
1168 | 150k | } |
1169 | | |
1170 | 75.6k | void TabletPeer::UnregisterMaintenanceOps() { |
1171 | 75.6k | DCHECK(state_change_lock_.is_locked()); |
1172 | 75.6k | for (auto& op : maintenance_ops_) { |
1173 | 75.6k | op->Unregister(); |
1174 | 75.6k | } |
1175 | 75.6k | maintenance_ops_.clear(); |
1176 | 75.6k | } |
1177 | | |
1178 | 8.14k | TabletOnDiskSizeInfo TabletPeer::GetOnDiskSizeInfo() const { |
1179 | 8.14k | TabletOnDiskSizeInfo info; |
1180 | | |
1181 | 8.14k | if (consensus_) { |
1182 | 8.07k | info.consensus_metadata_disk_size = consensus_->OnDiskSize(); |
1183 | 8.07k | } |
1184 | | |
1185 | 8.14k | if (tablet_) { |
1186 | 8.07k | info.sst_files_disk_size = tablet_->GetCurrentVersionSstFilesSize(); |
1187 | 8.07k | info.uncompressed_sst_files_disk_size = |
1188 | 8.07k | tablet_->GetCurrentVersionSstFilesUncompressedSize(); |
1189 | 8.07k | } |
1190 | | |
1191 | 8.14k | auto log = log_atomic_.load(std::memory_order_acquire); |
1192 | 8.14k | if (log) { |
1193 | 8.12k | info.wal_files_disk_size = log->OnDiskSize(); |
1194 | 8.12k | } |
1195 | | |
1196 | 8.14k | info.RecomputeTotalSize(); |
1197 | 8.14k | return info; |
1198 | 8.14k | } |
1199 | | |
1200 | 4.41k | size_t TabletPeer::GetNumLogSegments() const { |
1201 | 4.41k | auto log = log_atomic_.load(std::memory_order_acquire); |
1202 | 4.41k | return log ? log->num_segments()4.39k : 012 ; |
1203 | 4.41k | } |
1204 | | |
1205 | 843k | std::string TabletPeer::LogPrefix() const { |
1206 | 843k | return Substitute("T $0 P $1 [state=$2]: ", |
1207 | 843k | tablet_id_, permanent_uuid_, RaftGroupStatePB_Name(state())); |
1208 | 843k | } |
1209 | | |
1210 | 31.4M | scoped_refptr<OperationDriver> TabletPeer::CreateOperationDriver() { |
1211 | 31.4M | return scoped_refptr<OperationDriver>(new OperationDriver( |
1212 | 31.4M | &operation_tracker_, |
1213 | 31.4M | consensus_.get(), |
1214 | 31.4M | prepare_thread_.get(), |
1215 | 31.4M | tablet_->table_type())); |
1216 | 31.4M | } |
1217 | | |
1218 | 5.81M | int64_t TabletPeer::LeaderTerm() const { |
1219 | 5.81M | shared_ptr<consensus::Consensus> consensus; |
1220 | 5.81M | { |
1221 | 5.81M | std::lock_guard<simple_spinlock> lock(lock_); |
1222 | 5.81M | consensus = consensus_; |
1223 | 5.81M | } |
1224 | 5.81M | return consensus ? consensus->LeaderTerm()5.81M : yb::OpId::kUnknownTerm5.40k ; |
1225 | 5.81M | } |
1226 | | |
1227 | 247k | Result<HybridTime> TabletPeer::LeaderSafeTime() const { |
1228 | 247k | return tablet_->SafeTime(); |
1229 | 247k | } |
1230 | | |
1231 | 33.3M | consensus::LeaderStatus TabletPeer::LeaderStatus(bool allow_stale) const { |
1232 | 33.3M | shared_ptr<consensus::Consensus> consensus; |
1233 | 33.3M | { |
1234 | 33.3M | std::lock_guard<simple_spinlock> lock(lock_); |
1235 | 33.3M | consensus = consensus_; |
1236 | 33.3M | } |
1237 | 33.3M | return consensus ? consensus->GetLeaderStatus(allow_stale)32.1M : consensus::LeaderStatus::NOT_LEADER1.18M ; |
1238 | 33.3M | } |
1239 | | |
1240 | 197k | HybridTime TabletPeer::HtLeaseExpiration() const { |
1241 | 197k | HybridTime result( |
1242 | 197k | CHECK_RESULT(consensus_->MajorityReplicatedHtLeaseExpiration(0, CoarseTimePoint::max())), 0); |
1243 | 197k | return std::max(result, tablet_->mvcc_manager()->LastReplicatedHybridTime()); |
1244 | 197k | } |
1245 | | |
1246 | 0 | TableType TabletPeer::table_type() EXCLUDES(lock_) { |
1247 | | // TODO: what if tablet is not set? |
1248 | 0 | return DCHECK_NOTNULL(tablet())->table_type(); |
1249 | 0 | } |
1250 | | |
1251 | 18 | void TabletPeer::SetFailed(const Status& error) { |
1252 | 18 | DCHECK(error_.get(std::memory_order_acquire) == nullptr); |
1253 | 18 | error_ = MakeAtomicUniquePtr<Status>(error); |
1254 | 18 | auto state = state_.load(std::memory_order_acquire); |
1255 | 18 | while (state != RaftGroupStatePB::FAILED && state != RaftGroupStatePB::QUIESCING && |
1256 | 18 | state != RaftGroupStatePB::SHUTDOWN) { |
1257 | 18 | if (state_.compare_exchange_weak(state, RaftGroupStatePB::FAILED, std::memory_order_acq_rel)) { |
1258 | 18 | LOG_WITH_PREFIX(INFO) << "Changed state from " << RaftGroupStatePB_Name(state) |
1259 | 18 | << " to FAILED"; |
1260 | 18 | break; |
1261 | 18 | } |
1262 | 18 | } |
1263 | 18 | } |
1264 | | |
1265 | | Status TabletPeer::UpdateState(RaftGroupStatePB expected, RaftGroupStatePB new_state, |
1266 | 300k | const std::string& error_message) { |
1267 | 300k | RaftGroupStatePB old = expected; |
1268 | 300k | if (!state_.compare_exchange_strong(old, new_state, std::memory_order_acq_rel)) { |
1269 | 0 | return STATUS_FORMAT( |
1270 | 0 | InvalidArgument, "$0 Expected state: $1, got: $2", |
1271 | 0 | error_message, RaftGroupStatePB_Name(expected), RaftGroupStatePB_Name(old)); |
1272 | 0 | } |
1273 | | |
1274 | 300k | LOG_WITH_PREFIX(INFO) << "Changed state from " << RaftGroupStatePB_Name(old) << " to " |
1275 | 300k | << RaftGroupStatePB_Name(new_state); |
1276 | 300k | return Status::OK(); |
1277 | 300k | } |
1278 | | |
1279 | 222k | void TabletPeer::Enqueue(rpc::ThreadPoolTask* task) { |
1280 | 222k | rpc::ThreadPool* thread_pool = service_thread_pool_.load(std::memory_order_acquire); |
1281 | 222k | if (!thread_pool) { |
1282 | 0 | task->Done(STATUS(Aborted, "Thread pool not ready")); |
1283 | 0 | return; |
1284 | 0 | } |
1285 | | |
1286 | 222k | thread_pool->Enqueue(task); |
1287 | 222k | } |
1288 | | |
1289 | 1.67M | void TabletPeer::StrandEnqueue(rpc::StrandTask* task) { |
1290 | 1.67M | rpc::Strand* strand = strand_.get(); |
1291 | 1.67M | if (!strand) { |
1292 | 0 | task->Done(STATUS(Aborted, "Thread pool not ready")); |
1293 | 0 | return; |
1294 | 0 | } |
1295 | | |
1296 | 1.67M | strand->Enqueue(task); |
1297 | 1.67M | } |
1298 | | |
1299 | 1.63M | bool TabletPeer::CanBeDeleted() { |
1300 | 1.63M | const auto consensus = shared_raft_consensus(); |
1301 | 1.63M | if (!consensus || consensus->LeaderTerm() == OpId::kUnknownTerm1.62M ) { |
1302 | 1.50M | return false; |
1303 | 1.50M | } |
1304 | | |
1305 | 131k | const auto tablet = shared_tablet(); |
1306 | 131k | if (!tablet) { |
1307 | 0 | return false; |
1308 | 0 | } |
1309 | | |
1310 | 131k | auto op_id = tablet->metadata()->GetOpIdToDeleteAfterAllApplied(); |
1311 | 131k | if (!op_id.valid()) { |
1312 | 131k | return false; |
1313 | 131k | } |
1314 | | |
1315 | 60 | const auto all_applied_op_id = consensus->GetAllAppliedOpId(); |
1316 | 60 | if (all_applied_op_id < op_id) { |
1317 | 46 | return false; |
1318 | 46 | } |
1319 | | |
1320 | 14 | LOG_WITH_PREFIX(INFO) << Format( |
1321 | 14 | "Marked tablet $0 as requiring cleanup due to all replicas have been split (all applied op " |
1322 | 14 | "id: $1, split op id: $2)", |
1323 | 14 | tablet_id(), all_applied_op_id, op_id); |
1324 | | |
1325 | 14 | return true; |
1326 | 60 | } |
1327 | | |
1328 | 56.9k | rpc::Scheduler& TabletPeer::scheduler() const { |
1329 | 56.9k | return messenger_->scheduler(); |
1330 | 56.9k | } |
1331 | | |
1332 | | } // namespace tablet |
1333 | | } // namespace yb |