YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/tablet_peer.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/tablet_peer.h"
34
35
#include <algorithm>
36
#include <mutex>
37
#include <string>
38
#include <utility>
39
#include <vector>
40
41
#include <gflags/gflags.h>
42
43
#include "yb/consensus/consensus.h"
44
#include "yb/consensus/consensus.pb.h"
45
#include "yb/consensus/consensus_util.h"
46
#include "yb/consensus/log.h"
47
#include "yb/consensus/log_anchor_registry.h"
48
#include "yb/consensus/log_util.h"
49
#include "yb/consensus/opid_util.h"
50
#include "yb/consensus/raft_consensus.h"
51
#include "yb/consensus/retryable_requests.h"
52
#include "yb/consensus/state_change_context.h"
53
54
#include "yb/docdb/consensus_frontier.h"
55
56
#include "yb/gutil/casts.h"
57
#include "yb/gutil/strings/substitute.h"
58
#include "yb/gutil/sysinfo.h"
59
60
#include "yb/rocksdb/db/memtable.h"
61
62
#include "yb/rpc/messenger.h"
63
#include "yb/rpc/strand.h"
64
#include "yb/rpc/thread_pool.h"
65
66
#include "yb/tablet/operations/change_metadata_operation.h"
67
#include "yb/tablet/operations/history_cutoff_operation.h"
68
#include "yb/tablet/operations/operation_driver.h"
69
#include "yb/tablet/operations/snapshot_operation.h"
70
#include "yb/tablet/operations/split_operation.h"
71
#include "yb/tablet/operations/truncate_operation.h"
72
#include "yb/tablet/operations/update_txn_operation.h"
73
#include "yb/tablet/operations/write_operation.h"
74
#include "yb/tablet/tablet.h"
75
#include "yb/tablet/tablet.pb.h"
76
#include "yb/tablet/tablet_bootstrap_if.h"
77
#include "yb/tablet/tablet_metadata.h"
78
#include "yb/tablet/tablet_metrics.h"
79
#include "yb/tablet/tablet_peer_mm_ops.h"
80
#include "yb/tablet/tablet_retention_policy.h"
81
#include "yb/tablet/transaction_participant.h"
82
#include "yb/tablet/write_query.h"
83
84
#include "yb/util/debug-util.h"
85
#include "yb/util/flag_tags.h"
86
#include "yb/util/format.h"
87
#include "yb/util/logging.h"
88
#include "yb/util/metrics.h"
89
#include "yb/util/status_format.h"
90
#include "yb/util/status_log.h"
91
#include "yb/util/stopwatch.h"
92
#include "yb/util/threadpool.h"
93
#include "yb/util/trace.h"
94
95
using namespace std::literals;
96
using namespace std::placeholders;
97
using std::shared_ptr;
98
using std::string;
99
100
DEFINE_test_flag(int32, delay_init_tablet_peer_ms, 0,
101
                 "Wait before executing init tablet peer for specified amount of milliseconds.");
102
103
DEFINE_int32(cdc_min_replicated_index_considered_stale_secs, 900,
104
    "If cdc_min_replicated_index hasn't been replicated in this amount of time, we reset its"
105
    "value to max int64 to avoid retaining any logs");
