YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/tablet_peer.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/tablet_peer.h"
34
35
#include <algorithm>
36
#include <mutex>
37
#include <string>
38
#include <utility>
39
#include <vector>
40
41
#include <gflags/gflags.h>
42
43
#include "yb/consensus/consensus.h"
44
#include "yb/consensus/consensus.pb.h"
45
#include "yb/consensus/consensus_util.h"
46
#include "yb/consensus/log.h"
47
#include "yb/consensus/log_anchor_registry.h"
48
#include "yb/consensus/log_util.h"
49
#include "yb/consensus/opid_util.h"
50
#include "yb/consensus/raft_consensus.h"
51
#include "yb/consensus/retryable_requests.h"
52
#include "yb/consensus/state_change_context.h"
53
54
#include "yb/docdb/consensus_frontier.h"
55
56
#include "yb/gutil/casts.h"
57
#include "yb/gutil/strings/substitute.h"
58
#include "yb/gutil/sysinfo.h"
59
60
#include "yb/rocksdb/db/memtable.h"
61
62
#include "yb/rpc/messenger.h"
63
#include "yb/rpc/strand.h"
64
#include "yb/rpc/thread_pool.h"
65
66
#include "yb/tablet/operations/change_metadata_operation.h"
67
#include "yb/tablet/operations/history_cutoff_operation.h"
68
#include "yb/tablet/operations/operation_driver.h"
69
#include "yb/tablet/operations/snapshot_operation.h"
70
#include "yb/tablet/operations/split_operation.h"
71
#include "yb/tablet/operations/truncate_operation.h"
72
#include "yb/tablet/operations/update_txn_operation.h"
73
#include "yb/tablet/operations/write_operation.h"
74
#include "yb/tablet/tablet.h"
75
#include "yb/tablet/tablet.pb.h"
76
#include "yb/tablet/tablet_bootstrap_if.h"
77
#include "yb/tablet/tablet_metadata.h"
78
#include "yb/tablet/tablet_metrics.h"
79
#include "yb/tablet/tablet_peer_mm_ops.h"
80
#include "yb/tablet/tablet_retention_policy.h"
81
#include "yb/tablet/transaction_participant.h"
82
#include "yb/tablet/write_query.h"
83
84
#include "yb/util/debug-util.h"
85
#include "yb/util/flag_tags.h"
86
#include "yb/util/format.h"
87
#include "yb/util/logging.h"
88
#include "yb/util/metrics.h"
89
#include "yb/util/status_format.h"
90
#include "yb/util/status_log.h"
91
#include "yb/util/stopwatch.h"
92
#include "yb/util/threadpool.h"
93
#include "yb/util/trace.h"
94
95
using namespace std::literals;
96
using namespace std::placeholders;
97
using std::shared_ptr;
98
using std::string;
99
100
DEFINE_test_flag(int32, delay_init_tablet_peer_ms, 0,
101
                 "Wait before executing init tablet peer for specified amount of milliseconds.");
102
103
DEFINE_int32(cdc_min_replicated_index_considered_stale_secs, 900,
104
    "If cdc_min_replicated_index hasn't been replicated in this amount of time, we reset its"
105
    "value to max int64 to avoid retaining any logs");
