/Users/deen/code/yugabyte-db/src/yb/tablet/operations/operation_tracker.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tablet/operations/operation_tracker.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <limits> |
37 | | #include <vector> |
38 | | |
39 | | #include "yb/gutil/map-util.h" |
40 | | #include "yb/gutil/strings/substitute.h" |
41 | | |
42 | | #include "yb/tablet/operations/operation_driver.h" |
43 | | #include "yb/tablet/tablet.h" |
44 | | |
45 | | #include "yb/util/flag_tags.h" |
46 | | #include "yb/util/logging.h" |
47 | | #include "yb/util/mem_tracker.h" |
48 | | #include "yb/util/metrics.h" |
49 | | #include "yb/util/monotime.h" |
50 | | #include "yb/util/status_log.h" |
51 | | #include "yb/util/tsan_util.h" |
52 | | |
53 | | DEFINE_int64(tablet_operation_memory_limit_mb, 1024, |
54 | | "Maximum amount of memory that may be consumed by all in-flight " |
55 | | "operations belonging to a particular tablet. When this limit " |
56 | | "is reached, new operations will be rejected and clients will " |
57 | | "be forced to retry them. If -1, operation memory tracking is " |
58 | | "disabled."); |
59 | | TAG_FLAG(tablet_operation_memory_limit_mb, advanced); |
60 | | |
61 | | METRIC_DEFINE_gauge_uint64(tablet, all_operations_inflight, |
62 | | "Operations In Flight", |
63 | | yb::MetricUnit::kOperations, |
64 | | "Number of operations currently in-flight, including any type."); |
65 | | METRIC_DEFINE_gauge_uint64(tablet, write_operations_inflight, |
66 | | "Write Operations In Flight", |
67 | | yb::MetricUnit::kOperations, |
68 | | "Number of write operations currently in-flight"); |
69 | | METRIC_DEFINE_gauge_uint64(tablet, alter_schema_operations_inflight, |
70 | | "Change Metadata Operations In Flight", |
71 | | yb::MetricUnit::kOperations, |
72 | | "Number of change metadata operations currently in-flight"); |
73 | | METRIC_DEFINE_gauge_uint64(tablet, update_transaction_operations_inflight, |
74 | | "Update Transaction Operations In Flight", |
75 | | yb::MetricUnit::kOperations, |
76 | | "Number of update transaction operations currently in-flight"); |
77 | | METRIC_DEFINE_gauge_uint64(tablet, snapshot_operations_inflight, |
78 | | "Snapshot Operations In Flight", |
79 | | yb::MetricUnit::kOperations, |
80 | | "Number of snapshot operations currently in-flight"); |
81 | | METRIC_DEFINE_gauge_uint64(tablet, split_operations_inflight, |
82 | | "Split Operations In Flight", |
83 | | yb::MetricUnit::kOperations, |
84 | | "Number of split operations currently in-flight"); |
85 | | METRIC_DEFINE_gauge_uint64(tablet, truncate_operations_inflight, |
86 | | "Truncate Operations In Flight", |
87 | | yb::MetricUnit::kOperations, |
88 | | "Number of truncate operations currently in-flight"); |
89 | | METRIC_DEFINE_gauge_uint64(tablet, empty_operations_inflight, |
90 | | "Empty Operations In Flight", |
91 | | yb::MetricUnit::kOperations, |
92 | | "Number of none operations currently in-flight"); |
93 | | METRIC_DEFINE_gauge_uint64(tablet, history_cutoff_operations_inflight, |
94 | | "History Cutoff Operations In Flight", |
95 | | yb::MetricUnit::kOperations, |
96 | | "Number of history cutoff operations currently in-flight"); |
97 | | |
98 | | METRIC_DEFINE_counter(tablet, operation_memory_pressure_rejections, |
99 | | "Operation Memory Pressure Rejections", |
100 | | yb::MetricUnit::kOperations, |
101 | | "Number of operations rejected because the tablet's " |
102 | | "operation memory limit was reached."); |
103 | | |
104 | | using namespace std::literals; |
105 | | using std::shared_ptr; |
106 | | using std::vector; |
107 | | |
108 | | namespace yb { |
109 | | namespace tablet { |
110 | | |
111 | | using strings::Substitute; |
112 | | |
113 | | #define MINIT(x) x(METRIC_##x.Instantiate(entity)) |
114 | | #define GINIT(x) x(METRIC_##x.Instantiate(entity, 0)) |
115 | | #define INSTANTIATE(upper, lower) \ |
116 | 1.20M | operations_inflight[to_underlying(OperationType::BOOST_PP_CAT(k, upper))] = \ |
117 | 1.20M | BOOST_PP_CAT(BOOST_PP_CAT(METRIC_, lower), _operations_inflight).Instantiate(entity, 0); |
118 | | OperationTracker::Metrics::Metrics(const scoped_refptr<MetricEntity>& entity) |
119 | | : GINIT(all_operations_inflight), |
120 | 150k | MINIT(operation_memory_pressure_rejections) { |
121 | 150k | INSTANTIATE(Write, write); |
122 | 150k | INSTANTIATE(ChangeMetadata, alter_schema); |
123 | 150k | INSTANTIATE(UpdateTransaction, update_transaction); |
124 | 150k | INSTANTIATE(Snapshot, snapshot); |
125 | 150k | INSTANTIATE(Split, split); |
126 | 150k | INSTANTIATE(Truncate, truncate); |
127 | 150k | INSTANTIATE(Empty, empty); |
128 | 150k | INSTANTIATE(HistoryCutoff, history_cutoff); |
129 | 150k | static_assert(8 == kElementsInOperationType, "Init metrics for all operation types"); |
130 | 150k | } |
131 | | #undef INSTANTIATE |
132 | | #undef GINIT |
133 | | #undef MINIT |
134 | | |
135 | | OperationTracker::OperationTracker(const std::string& log_prefix) |
136 | 150k | : log_prefix_(log_prefix) { |
137 | 150k | } |
138 | | |
139 | 74.1k | OperationTracker::~OperationTracker() { |
140 | 74.1k | std::lock_guard<std::mutex> lock(mutex_); |
141 | 74.1k | CHECK_EQ(pending_operations_.size(), 0); |
142 | 74.1k | if (mem_tracker_) { |
143 | 74.0k | mem_tracker_->UnregisterFromParent(); |
144 | 74.0k | } |
145 | 74.1k | } |
146 | | |
147 | 31.4M | Status OperationTracker::Add(OperationDriver* driver) { |
148 | 31.4M | int64_t driver_mem_footprint = driver->SpaceUsed(); |
149 | 31.4M | if (mem_tracker_ && !mem_tracker_->TryConsume(driver_mem_footprint)31.4M ) { |
150 | 4 | if (metrics_) { |
151 | 4 | metrics_->operation_memory_pressure_rejections->Increment(); |
152 | 4 | } |
153 | | |
154 | | // May be null in unit tests. |
155 | 4 | Tablet* tablet = driver->operation()->tablet(); |
156 | | |
157 | 4 | string msg = Substitute( |
158 | 4 | "Operation failed, tablet $0 operation memory consumption ($1) " |
159 | 4 | "has exceeded its limit ($2) or the limit of an ancestral tracker", |
160 | 4 | tablet ? tablet->tablet_id() : "(unknown)"0 , |
161 | 4 | mem_tracker_->consumption(), mem_tracker_->limit()); |
162 | | |
163 | 4 | YB_LOG_EVERY_N_SECS(WARNING, 1) << msg << THROTTLE_MSG; |
164 | | |
165 | 4 | return STATUS(ServiceUnavailable, msg); |
166 | 4 | } |
167 | | |
168 | 31.4M | IncrementCounters(*driver); |
169 | | |
170 | | // Cache the operation memory footprint so we needn't refer to the request |
171 | | // again, as it may disappear between now and then. |
172 | 31.4M | State st; |
173 | 31.4M | st.memory_footprint = driver_mem_footprint; |
174 | 31.4M | std::lock_guard<std::mutex> lock(mutex_); |
175 | 31.4M | CHECK(pending_operations_.emplace(driver, st).second); |
176 | 31.4M | return Status::OK(); |
177 | 31.4M | } |
178 | | |
179 | 31.4M | void OperationTracker::IncrementCounters(const OperationDriver& driver) const { |
180 | 31.4M | if (!metrics_) { |
181 | 0 | return; |
182 | 0 | } |
183 | | |
184 | 31.4M | metrics_->all_operations_inflight->Increment(); |
185 | 31.4M | metrics_->operations_inflight[to_underlying(driver.operation_type())]->Increment(); |
186 | 31.4M | } |
187 | | |
188 | 31.4M | void OperationTracker::DecrementCounters(const OperationDriver& driver) const { |
189 | 31.4M | if (!metrics_) { |
190 | 0 | return; |
191 | 0 | } |
192 | | |
193 | 31.4M | DCHECK_GT(metrics_->all_operations_inflight->value(), 0); |
194 | 31.4M | metrics_->all_operations_inflight->Decrement(); |
195 | 31.4M | auto index = to_underlying(driver.operation_type()); |
196 | 31.4M | DCHECK_GT(metrics_->operations_inflight[index]->value(), 0); |
197 | 31.4M | metrics_->operations_inflight[index]->Decrement(); |
198 | 31.4M | } |
199 | | |
200 | 31.4M | void OperationTracker::Release(OperationDriver* driver, OpIds* applied_op_ids) { |
201 | 31.4M | DecrementCounters(*driver); |
202 | | |
203 | 31.4M | State st; |
204 | 31.4M | yb::OpId op_id = driver->GetOpId(); |
205 | 31.4M | OperationType operation_type = driver->operation_type(); |
206 | 31.4M | bool notify; |
207 | 31.4M | { |
208 | | // Remove the operation from the map, retaining the state for use |
209 | | // below. |
210 | 31.4M | std::lock_guard<std::mutex> lock(mutex_); |
211 | 31.4M | st = FindOrDie(pending_operations_, driver); |
212 | 31.4M | if (PREDICT_FALSE(pending_operations_.erase(driver) != 1)) { |
213 | 0 | LOG_WITH_PREFIX(FATAL) << "Could not remove pending operation from map: " |
214 | 0 | << driver->ToStringUnlocked(); |
215 | 0 | } |
216 | 31.4M | notify = pending_operations_.empty(); |
217 | 31.4M | } |
218 | 31.4M | if (notify) { |
219 | 29.3M | cond_.notify_all(); |
220 | 29.3M | } |
221 | | |
222 | 31.4M | if (mem_tracker_ && st.memory_footprint31.4M ) { |
223 | 14.3M | mem_tracker_->Release(st.memory_footprint); |
224 | 14.3M | } |
225 | | |
226 | 31.4M | if (operation_type != OperationType::kEmpty) { |
227 | 14.3M | if (applied_op_ids) { |
228 | 14.3M | applied_op_ids->push_back(op_id); |
229 | 14.3M | } else if (546 post_tracker_546 ) { |
230 | 605 | post_tracker_(op_id); |
231 | 605 | } |
232 | 14.3M | } |
233 | 31.4M | } |
234 | | |
235 | 50.2M | std::vector<scoped_refptr<OperationDriver>> OperationTracker::GetPendingOperations() const { |
236 | 50.2M | std::lock_guard<std::mutex> lock(mutex_); |
237 | 50.2M | return GetPendingOperationsUnlocked(); |
238 | 50.2M | } |
239 | | |
240 | 50.2M | std::vector<scoped_refptr<OperationDriver>> OperationTracker::GetPendingOperationsUnlocked() const { |
241 | 50.2M | std::vector<scoped_refptr<OperationDriver>> result; |
242 | 50.2M | result.reserve(pending_operations_.size()); |
243 | 50.2M | for (const auto& e : pending_operations_) { |
244 | 963k | result.push_back(e.first); |
245 | 963k | } |
246 | 50.2M | return result; |
247 | 50.2M | } |
248 | | |
249 | | |
250 | 0 | size_t OperationTracker::TEST_GetNumPending() const { |
251 | 0 | std::lock_guard<std::mutex> l(mutex_); |
252 | 0 | return pending_operations_.size(); |
253 | 0 | } |
254 | | |
255 | 75.6k | void OperationTracker::WaitForAllToFinish() const { |
256 | | // Wait indefinitely. |
257 | 75.6k | CHECK_OK(WaitForAllToFinish(MonoDelta::FromNanoseconds(std::numeric_limits<int64_t>::max()))); |
258 | 75.6k | } |
259 | | |
260 | | Status OperationTracker::WaitForAllToFinish(const MonoDelta& timeout) const |
261 | 78.6k | NO_THREAD_SAFETY_ANALYSIS { |
262 | 78.6k | const MonoDelta kComplainInterval = 1000ms * kTimeMultiplier; |
263 | 78.6k | MonoDelta wait_time = 250ms * kTimeMultiplier; |
264 | 78.6k | int num_complaints = 0; |
265 | 78.6k | MonoTime start_time = MonoTime::Now(); |
266 | 78.6k | auto operations = GetPendingOperations(); |
267 | 78.6k | if (operations.empty()) { |
268 | 78.6k | return Status::OK(); |
269 | 78.6k | } |
270 | 3 | for (;;) { |
271 | 0 | MonoDelta diff = MonoTime::Now().GetDeltaSince(start_time); |
272 | 0 | if (diff.MoreThan(timeout)) { |
273 | 0 | return STATUS(TimedOut, Substitute("Timed out waiting for all operations to finish. " |
274 | 0 | "$0 operations pending. Waited for $1", |
275 | 0 | operations.size(), diff.ToString())); |
276 | 0 | } |
277 | 0 | if (diff > kComplainInterval * num_complaints) { |
278 | 0 | LOG_WITH_PREFIX(WARNING) |
279 | 0 | << Format("OperationTracker waiting for $0 outstanding operations to" |
280 | 0 | " complete now for $1", operations.size(), diff); |
281 | 0 | num_complaints++; |
282 | 0 | } |
283 | 0 | wait_time = std::min<MonoDelta>(wait_time * 5 / 4, 1s); |
284 | |
|
285 | 0 | LOG_WITH_PREFIX(INFO) << "Dumping currently running operations: "; |
286 | 0 | for (scoped_refptr<OperationDriver> driver : operations) { |
287 | 0 | LOG_WITH_PREFIX(INFO) << driver->ToString(); |
288 | 0 | } |
289 | 0 | { |
290 | 0 | std::unique_lock<std::mutex> lock(mutex_); |
291 | 0 | if (pending_operations_.empty()) { |
292 | 0 | break; |
293 | 0 | } |
294 | 0 | if (cond_.wait_for(lock, wait_time.ToSteadyDuration()) == std::cv_status::no_timeout && |
295 | 0 | pending_operations_.empty()) { |
296 | 0 | break; |
297 | 0 | } |
298 | 0 | operations = GetPendingOperationsUnlocked(); |
299 | 0 | } |
300 | 0 | } |
301 | 3 | return Status::OK(); |
302 | 3 | } |
303 | | |
304 | | void OperationTracker::StartInstrumentation( |
305 | 150k | const scoped_refptr<MetricEntity>& metric_entity) { |
306 | 150k | metrics_.reset(new Metrics(metric_entity)); |
307 | 150k | } |
308 | | |
309 | | void OperationTracker::StartMemoryTracking( |
310 | 150k | const shared_ptr<MemTracker>& parent_mem_tracker) { |
311 | 150k | if (FLAGS_tablet_operation_memory_limit_mb != -1) { |
312 | 150k | mem_tracker_ = MemTracker::CreateTracker( |
313 | 150k | FLAGS_tablet_operation_memory_limit_mb * 1024 * 1024, |
314 | 150k | "operation_tracker", |
315 | 150k | parent_mem_tracker); |
316 | 150k | } |
317 | 150k | } |
318 | | |
319 | | } // namespace tablet |
320 | | } // namespace yb |