106
107
DEFINE_bool(propagate_safe_time, true, "Propagate safe time to read from leader to followers");
108
109
DECLARE_int32(ysql_transaction_abort_timeout_ms);
110
111
namespace yb {
112
namespace tablet {
113
114
METRIC_DEFINE_coarse_histogram(table, op_prepare_queue_length, "Operation Prepare Queue Length",
115
                        MetricUnit::kTasks,
116
                        "Number of operations waiting to be prepared within this tablet. "
117
                        "High queue lengths indicate that the server is unable to process "
118
                        "operations as fast as they are being written to the WAL.");
119
120
METRIC_DEFINE_coarse_histogram(table, op_prepare_queue_time, "Operation Prepare Queue Time",
121
                        MetricUnit::kMicroseconds,
122
                        "Time that operations spent waiting in the prepare queue before being "
123
                        "processed. High queue times indicate that the server is unable to "
124
                        "process operations as fast as they are being written to the WAL.");
125
126
METRIC_DEFINE_coarse_histogram(table, op_prepare_run_time, "Operation Prepare Run Time",
127
                        MetricUnit::kMicroseconds,
128
                        "Time that operations spent being prepared in the tablet. "
129
                        "High values may indicate that the server is under-provisioned or "
130
                        "that operations are experiencing high contention with one another for "
131
                        "locks.");
132
133
using consensus::Consensus;
134
using consensus::ConsensusBootstrapInfo;
135
using consensus::ConsensusMetadata;
136
using consensus::ConsensusOptions;
137
using consensus::ConsensusRound;
138
using consensus::StateChangeContext;
139
using consensus::StateChangeReason;
140
using consensus::RaftConfigPB;
141
using consensus::RaftPeerPB;
142
using consensus::RaftConsensus;
143
using consensus::ReplicateMsg;
144
using consensus::OpIdType;
145
using log::Log;
146
using log::LogAnchorRegistry;
147
using rpc::Messenger;
148
using strings::Substitute;
149
using tserver::TabletServerErrorPB;
150
151
// ============================================================================
152
//  Tablet Peer
153
// ============================================================================
154
TabletPeer::TabletPeer(
155
    const RaftGroupMetadataPtr& meta,
156
    const consensus::RaftPeerPB& local_peer_pb,
157
    const scoped_refptr<server::Clock>& clock,
158
    const std::string& permanent_uuid,
159
    Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk,
160
    MetricRegistry* metric_registry,
161
    TabletSplitter* tablet_splitter,
162
    const std::shared_future<client::YBClient*>& client_future)
163
    : meta_(meta),
164
      tablet_id_(meta->raft_group_id()),
165
      local_peer_pb_(local_peer_pb),
166
      state_(RaftGroupStatePB::NOT_STARTED),
167
      operation_tracker_(consensus::MakeTabletLogPrefix(tablet_id_, permanent_uuid)),
168
      status_listener_(new TabletStatusListener(meta)),
169
      clock_(clock),
170
      log_anchor_registry_(new LogAnchorRegistry()),
171
      mark_dirty_clbk_(std::move(mark_dirty_clbk)),
172
      permanent_uuid_(permanent_uuid),
173
      preparing_operations_counter_(operation_tracker_.LogPrefix()),
174
      metric_registry_(metric_registry),
175
      tablet_splitter_(tablet_splitter),
176
88.6k
      client_future_(client_future) {}
177
178
47.0k
TabletPeer::~TabletPeer() {
179
47.0k
  std::lock_guard<simple_spinlock> lock(lock_);
180
  // We should either have called Shutdown(), or we should have never called
181
  // Init().
182
0
  LOG_IF_WITH_PREFIX(DFATAL, tablet_) << "TabletPeer not fully shut down.";
183
47.0k
}
184
185
Status TabletPeer::InitTabletPeer(
186
    const TabletPtr& tablet,
187
    const std::shared_ptr<MemTracker>& server_mem_tracker,
188
    Messenger* messenger,
189
    rpc::ProxyCache* proxy_cache,
190
    const scoped_refptr<Log>& log,
191
    const scoped_refptr<MetricEntity>& table_metric_entity,
192
    const scoped_refptr<MetricEntity>& tablet_metric_entity,
193
    ThreadPool* raft_pool,
194
    ThreadPool* tablet_prepare_pool,
195
    consensus::RetryableRequests* retryable_requests,
196
88.7k
    consensus::MultiRaftManager* multi_raft_manager) {
197
21
  DCHECK(tablet) << "A TabletPeer must be provided with a Tablet";
198
17
  DCHECK(log) << "A TabletPeer must be provided with a Log";
199
200
88.7k
  if (FLAGS_TEST_delay_init_tablet_peer_ms > 0) {
201
0
    std::this_thread::sleep_for(FLAGS_TEST_delay_init_tablet_peer_ms * 1ms);
202
0
  }
203
204
88.7k
  {
205
88.7k
    std::lock_guard<simple_spinlock> lock(lock_);
206
88.7k
    auto state = state_.load(std::memory_order_acquire);
207
88.7k
    if (state != RaftGroupStatePB::BOOTSTRAPPING) {
208
0
      return STATUS_FORMAT(
209
0
          IllegalState, "Invalid tablet state for init: $0", RaftGroupStatePB_Name(state));
210
0
    }
211
88.7k
    tablet_ = tablet;
212
88.7k
    proxy_cache_ = proxy_cache;
213
88.7k
    log_ = log;
214
    // "Publish" the log pointer so it can be retrieved using the log() accessor.
215
88.7k
    log_atomic_ = log.get();
216
88.7k
    service_thread_pool_ = &messenger->ThreadPool();
217
88.7k
    strand_.reset(new rpc::Strand(&messenger->ThreadPool()));
218
88.7k
    messenger_ = messenger;
219
220
2.09k
    tablet->SetMemTableFlushFilterFactory([log] {
221
2.09k
      auto largest_log_op_index = log->GetLatestEntryOpId().index;
222
2.27k
      return [largest_log_op_index] (const rocksdb::MemTable& memtable) -> Result<bool> {
223
2.27k
        auto frontiers = memtable.Frontiers();
224
2.27k
        if (frontiers) {
225
2.27k
          const auto largest_memtable_op_index =
226
2.27k
              down_cast<const docdb::ConsensusFrontier&>(frontiers->Largest()).op_id().index;
227
          // We can only flush this memtable if all operations written to it have also been written
228
          // to the log (maybe not synced, if durable_wal_write is disabled, but that's OK).
229
2.27k
          auto should_flush = largest_memtable_op_index <= largest_log_op_index;
230
2.27k
          if (!should_flush) {
231
26
            LOG(WARNING)
232
26
              << "Skipping flush on memtable with ops ahead of log. "
233
26
              << "Memtable index: " << largest_memtable_op_index
234
26
              << " - log index: " << largest_log_op_index;
235
26
          }
236
2.27k
          return should_flush;
237
2.27k
        }
238
239
        // It is correct to not have frontiers when memtable is empty
240
5
        if (memtable.IsEmpty()) {
241
1
          return true;
242
1
        }
243
244
        // This is a degenerate case that should ideally never occur. An empty memtable got into the
245
        // list of immutable memtables. We say it is OK to flush it and move on.
246
4
        static const char* error_msg =
247
4
            "A memtable with no frontiers set found when deciding what memtables to "
248
4
            "flush! This should not happen.";
249
4
        LOG(ERROR) << error_msg << " Stack trace:\n" << GetStackTrace();
250
4
        return STATUS(IllegalState, error_msg);
251
4
      };
252
2.09k
    });
253
254
88.7k
    tablet_->SetCleanupPool(raft_pool);
255
256
88.7k
    ConsensusOptions options;
257
88.7k
    options.tablet_id = meta_->raft_group_id();
258
259
88.7k
    TRACE("Creating consensus instance");
260
261
88.7k
    std::unique_ptr<ConsensusMetadata> cmeta;
262
88.7k
    RETURN_NOT_OK(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id_,
263
88.7k
                                          meta_->fs_manager()->uuid(), &cmeta));
264
265
88.7k
    if (retryable_requests) {
266
83.4k
      retryable_requests->SetMetricEntity(tablet->GetTabletMetricsEntity());
267
83.4k
    }
268
269
88.7k
    consensus_ = RaftConsensus::Create(
270
88.7k
        options,
271
88.7k
        std::move(cmeta),
272
88.7k
        local_peer_pb_,
273
88.7k
        table_metric_entity,
274
88.7k
        tablet_metric_entity,
275
88.7k
        clock_,
276
88.7k
        this,
277
88.7k
        messenger,
278
88.7k
        proxy_cache_,
279
88.7k
        log_.get(),
280
88.7k
        server_mem_tracker,
281
88.7k
        tablet_->mem_tracker(),
282
88.7k
        mark_dirty_clbk_,
283
88.7k
        tablet_->table_type(),
284
88.7k
        raft_pool,
285
88.7k
        retryable_requests,
286
88.7k
        multi_raft_manager);
287
88.7k
    has_consensus_.store(true, std::memory_order_release);
288
289
88.7k
    tablet_->SetHybridTimeLeaseProvider(std::bind(&TabletPeer::HybridTimeLease, this, _1, _2));
290
88.7k
    operation_tracker_.SetPostTracker(
291
88.7k
        std::bind(&RaftConsensus::TrackOperationMemory, consensus_.get(), _1));
292
293
88.7k
    prepare_thread_ = std::make_unique<Preparer>(consensus_.get(), tablet_prepare_pool);
294
295
88.7k
    ChangeConfigReplicated(RaftConfig()); // Set initial flag value.
296
88.7k
  }
297
298
88.7k
  RETURN_NOT_OK(prepare_thread_->Start());
299
300
88.7k
  if (tablet_->metrics() != nullptr) {
301
88.7k
    TRACE("Starting instrumentation");
302
88.7k
    operation_tracker_.StartInstrumentation(tablet_->GetTabletMetricsEntity());
303
88.7k
  }
304
88.7k
  operation_tracker_.StartMemoryTracking(tablet_->mem_tracker());
305
306
88.7k
  if (tablet_->transaction_coordinator()) {
307
29.1k
    tablet_->transaction_coordinator()->Start();
308
29.1k
  }
309
310
88.7k
  if (tablet_->transaction_participant()) {
311
25.8k
    tablet_->transaction_participant()->Start();
312
25.8k
  }
313
314
88.7k
  RETURN_NOT_OK(set_cdc_min_replicated_index(meta_->cdc_min_replicated_index()));
315
316
88.7k
  TRACE("TabletPeer::Init() finished");
317
153
  VLOG_WITH_PREFIX(2) << "Peer Initted";
318
319
88.7k
  return Status::OK();
320
88.7k
}
321
322
Result<FixedHybridTimeLease> TabletPeer::HybridTimeLease(
323
33.2M
    HybridTime min_allowed, CoarseTimePoint deadline) {
324
33.2M
  auto time = VERIFY_RESULT(WaitUntil(clock_.get(), min_allowed, deadline));
325
  // min_allowed could contain non zero logical part, so we add one microsecond to be sure that
326
  // the resulting ht_lease is at least min_allowed.
327
33.2M
  auto min_allowed_micros = min_allowed.CeilPhysicalValueMicros();
328
33.2M
  MicrosTime lease_micros = VERIFY_RESULT(consensus_->MajorityReplicatedHtLeaseExpiration(
329
33.2M
      min_allowed_micros, deadline));
330
33.2M
  if (lease_micros >= kMaxHybridTimePhysicalMicros) {
331
    // This could happen when leader leases are disabled.
332
383k
    return FixedHybridTimeLease();
333
383k
  }
334
32.8M
  return FixedHybridTimeLease {
335
32.8M
    .time = time,
336
32.8M
    .lease = HybridTime(lease_micros, /* logical */ 0)
337
32.8M
  };
338
32.8M
}
339
340
13.3M
Result<HybridTime> TabletPeer::PreparePeerRequest() {
341
13.3M
  auto leader_term = consensus_->GetLeaderState(/* allow_stale= */ true).term;
342
13.3M
  if (leader_term >= 0) {
343
10.2M
    auto last_write_ht = tablet_->mvcc_manager()->LastReplicatedHybridTime();
344
10.2M
    auto propagated_history_cutoff =
345
10.2M
        tablet_->RetentionPolicy()->HistoryCutoffToPropagate(last_write_ht);
346
347
10.2M
    if (propagated_history_cutoff) {
348
0
      VLOG_WITH_PREFIX(2) << "Propagate history cutoff: " << propagated_history_cutoff;
349
350
0
      auto operation = std::make_unique<HistoryCutoffOperation>(tablet_.get());
351
0
      auto request = operation->AllocateRequest();
352
0
      request->set_history_cutoff(propagated_history_cutoff.ToUint64());
353
354
0
      Submit(std::move(operation), leader_term);
355
0
    }
356
10.2M
  }
357
358
13.3M
  if (!FLAGS_propagate_safe_time) {
359
6
    return HybridTime::kInvalid;
360
6
  }
361
362
  // Get the current majority-replicated HT leader lease without any waiting.
363
13.3M
  auto ht_lease = VERIFY_RESULT(HybridTimeLease(
364
13.3M
      /* min_allowed= */ HybridTime::kMin, /* deadline */ CoarseTimePoint::max()));
365
13.3M
  return tablet_->mvcc_manager()->SafeTime(ht_lease);
366
13.3M
}
367
368
15.1M
void TabletPeer::MajorityReplicated() {
369
15.1M
  auto ht_lease = HybridTimeLease(
370
15.1M
      /* min_allowed= */ HybridTime::kMin, /* deadline */ CoarseTimePoint::max());
371
15.1M
  if (!ht_lease.ok()) {
372
0
    LOG_WITH_PREFIX(DFATAL) << "Failed to get current lease: " << ht_lease.status();
373
0
    return;
374
0
  }
375
376
15.1M
  tablet_->mvcc_manager()->UpdatePropagatedSafeTimeOnLeader(*ht_lease);
377
15.1M
}
378
379
98.8k
void TabletPeer::ChangeConfigReplicated(const RaftConfigPB& config) {
380
98.8k
  tablet_->mvcc_manager()->SetLeaderOnlyMode(config.peers_size() == 1);
381
98.8k
}
382
383
13.1M
uint64_t TabletPeer::NumSSTFiles() {
384
13.1M
  return tablet_->GetCurrentVersionNumSSTFiles();
385
13.1M
}
386
387
136k
void TabletPeer::ListenNumSSTFilesChanged(std::function<void()> listener) {
388
136k
  tablet_->ListenNumSSTFilesChanged(std::move(listener));
389
136k
}
390
391
2.73M
Status TabletPeer::CheckOperationAllowed(const OpId& op_id, consensus::OperationType op_type) {
392
2.73M
  return tablet_->CheckOperationAllowed(op_id, op_type);
393
2.73M
}
394
395
88.7k
Status TabletPeer::Start(const ConsensusBootstrapInfo& bootstrap_info) {
396
88.7k
  {
397
88.7k
    std::lock_guard<simple_spinlock> l(state_change_lock_);
398
88.7k
    TRACE("Starting consensus");
399
400
18.4E
    VLOG_WITH_PREFIX(2) << "Peer starting";
401
402
11
    VLOG(2) << "RaftConfig before starting: " << consensus_->CommittedConfig().DebugString();
403
404
    // If tablet was previously considered shutdown w.r.t. metrics,
405
    // fix that for a tablet now being reinstated.
406
41
    DVLOG_WITH_PREFIX(3)
407
41
      << "Remove from set of tablets that have been shutdown so as to allow reporting metrics";
408
88.7k
    metric_registry_->tablets_shutdown_erase(tablet_id());
409
410
88.7k
    RETURN_NOT_OK(consensus_->Start(bootstrap_info));
411
88.7k
    RETURN_NOT_OK(UpdateState(RaftGroupStatePB::BOOTSTRAPPING, RaftGroupStatePB::RUNNING,
412
88.7k
                              "Incorrect state to start TabletPeer, "));
413
88.7k
  }
414
  // The context tracks that the current caller does not hold the lock for consensus state.
415
  // So mark dirty callback, e.g., consensus->ConsensusState() for master consensus callback of
416
  // SysCatalogStateChanged, can get the lock when needed.
417
88.7k
  auto context =
418
88.7k
      std::make_shared<StateChangeContext>(StateChangeReason::TABLET_PEER_STARTED, false);
419
  // Because we changed the tablet state, we need to re-report the tablet to the master.
420
88.7k
  mark_dirty_clbk_.Run(context);
421
422
88.7k
  return tablet_->EnableCompactions(/* non_abortable_ops_pause */ nullptr);
423
88.7k
}
424
425
89.7k
consensus::RaftConfigPB TabletPeer::RaftConfig() const {
426
0
  CHECK(consensus_) << "consensus is null";
427
89.7k
  return consensus_->CommittedConfig();
428
89.7k
}
429
430
47.8k
bool TabletPeer::StartShutdown(IsDropTable is_drop_table) {
431
47.8k
  LOG_WITH_PREFIX(INFO) << "Initiating TabletPeer shutdown";
432
433
47.8k
  {
434
47.8k
    std::lock_guard<decltype(lock_)> lock(lock_);
435
47.8k
    if (tablet_) {
436
47.7k
      tablet_->StartShutdown(is_drop_table);
437
47.7k
    }
438
47.8k
  }
439
440
47.8k
  {
441
47.8k
    RaftGroupStatePB state = state_.load(std::memory_order_acquire);
442
47.8k
    for (;;) {
443
47.8k
      if (state == RaftGroupStatePB::QUIESCING || state == RaftGroupStatePB::SHUTDOWN) {
444
24
        return false;
445
24
      }
446
47.7k
      if (state_.compare_exchange_strong(
447
47.7k
          state, RaftGroupStatePB::QUIESCING, std::memory_order_acq_rel)) {
448
47.7k
        LOG_WITH_PREFIX(INFO) << "Started shutdown from state: " << RaftGroupStatePB_Name(state);
449
47.7k
        break;
450
47.7k
      }
451
47.7k
    }
452
47.8k
  }
453
454
47.7k
  std::lock_guard<simple_spinlock> l(state_change_lock_);
455
  // Even though Tablet::Shutdown() also unregisters its ops, we have to do it here
456
  // to ensure that any currently running operation finishes before we proceed with
457
  // the rest of the shutdown sequence. In particular, a maintenance operation could
458
  // indirectly end up calling into the log, which we are about to shut down.
459
47.7k
  UnregisterMaintenanceOps();
460
461
47.7k
  std::shared_ptr<consensus::RaftConsensus> consensus;
462
47.7k
  {
463
47.7k
    std::lock_guard<decltype(lock_)> lock(lock_);
464
47.7k
    consensus = consensus_;
465
47.7k
  }
466
47.7k
  if (consensus) {
467
47.7k
    consensus->Shutdown();
468
47.7k
  }
469
470
47.7k
  return true;
471
47.8k
}
472
473
47.7k
void TabletPeer::CompleteShutdown(IsDropTable is_drop_table) {
474
47.7k
  auto* strand = strand_.get();
475
47.7k
  if (strand) {
476
47.7k
    strand->Shutdown();
477
47.7k
  }
478
479
47.7k
  preparing_operations_counter_.Shutdown();
480
481
  // TODO: KUDU-183: Keep track of the pending tasks and send an "abort" message.
482
47.7k
  LOG_SLOW_EXECUTION(WARNING, 1000,
483
47.7k
      Substitute("TabletPeer: tablet $0: Waiting for Operations to complete", tablet_id())) {
484
47.7k
    operation_tracker_.WaitForAllToFinish();
485
47.7k
  }
486
487
47.7k
  if (prepare_thread_) {
488
47.7k
    prepare_thread_->Stop();
489
47.7k
  }
490
491
47.7k
  if (log_) {
492
47.7k
    WARN_NOT_OK(log_->Close(), LogPrefix() + "Error closing the Log");
493
47.7k
  }
494
495
99
  VLOG_WITH_PREFIX(1) << "Shut down!";
496
497
47.7k
  if (tablet_) {
498
47.6k
    tablet_->CompleteShutdown(is_drop_table);
499
47.6k
  }
500
501
  // Only mark the peer as SHUTDOWN when all other components have shut down.
502
47.7k
  {
503
47.7k
    std::lock_guard<simple_spinlock> lock(lock_);
504
47.7k
    strand_.reset();
505
    // Release mem tracker resources.
506
47.7k
    has_consensus_.store(false, std::memory_order_release);
507
47.7k
    consensus_.reset();
508
47.7k
    prepare_thread_.reset();
509
47.7k
    tablet_.reset();
510
47.7k
    auto state = state_.load(std::memory_order_acquire);
511
83
    LOG_IF_WITH_PREFIX(DFATAL, state != RaftGroupStatePB::QUIESCING) <<
512
83
        "Bad state when completing shutdown: " << RaftGroupStatePB_Name(state);
513
47.7k
    state_.store(RaftGroupStatePB::SHUTDOWN, std::memory_order_release);
514
515
47.7k
    if (metric_registry_) {
516
18.4E
      DVLOG_WITH_PREFIX(3)
517
18.4E
        << "Add to set of tablets that have been shutdown so as to avoid reporting metrics";
518
47.6k
      metric_registry_->tablets_shutdown_insert(tablet_id());
519
47.6k
    }
520
47.7k
  }
521
47.7k
}
522
523
23
void TabletPeer::WaitUntilShutdown() {
524
23
  const MonoDelta kSingleWait = 10ms;
525
23
  const MonoDelta kReportInterval = 5s;
526
23
  const MonoDelta kMaxWait = 30s;
527
528
23
  MonoDelta waited = MonoDelta::kZero;
529
23
  MonoDelta last_reported = MonoDelta::kZero;
530
23
  while (state_.load(std::memory_order_acquire) != RaftGroupStatePB::SHUTDOWN) {
531
0
    if (waited >= last_reported + kReportInterval) {
532
0
      if (waited >= kMaxWait) {
533
0
        LOG_WITH_PREFIX(DFATAL)
534
0
            << "Wait for shutdown " << waited << " exceeded kMaxWait " << kMaxWait;
535
0
      } else {
536
0
        LOG_WITH_PREFIX(WARNING) << "Long wait for shutdown: " << waited;
537
0
      }
538
0
      last_reported = waited;
539
0
    }
540
0
    SleepFor(kSingleWait);
541
0
    waited += kSingleWait;
542
0
  }
543
544
23
  if (metric_registry_) {
545
0
    DVLOG_WITH_PREFIX(3)
546
0
      << "Add to set of tablets that have been shutdown so as to avoid reporting metrics";
547
23
    metric_registry_->tablets_shutdown_insert(tablet_id());
548
23
  }
549
23
}
550
551
47.6k
Status TabletPeer::Shutdown(IsDropTable is_drop_table) {
552
47.6k
  bool isShutdownInitiated = StartShutdown(is_drop_table);
553
554
47.6k
  RETURN_NOT_OK(AbortSQLTransactions());
555
556
47.6k
  if (isShutdownInitiated) {
557
47.5k
    CompleteShutdown(is_drop_table);
558
33
  } else {
559
33
    WaitUntilShutdown();
560
33
  }
561
47.6k
  return Status::OK();
562
47.6k
}
563
564
47.6k
Status TabletPeer::AbortSQLTransactions() {
565
  // Once raft group state enters QUIESCING state,
566
  // new queries cannot be processed from then onwards.
567
  // Aborting any remaining active transactions in the tablet.
568
47.6k
  if (tablet_ && tablet_->table_type() == TableType::PGSQL_TABLE_TYPE) {
569
9.90k
    if (tablet_->transaction_participant()) {
570
9.90k
      HybridTime maxCutoff = HybridTime::kMax;
571
9.90k
      LOG(INFO) << "Aborting transactions that started prior to " << maxCutoff
572
9.90k
                << " for tablet id " << tablet_->tablet_id();
573
9.90k
      CoarseTimePoint deadline = CoarseMonoClock::Now() +
574
9.90k
          MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_abort_timeout_ms);
575
9.90k
      WARN_NOT_OK(tablet_->transaction_participant()->StopActiveTxnsPriorTo(maxCutoff, deadline),
576
9.90k
                  "Cannot abort transactions for tablet " + tablet_->tablet_id());
577
9.90k
    }
578
9.90k
  }
579
47.6k
  return Status::OK();
580
47.6k
}
581
582
16.0M
Status TabletPeer::CheckRunning() const {
583
16.0M
  auto state = state_.load(std::memory_order_acquire);
584
16.0M
  if (state != RaftGroupStatePB::RUNNING) {
585
955
    if (state == RaftGroupStatePB::QUIESCING) {
586
955
      return STATUS(ShutdownInProgress, "The tablet is shutting down");
587
955
    }
588
0
    return STATUS_FORMAT(IllegalState, "The tablet is not in a running state: $0",
589
0
                         RaftGroupStatePB_Name(state));
590
0
  }
591
592
16.0M
  return Status::OK();
593
16.0M
}
594
595
0
bool TabletPeer::IsShutdownStarted() const {
596
0
  auto state = state_.load(std::memory_order_acquire);
597
0
  return state == RaftGroupStatePB::QUIESCING || state == RaftGroupStatePB::SHUTDOWN;
598
0
}
599
600
102
Status TabletPeer::CheckShutdownOrNotStarted() const {
601
102
  RaftGroupStatePB value = state_.load(std::memory_order_acquire);
602
102
  if (value != RaftGroupStatePB::SHUTDOWN && value != RaftGroupStatePB::NOT_STARTED) {
603
0
    return STATUS(IllegalState, Substitute("The tablet is not in a shutdown state: $0",
604
0
                                           RaftGroupStatePB_Name(value)));
605
0
  }
606
607
102
  return Status::OK();
608
102
}
609
610
5.39k
Status TabletPeer::WaitUntilConsensusRunning(const MonoDelta& timeout) {
611
5.39k
  MonoTime start(MonoTime::Now());
612
613
5.39k
  int backoff_exp = 0;
614
5.39k
  const int kMaxBackoffExp = 8;
615
5.67k
  while (true) {
616
5.67k
    RaftGroupStatePB cached_state = state_.load(std::memory_order_acquire);
617
5.67k
    if (cached_state == RaftGroupStatePB::QUIESCING || cached_state == RaftGroupStatePB::SHUTDOWN) {
618
0
      return STATUS(IllegalState,
619
0
          Substitute("The tablet is already shutting down or shutdown. State: $0",
620
0
                     RaftGroupStatePB_Name(cached_state)));
621
0
    }
622
5.67k
    if (cached_state == RUNNING && has_consensus_.load(std::memory_order_acquire) &&
623
5.39k
        consensus_->IsRunning()) {
624
5.39k
      break;
625
5.39k
    }
626
284
    MonoTime now(MonoTime::Now());
627
284
    MonoDelta elapsed(now.GetDeltaSince(start));
628
284
    if (elapsed.MoreThan(timeout)) {
629
1
      return STATUS(TimedOut, Substitute("Consensus is not running after waiting for $0. State; $1",
630
1
                                         elapsed.ToString(), RaftGroupStatePB_Name(cached_state)));
631
1
    }
632
283
    SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
633
283
    backoff_exp = std::min(backoff_exp + 1, kMaxBackoffExp);
634
283
  }
635
5.39k
  return Status::OK();
636
5.39k
}
637
638
1.65M
void TabletPeer::WriteAsync(std::unique_ptr<WriteQuery> query) {
639
1.65M
  ScopedOperation preparing_token(&preparing_operations_counter_);
640
1.65M
  auto status = CheckRunning();
641
1.65M
  if (!status.ok()) {
642
1
    query->Cancel(status);
643
1
    return;
644
1
  }
645
646
1.65M
  query->operation().set_preparing_token(std::move(preparing_token));
647
1.65M
  tablet_->AcquireLocksAndPerformDocOperations(std::move(query));
648
1.65M
}
649
650
0
Result<HybridTime> TabletPeer::ReportReadRestart() {
651
0
  tablet_->metrics()->restart_read_requests->Increment();
652
0
  return tablet_->SafeTime(RequireLease::kTrue);
653
0
}
654
655
2.69M
void TabletPeer::Submit(std::unique_ptr<Operation> operation, int64_t term) {
656
2.69M
  auto status = CheckRunning();
657
658
2.69M
  if (status.ok()) {
659
2.69M
    auto driver = NewLeaderOperationDriver(&operation, term);
660
2.69M
    if (driver.ok()) {
661
2.69M
      (**driver).ExecuteAsync();
662
18.4E
    } else {
663
18.4E
      status = driver.status();
664
18.4E
    }
665
2.69M
  }
666
2.69M
  if (!status.ok()) {
667
6
    operation->Aborted(status, /* was_pending= */ false);
668
6
  }
669
2.69M
}
670
671
void TabletPeer::SubmitUpdateTransaction(
672
873k
    std::unique_ptr<UpdateTxnOperation> operation, int64_t term) {
673
873k
  if (!operation->tablet()) {
674
0
    operation->SetTablet(tablet());
675
0
  }
676
873k
  Submit(std::move(operation), term);
677
873k
}
678
679
321k
HybridTime TabletPeer::SafeTimeForTransactionParticipant() {
680
321k
  return tablet_->mvcc_manager()->SafeTimeForFollower(
681
321k
      /* min_allowed= */ HybridTime::kMin, /* deadline= */ CoarseTimePoint::min());
682
321k
}
683
684
10.2k
Result<HybridTime> TabletPeer::WaitForSafeTime(HybridTime safe_time, CoarseTimePoint deadline) {
685
10.2k
  return tablet_->SafeTime(RequireLease::kFallbackToFollower, safe_time, deadline);
686
10.2k
}
687
688
969k
void TabletPeer::GetLastReplicatedData(RemoveIntentsData* data) {
689
969k
  data->op_id = consensus_->GetLastCommittedOpId();
690
969k
  data->log_ht = tablet_->mvcc_manager()->LastReplicatedHybridTime();
691
969k
}
692
693
984k
void TabletPeer::GetLastCDCedData(RemoveIntentsData* data) {
694
984k
  if (consensus_ != nullptr) {
695
983k
    data->op_id.index = consensus_->GetLastCDCedOpId().index;
696
983k
    data->op_id.term = consensus_->GetLastCDCedOpId().term;
697
983k
  }
698
699
984k
  if((tablet_ != nullptr) && (tablet_->mvcc_manager() != nullptr)) {
700
    // for now use this hybrid time, ideally it should be of last_updated_time
701
983k
    data->log_ht = tablet_->mvcc_manager()->LastReplicatedHybridTime();
702
983k
  }
703
984k
}
704
705
728k
void TabletPeer::UpdateClock(HybridTime hybrid_time) {
706
728k
  clock_->Update(hybrid_time);
707
728k
}
708
709
std::unique_ptr<UpdateTxnOperation> TabletPeer::CreateUpdateTransaction(
710
239k
    TransactionStatePB* request) {
711
239k
  auto result = std::make_unique<UpdateTxnOperation>(tablet());
712
239k
  result->TakeRequest(request);
713
239k
  return result;
714
239k
}
715
716
7.35k
void TabletPeer::GetTabletStatusPB(TabletStatusPB* status_pb_out) {
717
7.35k
  std::lock_guard<simple_spinlock> lock(lock_);
718
7.35k
  DCHECK(status_pb_out != nullptr);
719
7.35k
  DCHECK(status_listener_.get() != nullptr);
720
7.35k
  const auto disk_size_info = GetOnDiskSizeInfo();
721
7.35k
  status_pb_out->set_tablet_id(status_listener_->tablet_id());
722
7.35k
  status_pb_out->set_namespace_name(status_listener_->namespace_name());
723
7.35k
  status_pb_out->set_table_name(status_listener_->table_name());
724
7.35k
  status_pb_out->set_table_id(status_listener_->table_id());
725
7.35k
  status_pb_out->set_last_status(status_listener_->last_status());
726
7.35k
  status_listener_->partition()->ToPB(status_pb_out->mutable_partition());
727
7.35k
  status_pb_out->set_state(state_);
728
7.35k
  status_pb_out->set_tablet_data_state(meta_->tablet_data_state());
729
7.35k
  auto tablet = tablet_;
730
7.35k
  if (tablet) {
731
7.29k
    status_pb_out->set_table_type(tablet->table_type());
732
7.29k
  }
733
7.35k
  disk_size_info.ToPB(status_pb_out);
734
  // Set hide status of the tablet.
735
7.35k
  status_pb_out->set_is_hidden(meta_->hidden());
736
7.35k
}
737
738
6
Status TabletPeer::RunLogGC() {
739
6
  if (!CheckRunning().ok()) {
740
0
    return Status::OK();
741
0
  }
742
6
  auto s = reset_cdc_min_replicated_index_if_stale();
743
6
  if (!s.ok()) {
744
0
    LOG_WITH_PREFIX(WARNING) << "Unable to reset cdc min replicated index " << s;
745
0
  }
746
6
  int64_t min_log_index;
747
0
  if (VLOG_IS_ON(2)) {
748
0
    std::string details;
749
0
    min_log_index = VERIFY_RESULT(GetEarliestNeededLogIndex(&details));
750
0
    LOG_WITH_PREFIX(INFO) << __func__ << ": " << details;
751
6
  } else {
752
6
     min_log_index = VERIFY_RESULT(GetEarliestNeededLogIndex());
753
6
  }
754
6
  int32_t num_gced = 0;
755
6
  return log_->GC(min_log_index, &num_gced);
756
6
}
757
758
44
TabletDataState TabletPeer::data_state() const {
759
44
  std::lock_guard<simple_spinlock> lock(lock_);
760
44
  return meta_->tablet_data_state();
761
44
}
762
763
7
string TabletPeer::HumanReadableState() const {
764
7
  std::lock_guard<simple_spinlock> lock(lock_);
765
7
  TabletDataState data_state = meta_->tablet_data_state();
766
7
  RaftGroupStatePB state = this->state();
767
  // If failed, any number of things could have gone wrong.
768
7
  if (state == RaftGroupStatePB::FAILED) {
769
0
    return Substitute("$0 ($1): $2", RaftGroupStatePB_Name(state),
770
0
                      TabletDataState_Name(data_state),
771
0
                      error_.get()->ToString());
772
  // If it's remotely bootstrapping, or tombstoned, that is the important thing
773
  // to show.
774
7
  } else if (!CanServeTabletData(data_state)) {
775
0
    return TabletDataState_Name(data_state);
776
7
  } else if (data_state == TabletDataState::TABLET_DATA_SPLIT_COMPLETED) {
777
0
    return RaftGroupStatePB_Name(state) + " (split)";
778
0
  }
779
  // Otherwise, the tablet's data is in a "normal" state, so we just display
780
  // the runtime state (BOOTSTRAPPING, RUNNING, etc).
781
7
  return RaftGroupStatePB_Name(state);
782
7
}
783
784
namespace {
785
786
0
consensus::OperationType MapOperationTypeToPB(OperationType operation_type) {
787
0
  switch (operation_type) {
788
0
    case OperationType::kWrite:
789
0
      return consensus::WRITE_OP;
790
791
0
    case OperationType::kChangeMetadata:
792
0
      return consensus::CHANGE_METADATA_OP;
793
794
0
    case OperationType::kUpdateTransaction:
795
0
      return consensus::UPDATE_TRANSACTION_OP;
796
797
0
    case OperationType::kSnapshot:
798
0
      return consensus::SNAPSHOT_OP;
799
800
0
    case OperationType::kTruncate:
801
0
      return consensus::TRUNCATE_OP;
802
803
0
    case OperationType::kHistoryCutoff:
804
0
      return consensus::HISTORY_CUTOFF_OP;
805
806
0
    case OperationType::kSplit:
807
0
      return consensus::SPLIT_OP;
808
809
0
    case OperationType::kEmpty:
810
0
      LOG(FATAL) << "OperationType::kEmpty cannot be converted to consensus::OperationType";
811
0
  }
812
0
  FATAL_INVALID_ENUM_VALUE(OperationType, operation_type);
813
0
}
814
815
} // namespace
816
817
void TabletPeer::GetInFlightOperations(Operation::TraceType trace_type,
818
0
                                       vector<consensus::OperationStatusPB>* out) const {
819
0
  for (const auto& driver : operation_tracker_.GetPendingOperations()) {
820
0
    if (driver->operation() == nullptr) {
821
0
      continue;
822
0
    }
823
0
    auto op_type = driver->operation_type();
824
0
    if (op_type == OperationType::kEmpty) {
825
      // This is a special-purpose in-memory-only operation for updating propagated safe time on
826
      // a follower.
827
0
      continue;
828
0
    }
829
830
0
    consensus::OperationStatusPB status_pb;
831
0
    driver->GetOpId().ToPB(status_pb.mutable_op_id());
832
0
    status_pb.set_operation_type(MapOperationTypeToPB(op_type));
833
0
    status_pb.set_description(driver->ToString());
834
0
    int64_t running_for_micros =
835
0
        MonoTime::Now().GetDeltaSince(driver->start_time()).ToMicroseconds();
836
0
    status_pb.set_running_for_micros(running_for_micros);
837
0
    if (trace_type == Operation::TRACE_TXNS) {
838
0
      status_pb.set_trace_buffer(driver->trace()->DumpToString(true));
839
0
    }
840
0
    out->push_back(status_pb);
841
0
  }
842
0
}
843
844
6.61M
Result<int64_t> TabletPeer::GetEarliestNeededLogIndex(std::string* details) const {
845
  // First, we anchor on the last OpId in the Log to establish a lower bound
846
  // and avoid racing with the other checks. This limits the Log GC candidate
847
  // segments before we check the anchors.
848
6.61M
  auto latest_log_entry_op_id = log_->GetLatestEntryOpId();
849
6.61M
  int64_t min_index = latest_log_entry_op_id.index;
850
6.61M
  if (details) {
851
0
    *details += Format("Latest log entry op id: $0\n", latest_log_entry_op_id);
852
0
  }
853
854
  // If we never have written to the log, no need to proceed.
855
6.61M
  if (min_index == 0) {
856
208k
    return min_index;
857
208k
  }
858
859
  // Next, we interrogate the anchor registry.
860
  // Returns OK if minimum known, NotFound if no anchors are registered.
861
6.40M
  {
862
6.40M
    int64_t min_anchor_index;
863
6.40M
    Status s = log_anchor_registry_->GetEarliestRegisteredLogIndex(&min_anchor_index);
864
6.40M
    if (PREDICT_FALSE(!s.ok())) {
865
0
      DCHECK(s.IsNotFound()) << "Unexpected error calling LogAnchorRegistry: " << s.ToString();
866
9.04k
    } else {
867
9.04k
      min_index = std::min(min_index, min_anchor_index);
868
9.04k
      if (details) {
869
0
        *details += Format("Min anchor index: $0\n", min_anchor_index);
870
0
      }
871
9.04k
    }
872
6.40M
  }
873
874
  // Next, interrogate the OperationTracker.
875
6.40M
  int64_t min_pending_op_index = std::numeric_limits<int64_t>::max();
876
278k
  for (const auto& driver : operation_tracker_.GetPendingOperations()) {
877
278k
    auto tx_op_id = driver->GetOpId();
878
    // A operation which doesn't have an opid hasn't been submitted for replication yet and
879
    // thus has no need to anchor the log.
880
278k
    if (tx_op_id != yb::OpId::Invalid()) {
881
250k
      min_pending_op_index = std::min(min_pending_op_index, tx_op_id.index);
882
250k
    }
883
278k
  }
884
885
6.40M
  min_index = std::min(min_index, min_pending_op_index);
886
6.40M
  if (details && min_pending_op_index != std::numeric_limits<int64_t>::max()) {
887
0
    *details += Format("Min pending op id index: $0\n", min_pending_op_index);
888
0
  }
889
890
6.40M
  auto min_retryable_request_op_id = consensus_->MinRetryableRequestOpId();
891
6.40M
  min_index = std::min(min_index, min_retryable_request_op_id.index);
892
6.40M
  if (details) {
893
0
    *details += Format("Min retryable request op id: $0\n", min_retryable_request_op_id);
894
0
  }
895
896
6.40M
  auto* transaction_coordinator = tablet()->transaction_coordinator();
897
6.40M
  if (transaction_coordinator) {
898
2.68M
    auto transaction_coordinator_min_op_index = transaction_coordinator->PrepareGC(details);
899
2.68M
    min_index = std::min(min_index, transaction_coordinator_min_op_index);
900
2.68M
  }
901
902
  // We keep at least one committed operation in the log so that we can always recover safe time
903
  // during bootstrap.
904
  // Last committed op id should be read before MaxPersistentOpId to avoid race condition
905
  // described in MaxPersistentOpIdForDb.
906
  //
907
  // If we read last committed op id AFTER reading last persistent op id (INCORRECT):
908
  // - We read max persistent op id and find there is no new data, so we ignore it.
909
  // - New data gets written and Raft-committed, but not yet flushed to an SSTable.
910
  // - We read the last committed op id, which is greater than what max persistent op id would have
911
  //   returned.
912
  // - We garbage-collect the Raft log entries corresponding to the new data.
913
  // - Power is lost and the server reboots, losing committed data.
914
  //
915
  // If we read last committed op id BEFORE reading last persistent op id (CORRECT):
916
  // - We read the last committed op id.
917
  // - We read max persistent op id and find there is no new data, so we ignore it.
918
  // - New data gets written and Raft-committed, but not yet flushed to an SSTable.
919
  // - We still don't garbage-collect the logs containing the committed but unflushed data,
920
  //   because the earlier value of the last committed op id that we read prevents us from doing so.
921
6.40M
  auto last_committed_op_id = consensus()->GetLastCommittedOpId();
922
6.40M
  min_index = std::min(min_index, last_committed_op_id.index);
923
6.40M
  if (details) {
924
0
    *details += Format("Last committed op id: $0\n", last_committed_op_id);
925
0
  }
926
927
6.40M
  if (tablet_->table_type() != TableType::TRANSACTION_STATUS_TABLE_TYPE) {
928
3.71M
    tablet_->FlushIntentsDbIfNecessary(latest_log_entry_op_id);
929
3.71M
    auto max_persistent_op_id = VERIFY_RESULT(
930
3.71M
        tablet_->MaxPersistentOpId(true /* invalid_if_no_new_data */));
931
3.71M
    if (max_persistent_op_id.regular.valid()) {
932
1.56M
      min_index = std::min(min_index, max_persistent_op_id.regular.index);
933
1.56M
      if (details) {
934
0
        *details += Format("Max persistent regular op id: $0\n", max_persistent_op_id.regular);
935
0
      }
936
1.56M
    }
937
3.71M
    if (max_persistent_op_id.intents.valid()) {
938
427k
      min_index = std::min(min_index, max_persistent_op_id.intents.index);
939
427k
      if (details) {
940
0
        *details += Format("Max persistent intents op id: $0\n", max_persistent_op_id.intents);
941
0
      }
942
427k
    }
943
3.71M
  }
944
945
6.40M
  if (details) {
946
0
    *details += Format("Earliest needed log index: $0\n", min_index);
947
0
  }
948
949
6.40M
  return min_index;
950
6.40M
}
951
952
6.61M
Status TabletPeer::GetGCableDataSize(int64_t* retention_size) const {
953
6.61M
  RETURN_NOT_OK(CheckRunning());
954
6.61M
  int64_t min_op_idx = VERIFY_RESULT(GetEarliestNeededLogIndex());
955
6.61M
  RETURN_NOT_OK(log_->GetGCableDataSize(min_op_idx, retention_size));
956
6.61M
  return Status::OK();
957
6.61M
}
958
959
846k
log::Log* TabletPeer::log() const {
960
846k
  Log* log = log_atomic_.load(std::memory_order_acquire);
961
39
  LOG_IF_WITH_PREFIX(FATAL, !log) << "log() called before the log instance is initialized.";
962
846k
  return log;
963
846k
}
964
965
49.0k
yb::OpId TabletPeer::GetLatestLogEntryOpId() const {
966
49.0k
  Log* log = log_atomic_.load(std::memory_order_acquire);
967
49.0k
  if (log) {
968
49.0k
    return log->GetLatestEntryOpId();
969
49.0k
  }
970
5
  return yb::OpId();
971
5
}
972
973
89.0k
Status TabletPeer::set_cdc_min_replicated_index_unlocked(int64_t cdc_min_replicated_index) {
974
89.0k
  LOG_WITH_PREFIX(INFO) << "Setting cdc min replicated index to " << cdc_min_replicated_index;
975
89.0k
  RETURN_NOT_OK(meta_->set_cdc_min_replicated_index(cdc_min_replicated_index));
976
89.0k
  Log* log = log_atomic_.load(std::memory_order_acquire);
977
89.0k
  if (log) {
978
88.9k
    log->set_cdc_min_replicated_index(cdc_min_replicated_index);
979
88.9k
  }
980
89.0k
  cdc_min_replicated_index_refresh_time_ = MonoTime::Now();
981
89.0k
  return Status::OK();
982
89.0k
}
983
984
89.0k
Status TabletPeer::set_cdc_min_replicated_index(int64_t cdc_min_replicated_index) {
985
89.0k
  std::lock_guard<decltype(cdc_min_replicated_index_lock_)> l(cdc_min_replicated_index_lock_);
986
89.0k
  return set_cdc_min_replicated_index_unlocked(cdc_min_replicated_index);
987
89.0k
}
988
989
6
Status TabletPeer::reset_cdc_min_replicated_index_if_stale() {
990
6
  std::lock_guard<decltype(cdc_min_replicated_index_lock_)> l(cdc_min_replicated_index_lock_);
991
6
  auto seconds_since_last_refresh =
992
6
      MonoTime::Now().GetDeltaSince(cdc_min_replicated_index_refresh_time_).ToSeconds();
993
6
  if (seconds_since_last_refresh > FLAGS_cdc_min_replicated_index_considered_stale_secs) {
994
0
    LOG_WITH_PREFIX(INFO) << "Resetting cdc min replicated index. Seconds since last update: "
995
0
                          << seconds_since_last_refresh;
996
0
    RETURN_NOT_OK(set_cdc_min_replicated_index_unlocked(std::numeric_limits<int64_t>::max()));
997
0
  }
998
6
  return Status::OK();
999
6
}
1000
1001
5.09M
std::unique_ptr<Operation> TabletPeer::CreateOperation(consensus::ReplicateMsg* replicate_msg) {
1002
5.09M
  switch (replicate_msg->op_type()) {
1003
2.93M
    case consensus::WRITE_OP:
1004
1.57k
      DCHECK(replicate_msg->has_write()) << "WRITE_OP replica"
1005
1.57k
          " operation must receive a WriteRequestPB";
1006
      // We use separate preparing token only on leader, so here it could be empty.
1007
2.93M
      return std::make_unique<WriteOperation>(tablet());
1008
1009
320k
    case consensus::CHANGE_METADATA_OP:
1010
300
      DCHECK(replicate_msg->has_change_metadata_request()) << "CHANGE_METADATA_OP replica"
1011
300
          " operation must receive an ChangeMetadataRequestPB";
1012
320k
      return std::make_unique<ChangeMetadataOperation>(tablet(), log());
1013
1014
1.73M
    case consensus::UPDATE_TRANSACTION_OP:
1015
1.24k
      DCHECK(replicate_msg->has_transaction_state()) << "UPDATE_TRANSACTION_OP replica"
1016
1.24k
          " operation must receive an TransactionStatePB";
1017
1.73M
      return std::make_unique<UpdateTxnOperation>(tablet());
1018
1019
106k
    case consensus::TRUNCATE_OP:
1020
354
      DCHECK(replicate_msg->has_truncate()) << "TRUNCATE_OP replica"
1021
354
          " operation must receive an TruncateRequestPB";
1022
106k
      return std::make_unique<TruncateOperation>(tablet());
1023
1024
500
    case consensus::SNAPSHOT_OP:
1025
0
       DCHECK(replicate_msg->has_snapshot_request()) << "SNAPSHOT_OP replica"
1026
0
          " operation must receive an TabletSnapshotOpRequestPB";
1027
500
      return std::make_unique<SnapshotOperation>(tablet());
1028
1029
0
    case consensus::HISTORY_CUTOFF_OP:
1030
0
       DCHECK(replicate_msg->has_history_cutoff()) << "HISTORY_CUTOFF_OP replica"
1031
0
          " transaction must receive an HistoryCutoffPB";
1032
0
      return std::make_unique<HistoryCutoffOperation>(tablet());
1033
1034
31
    case consensus::SPLIT_OP:
1035
0
       DCHECK(replicate_msg->has_split_request()) << "SPLIT_OP replica"
1036
0
          " operation must receive an SplitOpRequestPB";
1037
31
      return std::make_unique<SplitOperation>(tablet(), tablet_splitter_);
1038
1039
0
    case consensus::UNKNOWN_OP: FALLTHROUGH_INTENDED;
1040
0
    case consensus::NO_OP: FALLTHROUGH_INTENDED;
1041
0
    case consensus::CHANGE_CONFIG_OP:
1042
0
      FATAL_INVALID_ENUM_VALUE(consensus::OperationType, replicate_msg->op_type());
1043
5.09M
  }
1044
0
  FATAL_INVALID_ENUM_VALUE(consensus::OperationType, replicate_msg->op_type());
1045
0
}
1046
1047
Status TabletPeer::StartReplicaOperation(
1048
5.09M
    const scoped_refptr<ConsensusRound>& round, HybridTime propagated_safe_time) {
1049
5.09M
  RaftGroupStatePB value = state();
1050
5.09M
  if (value != RaftGroupStatePB::RUNNING && value != RaftGroupStatePB::BOOTSTRAPPING) {
1051
4
    return STATUS(IllegalState, RaftGroupStatePB_Name(value));
1052
4
  }
1053
1054
5.09M
  consensus::ReplicateMsg* replicate_msg = round->replicate_msg().get();
1055
5.09M
  DCHECK(replicate_msg->has_hybrid_time());
1056
5.09M
  auto operation = CreateOperation(replicate_msg);
1057
1058
  // TODO(todd) Look at wiring the stuff below on the driver
1059
  // It's imperative that we set the round here on any type of operation, as this
1060
  // allows us to keep the reference to the request in the round instead of copying it.
1061
5.09M
  operation->set_consensus_round(round);
1062
5.09M
  HybridTime ht(replicate_msg->hybrid_time());
1063
5.09M
  operation->set_hybrid_time(ht);
1064
5.09M
  clock_->Update(ht);
1065
1066
  // This sets the monotonic counter to at least replicate_msg.monotonic_counter() atomically.
1067
5.09M
  tablet_->UpdateMonotonicCounter(replicate_msg->monotonic_counter());
1068
1069
5.09M
  auto* operation_ptr = operation.get();
1070
5.09M
  OperationDriverPtr driver = VERIFY_RESULT(NewReplicaOperationDriver(&operation));
1071
1072
5.09M
  operation_ptr->consensus_round()->SetCallback(driver.get());
1073
1074
5.09M
  if (propagated_safe_time) {
1075
4.42M
    driver->SetPropagatedSafeTime(propagated_safe_time, tablet_->mvcc_manager());
1076
4.42M
  }
1077
1078
5.09M
  driver->ExecuteAsync();
1079
5.09M
  return Status::OK();
1080
5.09M
}
1081
1082
5.71M
void TabletPeer::SetPropagatedSafeTime(HybridTime ht) {
1083
5.71M
  auto driver = NewReplicaOperationDriver(nullptr);
1084
5.71M
  if (!driver.ok()) {
1085
0
    LOG_WITH_PREFIX(ERROR) << "Failed to create operation driver to set propagated hybrid time";
1086
0
    return;
1087
0
  }
1088
5.71M
  (**driver).SetPropagatedSafeTime(ht, tablet_->mvcc_manager());
1089
5.71M
  (**driver).ExecuteAsync();
1090
5.71M
}
1091
1092
2.93M
bool TabletPeer::ShouldApplyWrite() {
1093
2.93M
  return tablet_->ShouldApplyWrite();
1094
2.93M
}
1095
1096
16.7M
consensus::Consensus* TabletPeer::consensus() const {
1097
16.7M
  return raft_consensus();
1098
16.7M
}
1099
1100
18.2M
consensus::RaftConsensus* TabletPeer::raft_consensus() const {
1101
18.2M
  std::lock_guard<simple_spinlock> lock(lock_);
1102
18.2M
  return consensus_.get();
1103
18.2M
}
1104
1105
11.6M
shared_ptr<consensus::Consensus> TabletPeer::shared_consensus() const {
1106
11.6M
  std::lock_guard<simple_spinlock> lock(lock_);
1107
11.6M
  return consensus_;
1108
11.6M
}
1109
1110
10.4M
shared_ptr<consensus::RaftConsensus> TabletPeer::shared_raft_consensus() const {
1111
10.4M
  std::lock_guard<simple_spinlock> lock(lock_);
1112
10.4M
  return consensus_;
1113
10.4M
}
1114
1115
Result<OperationDriverPtr> TabletPeer::NewLeaderOperationDriver(
1116
2.69M
    std::unique_ptr<Operation>* operation, int64_t term) {
1117
2.69M
  if (term == OpId::kUnknownTerm) {
1118
0
    return STATUS(InvalidArgument, "Leader operation driver for unknown term");
1119
0
  }
1120
2.69M
  return NewOperationDriver(operation, term);
1121
2.69M
}
1122
1123
Result<OperationDriverPtr> TabletPeer::NewReplicaOperationDriver(
1124
10.8M
    std::unique_ptr<Operation>* operation) {
1125
10.8M
  return NewOperationDriver(operation, OpId::kUnknownTerm);
1126
10.8M
}
1127
1128
Result<OperationDriverPtr> TabletPeer::NewOperationDriver(std::unique_ptr<Operation>* operation,
1129
13.4M
                                                          int64_t term) {
1130
13.4M
  auto operation_driver = CreateOperationDriver();
1131
13.4M
  RETURN_NOT_OK(operation_driver->Init(operation, term));
1132
13.4M
  return operation_driver;
1133
13.4M
}
1134
1135
88.6k
void TabletPeer::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
1136
  // Taking state_change_lock_ ensures that we don't shut down concurrently with