106
107
DEFINE_bool(propagate_safe_time, true, "Propagate safe time to read from leader to followers");
108
109
DECLARE_int32(ysql_transaction_abort_timeout_ms);
110
111
namespace yb {
112
namespace tablet {
113
114
METRIC_DEFINE_coarse_histogram(table, op_prepare_queue_length, "Operation Prepare Queue Length",
115
                        MetricUnit::kTasks,
116
                        "Number of operations waiting to be prepared within this tablet. "
117
                        "High queue lengths indicate that the server is unable to process "
118
                        "operations as fast as they are being written to the WAL.");
119
120
METRIC_DEFINE_coarse_histogram(table, op_prepare_queue_time, "Operation Prepare Queue Time",
121
                        MetricUnit::kMicroseconds,
122
                        "Time that operations spent waiting in the prepare queue before being "
123
                        "processed. High queue times indicate that the server is unable to "
124
                        "process operations as fast as they are being written to the WAL.");
125
126
METRIC_DEFINE_coarse_histogram(table, op_prepare_run_time, "Operation Prepare Run Time",
127
                        MetricUnit::kMicroseconds,
128
                        "Time that operations spent being prepared in the tablet. "
129
                        "High values may indicate that the server is under-provisioned or "
130
                        "that operations are experiencing high contention with one another for "
131
                        "locks.");
132
133
using consensus::Consensus;
134
using consensus::ConsensusBootstrapInfo;
135
using consensus::ConsensusMetadata;
136
using consensus::ConsensusOptions;
137
using consensus::ConsensusRound;
138
using consensus::StateChangeContext;
139
using consensus::StateChangeReason;
140
using consensus::RaftConfigPB;
141
using consensus::RaftPeerPB;
142
using consensus::RaftConsensus;
143
using consensus::ReplicateMsg;
144
using consensus::OpIdType;
145
using log::Log;
146
using log::LogAnchorRegistry;
147
using rpc::Messenger;
148
using strings::Substitute;
149
using tserver::TabletServerErrorPB;
150
151
// ============================================================================
152
//  Tablet Peer
153
// ============================================================================
154
TabletPeer::TabletPeer(
155
    const RaftGroupMetadataPtr& meta,
156
    const consensus::RaftPeerPB& local_peer_pb,
157
    const scoped_refptr<server::Clock>& clock,
158
    const std::string& permanent_uuid,
159
    Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk,
160
    MetricRegistry* metric_registry,
161
    TabletSplitter* tablet_splitter,
162
    const std::shared_future<client::YBClient*>& client_future)
163
    : meta_(meta),
164
      tablet_id_(meta->raft_group_id()),
165
      local_peer_pb_(local_peer_pb),
166
      state_(RaftGroupStatePB::NOT_STARTED),
167
      operation_tracker_(consensus::MakeTabletLogPrefix(tablet_id_, permanent_uuid)),
168
      status_listener_(new TabletStatusListener(meta)),
169
      clock_(clock),
170
      log_anchor_registry_(new LogAnchorRegistry()),
171
      mark_dirty_clbk_(std::move(mark_dirty_clbk)),
172
      permanent_uuid_(permanent_uuid),
173
      preparing_operations_counter_(operation_tracker_.LogPrefix()),
174
      metric_registry_(metric_registry),
175
      tablet_splitter_(tablet_splitter),
176
150k
      client_future_(client_future) {}
177
178
74.1k
TabletPeer::~TabletPeer() {
179
74.1k
  std::lock_guard<simple_spinlock> lock(lock_);
180
  // We should either have called Shutdown(), or we should have never called
181
  // Init().
182
74.1k
  
LOG_IF_WITH_PREFIX0
(DFATAL, tablet_) << "TabletPeer not fully shut down."0
;
183
74.1k
}
184
185
Status TabletPeer::InitTabletPeer(
186
    const TabletPtr& tablet,
187
    const std::shared_ptr<MemTracker>& server_mem_tracker,
188
    Messenger* messenger,
189
    rpc::ProxyCache* proxy_cache,
190
    const scoped_refptr<Log>& log,
191
    const scoped_refptr<MetricEntity>& table_metric_entity,
192
    const scoped_refptr<MetricEntity>& tablet_metric_entity,
193
    ThreadPool* raft_pool,
194
    ThreadPool* tablet_prepare_pool,
195
    consensus::RetryableRequests* retryable_requests,
196
150k
    consensus::MultiRaftManager* multi_raft_manager) {
197
150k
  DCHECK
(tablet) << "A TabletPeer must be provided with a Tablet"38
;
198
150k
  DCHECK
(log) << "A TabletPeer must be provided with a Log"46
;
199
200
150k
  if (FLAGS_TEST_delay_init_tablet_peer_ms > 0) {
201
0
    std::this_thread::sleep_for(FLAGS_TEST_delay_init_tablet_peer_ms * 1ms);
202
0
  }
203
204
150k
  {
205
150k
    std::lock_guard<simple_spinlock> lock(lock_);
206
150k
    auto state = state_.load(std::memory_order_acquire);
207
150k
    if (state != RaftGroupStatePB::BOOTSTRAPPING) {
208
0
      return STATUS_FORMAT(
209
0
          IllegalState, "Invalid tablet state for init: $0", RaftGroupStatePB_Name(state));
210
0
    }
211
150k
    tablet_ = tablet;
212
150k
    proxy_cache_ = proxy_cache;
213
150k
    log_ = log;
214
    // "Publish" the log pointer so it can be retrieved using the log() accessor.
215
150k
    log_atomic_ = log.get();
216
150k
    service_thread_pool_ = &messenger->ThreadPool();
217
150k
    strand_.reset(new rpc::Strand(&messenger->ThreadPool()));
218
150k
    messenger_ = messenger;
219
220
150k
    tablet->SetMemTableFlushFilterFactory([log] {
221
3.06k
      auto largest_log_op_index = log->GetLatestEntryOpId().index;
222
3.31k
      return [largest_log_op_index] (const rocksdb::MemTable& memtable) -> Result<bool> {
223
3.31k
        auto frontiers = memtable.Frontiers();
224
3.31k
        if (frontiers) {
225
3.30k
          const auto largest_memtable_op_index =
226
3.30k
              down_cast<const docdb::ConsensusFrontier&>(frontiers->Largest()).op_id().index;
227
          // We can only flush this memtable if all operations written to it have also been written
228
          // to the log (maybe not synced, if durable_wal_write is disabled, but that's OK).
229
3.30k
          auto should_flush = largest_memtable_op_index <= largest_log_op_index;
230
3.30k
          if (!should_flush) {
231
34
            LOG(WARNING)
232
34
              << "Skipping flush on memtable with ops ahead of log. "
233
34
              << "Memtable index: " << largest_memtable_op_index
234
34
              << " - log index: " << largest_log_op_index;
235
34
          }
236
3.30k
          return should_flush;
237
3.30k
        }
238
239
        // It is correct to not have frontiers when memtable is empty
240
7
        if (memtable.IsEmpty()) {
241
7
          return true;
242
7
        }
243
244
        // This is a degenerate case that should ideally never occur. An empty memtable got into the
245
        // list of immutable memtables. We say it is OK to flush it and move on.
246
0
        static const char* error_msg =
247
0
            "A memtable with no frontiers set found when deciding what memtables to "
248
0
            "flush! This should not happen.";
249
0
        LOG(ERROR) << error_msg << " Stack trace:\n" << GetStackTrace();
250
0
        return STATUS(IllegalState, error_msg);
251
7
      };
252
3.06k
    });
253
254
150k
    tablet_->SetCleanupPool(raft_pool);
255
256
150k
    ConsensusOptions options;
257
150k
    options.tablet_id = meta_->raft_group_id();
258
259
150k
    TRACE("Creating consensus instance");
260
261
150k
    std::unique_ptr<ConsensusMetadata> cmeta;
262
150k
    RETURN_NOT_OK(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id_,
263
150k
                                          meta_->fs_manager()->uuid(), &cmeta));
264
265
150k
    if (retryable_requests) {
266
142k
      retryable_requests->SetMetricEntity(tablet->GetTabletMetricsEntity());
267
142k
    }
268
269
150k
    consensus_ = RaftConsensus::Create(
270
150k
        options,
271
150k
        std::move(cmeta),
272
150k
        local_peer_pb_,
273
150k
        table_metric_entity,
274
150k
        tablet_metric_entity,
275
150k
        clock_,
276
150k
        this,
277
150k
        messenger,
278
150k
        proxy_cache_,
279
150k
        log_.get(),
280
150k
        server_mem_tracker,
281
150k
        tablet_->mem_tracker(),
282
150k
        mark_dirty_clbk_,
283
150k
        tablet_->table_type(),
284
150k
        raft_pool,
285
150k
        retryable_requests,
286
150k
        multi_raft_manager);
287
150k
    has_consensus_.store(true, std::memory_order_release);
288
289
150k
    tablet_->SetHybridTimeLeaseProvider(std::bind(&TabletPeer::HybridTimeLease, this, _1, _2));
290
150k
    operation_tracker_.SetPostTracker(
291
150k
        std::bind(&RaftConsensus::TrackOperationMemory, consensus_.get(), _1));
292
293
150k
    prepare_thread_ = std::make_unique<Preparer>(consensus_.get(), tablet_prepare_pool);
294
295
150k
    ChangeConfigReplicated(RaftConfig()); // Set initial flag value.
296
150k
  }
297
298
150k
  RETURN_NOT_OK(prepare_thread_->Start());
299
300
150k
  if (tablet_->metrics() != nullptr) {
301
150k
    TRACE("Starting instrumentation");
302
150k
    operation_tracker_.StartInstrumentation(tablet_->GetTabletMetricsEntity());
303
150k
  }
304
150k
  operation_tracker_.StartMemoryTracking(tablet_->mem_tracker());
305
306
150k
  if (tablet_->transaction_coordinator()) {
307
47.9k
    tablet_->transaction_coordinator()->Start();
308
47.9k
  }
309
310
150k
  if (tablet_->transaction_participant()) {
311
56.8k
    tablet_->transaction_participant()->Start();
312
56.8k
  }
313
314
150k
  RETURN_NOT_OK(set_cdc_min_replicated_index(meta_->cdc_min_replicated_index()));
315
316
150k
  TRACE("TabletPeer::Init() finished");
317
150k
  
VLOG_WITH_PREFIX28
(2) << "Peer Initted"28
;
318
319
150k
  return Status::OK();
320
150k
}
321
322
Result<FixedHybridTimeLease> TabletPeer::HybridTimeLease(
323
73.3M
    HybridTime min_allowed, CoarseTimePoint deadline) {
324
73.3M
  auto time = VERIFY_RESULT(WaitUntil(clock_.get(), min_allowed, deadline));
325
  // min_allowed could contain non zero logical part, so we add one microsecond to be sure that
326
  // the resulting ht_lease is at least min_allowed.
327
0
  auto min_allowed_micros = min_allowed.CeilPhysicalValueMicros();
328
73.3M
  MicrosTime lease_micros = 
VERIFY_RESULT73.3M
(consensus_->MajorityReplicatedHtLeaseExpiration(
329
73.3M
      min_allowed_micros, deadline));
330
73.3M
  if (lease_micros >= kMaxHybridTimePhysicalMicros) {
331
    // This could happen when leader leases are disabled.
332
1.09M
    return FixedHybridTimeLease();
333
1.09M
  }
334
72.2M
  return FixedHybridTimeLease {
335
72.2M
    .time = time,
336
72.2M
    .lease = HybridTime(lease_micros, /* logical */ 0)
337
72.2M
  };
338
73.3M
}
339
340
28.8M
Result<HybridTime> TabletPeer::PreparePeerRequest() {
341
28.8M
  auto leader_term = consensus_->GetLeaderState(/* allow_stale= */ true).term;
342
28.8M
  if (leader_term >= 0) {
343
25.4M
    auto last_write_ht = tablet_->mvcc_manager()->LastReplicatedHybridTime();
344
25.4M
    auto propagated_history_cutoff =
345
25.4M
        tablet_->RetentionPolicy()->HistoryCutoffToPropagate(last_write_ht);
346
347
25.4M
    if (propagated_history_cutoff) {
348
0
      VLOG_WITH_PREFIX(2) << "Propagate history cutoff: " << propagated_history_cutoff;
349
350
0
      auto operation = std::make_unique<HistoryCutoffOperation>(tablet_.get());
351
0
      auto request = operation->AllocateRequest();
352
0
      request->set_history_cutoff(propagated_history_cutoff.ToUint64());
353
354
0
      Submit(std::move(operation), leader_term);
355
0
    }
356
25.4M
  }
357
358
28.8M
  if (!FLAGS_propagate_safe_time) {
359
6
    return HybridTime::kInvalid;
360
6
  }
361
362
  // Get the current majority-replicated HT leader lease without any waiting.
363
28.8M
  auto ht_lease = VERIFY_RESULT(HybridTimeLease(
364
28.8M
      /* min_allowed= */ HybridTime::kMin, /* deadline */ CoarseTimePoint::max()));
365
0
  return tablet_->mvcc_manager()->SafeTime(ht_lease);
366
28.8M
}
367
368
34.6M
void TabletPeer::MajorityReplicated() {
369
34.6M
  auto ht_lease = HybridTimeLease(
370
34.6M
      /* min_allowed= */ HybridTime::kMin, /* deadline */ CoarseTimePoint::max());
371
34.6M
  if (!ht_lease.ok()) {
372
0
    LOG_WITH_PREFIX(DFATAL) << "Failed to get current lease: " << ht_lease.status();
373
0
    return;
374
0
  }
375
376
34.6M
  tablet_->mvcc_manager()->UpdatePropagatedSafeTimeOnLeader(*ht_lease);
377
34.6M
}
378
379
169k
void TabletPeer::ChangeConfigReplicated(const RaftConfigPB& config) {
380
169k
  tablet_->mvcc_manager()->SetLeaderOnlyMode(config.peers_size() == 1);
381
169k
}
382
383
24.3M
uint64_t TabletPeer::NumSSTFiles() {
384
24.3M
  return tablet_->GetCurrentVersionNumSSTFiles();
385
24.3M
}
386
387
225k
void TabletPeer::ListenNumSSTFilesChanged(std::function<void()> listener) {
388
225k
  tablet_->ListenNumSSTFilesChanged(std::move(listener));
389
225k
}
390
391
5.11M
Status TabletPeer::CheckOperationAllowed(const OpId& op_id, consensus::OperationType op_type) {
392
5.11M
  return tablet_->CheckOperationAllowed(op_id, op_type);
393
5.11M
}
394
395
150k
Status TabletPeer::Start(const ConsensusBootstrapInfo& bootstrap_info) {
396
150k
  {
397
150k
    std::lock_guard<simple_spinlock> l(state_change_lock_);
398
150k
    TRACE("Starting consensus");
399
400
18.4E
    VLOG_WITH_PREFIX(2) << "Peer starting";
401
402
150k
    VLOG
(2) << "RaftConfig before starting: " << consensus_->CommittedConfig().DebugString()9
;
403
404
    // If tablet was previously considered shutdown w.r.t. metrics,
405
    // fix that for a tablet now being reinstated.
406
150k
    
DVLOG_WITH_PREFIX22
(3)
407
22
      << "Remove from set of tablets that have been shutdown so as to allow reporting metrics";
408
150k
    metric_registry_->tablets_shutdown_erase(tablet_id());
409
410
150k
    RETURN_NOT_OK(consensus_->Start(bootstrap_info));
411
150k
    RETURN_NOT_OK(UpdateState(RaftGroupStatePB::BOOTSTRAPPING, RaftGroupStatePB::RUNNING,
412
150k
                              "Incorrect state to start TabletPeer, "));
413
150k
  }
414
  // The context tracks that the current caller does not hold the lock for consensus state.
415
  // So mark dirty callback, e.g., consensus->ConsensusState() for master consensus callback of
416
  // SysCatalogStateChanged, can get the lock when needed.
417
150k
  auto context =
418
150k
      std::make_shared<StateChangeContext>(StateChangeReason::TABLET_PEER_STARTED, false);
419
  // Because we changed the tablet state, we need to re-report the tablet to the master.
420
150k
  mark_dirty_clbk_.Run(context);
421
422
150k
  return tablet_->EnableCompactions(/* non_abortable_ops_pause */ nullptr);
423
150k
}
424
425
152k
consensus::RaftConfigPB TabletPeer::RaftConfig() const {
426
152k
  CHECK
(consensus_) << "consensus is null"0
;
427
152k
  return consensus_->CommittedConfig();
428
152k
}
429
430
75.7k
bool TabletPeer::StartShutdown(IsDropTable is_drop_table) {
431
75.7k
  LOG_WITH_PREFIX(INFO) << "Initiating TabletPeer shutdown";
432
433
75.7k
  {
434
75.7k
    std::lock_guard<decltype(lock_)> lock(lock_);
435
75.7k
    if (tablet_) {
436
75.6k
      tablet_->StartShutdown(is_drop_table);
437
75.6k
    }
438
75.7k
  }
439
440
75.7k
  {
441
75.7k
    RaftGroupStatePB state = state_.load(std::memory_order_acquire);
442
75.7k
    for (;;) {
443
75.7k
      if (state == RaftGroupStatePB::QUIESCING || state == RaftGroupStatePB::SHUTDOWN) {
444
96
        return false;
445
96
      }
446
75.6k
      if (state_.compare_exchange_strong(
447
75.6k
          state, RaftGroupStatePB::QUIESCING, std::memory_order_acq_rel)) {
448
75.6k
        LOG_WITH_PREFIX(INFO) << "Started shutdown from state: " << RaftGroupStatePB_Name(state);
449
75.6k
        break;
450
75.6k
      }
451
75.6k
    }
452
75.7k
  }
453
454
75.6k
  std::lock_guard<simple_spinlock> l(state_change_lock_);
455
  // Even though Tablet::Shutdown() also unregisters its ops, we have to do it here
456
  // to ensure that any currently running operation finishes before we proceed with
457
  // the rest of the shutdown sequence. In particular, a maintenance operation could
458
  // indirectly end up calling into the log, which we are about to shut down.
459
75.6k
  UnregisterMaintenanceOps();
460
461
75.6k
  std::shared_ptr<consensus::RaftConsensus> consensus;
462
75.6k
  {
463
75.6k
    std::lock_guard<decltype(lock_)> lock(lock_);
464
75.6k
    consensus = consensus_;
465
75.6k
  }
466
75.6k
  if (consensus) {
467
75.6k
    consensus->Shutdown();
468
75.6k
  }
469
470
75.6k
  return true;
471
75.7k
}
472
473
75.6k
void TabletPeer::CompleteShutdown(IsDropTable is_drop_table) {
474
75.6k
  auto* strand = strand_.get();
475
75.6k
  if (strand) {
476
75.6k
    strand->Shutdown();
477
75.6k
  }
478
479
75.6k
  preparing_operations_counter_.Shutdown();
480
481
  // TODO: KUDU-183: Keep track of the pending tasks and send an "abort" message.
482
75.6k
  LOG_SLOW_EXECUTION(WARNING, 1000,
483
75.6k
      Substitute("TabletPeer: tablet $0: Waiting for Operations to complete", tablet_id())) {
484
75.6k
    operation_tracker_.WaitForAllToFinish();
485
75.6k
  }
486
487
75.6k
  if (prepare_thread_) {
488
75.5k
    prepare_thread_->Stop();
489
75.5k
  }
490
491
75.6k
  if (log_) {
492
75.5k
    WARN_NOT_OK(log_->Close(), LogPrefix() + "Error closing the Log");
493
75.5k
  }
494
495
75.6k
  
VLOG_WITH_PREFIX224
(1) << "Shut down!"224
;
496
497
75.6k
  if (tablet_) {
498
75.2k
    tablet_->CompleteShutdown(is_drop_table);
499
75.2k
  }
500
501
  // Only mark the peer as SHUTDOWN when all other components have shut down.
502
75.6k
  {
503
75.6k
    std::lock_guard<simple_spinlock> lock(lock_);
504
75.6k
    strand_.reset();
505
    // Release mem tracker resources.
506
75.6k
    has_consensus_.store(false, std::memory_order_release);
507
75.6k
    consensus_.reset();
508
75.6k
    prepare_thread_.reset();
509
75.6k
    tablet_.reset();
510
75.6k
    auto state = state_.load(std::memory_order_acquire);
511
75.6k
    
LOG_IF_WITH_PREFIX151
(DFATAL, state != RaftGroupStatePB::QUIESCING) <<
512
151
        "Bad state when completing shutdown: " << RaftGroupStatePB_Name(state);
513
75.6k
    state_.store(RaftGroupStatePB::SHUTDOWN, std::memory_order_release);
514
515
75.6k
    if (metric_registry_) {
516
18.4E
      DVLOG_WITH_PREFIX(3)
517
18.4E
        << "Add to set of tablets that have been shutdown so as to avoid reporting metrics";
518
75.3k
      metric_registry_->tablets_shutdown_insert(tablet_id());
519
75.3k
    }
520
75.6k
  }
521
75.6k
}
522
523
95
void TabletPeer::WaitUntilShutdown() {
524
95
  const MonoDelta kSingleWait = 10ms;
525
95
  const MonoDelta kReportInterval = 5s;
526
95
  const MonoDelta kMaxWait = 30s;
527
528
95
  MonoDelta waited = MonoDelta::kZero;
529
95
  MonoDelta last_reported = MonoDelta::kZero;
530
95
  while (state_.load(std::memory_order_acquire) != RaftGroupStatePB::SHUTDOWN) {
531
0
    if (waited >= last_reported + kReportInterval) {
532
0
      if (waited >= kMaxWait) {
533
0
        LOG_WITH_PREFIX(DFATAL)
534
0
            << "Wait for shutdown " << waited << " exceeded kMaxWait " << kMaxWait;
535
0
      } else {
536
0
        LOG_WITH_PREFIX(WARNING) << "Long wait for shutdown: " << waited;
537
0
      }
538
0
      last_reported = waited;
539
0
    }
540
0
    SleepFor(kSingleWait);
541
0
    waited += kSingleWait;
542
0
  }
543
544
95
  if (metric_registry_) {
545
95
    
DVLOG_WITH_PREFIX0
(3)
546
0
      << "Add to set of tablets that have been shutdown so as to avoid reporting metrics";
547
95
    metric_registry_->tablets_shutdown_insert(tablet_id());
548
95
  }
549
95
}
550
551
75.5k
Status TabletPeer::Shutdown(IsDropTable is_drop_table) {
552
75.5k
  bool isShutdownInitiated = StartShutdown(is_drop_table);
553
554
75.5k
  RETURN_NOT_OK(AbortSQLTransactions());
555
556
75.5k
  if (isShutdownInitiated) {
557
75.4k
    CompleteShutdown(is_drop_table);
558
75.4k
  } else {
559
113
    WaitUntilShutdown();
560
113
  }
561
75.5k
  return Status::OK();
562
75.5k
}
563
564
75.5k
Status TabletPeer::AbortSQLTransactions() {
565
  // Once raft group state enters QUIESCING state,
566
  // new queries cannot be processed from then onwards.
567
  // Aborting any remaining active transactions in the tablet.
568
75.5k
  if (tablet_ && 
tablet_->table_type() == TableType::PGSQL_TABLE_TYPE75.4k
) {
569
35.2k
    if (tablet_->transaction_participant()) {
570
35.2k
      HybridTime maxCutoff = HybridTime::kMax;
571
35.2k
      LOG(INFO) << "Aborting transactions that started prior to " << maxCutoff
572
35.2k
                << " for tablet id " << tablet_->tablet_id();
573
35.2k
      CoarseTimePoint deadline = CoarseMonoClock::Now() +
574
35.2k
          MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_abort_timeout_ms);
575
35.2k
      WARN_NOT_OK(tablet_->transaction_participant()->StopActiveTxnsPriorTo(maxCutoff, deadline),
576
35.2k
                  "Cannot abort transactions for tablet " + tablet_->tablet_id());
577
35.2k
    }
578
35.2k
  }
579
75.5k
  return Status::OK();
580
75.5k
}
581
582
69.1M
Status TabletPeer::CheckRunning() const {
583
69.1M
  auto state = state_.load(std::memory_order_acquire);
584
69.1M
  if (state != RaftGroupStatePB::RUNNING) {
585
613
    if (state == RaftGroupStatePB::QUIESCING) {
586
613
      return STATUS(ShutdownInProgress, "The tablet is shutting down");
587
613
    }
588
0
    return STATUS_FORMAT(IllegalState, "The tablet is not in a running state: $0",
589
613
                         RaftGroupStatePB_Name(state));
590
613
  }
591
592
69.1M
  return Status::OK();
593
69.1M
}
594
595
0
bool TabletPeer::IsShutdownStarted() const {
596
0
  auto state = state_.load(std::memory_order_acquire);
597
0
  return state == RaftGroupStatePB::QUIESCING || state == RaftGroupStatePB::SHUTDOWN;
598
0
}
599
600
153
Status TabletPeer::CheckShutdownOrNotStarted() const {
601
153
  RaftGroupStatePB value = state_.load(std::memory_order_acquire);
602
153
  if (value != RaftGroupStatePB::SHUTDOWN && 
value != RaftGroupStatePB::NOT_STARTED0
) {
603
0
    return STATUS(IllegalState, Substitute("The tablet is not in a shutdown state: $0",
604
0
                                           RaftGroupStatePB_Name(value)));
605
0
  }
606
607
153
  return Status::OK();
608
153
}
609
610
7.96k
Status TabletPeer::WaitUntilConsensusRunning(const MonoDelta& timeout) {
611
7.96k
  MonoTime start(MonoTime::Now());
612
613
7.96k
  int backoff_exp = 0;
614
7.96k
  const int kMaxBackoffExp = 8;
615
8.23k
  while (true) {
616
8.23k
    RaftGroupStatePB cached_state = state_.load(std::memory_order_acquire);
617
8.23k
    if (cached_state == RaftGroupStatePB::QUIESCING || cached_state == RaftGroupStatePB::SHUTDOWN) {
618
0
      return STATUS(IllegalState,
619
0
          Substitute("The tablet is already shutting down or shutdown. State: $0",
620
0
                     RaftGroupStatePB_Name(cached_state)));
621
0
    }
622
8.23k
    if (cached_state == RUNNING && 
has_consensus_.load(std::memory_order_acquire)7.96k
&&
623
8.23k
        
consensus_->IsRunning()7.96k
) {
624
7.96k
      break;
625
7.96k
    }
626
275
    MonoTime now(MonoTime::Now());
627
275
    MonoDelta elapsed(now.GetDeltaSince(start));
628
275
    if (elapsed.MoreThan(timeout)) {
629
1
      return STATUS(TimedOut, Substitute("Consensus is not running after waiting for $0. State; $1",
630
1
                                         elapsed.ToString(), RaftGroupStatePB_Name(cached_state)));
631
1
    }
632
274
    SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
633
274
    backoff_exp = std::min(backoff_exp + 1, kMaxBackoffExp);
634
274
  }
635
7.96k
  return Status::OK();
636
7.96k
}
637
638
3.16M
void TabletPeer::WriteAsync(std::unique_ptr<WriteQuery> query) {
639
3.16M
  ScopedOperation preparing_token(&preparing_operations_counter_);
640
3.16M
  auto status = CheckRunning();
641
3.16M
  if (!status.ok()) {
642
4
    query->Cancel(status);
643
4
    return;
644
4
  }
645
646
3.16M
  query->operation().set_preparing_token(std::move(preparing_token));
647
3.16M
  tablet_->AcquireLocksAndPerformDocOperations(std::move(query));
648
3.16M
}
649
650
0
Result<HybridTime> TabletPeer::ReportReadRestart() {
651
0
  tablet_->metrics()->restart_read_requests->Increment();
652
0
  return tablet_->SafeTime(RequireLease::kTrue);
653
0
}
654
655
5.04M
void TabletPeer::Submit(std::unique_ptr<Operation> operation, int64_t term) {
656
5.04M
  auto status = CheckRunning();
657
658
5.04M
  if (status.ok()) {
659
5.04M
    auto driver = NewLeaderOperationDriver(&operation, term);
660
5.04M
    if (
driver.ok()5.04M
) {
661
5.04M
      (**driver).ExecuteAsync();
662
18.4E
    } else {
663
18.4E
      status = driver.status();
664
18.4E
    }
665
5.04M
  }
666
5.04M
  if (!status.ok()) {
667
7
    operation->Aborted(status, /* was_pending= */ false);
668
7
  }
669
5.04M
}
670
671
void TabletPeer::SubmitUpdateTransaction(
672
1.50M
    std::unique_ptr<UpdateTxnOperation> operation, int64_t term) {
673
1.50M
  if (!operation->tablet()) {
674
0
    operation->SetTablet(tablet());
675
0
  }
676
1.50M
  Submit(std::move(operation), term);
677
1.50M
}
678
679
477k
HybridTime TabletPeer::SafeTimeForTransactionParticipant() {
680
477k
  return tablet_->mvcc_manager()->SafeTimeForFollower(
681
477k
      /* min_allowed= */ HybridTime::kMin, /* deadline= */ CoarseTimePoint::min());
682
477k
}
683
684
26.6k
Result<HybridTime> TabletPeer::WaitForSafeTime(HybridTime safe_time, CoarseTimePoint deadline) {
685
26.6k
  return tablet_->SafeTime(RequireLease::kFallbackToFollower, safe_time, deadline);
686
26.6k
}
687
688
1.67M
Status TabletPeer::GetLastReplicatedData(RemoveIntentsData* data) {
689
1.67M
  std::shared_ptr<consensus::RaftConsensus> consensus;
690
1.67M
  TabletPtr tablet;
691
1.67M
  {
692
1.67M
    std::lock_guard<simple_spinlock> lock(lock_);
693
1.67M
    consensus = consensus_;
694
1.67M
    tablet = tablet_;
695
1.67M
  }
696
1.67M
  if (!consensus) {
697
0
    return STATUS(IllegalState, "Consensus destroyed");
698
0
  }
699
1.67M
  if (!tablet) {
700
0
    return STATUS(IllegalState, "Tablet destroyed");
701
0
  }
702
1.67M
  data->op_id = consensus->GetLastCommittedOpId();
703
1.67M
  data->log_ht = tablet->mvcc_manager()->LastReplicatedHybridTime();
704
1.67M
  return Status::OK();
705
1.67M
}
706
707
1.91M
void TabletPeer::GetLastCDCedData(RemoveIntentsData* data) {
708
1.91M
  if (consensus_ != nullptr) {
709
1.90M
    data->op_id.index = consensus_->GetLastCDCedOpId().index;
710
1.90M
    data->op_id.term = consensus_->GetLastCDCedOpId().term;
711
1.90M
  }
712
713
1.91M
  if((tablet_ != nullptr) && 
(tablet_->mvcc_manager() != nullptr)1.91M
) {
714
    // for now use this hybrid time, ideally it should be of last_updated_time
715
1.90M
    data->log_ht = tablet_->mvcc_manager()->LastReplicatedHybridTime();
716
1.90M
  }
717
1.91M
}
718
719
1.19M
void TabletPeer::UpdateClock(HybridTime hybrid_time) {
720
1.19M
  clock_->Update(hybrid_time);
721
1.19M
}
722
723
std::unique_ptr<UpdateTxnOperation> TabletPeer::CreateUpdateTransaction(
724
406k
    TransactionStatePB* request) {
725
406k
  auto result = std::make_unique<UpdateTxnOperation>(tablet());
726
406k
  result->TakeRequest(request);
727
406k
  return result;
728
406k
}
729
730
8.14k
void TabletPeer::GetTabletStatusPB(TabletStatusPB* status_pb_out) {
731
8.14k
  std::lock_guard<simple_spinlock> lock(lock_);
732
8.14k
  DCHECK(status_pb_out != nullptr);
733
8.14k
  DCHECK(status_listener_.get() != nullptr);
734
8.14k
  const auto disk_size_info = GetOnDiskSizeInfo();
735
8.14k
  status_pb_out->set_tablet_id(status_listener_->tablet_id());
736
8.14k
  status_pb_out->set_namespace_name(status_listener_->namespace_name());
737
8.14k
  status_pb_out->set_table_name(status_listener_->table_name());
738
8.14k
  status_pb_out->set_table_id(status_listener_->table_id());
739
8.14k
  status_pb_out->set_last_status(status_listener_->last_status());
740
8.14k
  status_listener_->partition()->ToPB(status_pb_out->mutable_partition());
741
8.14k
  status_pb_out->set_state(state_);
742
8.14k
  status_pb_out->set_tablet_data_state(meta_->tablet_data_state());
743
8.14k
  auto tablet = tablet_;
744
8.14k
  if (tablet) {
745
8.07k
    status_pb_out->set_table_type(tablet->table_type());
746
8.07k
  }
747
8.14k
  disk_size_info.ToPB(status_pb_out);
748
  // Set hide status of the tablet.
749
8.14k
  status_pb_out->set_is_hidden(meta_->hidden());
750
8.14k
}
751
752
20
Status TabletPeer::RunLogGC() {
753
20
  if (!CheckRunning().ok()) {
754
0
    return Status::OK();
755
0
  }
756
20
  auto s = reset_cdc_min_replicated_index_if_stale();
757
20
  if (!s.ok()) {
758
0
    LOG_WITH_PREFIX(WARNING) << "Unable to reset cdc min replicated index " << s;
759
0
  }
760
20
  int64_t min_log_index;
761
20
  if (VLOG_IS_ON(2)) {
762
0
    std::string details;
763
0
    min_log_index = VERIFY_RESULT(GetEarliestNeededLogIndex(&details));
764
0
    LOG_WITH_PREFIX(INFO) << __func__ << ": " << details;
765
20
  } else {
766
20
     min_log_index = VERIFY_RESULT(GetEarliestNeededLogIndex());
767
20
  }
768
20
  int32_t num_gced = 0;
769
20
  return log_->GC(min_log_index, &num_gced);
770
20
}
771
772
141
TabletDataState TabletPeer::data_state() const {
773
141
  std::lock_guard<simple_spinlock> lock(lock_);
774
141
  return meta_->tablet_data_state();
775
141
}
776
777
13
string TabletPeer::HumanReadableState() const {
778
13
  std::lock_guard<simple_spinlock> lock(lock_);
779
13
  TabletDataState data_state = meta_->tablet_data_state();
780
13
  RaftGroupStatePB state = this->state();
781
  // If failed, any number of things could have gone wrong.
782
13
  if (state == RaftGroupStatePB::FAILED) {
783
0
    return Substitute("$0 ($1): $2", RaftGroupStatePB_Name(state),
784
0
                      TabletDataState_Name(data_state),
785
0
                      error_.get()->ToString());
786
  // If it's remotely bootstrapping, or tombstoned, that is the important thing
787
  // to show.
788
13
  } else if (!CanServeTabletData(data_state)) {
789
0
    return TabletDataState_Name(data_state);
790
13
  } else if (data_state == TabletDataState::TABLET_DATA_SPLIT_COMPLETED) {
791
0
    return RaftGroupStatePB_Name(state) + " (split)";
792
0
  }
793
  // Otherwise, the tablet's data is in a "normal" state, so we just display
794
  // the runtime state (BOOTSTRAPPING, RUNNING, etc).
795
13
  return RaftGroupStatePB_Name(state);
796
13
}
797
798
namespace {
799
800
0
consensus::OperationType MapOperationTypeToPB(OperationType operation_type) {
801
0
  switch (operation_type) {
802
0
    case OperationType::kWrite:
803
0
      return consensus::WRITE_OP;
804
805
0
    case OperationType::kChangeMetadata:
806
0
      return consensus::CHANGE_METADATA_OP;
807
808
0
    case OperationType::kUpdateTransaction:
809
0
      return consensus::UPDATE_TRANSACTION_OP;
810
811
0
    case OperationType::kSnapshot:
812
0
      return consensus::SNAPSHOT_OP;
813
814
0
    case OperationType::kTruncate:
815
0
      return consensus::TRUNCATE_OP;
816
817
0
    case OperationType::kHistoryCutoff:
818
0
      return consensus::HISTORY_CUTOFF_OP;
819
820
0
    case OperationType::kSplit:
821
0
      return consensus::SPLIT_OP;
822
823
0
    case OperationType::kEmpty:
824
0
      LOG(FATAL) << "OperationType::kEmpty cannot be converted to consensus::OperationType";
825
0
  }
826
0
  FATAL_INVALID_ENUM_VALUE(OperationType, operation_type);
827
0
}
828
829
} // namespace
830
831
void TabletPeer::GetInFlightOperations(Operation::TraceType trace_type,
832
0
                                       vector<consensus::OperationStatusPB>* out) const {
833
0
  for (const auto& driver : operation_tracker_.GetPendingOperations()) {
834
0
    if (driver->operation() == nullptr) {
835
0
      continue;
836
0
    }
837
0
    auto op_type = driver->operation_type();
838
0
    if (op_type == OperationType::kEmpty) {
839
      // This is a special-purpose in-memory-only operation for updating propagated safe time on
840
      // a follower.
841
0
      continue;
842
0
    }
843
844
0
    consensus::OperationStatusPB status_pb;
845
0
    driver->GetOpId().ToPB(status_pb.mutable_op_id());
846
0
    status_pb.set_operation_type(MapOperationTypeToPB(op_type));
847
0
    status_pb.set_description(driver->ToString());
848
0
    int64_t running_for_micros =
849
0
        MonoTime::Now().GetDeltaSince(driver->start_time()).ToMicroseconds();
850
0
    status_pb.set_running_for_micros(running_for_micros);
851
0
    if (trace_type == Operation::TRACE_TXNS) {
852
0
      status_pb.set_trace_buffer(driver->trace()->DumpToString(true));
853
0
    }
854
0
    out->push_back(status_pb);
855
0
  }
856
0
}
857
858
50.4M
Result<int64_t> TabletPeer::GetEarliestNeededLogIndex(std::string* details) const {
859
  // First, we anchor on the last OpId in the Log to establish a lower bound
860
  // and avoid racing with the other checks. This limits the Log GC candidate
861
  // segments before we check the anchors.
862
50.4M
  auto latest_log_entry_op_id = log_->GetLatestEntryOpId();
863
50.4M
  int64_t min_index = latest_log_entry_op_id.index;
864
50.4M
  if (details) {
865
0
    *details += Format("Latest log entry op id: $0\n", latest_log_entry_op_id);
866
0
  }
867
868
  // If we never have written to the log, no need to proceed.
869
50.4M
  if (min_index == 0) {
870
310k
    return min_index;
871
310k
  }
872
873
  // Next, we interrogate the anchor registry.
874
  // Returns OK if minimum known, NotFound if no anchors are registered.
875
50.1M
  {
876
50.1M
    int64_t min_anchor_index;
877
50.1M
    Status s = log_anchor_registry_->GetEarliestRegisteredLogIndex(&min_anchor_index);
878
50.1M
    if (PREDICT_FALSE(!s.ok())) {
879
50.1M
      DCHECK
(s.IsNotFound()) << "Unexpected error calling LogAnchorRegistry: " << s.ToString()0
;
880
50.1M
    } else {
881
14.5k
      min_index = std::min(min_index, min_anchor_index);
882
14.5k
      if (details) {
883
0
        *details += Format("Min anchor index: $0\n", min_anchor_index);
884
0
      }
885
14.5k
    }
886
50.1M
  }
887
888
  // Next, interrogate the OperationTracker.
889
50.1M
  int64_t min_pending_op_index = std::numeric_limits<int64_t>::max();
890
50.1M
  for (const auto& driver : operation_tracker_.GetPendingOperations()) {
891
963k
    auto tx_op_id = driver->GetOpId();
892
    // A operation which doesn't have an opid hasn't been submitted for replication yet and
893
    // thus has no need to anchor the log.
894
963k
    if (tx_op_id != yb::OpId::Invalid()) {
895
929k
      min_pending_op_index = std::min(min_pending_op_index, tx_op_id.index);
896
929k
    }
897
963k
  }
898
899
50.1M
  min_index = std::min(min_index, min_pending_op_index);
900
50.1M
  if (details && 
min_pending_op_index != std::numeric_limits<int64_t>::max()0
) {
901
0
    *details += Format("Min pending op id index: $0\n", min_pending_op_index);
902
0
  }
903
904
50.1M
  auto min_retryable_request_op_id = consensus_->MinRetryableRequestOpId();
905
50.1M
  min_index = std::min(min_index, min_retryable_request_op_id.index);
906
50.1M
  if (details) {
907
0
    *details += Format("Min retryable request op id: $0\n", min_retryable_request_op_id);
908
0
  }
909
910
50.1M
  auto* transaction_coordinator = tablet()->transaction_coordinator();
911
50.1M
  if (transaction_coordinator) {
912
10.4M
    auto transaction_coordinator_min_op_index = transaction_coordinator->PrepareGC(details);
913
10.4M
    min_index = std::min(min_index, transaction_coordinator_min_op_index);
914
10.4M
  }
915
916
  // We keep at least one committed operation in the log so that we can always recover safe time
917
  // during bootstrap.
918
  // Last committed op id should be read before MaxPersistentOpId to avoid race condition
919
  // described in MaxPersistentOpIdForDb.
920
  //
921
  // If we read last committed op id AFTER reading last persistent op id (INCORRECT):
922
  // - We read max persistent op id and find there is no new data, so we ignore it.
923
  // - New data gets written and Raft-committed, but not yet flushed to an SSTable.
924
  // - We read the last committed op id, which is greater than what max persistent op id would have
925
  //   returned.
926
  // - We garbage-collect the Raft log entries corresponding to the new data.
927
  // - Power is lost and the server reboots, losing committed data.
928
  //
929
  // If we read last committed op id BEFORE reading last persistent op id (CORRECT):
930
  // - We read the last committed op id.
931
  // - We read max persistent op id and find there is no new data, so we ignore it.
932
  // - New data gets written and Raft-committed, but not yet flushed to an SSTable.
933
  // - We still don't garbage-collect the logs containing the committed but unflushed data,
934
  //   because the earlier value of the last committed op id that we read prevents us from doing so.
935
50.1M
  auto last_committed_op_id = consensus()->GetLastCommittedOpId();
936
50.1M
  min_index = std::min(min_index, last_committed_op_id.index);
937
50.1M
  if (details) {
938
0
    *details += Format("Last committed op id: $0\n", last_committed_op_id);
939
0
  }
940
941
50.1M
  if (tablet_->table_type() != TableType::TRANSACTION_STATUS_TABLE_TYPE) {
942
39.7M
    tablet_->FlushIntentsDbIfNecessary(latest_log_entry_op_id);
943
39.7M
    auto max_persistent_op_id = VERIFY_RESULT(
944
39.7M
        tablet_->MaxPersistentOpId(true /* invalid_if_no_new_data */));
945
39.7M
    if (max_persistent_op_id.regular.valid()) {
946
18.1M
      min_index = std::min(min_index, max_persistent_op_id.regular.index);
947
18.1M
      if (details) {
948
0
        *details += Format("Max persistent regular op id: $0\n", max_persistent_op_id.regular);
949
0
      }
950
18.1M
    }
951
39.7M
    if (max_persistent_op_id.intents.valid()) {
952
2.45M
      min_index = std::min(min_index, max_persistent_op_id.intents.index);
953
2.45M
      if (details) {
954
0
        *details += Format("Max persistent intents op id: $0\n", max_persistent_op_id.intents);
955
0
      }
956
2.45M
    }
957
39.7M
  }
958
959
50.1M
  if (details) {
960
0
    *details += Format("Earliest needed log index: $0\n", min_index);
961
0
  }
962
963
50.1M
  return min_index;
964
50.1M
}
965
966
50.4M
Status TabletPeer::GetGCableDataSize(int64_t* retention_size) const {
967
50.4M
  RETURN_NOT_OK(CheckRunning());
968
50.4M
  int64_t min_op_idx = VERIFY_RESULT(GetEarliestNeededLogIndex());
969
50.4M
  RETURN_NOT_OK(log_->GetGCableDataSize(min_op_idx, retention_size));
970
50.4M
  return Status::OK();
971
50.4M
}
972
973
3.47M
log::Log* TabletPeer::log() const {
974
3.47M
  Log* log = log_atomic_.load(std::memory_order_acquire);
975
3.47M
  
LOG_IF_WITH_PREFIX69
(FATAL, !log) << "log() called before the log instance is initialized."69
;
976
3.47M
  return log;
977
3.47M
}
978
979
77.5k
yb::OpId TabletPeer::GetLatestLogEntryOpId() const {
980
77.5k
  Log* log = log_atomic_.load(std::memory_order_acquire);
981
77.5k
  if (log) {
982
77.5k
    return log->GetLatestEntryOpId();
983
77.5k
  }
984
3
  return yb::OpId();
985
77.5k
}
986
987
150k
Status TabletPeer::set_cdc_min_replicated_index_unlocked(int64_t cdc_min_replicated_index) {
988
150k
  LOG_WITH_PREFIX(INFO) << "Setting cdc min replicated index to " << cdc_min_replicated_index;
989
150k
  RETURN_NOT_OK(meta_->set_cdc_min_replicated_index(cdc_min_replicated_index));
990
150k
  Log* log = log_atomic_.load(std::memory_order_acquire);
991
150k
  if (log) {
992
150k
    log->set_cdc_min_replicated_index(cdc_min_replicated_index);
993
150k
  }
994
150k
  cdc_min_replicated_index_refresh_time_ = MonoTime::Now();
995
150k
  return Status::OK();
996
150k
}
997
998
150k
Status TabletPeer::set_cdc_min_replicated_index(int64_t cdc_min_replicated_index) {
999
150k
  std::lock_guard<decltype(cdc_min_replicated_index_lock_)> l(cdc_min_replicated_index_lock_);
1000
150k
  return set_cdc_min_replicated_index_unlocked(cdc_min_replicated_index);
1001
150k
}
1002
1003
20
Status TabletPeer::reset_cdc_min_replicated_index_if_stale() {
1004
20
  std::lock_guard<decltype(cdc_min_replicated_index_lock_)> l(cdc_min_replicated_index_lock_);
1005
20
  auto seconds_since_last_refresh =
1006
20
      MonoTime::Now().GetDeltaSince(cdc_min_replicated_index_refresh_time_).ToSeconds();
1007
20
  if (seconds_since_last_refresh > FLAGS_cdc_min_replicated_index_considered_stale_secs) {
1008
0
    LOG_WITH_PREFIX(INFO) << "Resetting cdc min replicated index. Seconds since last update: "
1009
0
                          << seconds_since_last_refresh;
1010
0
    RETURN_NOT_OK(set_cdc_min_replicated_index_unlocked(std::numeric_limits<int64_t>::max()));
1011
0
  }
1012
20
  return Status::OK();
1013
20
}
1014
1015
9.28M
std::unique_ptr<Operation> TabletPeer::CreateOperation(consensus::ReplicateMsg* replicate_msg) {
1016
9.28M
  switch (replicate_msg->op_type()) {
1017
5.53M
    case consensus::WRITE_OP:
1018
5.53M
      DCHECK(replicate_msg->has_write()) << "WRITE_OP replica"
1019
4.90k
          " operation must receive a WriteRequestPB";
1020
      // We use separate preparing token only on leader, so here it could be empty.
1021
5.53M
      return std::make_unique<WriteOperation>(tablet());
1022
1023
645k
    case consensus::CHANGE_METADATA_OP:
1024
645k
      DCHECK(replicate_msg->has_change_metadata_request()) << "CHANGE_METADATA_OP replica"
1025
352
          " operation must receive an ChangeMetadataRequestPB";
1026
645k
      return std::make_unique<ChangeMetadataOperation>(tablet(), log());
1027
1028
2.99M
    case consensus::UPDATE_TRANSACTION_OP:
1029
2.99M
      DCHECK(replicate_msg->has_transaction_state()) << "UPDATE_TRANSACTION_OP replica"
1030
2.29k
          " operation must receive an TransactionStatePB";
1031
2.99M
      return std::make_unique<UpdateTxnOperation>(tablet());
1032
1033
114k
    case consensus::TRUNCATE_OP:
1034
114k
      DCHECK(replicate_msg->has_truncate()) << "TRUNCATE_OP replica"
1035
515
          " operation must receive an TruncateRequestPB";
1036
114k
      return std::make_unique<TruncateOperation>(tablet());
1037
1038
1.15k
    case consensus::SNAPSHOT_OP:
1039
1.15k
       DCHECK(replicate_msg->has_snapshot_request()) << "SNAPSHOT_OP replica"
1040
0
          " operation must receive an TabletSnapshotOpRequestPB";
1041
1.15k
      return std::make_unique<SnapshotOperation>(tablet());
1042
1043
0
    case consensus::HISTORY_CUTOFF_OP:
1044
0
       DCHECK(replicate_msg->has_history_cutoff()) << "HISTORY_CUTOFF_OP replica"
1045
0
          " transaction must receive an HistoryCutoffPB";
1046
0
      return std::make_unique<HistoryCutoffOperation>(tablet());
1047
1048
26
    case consensus::SPLIT_OP:
1049
26
       DCHECK(replicate_msg->has_split_request()) << "SPLIT_OP replica"
1050
0
          " operation must receive an SplitOpRequestPB";
1051
26
      return std::make_unique<SplitOperation>(tablet(), tablet_splitter_);
1052
1053
0
    case consensus::UNKNOWN_OP: FALLTHROUGH_INTENDED;
1054
0
    case consensus::NO_OP: FALLTHROUGH_INTENDED;
1055
0
    case consensus::CHANGE_CONFIG_OP:
1056
0
      FATAL_INVALID_ENUM_VALUE(consensus::OperationType, replicate_msg->op_type());
1057
9.28M
  }
1058
0
  FATAL_INVALID_ENUM_VALUE(consensus::OperationType, replicate_msg->op_type());
1059
0
}
1060
1061
Status TabletPeer::StartReplicaOperation(
1062
9.28M
    const scoped_refptr<ConsensusRound>& round, HybridTime propagated_safe_time) {
1063
9.28M
  RaftGroupStatePB value = state();
1064
9.28M
  if (value != RaftGroupStatePB::RUNNING && 
value != RaftGroupStatePB::BOOTSTRAPPING122
) {
1065
0
    return STATUS(IllegalState, RaftGroupStatePB_Name(value));
1066
0
  }
1067
1068
9.28M
  consensus::ReplicateMsg* replicate_msg = round->replicate_msg().get();
1069
9.28M
  DCHECK(replicate_msg->has_hybrid_time());
1070
9.28M
  auto operation = CreateOperation(replicate_msg);
1071
1072
  // TODO(todd) Look at wiring the stuff below on the driver
1073
  // It's imperative that we set the round here on any type of operation, as this
1074
  // allows us to keep the reference to the request in the round instead of copying it.
1075
9.28M
  operation->set_consensus_round(round);
1076
9.28M
  HybridTime ht(replicate_msg->hybrid_time());
1077
9.28M
  operation->set_hybrid_time(ht);
1078
9.28M
  clock_->Update(ht);
1079
1080
  // This sets the monotonic counter to at least replicate_msg.monotonic_counter() atomically.
1081
9.28M
  tablet_->UpdateMonotonicCounter(replicate_msg->monotonic_counter());
1082
1083
9.28M
  auto* operation_ptr = operation.get();
1084
9.28M
  OperationDriverPtr driver = 
VERIFY_RESULT9.28M
(NewReplicaOperationDriver(&operation));9.28M
1085
1086
0
  operation_ptr->consensus_round()->SetCallback(driver.get());
1087
1088
9.28M
  if (propagated_safe_time) {
1089
8.14M
    driver->SetPropagatedSafeTime(propagated_safe_time, tablet_->mvcc_manager());
1090
8.14M
  }
1091
1092
9.28M
  driver->ExecuteAsync();
1093
9.28M
  return Status::OK();
1094
9.28M
}
1095
1096
17.1M
void TabletPeer::SetPropagatedSafeTime(HybridTime ht) {
1097
17.1M
  auto driver = NewReplicaOperationDriver(nullptr);
1098
17.1M
  if (!driver.ok()) {
1099
0
    LOG_WITH_PREFIX(ERROR) << "Failed to create operation driver to set propagated hybrid time";
1100
0
    return;
1101
0
  }
1102
17.1M
  (**driver).SetPropagatedSafeTime(ht, tablet_->mvcc_manager());
1103
17.1M
  (**driver).ExecuteAsync();
1104
17.1M
}
1105
1106
5.53M
bool TabletPeer::ShouldApplyWrite() {
1107
5.53M
  return tablet_->ShouldApplyWrite();
1108
5.53M
}
1109
1110
119M
consensus::Consensus* TabletPeer::consensus() const {
1111
119M
  return raft_consensus();
1112
119M
}
1113
1114
122M
consensus::RaftConsensus* TabletPeer::raft_consensus() const {
1115
122M
  std::lock_guard<simple_spinlock> lock(lock_);
1116
122M
  return consensus_.get();
1117
122M
}
1118
1119
27.2M
shared_ptr<consensus::Consensus> TabletPeer::shared_consensus() const {
1120
27.2M
  std::lock_guard<simple_spinlock> lock(lock_);
1121
27.2M
  return consensus_;
1122
27.2M
}
1123
1124
28.6M
shared_ptr<consensus::RaftConsensus> TabletPeer::shared_raft_consensus() const {
1125
28.6M
  std::lock_guard<simple_spinlock> lock(lock_);
1126
28.6M
  return consensus_;
1127
28.6M
}
1128
1129
Result<OperationDriverPtr> TabletPeer::NewLeaderOperationDriver(
1130
5.04M
    std::unique_ptr<Operation>* operation, int64_t term) {
1131
5.04M
  if (term == OpId::kUnknownTerm) {
1132
0
    return STATUS(InvalidArgument, "Leader operation driver for unknown term");
1133
0
  }
1134
5.04M
  return NewOperationDriver(operation, term);
1135
5.04M
}
1136
1137
Result<OperationDriverPtr> TabletPeer::NewReplicaOperationDriver(
1138
26.4M
    std::unique_ptr<Operation>* operation) {
1139
26.4M
  return NewOperationDriver(operation, OpId::kUnknownTerm);
1140
26.4M
}
1141
1142
Result<OperationDriverPtr> TabletPeer::NewOperationDriver(std::unique_ptr<Operation>* operation,
1143
31.4M
                                                          int64_t term) {
1144
31.4M
  auto operation_driver = CreateOperationDriver();
1145
31.4M
  RETURN_NOT_OK(operation_driver->Init(operation, term));
1146
31.4M
  return operation_driver;
1147
31.4M
}
1148
1149
150k
void TabletPeer::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
1150
  // Taking state_change_lock_ ensures that we don't shut down concurrently with
