YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/operations/operation_tracker.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/operations/operation_tracker.h"
34
35
#include <algorithm>
36
#include <limits>
37
#include <vector>
38
39
#include "yb/gutil/map-util.h"
40
#include "yb/gutil/strings/substitute.h"
41
42
#include "yb/tablet/operations/operation_driver.h"
43
#include "yb/tablet/tablet.h"
44
45
#include "yb/util/flag_tags.h"
46
#include "yb/util/logging.h"
47
#include "yb/util/mem_tracker.h"
48
#include "yb/util/metrics.h"
49
#include "yb/util/monotime.h"
50
#include "yb/util/status_log.h"
51
#include "yb/util/tsan_util.h"
52
53
DEFINE_int64(tablet_operation_memory_limit_mb, 1024,
54
             "Maximum amount of memory that may be consumed by all in-flight "
55
             "operations belonging to a particular tablet. When this limit "
56
             "is reached, new operations will be rejected and clients will "
57
             "be forced to retry them. If -1, operation memory tracking is "
58
             "disabled.");
59
TAG_FLAG(tablet_operation_memory_limit_mb, advanced);
60
61
METRIC_DEFINE_gauge_uint64(tablet, all_operations_inflight,
62
                           "Operations In Flight",
63
                           yb::MetricUnit::kOperations,
64
                           "Number of operations currently in-flight, including any type.");
65
METRIC_DEFINE_gauge_uint64(tablet, write_operations_inflight,
66
                           "Write Operations In Flight",
67
                           yb::MetricUnit::kOperations,
68
                           "Number of write operations currently in-flight");
69
METRIC_DEFINE_gauge_uint64(tablet, alter_schema_operations_inflight,
70
                           "Change Metadata Operations In Flight",
71
                           yb::MetricUnit::kOperations,
72
                           "Number of change metadata operations currently in-flight");
73
METRIC_DEFINE_gauge_uint64(tablet, update_transaction_operations_inflight,
74
                           "Update Transaction Operations In Flight",
75
                           yb::MetricUnit::kOperations,
76
                           "Number of update transaction operations currently in-flight");
77
METRIC_DEFINE_gauge_uint64(tablet, snapshot_operations_inflight,
78
                           "Snapshot Operations In Flight",
79
                           yb::MetricUnit::kOperations,
80
                           "Number of snapshot operations currently in-flight");
81
METRIC_DEFINE_gauge_uint64(tablet, split_operations_inflight,
82
                           "Split Operations In Flight",
83
                           yb::MetricUnit::kOperations,
84
                           "Number of split operations currently in-flight");
85
METRIC_DEFINE_gauge_uint64(tablet, truncate_operations_inflight,
86
                           "Truncate Operations In Flight",
87
                           yb::MetricUnit::kOperations,
88
                           "Number of truncate operations currently in-flight");
89
METRIC_DEFINE_gauge_uint64(tablet, empty_operations_inflight,
90
                           "Empty Operations In Flight",
91
                           yb::MetricUnit::kOperations,
92
                           "Number of none operations currently in-flight");
93
METRIC_DEFINE_gauge_uint64(tablet, history_cutoff_operations_inflight,
94
                           "History Cutoff Operations In Flight",
95
                           yb::MetricUnit::kOperations,
96
                           "Number of history cutoff operations currently in-flight");
97
98
METRIC_DEFINE_counter(tablet, operation_memory_pressure_rejections,
99
                      "Operation Memory Pressure Rejections",
100
                      yb::MetricUnit::kOperations,
101
                      "Number of operations rejected because the tablet's "
102
                      "operation memory limit was reached.");