1137
  // this last start-up task.
1138
  // Note that the state_change_lock_ is taken in Shutdown(),
1139
  // prior to calling UnregisterMaintenanceOps().
1140
1141
88.6k
  std::lock_guard<simple_spinlock> l(state_change_lock_);
1142
1143
88.6k
  if (state() != RaftGroupStatePB::RUNNING) {
1144
0
    LOG_WITH_PREFIX(WARNING) << "Not registering maintenance operations: tablet not RUNNING";
1145
0
    return;
1146
0
  }
1147
1148
88.6k
  DCHECK(maintenance_ops_.empty());
1149
1150
88.6k
  auto log_gc = std::make_unique<LogGCOp>(this);
1151
88.6k
  maint_mgr->RegisterOp(log_gc.get());
1152
88.6k
  maintenance_ops_.push_back(std::move(log_gc));
1153
88.6k
  LOG_WITH_PREFIX(INFO) << "Registered log gc";
1154
88.6k
}
1155
1156
47.7k
void TabletPeer::UnregisterMaintenanceOps() {
1157
47.7k
  DCHECK(state_change_lock_.is_locked());
1158
47.7k
  for (auto& op : maintenance_ops_) {
1159
47.7k
    op->Unregister();
1160
47.7k
  }
1161
47.7k
  maintenance_ops_.clear();
1162
47.7k
}
1163
1164
7.35k
TabletOnDiskSizeInfo TabletPeer::GetOnDiskSizeInfo() const {
1165
7.35k
  TabletOnDiskSizeInfo info;
1166
1167
7.35k
  if (consensus_) {
1168
7.29k
    info.consensus_metadata_disk_size = consensus_->OnDiskSize();
1169
7.29k
  }
1170
1171
7.35k
  if (tablet_) {
1172
7.29k
    info.sst_files_disk_size = tablet_->GetCurrentVersionSstFilesSize();
1173
7.29k
    info.uncompressed_sst_files_disk_size =
1174
7.29k
        tablet_->GetCurrentVersionSstFilesUncompressedSize();
1175
7.29k
  }
1176
1177
7.35k
  auto log = log_atomic_.load(std::memory_order_acquire);
1178
7.35k
  if (log) {
1179
7.32k
    info.wal_files_disk_size = log->OnDiskSize();
1180
7.32k
  }
1181
1182
7.35k
  info.RecomputeTotalSize();
1183
7.35k
  return info;
1184
7.35k
}
1185
1186
3.77k
size_t TabletPeer::GetNumLogSegments() const {
1187
3.77k
  auto log = log_atomic_.load(std::memory_order_acquire);
1188
3.74k
  return log ? log->num_segments() : 0;
1189
3.77k
}
1190
1191
531k
std::string TabletPeer::LogPrefix() const {
1192
531k
  return Substitute("T $0 P $1 [state=$2]: ",
1193
531k
      tablet_id_, permanent_uuid_, RaftGroupStatePB_Name(state()));
1194
531k
}
1195
1196
13.5M
scoped_refptr<OperationDriver> TabletPeer::CreateOperationDriver() {
1197
13.5M
  return scoped_refptr<OperationDriver>(new OperationDriver(
1198
13.5M
      &operation_tracker_,
1199
13.5M
      consensus_.get(),
1200
13.5M
      prepare_thread_.get(),
1201
13.5M
      tablet_->table_type()));
1202
13.5M
}
1203
1204
1.63M
int64_t TabletPeer::LeaderTerm() const {
1205
1.63M
  shared_ptr<consensus::Consensus> consensus;
1206
1.63M
  {
1207
1.63M
    std::lock_guard<simple_spinlock> lock(lock_);
1208
1.63M
    consensus = consensus_;
1209
1.63M
  }
1210
1.63M
  return consensus ? consensus->LeaderTerm() : yb::OpId::kUnknownTerm;
1211
1.63M
}
1212
1213
165k
Result<HybridTime> TabletPeer::LeaderSafeTime() const {
1214
165k
  return tablet_->SafeTime();
1215
165k
}
1216
1217
11.8M
consensus::LeaderStatus TabletPeer::LeaderStatus(bool allow_stale) const {
1218
11.8M
  shared_ptr<consensus::Consensus> consensus;
1219
11.8M
  {
1220
11.8M
    std::lock_guard<simple_spinlock> lock(lock_);
1221
11.8M
    consensus = consensus_;
1222
11.8M
  }
1223
11.4M
  return consensus ? consensus->GetLeaderStatus(allow_stale) : consensus::LeaderStatus::NOT_LEADER;
1224
11.8M
}
1225
1226
139k
HybridTime TabletPeer::HtLeaseExpiration() const {
1227
139k
  HybridTime result(
1228
139k
      CHECK_RESULT(consensus_->MajorityReplicatedHtLeaseExpiration(0, CoarseTimePoint::max())), 0);
1229
139k
  return std::max(result, tablet_->mvcc_manager()->LastReplicatedHybridTime());
1230
139k
}
1231
1232
0
TableType TabletPeer::table_type() EXCLUDES(lock_) {
1233
  // TODO: what if tablet is not set?
1234
0
  return DCHECK_NOTNULL(tablet())->table_type();
1235
0
}
1236
1237
7
void TabletPeer::SetFailed(const Status& error) {
1238
7
  DCHECK(error_.get(std::memory_order_acquire) == nullptr);
1239
7
  error_ = MakeAtomicUniquePtr<Status>(error);
1240
7
  auto state = state_.load(std::memory_order_acquire);
1241
7
  while (state != RaftGroupStatePB::FAILED && state != RaftGroupStatePB::QUIESCING &&
1242
7
         state != RaftGroupStatePB::SHUTDOWN) {
1243
7
    if (state_.compare_exchange_weak(state, RaftGroupStatePB::FAILED, std::memory_order_acq_rel)) {
1244
7
      LOG_WITH_PREFIX(INFO) << "Changed state from " << RaftGroupStatePB_Name(state)
1245
7
                            << " to FAILED";
1246
7
      break;
1247
7
    }
1248
7
  }
1249
7
}
1250
1251
Status TabletPeer::UpdateState(RaftGroupStatePB expected, RaftGroupStatePB new_state,
1252
177k
                               const std::string& error_message) {
1253
177k
  RaftGroupStatePB old = expected;
1254
177k
  if (!state_.compare_exchange_strong(old, new_state, std::memory_order_acq_rel)) {
1255
0
    return STATUS_FORMAT(
1256
0
        InvalidArgument, "$0 Expected state: $1, got: $2",
1257
0
        error_message, RaftGroupStatePB_Name(expected), RaftGroupStatePB_Name(old));
1258
0
  }
1259
1260
177k
  LOG_WITH_PREFIX(INFO) << "Changed state from " << RaftGroupStatePB_Name(old) << " to "
1261
177k
                        << RaftGroupStatePB_Name(new_state);
1262
177k
  return Status::OK();
1263
177k
}
1264
1265
97.6k
void TabletPeer::Enqueue(rpc::ThreadPoolTask* task) {
1266
97.6k
  rpc::ThreadPool* thread_pool = service_thread_pool_.load(std::memory_order_acquire);
1267
97.6k
  if (!thread_pool) {
1268
0
    task->Done(STATUS(Aborted, "Thread pool not ready"));
1269
0
    return;
1270
0
  }
1271
1272
97.6k
  thread_pool->Enqueue(task);
1273
97.6k
}
1274
1275
968k
void TabletPeer::StrandEnqueue(rpc::StrandTask* task) {
1276
968k
  rpc::Strand* strand = strand_.get();
1277
968k
  if (!strand) {
1278
0
    task->Done(STATUS(Aborted, "Thread pool not ready"));
1279
0
    return;
1280
0
  }
1281
1282
968k
  strand->Enqueue(task);
1283
968k
}
1284
1285
24.1k
bool TabletPeer::CanBeDeleted() {
1286
24.1k
  const auto consensus = shared_raft_consensus();
1287
24.1k
  if (!consensus || consensus->LeaderTerm() == OpId::kUnknownTerm) {
1288
16.6k
    return false;
1289
16.6k
  }
1290
1291
7.51k
  const auto tablet = shared_tablet();
1292
7.51k
  if (!tablet) {
1293
0
    return false;
1294
0
  }
1295
1296
7.51k
  auto op_id = tablet->metadata()->GetOpIdToDeleteAfterAllApplied();
1297
7.51k
  if (!op_id.valid()) {
1298
7.45k
    return false;
1299
7.45k
  }
1300
1301
62
  const auto all_applied_op_id = consensus->GetAllAppliedOpId();
1302
62
  if (all_applied_op_id < op_id) {
1303
47
    return false;
1304
47
  }
1305
1306
15
  LOG_WITH_PREFIX(INFO) << Format(
1307
15
      "Marked tablet $0 as requiring cleanup due to all replicas have been split (all applied op "
1308
15
      "id: $1, split op id: $2)",
1309
15
      tablet_id(), all_applied_op_id, op_id);
1310
1311
15
  return true;
1312
15
}
1313
1314
25.8k
rpc::Scheduler& TabletPeer::scheduler() const {
1315
25.8k
  return messenger_->scheduler();
1316
25.8k
}
1317
1318
}  // namespace tablet
1319
}  // namespace yb