1151
  // this last start-up task.
1152
  // Note that the state_change_lock_ is taken in Shutdown(),
1153
  // prior to calling UnregisterMaintenanceOps().
1154
1155
150k
  std::lock_guard<simple_spinlock> l(state_change_lock_);
1156
1157
150k
  if (state() != RaftGroupStatePB::RUNNING) {
1158
0
    LOG_WITH_PREFIX(WARNING) << "Not registering maintenance operations: tablet not RUNNING";
1159
0
    return;
1160
0
  }
1161
1162
150k
  DCHECK(maintenance_ops_.empty());
1163
1164
150k
  auto log_gc = std::make_unique<LogGCOp>(this);
1165
150k
  maint_mgr->RegisterOp(log_gc.get());
1166
150k
  maintenance_ops_.push_back(std::move(log_gc));
1167
150k
  LOG_WITH_PREFIX(INFO) << "Registered log gc";
1168
150k
}
1169
1170
75.6k
void TabletPeer::UnregisterMaintenanceOps() {
1171
75.6k
  DCHECK(state_change_lock_.is_locked());
1172
75.6k
  for (auto& op : maintenance_ops_) {
1173
75.6k
    op->Unregister();
1174
75.6k
  }
1175
75.6k
  maintenance_ops_.clear();
1176
75.6k
}
1177
1178
8.14k
TabletOnDiskSizeInfo TabletPeer::GetOnDiskSizeInfo() const {
1179
8.14k
  TabletOnDiskSizeInfo info;
1180
1181
8.14k
  if (consensus_) {
1182
8.07k
    info.consensus_metadata_disk_size = consensus_->OnDiskSize();
1183
8.07k
  }
1184
1185
8.14k
  if (tablet_) {
1186
8.07k
    info.sst_files_disk_size = tablet_->GetCurrentVersionSstFilesSize();
1187
8.07k
    info.uncompressed_sst_files_disk_size =
1188
8.07k
        tablet_->GetCurrentVersionSstFilesUncompressedSize();
1189
8.07k
  }
1190
1191
8.14k
  auto log = log_atomic_.load(std::memory_order_acquire);
1192
8.14k
  if (log) {
1193
8.12k
    info.wal_files_disk_size = log->OnDiskSize();
1194
8.12k
  }
1195
1196
8.14k
  info.RecomputeTotalSize();
1197
8.14k
  return info;
1198
8.14k
}
1199
1200
4.41k
size_t TabletPeer::GetNumLogSegments() const {
1201
4.41k
  auto log = log_atomic_.load(std::memory_order_acquire);
1202
4.41k
  return log ? 
log->num_segments()4.39k
:
012
;
1203
4.41k
}
1204
1205
843k
std::string TabletPeer::LogPrefix() const {
1206
843k
  return Substitute("T $0 P $1 [state=$2]: ",
1207
843k
      tablet_id_, permanent_uuid_, RaftGroupStatePB_Name(state()));
1208
843k
}
1209
1210
31.4M
scoped_refptr<OperationDriver> TabletPeer::CreateOperationDriver() {
1211
31.4M
  return scoped_refptr<OperationDriver>(new OperationDriver(
1212
31.4M
      &operation_tracker_,
1213
31.4M
      consensus_.get(),
1214
31.4M
      prepare_thread_.get(),
1215
31.4M
      tablet_->table_type()));
1216
31.4M
}
1217
1218
5.81M
int64_t TabletPeer::LeaderTerm() const {
1219
5.81M
  shared_ptr<consensus::Consensus> consensus;
1220
5.81M
  {
1221
5.81M
    std::lock_guard<simple_spinlock> lock(lock_);
1222
5.81M
    consensus = consensus_;
1223
5.81M
  }
1224
5.81M
  return consensus ? 
consensus->LeaderTerm()5.81M
:
yb::OpId::kUnknownTerm5.40k
;
1225
5.81M
}
1226
1227
247k
Result<HybridTime> TabletPeer::LeaderSafeTime() const {
1228
247k
  return tablet_->SafeTime();
1229
247k
}
1230
1231
33.3M
consensus::LeaderStatus TabletPeer::LeaderStatus(bool allow_stale) const {
1232
33.3M
  shared_ptr<consensus::Consensus> consensus;
1233
33.3M
  {
1234
33.3M
    std::lock_guard<simple_spinlock> lock(lock_);
1235
33.3M
    consensus = consensus_;
1236
33.3M
  }
1237
33.3M
  return consensus ? 
consensus->GetLeaderStatus(allow_stale)32.1M
:
consensus::LeaderStatus::NOT_LEADER1.18M
;
1238
33.3M
}
1239
1240
197k
HybridTime TabletPeer::HtLeaseExpiration() const {
1241
197k
  HybridTime result(
1242
197k
      CHECK_RESULT(consensus_->MajorityReplicatedHtLeaseExpiration(0, CoarseTimePoint::max())), 0);
1243
197k
  return std::max(result, tablet_->mvcc_manager()->LastReplicatedHybridTime());
1244
197k
}
1245
1246
0
TableType TabletPeer::table_type() EXCLUDES(lock_) {
1247
  // TODO: what if tablet is not set?
1248
0
  return DCHECK_NOTNULL(tablet())->table_type();
1249
0
}
1250
1251
18
void TabletPeer::SetFailed(const Status& error) {
1252
18
  DCHECK(error_.get(std::memory_order_acquire) == nullptr);
1253
18
  error_ = MakeAtomicUniquePtr<Status>(error);
1254
18
  auto state = state_.load(std::memory_order_acquire);
1255
18
  while (state != RaftGroupStatePB::FAILED && state != RaftGroupStatePB::QUIESCING &&
1256
18
         state != RaftGroupStatePB::SHUTDOWN) {
1257
18
    if (state_.compare_exchange_weak(state, RaftGroupStatePB::FAILED, std::memory_order_acq_rel)) {
1258
18
      LOG_WITH_PREFIX(INFO) << "Changed state from " << RaftGroupStatePB_Name(state)
1259
18
                            << " to FAILED";
1260
18
      break;
1261
18
    }
1262
18
  }
1263
18
}
1264
1265
Status TabletPeer::UpdateState(RaftGroupStatePB expected, RaftGroupStatePB new_state,
1266
300k
                               const std::string& error_message) {
1267
300k
  RaftGroupStatePB old = expected;
1268
300k
  if (!state_.compare_exchange_strong(old, new_state, std::memory_order_acq_rel)) {
1269
0
    return STATUS_FORMAT(
1270
0
        InvalidArgument, "$0 Expected state: $1, got: $2",
1271
0
        error_message, RaftGroupStatePB_Name(expected), RaftGroupStatePB_Name(old));
1272
0
  }
1273
1274
300k
  LOG_WITH_PREFIX(INFO) << "Changed state from " << RaftGroupStatePB_Name(old) << " to "
1275
300k
                        << RaftGroupStatePB_Name(new_state);
1276
300k
  return Status::OK();
1277
300k
}
1278
1279
222k
void TabletPeer::Enqueue(rpc::ThreadPoolTask* task) {
1280
222k
  rpc::ThreadPool* thread_pool = service_thread_pool_.load(std::memory_order_acquire);
1281
222k
  if (!thread_pool) {
1282
0
    task->Done(STATUS(Aborted, "Thread pool not ready"));
1283
0
    return;
1284
0
  }
1285
1286
222k
  thread_pool->Enqueue(task);
1287
222k
}
1288
1289
1.67M
void TabletPeer::StrandEnqueue(rpc::StrandTask* task) {
1290
1.67M
  rpc::Strand* strand = strand_.get();
1291
1.67M
  if (!strand) {
1292
0
    task->Done(STATUS(Aborted, "Thread pool not ready"));
1293
0
    return;
1294
0
  }
1295
1296
1.67M
  strand->Enqueue(task);
1297
1.67M
}
1298
1299
1.63M
bool TabletPeer::CanBeDeleted() {
1300
1.63M
  const auto consensus = shared_raft_consensus();
1301
1.63M
  if (!consensus || 
consensus->LeaderTerm() == OpId::kUnknownTerm1.62M
) {
1302
1.50M
    return false;
1303
1.50M
  }
1304
1305
131k
  const auto tablet = shared_tablet();
1306
131k
  if (!tablet) {
1307
0
    return false;
1308
0
  }
1309
1310
131k
  auto op_id = tablet->metadata()->GetOpIdToDeleteAfterAllApplied();
1311
131k
  if (!op_id.valid()) {
1312
131k
    return false;
1313
131k
  }
1314
1315
60
  const auto all_applied_op_id = consensus->GetAllAppliedOpId();
1316
60
  if (all_applied_op_id < op_id) {
1317
46
    return false;
1318
46
  }
1319
1320
14
  LOG_WITH_PREFIX(INFO) << Format(
1321
14
      "Marked tablet $0 as requiring cleanup due to all replicas have been split (all applied op "
1322
14
      "id: $1, split op id: $2)",
1323
14
      tablet_id(), all_applied_op_id, op_id);
1324
1325
14
  return true;
1326
60
}
1327
1328
56.9k
rpc::Scheduler& TabletPeer::scheduler() const {
1329
56.9k
  return messenger_->scheduler();
1330
56.9k
}
1331
1332
}  // namespace tablet
1333
}  // namespace yb