103
104
using namespace std::literals;
105
using std::shared_ptr;
106
using std::vector;
107
108
namespace yb {
109
namespace tablet {
110
111
using strings::Substitute;
112
113
#define MINIT(x) x(METRIC_##x.Instantiate(entity))
114
#define GINIT(x) x(METRIC_##x.Instantiate(entity, 0))
115
#define INSTANTIATE(upper, lower) \
116
709k
  operations_inflight[to_underlying(OperationType::BOOST_PP_CAT(k, upper))] = \
117
709k
      BOOST_PP_CAT(BOOST_PP_CAT(METRIC_, lower), _operations_inflight).Instantiate(entity, 0);
118
OperationTracker::Metrics::Metrics(const scoped_refptr<MetricEntity>& entity)
119
    : GINIT(all_operations_inflight),
120
88.7k
      MINIT(operation_memory_pressure_rejections) {
121
88.7k
  INSTANTIATE(Write, write);
122
88.7k
  INSTANTIATE(ChangeMetadata, alter_schema);
123
88.7k
  INSTANTIATE(UpdateTransaction, update_transaction);
124
88.7k
  INSTANTIATE(Snapshot, snapshot);
125
88.7k
  INSTANTIATE(Split, split);
126
88.7k
  INSTANTIATE(Truncate, truncate);
127
88.7k
  INSTANTIATE(Empty, empty);
128
88.7k
  INSTANTIATE(HistoryCutoff, history_cutoff);
129
88.7k
  static_assert(8 == kElementsInOperationType, "Init metrics for all operation types");
130
88.7k
}
131
#undef INSTANTIATE
132
#undef GINIT
133
#undef MINIT
134
135
OperationTracker::OperationTracker(const std::string& log_prefix)
136
88.8k
    : log_prefix_(log_prefix) {
137
88.8k
}
138
139
47.0k
OperationTracker::~OperationTracker() {
140
47.0k
  std::lock_guard<std::mutex> lock(mutex_);
141
47.0k
  CHECK_EQ(pending_operations_.size(), 0);
142
47.0k
  if (mem_tracker_) {
143
47.0k
    mem_tracker_->UnregisterFromParent();
144
47.0k
  }
145
47.0k
}
146
147
13.4M
Status OperationTracker::Add(OperationDriver* driver) {
148
13.4M
  int64_t driver_mem_footprint = driver->SpaceUsed();
149
13.4M
  if (mem_tracker_ && !mem_tracker_->TryConsume(driver_mem_footprint)) {
150
2
    if (metrics_) {
151
2
      metrics_->operation_memory_pressure_rejections->Increment();
152
2
    }
153
154
    // May be null in unit tests.
155
2
    Tablet* tablet = driver->operation()->tablet();
156
157
2
    string msg = Substitute(
158
2
        "Operation failed, tablet $0 operation memory consumption ($1) "
159
2
        "has exceeded its limit ($2) or the limit of an ancestral tracker",
160
2
        tablet ? tablet->tablet_id() : "(unknown)",
161
2
        mem_tracker_->consumption(), mem_tracker_->limit());
162
163
2
    YB_LOG_EVERY_N_SECS(WARNING, 1) << msg << THROTTLE_MSG;
164
165
2
    return STATUS(ServiceUnavailable, msg);
166
2
  }
167
168
13.4M
  IncrementCounters(*driver);
169
170
  // Cache the operation memory footprint so we needn't refer to the request
171
  // again, as it may disappear between now and then.
172
13.4M
  State st;
173
13.4M
  st.memory_footprint = driver_mem_footprint;
174
13.4M
  std::lock_guard<std::mutex> lock(mutex_);
175
13.4M
  CHECK(pending_operations_.emplace(driver, st).second);
176
13.4M
  return Status::OK();
177
13.4M
}
178
179
13.5M
void OperationTracker::IncrementCounters(const OperationDriver& driver) const {
180
13.5M
  if (!metrics_) {
181
0
    return;
182
0
  }
183
184
13.5M
  metrics_->all_operations_inflight->Increment();
185
13.5M
  metrics_->operations_inflight[to_underlying(driver.operation_type())]->Increment();
186
13.5M
}
187
188
13.5M
void OperationTracker::DecrementCounters(const OperationDriver& driver) const {
189
13.5M
  if (!metrics_) {
190
0
    return;
191
0
  }
192
193
13.5M
  DCHECK_GT(metrics_->all_operations_inflight->value(), 0);
194
13.5M
  metrics_->all_operations_inflight->Decrement();
195
13.5M
  auto index = to_underlying(driver.operation_type());
196
13.5M
  DCHECK_GT(metrics_->operations_inflight[index]->value(), 0);
197
13.5M
  metrics_->operations_inflight[index]->Decrement();
198
13.5M
}
199
200
13.5M
void OperationTracker::Release(OperationDriver* driver, OpIds* applied_op_ids) {
201
13.5M
  DecrementCounters(*driver);
202
203
13.5M
  State st;
204
13.5M
  yb::OpId op_id = driver->GetOpId();
205
13.5M
  OperationType operation_type = driver->operation_type();
206
13.5M
  bool notify;
207
13.5M
  {
208
    // Remove the operation from the map, retaining the state for use
209
    // below.
210
13.5M
    std::lock_guard<std::mutex> lock(mutex_);
211
13.5M
    st = FindOrDie(pending_operations_, driver);
212
13.5M
    if (PREDICT_FALSE(pending_operations_.erase(driver) != 1)) {
213
0
      LOG_WITH_PREFIX(FATAL) << "Could not remove pending operation from map: "
214
0
          << driver->ToStringUnlocked();
215
0
    }
216
13.5M
    notify = pending_operations_.empty();
217
13.5M
  }
218
13.5M
  if (notify) {
219
12.3M
    cond_.notify_all();
220
12.3M
  }
221
222
13.5M
  if (mem_tracker_ && st.memory_footprint) {
223
7.79M
    mem_tracker_->Release(st.memory_footprint);
224
7.79M
  }
225
226
13.5M
  if (operation_type != OperationType::kEmpty) {
227
7.79M
    if (applied_op_ids) {
228
7.79M
      applied_op_ids->push_back(op_id);
229
410
    } else if (post_tracker_) {
230
410
      post_tracker_(op_id);
231
410
    }
232
7.79M
  }
233
13.5M
}
234
235
6.45M
std::vector<scoped_refptr<OperationDriver>> OperationTracker::GetPendingOperations() const {
236
6.45M
  std::lock_guard<std::mutex> lock(mutex_);
237
6.45M
  return GetPendingOperationsUnlocked();
238
6.45M
}
239
240
6.45M
std::vector<scoped_refptr<OperationDriver>> OperationTracker::GetPendingOperationsUnlocked() const {
241
6.45M
  std::vector<scoped_refptr<OperationDriver>> result;
242
6.45M
  result.reserve(pending_operations_.size());
243
278k
  for (const auto& e : pending_operations_) {
244
278k
    result.push_back(e.first);
245
278k
  }
246
6.45M
  return result;
247
6.45M
}
248
249
250
0
size_t OperationTracker::TEST_GetNumPending() const {
251
0
  std::lock_guard<std::mutex> l(mutex_);
252
0
  return pending_operations_.size();
253
0
}
254
255
47.7k
void OperationTracker::WaitForAllToFinish() const {
256
  // Wait indefinitely.
257
47.7k
  CHECK_OK(WaitForAllToFinish(MonoDelta::FromNanoseconds(std::numeric_limits<int64_t>::max())));
258
47.7k
}
259
260
Status OperationTracker::WaitForAllToFinish(const MonoDelta& timeout) const
261
49.7k
    NO_THREAD_SAFETY_ANALYSIS {
262
49.7k
  const MonoDelta kComplainInterval = 1000ms * kTimeMultiplier;
263
49.7k
  MonoDelta wait_time = 250ms * kTimeMultiplier;
264
49.7k
  int num_complaints = 0;
265
49.7k
  MonoTime start_time = MonoTime::Now();
266
49.7k
  auto operations = GetPendingOperations();
267
49.7k
  if (operations.empty()) {
268
49.7k
    return Status::OK();
269
49.7k
  }
270
18.4E
  for (;;) {
271
1
    MonoDelta diff = MonoTime::Now().GetDeltaSince(start_time);
272
1
    if (diff.MoreThan(timeout)) {
273
0
      return STATUS(TimedOut, Substitute("Timed out waiting for all operations to finish. "
274
0
                                         "$0 operations pending. Waited for $1",
275
0
                                         operations.size(), diff.ToString()));
276
0
    }
277
1
    if (diff > kComplainInterval * num_complaints) {
278
1
      LOG_WITH_PREFIX(WARNING)
279
1
          << Format("OperationTracker waiting for $0 outstanding operations to"
280
1
                        " complete now for $1", operations.size(), diff);
281
1
      num_complaints++;
282
1
    }
283
1
    wait_time = std::min<MonoDelta>(wait_time * 5 / 4, 1s);
284
285
1
    LOG_WITH_PREFIX(INFO) << "Dumping currently running operations: ";
286
1
    for (scoped_refptr<OperationDriver> driver : operations) {
287
1
      LOG_WITH_PREFIX(INFO) << driver->ToString();
288
1
    }
289
1
    {
290
1
      std::unique_lock<std::mutex> lock(mutex_);
291
1
      if (pending_operations_.empty()) {
292
0
        break;
293
0
      }
294
1
      if (cond_.wait_for(lock, wait_time.ToSteadyDuration()) == std::cv_status::no_timeout &&
295
1
          pending_operations_.empty()) {
296
1
        break;
297
1
      }
298
0
      operations = GetPendingOperationsUnlocked();
299
0
    }
300
0
  }
301
18.4E
  return Status::OK();
302
18.4E
}
303
304
void OperationTracker::StartInstrumentation(
305
88.7k
    const scoped_refptr<MetricEntity>& metric_entity) {
306
88.7k
  metrics_.reset(new Metrics(metric_entity));
307
88.7k
}
308
309
void OperationTracker::StartMemoryTracking(
310
88.7k
    const shared_ptr<MemTracker>& parent_mem_tracker) {
311
88.7k
  if (FLAGS_tablet_operation_memory_limit_mb != -1) {
312
88.7k
    mem_tracker_ = MemTracker::CreateTracker(
313
88.7k
        FLAGS_tablet_operation_memory_limit_mb * 1024 * 1024,
314
88.7k
        "operation_tracker",
315
88.7k
        parent_mem_tracker);
316
88.7k
  }
317
88.7k
}
318
319
}  // namespace tablet
320
}  // namespace yb