YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/tablet_peer.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#ifndef YB_TABLET_TABLET_PEER_H_
34
#define YB_TABLET_TABLET_PEER_H_
35
36
#include <atomic>
37
#include <future>
38
#include <map>
39
#include <memory>
40
#include <mutex>
41
#include <string>
42
#include <vector>
43
44
#include "yb/consensus/consensus_fwd.h"
45
#include "yb/consensus/consensus_context.h"
46
#include "yb/consensus/consensus_meta.h"
47
#include "yb/consensus/consensus_types.h"
48
#include "yb/gutil/callback.h"
49
#include "yb/gutil/ref_counted.h"
50
#include "yb/gutil/thread_annotations.h"
51
#include "yb/rpc/rpc_fwd.h"
52
53
#include "yb/tablet/tablet_fwd.h"
54
#include "yb/tablet/metadata.pb.h"
55
#include "yb/tablet/mvcc.h"
56
#include "yb/tablet/transaction_coordinator.h"
57
#include "yb/tablet/transaction_participant_context.h"
58
#include "yb/tablet/operations/operation_tracker.h"
59
#include "yb/tablet/preparer.h"
60
#include "yb/tablet/tablet_options.h"
61
#include "yb/tablet/write_query_context.h"
62
63
#include "yb/util/atomic.h"
64
#include "yb/util/semaphore.h"
65
66
using yb::consensus::StateChangeContext;
67
68
namespace yb {
69
70
namespace tserver {
71
class CatchUpServiceTest;
72
class UpdateTransactionResponsePB;
73
}
74
75
class MaintenanceManager;
76
class MaintenanceOp;
77
class ThreadPool;
78
79
namespace tablet {
80
81
struct TabletOnDiskSizeInfo {
82
  int64_t consensus_metadata_disk_size = 0;
83
  int64_t wal_files_disk_size = 0;
84
  int64_t sst_files_disk_size = 0;
85
  int64_t uncompressed_sst_files_disk_size = 0;
86
  int64_t sum_on_disk_size = 0;
87
88
  template <class PB>
89
7
  static TabletOnDiskSizeInfo FromPB(const PB& pb) {
90
7
    return {
91
7
      .consensus_metadata_disk_size = pb.consensus_metadata_disk_size(),
92
7
      .wal_files_disk_size = pb.wal_files_disk_size(),
93
7
      .sst_files_disk_size = pb.sst_files_disk_size(),
94
7
      .uncompressed_sst_files_disk_size = pb.uncompressed_sst_files_disk_size(),
95
7
      .sum_on_disk_size = pb.estimated_on_disk_size()
96
7
    };
97
7
  }
98
99
  template <class PB>
100
7.35k
  void ToPB(PB* pb) const {
101
7.35k
    pb->set_consensus_metadata_disk_size(consensus_metadata_disk_size);
102
7.35k
    pb->set_wal_files_disk_size(wal_files_disk_size);
103
7.35k
    pb->set_sst_files_disk_size(sst_files_disk_size);
104
7.35k
    pb->set_uncompressed_sst_files_disk_size(uncompressed_sst_files_disk_size);
105
7.35k
    pb->set_estimated_on_disk_size(sum_on_disk_size);
106
7.35k
  }
107
108
0
  void operator+=(const TabletOnDiskSizeInfo& other) {
109
0
    consensus_metadata_disk_size += other.consensus_metadata_disk_size;
110
0
    wal_files_disk_size += other.wal_files_disk_size;
111
0
    sst_files_disk_size += other.sst_files_disk_size;
112
0
    uncompressed_sst_files_disk_size += other.uncompressed_sst_files_disk_size;
113
0
    sum_on_disk_size += other.sum_on_disk_size;
114
0
  }
115
116
7.35k
  void RecomputeTotalSize() {
117
7.35k
    sum_on_disk_size =
118
7.35k
        consensus_metadata_disk_size +
119
7.35k
        sst_files_disk_size +
120
7.35k
        wal_files_disk_size;
121
7.35k
  }
122
};
123
124
// A peer is a tablet consensus configuration, which coordinates writes to tablets.
125
// Each time Write() is called this class appends a new entry to a replicated
126
// state machine through a consensus algorithm, which makes sure that other
127
// peers see the same updates in the same order. In addition to this, this
128
// class also splits the work and coordinates multi-threaded execution.
129
class TabletPeer : public consensus::ConsensusContext,
130
                   public TransactionParticipantContext,
131
                   public TransactionCoordinatorContext,
132
                   public WriteQueryContext {
133
 public:
134
  typedef std::map<int64_t, int64_t> MaxIdxToSegmentSizeMap;
135
136
  // Creates TabletPeer.
137
  // `tablet_splitter` will be used for applying split tablet Raft operation.
138
  TabletPeer(
139
      const RaftGroupMetadataPtr& meta,
140
      const consensus::RaftPeerPB& local_peer_pb,
141
      const scoped_refptr<server::Clock>& clock,
142
      const std::string& permanent_uuid,
143
      Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk,
144
      MetricRegistry* metric_registry,
145
      TabletSplitter* tablet_splitter,
146
      const std::shared_future<client::YBClient*>& client_future);
147
148
  ~TabletPeer();
149
150
  // Initializes the TabletPeer, namely creating the Log and initializing
151
  // Consensus.
152
  // split_op_id is the ID of split tablet Raft operation requesting split of this tablet or unset.
153
  CHECKED_STATUS InitTabletPeer(
154
      const TabletPtr& tablet,
155
      const std::shared_ptr<MemTracker>& server_mem_tracker,
156
      rpc::Messenger* messenger,
157
      rpc::ProxyCache* proxy_cache,
158
      const scoped_refptr<log::Log>& log,
159
      const scoped_refptr<MetricEntity>& table_metric_entity,
160
      const scoped_refptr<MetricEntity>& tablet_metric_entity,
161
      ThreadPool* raft_pool,
162
      ThreadPool* tablet_prepare_pool,
163
      consensus::RetryableRequests* retryable_requests,
164
      consensus::MultiRaftManager* multi_raft_manager);
165
166
  // Starts the TabletPeer, making it available for Write()s. If this
167
  // TabletPeer is part of a consensus configuration this will connect it to other peers
168
  // in the consensus configuration.
169
  CHECKED_STATUS Start(const consensus::ConsensusBootstrapInfo& info);
170
171
  // Starts shutdown process.
172
  // Returns true if shutdown was just initiated, false if shutdown was already running.
173
  MUST_USE_RESULT bool StartShutdown(IsDropTable is_drop_table = IsDropTable::kFalse);
174
  // Completes shutdown process and waits for it's completeness.
175
  void CompleteShutdown(IsDropTable is_drop_table = IsDropTable::kFalse);
176
177
  // Abort active transactions on the tablet after shutdown is initiated.
178
  CHECKED_STATUS AbortSQLTransactions();
179
180
  CHECKED_STATUS Shutdown(IsDropTable is_drop_table = IsDropTable::kFalse);
181
182
  // Check that the tablet is in a RUNNING state.
183
  CHECKED_STATUS CheckRunning() const;
184
185
  // Returns whether shutdown started. If shutdown already completed returns true as well.
186
  bool IsShutdownStarted() const;
187
188
  // Check that the tablet is in a SHUTDOWN/NOT_STARTED state.
189
  CHECKED_STATUS CheckShutdownOrNotStarted() const;
190
191
  // Wait until the tablet is in a RUNNING state or if there's a timeout.
192
  // TODO have a way to wait for any state?
193
  CHECKED_STATUS WaitUntilConsensusRunning(const MonoDelta& timeout);
194
195
  // Submits a write to a tablet and executes it asynchronously.
196
  // The caller is expected to build and pass a WriteOperation that points
197
  // to the RPC WriteRequest, WriteResponse, RpcContext and to the tablet's
198
  // MvccManager.
199
  // The operation_state is deallocated after use by this function.
200
  void WriteAsync(std::unique_ptr<WriteQuery> query);
201
202
  void Submit(std::unique_ptr<Operation> operation, int64_t term) override;
203
204
  void UpdateClock(HybridTime hybrid_time) override;
205
206
  std::unique_ptr<UpdateTxnOperation> CreateUpdateTransaction(
207
      TransactionStatePB* request) override;
208
209
  void SubmitUpdateTransaction(
210
      std::unique_ptr<UpdateTxnOperation> operation, int64_t term) override;
211
212
  HybridTime SafeTimeForTransactionParticipant() override;
213
  Result<HybridTime> WaitForSafeTime(HybridTime safe_time, CoarseTimePoint deadline) override;
214
215
  void GetLastReplicatedData(RemoveIntentsData* data) override;
216
217
  void GetLastCDCedData(RemoveIntentsData* data) override;
218
219
  void GetTabletStatusPB(TabletStatusPB* status_pb_out);
220
221
  // Used by consensus to create and start a new ReplicaOperation.
222
  CHECKED_STATUS StartReplicaOperation(
223
      const scoped_refptr<consensus::ConsensusRound>& round,
224
      HybridTime propagated_safe_time) override;
225
226
  // This is an override of a ConsensusContext method. This is called from
227
  // UpdateReplica -> EnqueuePreparesUnlocked on Raft heartbeats.
228
  void SetPropagatedSafeTime(HybridTime ht) override;
229
230
  // Returns false if it is preferable to don't apply write operation.
231
  bool ShouldApplyWrite() override;
232
233
  consensus::Consensus* consensus() const;
234
  consensus::RaftConsensus* raft_consensus() const;
235
236
  std::shared_ptr<consensus::Consensus> shared_consensus() const;
237
  std::shared_ptr<consensus::RaftConsensus> shared_raft_consensus() const;
238
239
22.4M
  Tablet* tablet() const EXCLUDES(lock_) {
240
22.4M
    std::lock_guard<simple_spinlock> lock(lock_);
241
22.4M
    return tablet_.get();
242
22.4M
  }
243
244
33.0M
  TabletPtr shared_tablet() const {
245
33.0M
    std::lock_guard<simple_spinlock> lock(lock_);
246
33.0M
    return tablet_;
247
33.0M
  }
248
249
31.3M
  RaftGroupStatePB state() const {
250
31.3M
    return state_.load(std::memory_order_acquire);
251
31.3M
  }
252
253
  TabletDataState data_state() const;
254
255
  // Returns the current Raft configuration.
256
  consensus::RaftConfigPB RaftConfig() const;
257
258
140k
  TabletStatusListener* status_listener() const {
259
140k
    return status_listener_.get();
260
140k
  }
261
262
  // Sets the tablet to a BOOTSTRAPPING state, indicating it is starting up.
263
88.7k
  CHECKED_STATUS SetBootstrapping() {
264
88.7k
    return UpdateState(RaftGroupStatePB::NOT_STARTED, RaftGroupStatePB::BOOTSTRAPPING, "");
265
88.7k
  }
266
267
  CHECKED_STATUS UpdateState(RaftGroupStatePB expected, RaftGroupStatePB new_state,
268
                             const std::string& error_message);
269
270
  // sets the tablet state to FAILED additionally setting the error to the provided
271
  // one.
272
  void SetFailed(const Status& error);
273
274
  // Returns the error that occurred, when state is FAILED.
275
957
  CHECKED_STATUS error() const {
276
957
    Status *error;
277
957
    if ((error = error_.get(std::memory_order_acquire)) != nullptr) {
278
      // Once the error_ is set, we do not reset it to nullptr
279
1
      return *error;
280
1
    }
281
956
    return Status::OK();
282
956
  }
283
284
  // Returns a human-readable string indicating the state of the tablet.
285
  // Typically this looks like "NOT_STARTED", "TABLET_DATA_COPYING",
286
  // etc. For use in places like the Web UI.
287
  std::string HumanReadableState() const;
288
289
  // Adds list of transactions in-flight at the time of the call to
290
  // 'out'. OperationStatusPB objects are used to allow this method
291
  // to be used by both the web-UI and ts-cli.
292
  void GetInFlightOperations(Operation::TraceType trace_type,
293
                             std::vector<consensus::OperationStatusPB>* out) const;
294
295
  // Returns the minimum known log index that is in-memory or in-flight.
296
  // Used for selection of log segments to delete during Log GC.
297
  // If details is specified then this function appends explanation of how index was calculated
298
  // to it.
299
  Result<int64_t> GetEarliestNeededLogIndex(std::string* details = nullptr) const;
300
301
  // Returns the amount of bytes that would be GC'd if RunLogGC() was called.
302
  //
303
  // Returns a non-ok status if the tablet isn't running.
304
  CHECKED_STATUS GetGCableDataSize(int64_t* retention_size) const;
305
306
  // Returns true if it is safe to retrieve the log pointer using the log() function from this
307
  // tablet peer. Once the log pointer is initialized, it will stay valid for the lifetime of the
308
  // TabletPeer.
309
297k
  bool log_available() const {
310
297k
    return log_atomic_.load(std::memory_order_acquire) != nullptr;
311
297k
  }
312
313
  // Return a pointer to the Log. TabletPeer keeps a reference to Log after Init(). This function
314
  // will crash if the log has not been initialized yet.
315
  log::Log* log() const;
316
317
  // Returns the OpId of the latest entry in the log, or a zero OpId if the log has not been
318
  // initialized.
319
  yb::OpId GetLatestLogEntryOpId() const;
320
321
13.6M
  const server::ClockPtr& clock_ptr() const override {
322
13.6M
    return clock_;
323
13.6M
  }
324
325
  void Enqueue(rpc::ThreadPoolTask* task);
326
  void StrandEnqueue(rpc::StrandTask* task) override;
327
328
483k
  const std::shared_future<client::YBClient*>& client_future() const override {
329
483k
    return client_future_;
330
483k
  }
331
332
  int64_t LeaderTerm() const override;
333
  consensus::LeaderStatus LeaderStatus(bool allow_stale = false) const;
334
  Result<HybridTime> LeaderSafeTime() const override;
335
336
  HybridTime HtLeaseExpiration() const override;
337
338
94.1k
  const scoped_refptr<log::LogAnchorRegistry>& log_anchor_registry() const {
339
94.1k
    return log_anchor_registry_;
340
94.1k
  }
341
342
  // Returns the tablet_id of the tablet managed by this TabletPeer.
343
  // Returns the correct tablet_id even if the underlying tablet is not available
344
  // yet.
345
2.78M
  const std::string& tablet_id() const override { return tablet_id_; }
346
347
  // Convenience method to return the permanent_uuid of this peer.
348
439k
  const std::string& permanent_uuid() const override {
349
439k
    return permanent_uuid_;
350
439k
  }
351
352
  Result<OperationDriverPtr> NewOperationDriver(std::unique_ptr<Operation>* operation,
353
                                                int64_t term);
354
355
  Result<OperationDriverPtr> NewLeaderOperationDriver(
356
      std::unique_ptr<Operation>* operation, int64_t term);
357
  Result<OperationDriverPtr> NewReplicaOperationDriver(std::unique_ptr<Operation>* operation);
358
359
  // Tells the tablet's log to garbage collect.
360
  CHECKED_STATUS RunLogGC();
361
362
  // Register the maintenance ops associated with this peer's tablet, also invokes
363
  // Tablet::RegisterMaintenanceOps().
364
  void RegisterMaintenanceOps(MaintenanceManager* maintenance_manager);
365
366
  // Unregister the maintenance ops associated with this peer's tablet.
367
  // This method is not thread safe.
368
  void UnregisterMaintenanceOps();
369
370
  // Return pointer to the transaction tracker for this peer.
371
2.01k
  const OperationTracker* operation_tracker() const { return &operation_tracker_; }
372
373
18.1M
  const RaftGroupMetadataPtr& tablet_metadata() const {
374
18.1M
    return meta_;
375
18.1M
  }
376
377
  CHECKED_STATUS set_cdc_min_replicated_index(int64_t cdc_min_replicated_index);
378
379
  CHECKED_STATUS set_cdc_min_replicated_index_unlocked(int64_t cdc_min_replicated_index);
380
381
  CHECKED_STATUS reset_cdc_min_replicated_index_if_stale();
382
383
  TableType table_type();
384
385
  // Returns the number of segments in log_.
386
  size_t GetNumLogSegments() const;
387
388
  // Might update the can_be_deleted_.
389
  bool CanBeDeleted();
390
391
  std::string LogPrefix() const;
392
393
 protected:
394
  friend class RefCountedThreadSafe<TabletPeer>;
395
  friend class TabletPeerTest;
396
  FRIEND_TEST(TabletPeerTest, TestDMSAnchorPreventsLogGC);
397
  FRIEND_TEST(TabletPeerTest, TestActiveOperationPreventsLogGC);
398
399
  // Wait until the TabletPeer is fully in SHUTDOWN state.
400
  void WaitUntilShutdown();
401
402
  // After bootstrap is complete and consensus is setup this initiates the transactions
403
  // that were not complete on bootstrap.
404
  // Not implemented yet. See .cc file.
405
  CHECKED_STATUS StartPendingOperations(PeerRole my_role,
406
                                        const consensus::ConsensusBootstrapInfo& bootstrap_info);
407
408
  scoped_refptr<OperationDriver> CreateOperationDriver();
409
410
  virtual std::unique_ptr<Operation> CreateOperation(consensus::ReplicateMsg* replicate_msg);
411
412
  const RaftGroupMetadataPtr meta_;
413
414
  const std::string tablet_id_;
415
416
  const consensus::RaftPeerPB local_peer_pb_;
417
418
  // The atomics state_, error_ and has_consensus_ maintain information about the tablet peer.
419
  // While modifying the other fields in tablet peer, state_ is modified last.
420
  // error_ is set before state_ is set to an error state.
421
  std::atomic<enum RaftGroupStatePB> state_;
422
  AtomicUniquePtr<Status> error_;
423
  std::atomic<bool> has_consensus_ = {false};
424
425
  OperationTracker operation_tracker_;
426
427
  scoped_refptr<log::Log> log_;
428
  std::atomic<log::Log*> log_atomic_{nullptr};
429
430
  TabletPtr tablet_;
431
  rpc::ProxyCache* proxy_cache_;
432
  std::shared_ptr<consensus::RaftConsensus> consensus_;
433
  std::unique_ptr<TabletStatusListener> status_listener_;
434
  simple_spinlock prepare_replicate_lock_;
435
436
  // Lock protecting state_ as well as smart pointers to collaborating
437
  // classes such as tablet_ and consensus_.
438
  mutable simple_spinlock lock_;
439
440
  // Lock taken during Init/Shutdown which ensures that only a single thread
441
  // attempts to perform major lifecycle operations (Init/Shutdown) at once.
442
  // This must be acquired before acquiring lock_ if they are acquired together.
443
  // We don't just use lock_ since the lifecycle operations may take a while
444
  // and we'd like other threads to be able to quickly poll the state_ variable
445
  // during them in order to reject RPCs, etc.
446
  mutable simple_spinlock state_change_lock_;
447
448
  std::unique_ptr<Preparer> prepare_thread_;
449
450
  scoped_refptr<server::Clock> clock_;
451
452
  scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
453
454
  // Function to mark this TabletPeer's tablet as dirty in the TSTabletManager.
455
  // This function must be called any time the cluster membership or cluster
456
  // leadership changes. Note that this function is called synchronously on the followers
457
  // or leader via the consensus round completion callback of NonTrackedRoundReplicationFinished.
458
  // Hence this should be a relatively lightweight function - e.g., update in-memory only state
459
  // and defer any other heavy duty operations to a thread pool.
460
  Callback<void(std::shared_ptr<consensus::StateChangeContext> context)> mark_dirty_clbk_;
461
462
  // List of maintenance operations for the tablet that need information that only the peer
463
  // can provide.
464
  std::vector<std::unique_ptr<MaintenanceOp>> maintenance_ops_;
465
466
  // Cache the permanent of the tablet UUID to retrieve it without a lock in the common case.
467
  const std::string permanent_uuid_;
468
469
  std::atomic<rpc::ThreadPool*> service_thread_pool_{nullptr};
470
  AtomicUniquePtr<rpc::Strand> strand_;
471
472
  OperationCounter preparing_operations_counter_;
473
474
  // Serializes access to set_cdc_min_replicated_index and reset_cdc_min_replicated_index_if_stale
475
  // and protects cdc_min_replicated_index_refresh_time_ for reads and writes.
476
  mutable simple_spinlock cdc_min_replicated_index_lock_;
477
  MonoTime cdc_min_replicated_index_refresh_time_ = MonoTime::Min();
478
479
 private:
480
  Result<HybridTime> ReportReadRestart() override;
481
482
  Result<FixedHybridTimeLease> HybridTimeLease(HybridTime min_allowed, CoarseTimePoint deadline);
483
  Result<HybridTime> PreparePeerRequest() override;
484
  void MajorityReplicated() override;
485
  void ChangeConfigReplicated(const consensus::RaftConfigPB& config) override;
486
  uint64_t NumSSTFiles() override;
487
  void ListenNumSSTFilesChanged(std::function<void()> listener) override;
488
  rpc::Scheduler& scheduler() const override;
489
  CHECKED_STATUS CheckOperationAllowed(
490
      const OpId& op_id, consensus::OperationType op_type) override;
491
492
  // Return granular types of on-disk size of this tablet replica, in bytes.
493
  TabletOnDiskSizeInfo GetOnDiskSizeInfo() const REQUIRES(lock_);
494
495
  MetricRegistry* metric_registry_;
496
497
0
  bool IsLeader() override {
498
0
    return LeaderTerm() != OpId::kUnknownTerm;
499
0
  }
500
501
  TabletSplitter* tablet_splitter_;
502
503
  std::shared_future<client::YBClient*> client_future_;
504
505
  rpc::Messenger* messenger_;
506
507
  DISALLOW_COPY_AND_ASSIGN(TabletPeer);
508
};
509
510
}  // namespace tablet
511
}  // namespace yb
512
513
#endif /* YB_TABLET_TABLET_PEER_H_ */