/Users/deen/code/yugabyte-db/src/yb/tablet/tablet_retention_policy.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tablet/tablet_retention_policy.h" |
15 | | |
16 | | #include <iosfwd> |
17 | | #include <map> |
18 | | #include <memory> |
19 | | #include <mutex> |
20 | | #include <string> |
21 | | #include <vector> |
22 | | |
23 | | #include "yb/common/common_fwd.h" |
24 | | #include "yb/common/schema.h" |
25 | | #include "yb/common/snapshot.h" |
26 | | #include "yb/common/transaction_error.h" |
27 | | |
28 | | #include "yb/docdb/doc_ttl_util.h" |
29 | | #include "yb/docdb/docdb_compaction_filter.h" |
30 | | |
31 | | #include "yb/gutil/ref_counted.h" |
32 | | |
33 | | #include "yb/rocksdb/options.h" |
34 | | #include "yb/rocksdb/types.h" |
35 | | |
36 | | #include "yb/server/hybrid_clock.h" |
37 | | |
38 | | #include "yb/tablet/tablet_fwd.h" |
39 | | #include "yb/tablet/tablet_metadata.h" |
40 | | |
41 | | #include "yb/util/enums.h" |
42 | | #include "yb/util/logging.h" |
43 | | #include "yb/util/strongly_typed_bool.h" |
44 | | |
45 | | using namespace std::literals; |
46 | | |
47 | | DEFINE_int32(timestamp_history_retention_interval_sec, 900, |
48 | | "The time interval in seconds to retain DocDB history for. Point-in-time reads at a " |
49 | | "hybrid time further than this in the past might not be allowed after a compaction. " |
50 | | "Set this to be higher than the expected maximum duration of any single transaction " |
51 | | "in your application."); |
52 | | |
53 | | DEFINE_bool(enable_history_cutoff_propagation, false, |
54 | | "Should we use history cutoff propagation (true) or calculate it locally (false)."); |
55 | | |
56 | | DEFINE_int32(history_cutoff_propagation_interval_ms, 180000, |
57 | | "History cutoff propagation interval in milliseconds."); |
58 | | |
59 | | namespace yb { |
60 | | namespace tablet { |
61 | | |
62 | | using docdb::TableTTL; |
63 | | using docdb::HistoryRetentionDirective; |
64 | | |
65 | | TabletRetentionPolicy::TabletRetentionPolicy( |
66 | | server::ClockPtr clock, const AllowedHistoryCutoffProvider& allowed_history_cutoff_provider, |
67 | | RaftGroupMetadata* metadata) |
68 | | : clock_(std::move(clock)), allowed_history_cutoff_provider_(allowed_history_cutoff_provider), |
69 | 150k | metadata_(*metadata), log_prefix_(metadata->LogPrefix()) { |
70 | 150k | } |
71 | | |
72 | 3.05k | HybridTime TabletRetentionPolicy::UpdateCommittedHistoryCutoff(HybridTime value) { |
73 | 3.05k | std::lock_guard<std::mutex> lock(mutex_); |
74 | 3.05k | if (!value) { |
75 | 2.97k | return committed_history_cutoff_; |
76 | 2.97k | } |
77 | | |
78 | 83 | VLOG_WITH_PREFIX2 (4) << __func__ << "(" << value << ")"2 ; |
79 | | |
80 | 83 | committed_history_cutoff_ = std::max(committed_history_cutoff_, value); |
81 | 83 | return committed_history_cutoff_; |
82 | 3.05k | } |
83 | | |
84 | 494 | HistoryRetentionDirective TabletRetentionPolicy::GetRetentionDirective() { |
85 | 494 | HybridTime history_cutoff; |
86 | 494 | { |
87 | 494 | std::lock_guard<std::mutex> lock(mutex_); |
88 | 494 | if (FLAGS_enable_history_cutoff_propagation) { |
89 | 0 | history_cutoff = SanitizeHistoryCutoff(committed_history_cutoff_); |
90 | 494 | } else { |
91 | 494 | history_cutoff = EffectiveHistoryCutoff(); |
92 | 494 | committed_history_cutoff_ = std::max(history_cutoff, committed_history_cutoff_); |
93 | 494 | } |
94 | 494 | } |
95 | | |
96 | 494 | auto deleted_before_history_cutoff = std::make_shared<docdb::ColumnIds>(); |
97 | 494 | for (const auto& deleted_col : *metadata_.deleted_cols()) { |
98 | 0 | if (deleted_col.ht < history_cutoff) { |
99 | 0 | deleted_before_history_cutoff->insert(deleted_col.id); |
100 | 0 | } |
101 | 0 | } |
102 | | |
103 | 494 | return {history_cutoff, std::move(deleted_before_history_cutoff), |
104 | 494 | TableTTL(*metadata_.schema()), |
105 | 494 | docdb::ShouldRetainDeleteMarkersInMajorCompaction( |
106 | 494 | ShouldRetainDeleteMarkersInMajorCompaction())}; |
107 | 494 | } |
108 | | |
109 | 9.78M | Status TabletRetentionPolicy::RegisterReaderTimestamp(HybridTime timestamp) { |
110 | 9.78M | std::lock_guard<std::mutex> lock(mutex_); |
111 | 9.78M | if (timestamp < committed_history_cutoff_) { |
112 | 0 | return STATUS( |
113 | 0 | SnapshotTooOld, |
114 | 0 | Format( |
115 | 0 | "Snapshot too old. Read point: $0, earliest read time allowed: $1, delta (usec): $2", |
116 | 0 | timestamp, |
117 | 0 | committed_history_cutoff_, |
118 | 0 | committed_history_cutoff_.PhysicalDiff(timestamp)), |
119 | 0 | TransactionError(TransactionErrorCode::kSnapshotTooOld)); |
120 | 0 | } |
121 | 9.78M | active_readers_.insert(timestamp); |
122 | 9.78M | return Status::OK(); |
123 | 9.78M | } |
124 | | |
125 | 9.73M | void TabletRetentionPolicy::UnregisterReaderTimestamp(HybridTime timestamp) { |
126 | 9.73M | std::lock_guard<std::mutex> lock(mutex_); |
127 | 9.73M | active_readers_.erase(timestamp); |
128 | 9.73M | } |
129 | | |
130 | 494 | bool TabletRetentionPolicy::ShouldRetainDeleteMarkersInMajorCompaction() const { |
131 | | // If the index table is in the process of being backfilled, then we |
132 | | // want to retain delete markers until the backfill process is complete. |
133 | 494 | return metadata_.schema()->table_properties().retain_delete_markers(); |
134 | 494 | } |
135 | | |
136 | 25.4M | HybridTime TabletRetentionPolicy::HistoryCutoffToPropagate(HybridTime last_write_ht) { |
137 | 25.4M | std::lock_guard<std::mutex> lock(mutex_); |
138 | | |
139 | 25.4M | auto now = CoarseMonoClock::now(); |
140 | | |
141 | 25.4M | VLOG_WITH_PREFIX5.25k (4) << __func__ << "(" << last_write_ht << "), left to wait: " |
142 | 5.25k | << MonoDelta(next_history_cutoff_propagation_ - now); |
143 | | |
144 | 25.4M | if (disable_counter_ != 0 || !FLAGS_enable_history_cutoff_propagation25.4M || |
145 | 25.4M | now < next_history_cutoff_propagation_0 || last_write_ht <= committed_history_cutoff_0 ) { |
146 | 25.4M | return HybridTime(); |
147 | 25.4M | } |
148 | | |
149 | 13.7k | next_history_cutoff_propagation_ = |
150 | 13.7k | now + ANNOTATE_UNPROTECTED_READ(FLAGS_history_cutoff_propagation_interval_ms) * 1ms; |
151 | | |
152 | 13.7k | return EffectiveHistoryCutoff(); |
153 | 25.4M | } |
154 | | |
155 | 494 | HybridTime TabletRetentionPolicy::EffectiveHistoryCutoff() { |
156 | 494 | auto retention_delta = |
157 | 494 | -ANNOTATE_UNPROTECTED_READ(FLAGS_timestamp_history_retention_interval_sec) * 1s; |
158 | | // We try to garbage-collect history older than current time minus the configured retention |
159 | | // interval, but we might not be able to do so if there are still read operations reading at an |
160 | | // older snapshot. |
161 | 494 | return SanitizeHistoryCutoff(clock_->Now().AddDelta(retention_delta)); |
162 | 494 | } |
163 | | |
164 | 494 | HybridTime TabletRetentionPolicy::SanitizeHistoryCutoff(HybridTime proposed_cutoff) { |
165 | 494 | HybridTime allowed_cutoff; |
166 | 494 | if (active_readers_.empty()) { |
167 | | // There are no readers restricting our garbage collection of old records. |
168 | 425 | allowed_cutoff = proposed_cutoff; |
169 | 425 | } else { |
170 | | // Cannot garbage-collect any records that are still being read. |
171 | 69 | allowed_cutoff = std::min(proposed_cutoff, *active_readers_.begin()); |
172 | 69 | } |
173 | | |
174 | 494 | HybridTime provided_allowed_cutoff; |
175 | 494 | if (allowed_history_cutoff_provider_) { |
176 | 464 | provided_allowed_cutoff = allowed_history_cutoff_provider_(&metadata_); |
177 | 464 | allowed_cutoff = std::min(provided_allowed_cutoff, allowed_cutoff); |
178 | 464 | } |
179 | | |
180 | 494 | VLOG_WITH_PREFIX0 (4) << __func__ << ", result: " << allowed_cutoff |
181 | 0 | << ", active readers: " << active_readers_.size() |
182 | 0 | << ", provided_allowed_cutoff: " << provided_allowed_cutoff |
183 | 0 | << ", schedules: " << AsString(metadata_.SnapshotSchedules()); |
184 | | |
185 | 494 | return allowed_cutoff; |
186 | 494 | } |
187 | | |
188 | 6.04k | void TabletRetentionPolicy::EnableHistoryCutoffPropagation(bool value) { |
189 | 6.04k | std::lock_guard<std::mutex> lock(mutex_); |
190 | 6.04k | if (value) { |
191 | 3.02k | --disable_counter_; |
192 | 3.02k | } else { |
193 | 3.02k | ++disable_counter_; |
194 | 3.02k | } |
195 | 6.04k | } |
196 | | |
197 | | } // namespace tablet |
198 | | } // namespace yb |