YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/operations/operation_tracker.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/operations/operation_tracker.h"
34
35
#include <algorithm>
36
#include <limits>
37
#include <vector>
38
39
#include "yb/gutil/map-util.h"
40
#include "yb/gutil/strings/substitute.h"
41
42
#include "yb/tablet/operations/operation_driver.h"
43
#include "yb/tablet/tablet.h"
44
45
#include "yb/util/flag_tags.h"
46
#include "yb/util/logging.h"
47
#include "yb/util/mem_tracker.h"
48
#include "yb/util/metrics.h"
49
#include "yb/util/monotime.h"
50
#include "yb/util/status_log.h"
51
#include "yb/util/tsan_util.h"
52
53
DEFINE_int64(tablet_operation_memory_limit_mb, 1024,
54
             "Maximum amount of memory that may be consumed by all in-flight "
55
             "operations belonging to a particular tablet. When this limit "
56
             "is reached, new operations will be rejected and clients will "
57
             "be forced to retry them. If -1, operation memory tracking is "
58
             "disabled.");
59
TAG_FLAG(tablet_operation_memory_limit_mb, advanced);
60
61
METRIC_DEFINE_gauge_uint64(tablet, all_operations_inflight,
62
                           "Operations In Flight",
63
                           yb::MetricUnit::kOperations,
64
                           "Number of operations currently in-flight, including any type.");
65
METRIC_DEFINE_gauge_uint64(tablet, write_operations_inflight,
66
                           "Write Operations In Flight",
67
                           yb::MetricUnit::kOperations,
68
                           "Number of write operations currently in-flight");
69
METRIC_DEFINE_gauge_uint64(tablet, alter_schema_operations_inflight,
70
                           "Change Metadata Operations In Flight",
71
                           yb::MetricUnit::kOperations,
72
                           "Number of change metadata operations currently in-flight");
73
METRIC_DEFINE_gauge_uint64(tablet, update_transaction_operations_inflight,
74
                           "Update Transaction Operations In Flight",
75
                           yb::MetricUnit::kOperations,
76
                           "Number of update transaction operations currently in-flight");
77
METRIC_DEFINE_gauge_uint64(tablet, snapshot_operations_inflight,
78
                           "Snapshot Operations In Flight",
79
                           yb::MetricUnit::kOperations,
80
                           "Number of snapshot operations currently in-flight");
81
METRIC_DEFINE_gauge_uint64(tablet, split_operations_inflight,
82
                           "Split Operations In Flight",
83
                           yb::MetricUnit::kOperations,
84
                           "Number of split operations currently in-flight");
85
METRIC_DEFINE_gauge_uint64(tablet, truncate_operations_inflight,
86
                           "Truncate Operations In Flight",
87
                           yb::MetricUnit::kOperations,
88
                           "Number of truncate operations currently in-flight");
89
METRIC_DEFINE_gauge_uint64(tablet, empty_operations_inflight,
90
                           "Empty Operations In Flight",
91
                           yb::MetricUnit::kOperations,
92
                           "Number of none operations currently in-flight");
93
METRIC_DEFINE_gauge_uint64(tablet, history_cutoff_operations_inflight,
94
                           "History Cutoff Operations In Flight",
95
                           yb::MetricUnit::kOperations,
96
                           "Number of history cutoff operations currently in-flight");
97
98
METRIC_DEFINE_counter(tablet, operation_memory_pressure_rejections,
99
                      "Operation Memory Pressure Rejections",
100
                      yb::MetricUnit::kOperations,
101
                      "Number of operations rejected because the tablet's "
102
                      "operation memory limit was reached.");
103
104
using namespace std::literals;
105
using std::shared_ptr;
106
using std::vector;
107
108
namespace yb {
109
namespace tablet {
110
111
using strings::Substitute;
112
113
#define MINIT(x) x(METRIC_##x.Instantiate(entity))
114
#define GINIT(x) x(METRIC_##x.Instantiate(entity, 0))
115
#define INSTANTIATE(upper, lower) \
116
1.20M
  operations_inflight[to_underlying(OperationType::BOOST_PP_CAT(k, upper))] = \
117
1.20M
      BOOST_PP_CAT(BOOST_PP_CAT(METRIC_, lower), _operations_inflight).Instantiate(entity, 0);
118
OperationTracker::Metrics::Metrics(const scoped_refptr<MetricEntity>& entity)
119
    : GINIT(all_operations_inflight),
120
150k
      MINIT(operation_memory_pressure_rejections) {
121
150k
  INSTANTIATE(Write, write);
122
150k
  INSTANTIATE(ChangeMetadata, alter_schema);
123
150k
  INSTANTIATE(UpdateTransaction, update_transaction);
124
150k
  INSTANTIATE(Snapshot, snapshot);
125
150k
  INSTANTIATE(Split, split);
126
150k
  INSTANTIATE(Truncate, truncate);
127
150k
  INSTANTIATE(Empty, empty);
128
150k
  INSTANTIATE(HistoryCutoff, history_cutoff);
129
150k
  static_assert(8 == kElementsInOperationType, "Init metrics for all operation types");
130
150k
}
131
#undef INSTANTIATE
132
#undef GINIT
133
#undef MINIT
134
135
OperationTracker::OperationTracker(const std::string& log_prefix)
136
150k
    : log_prefix_(log_prefix) {
137
150k
}
138
139
74.1k
OperationTracker::~OperationTracker() {
140
74.1k
  std::lock_guard<std::mutex> lock(mutex_);
141
74.1k
  CHECK_EQ(pending_operations_.size(), 0);
142
74.1k
  if (mem_tracker_) {
143
74.0k
    mem_tracker_->UnregisterFromParent();
144
74.0k
  }
145
74.1k
}
146
147
31.4M
Status OperationTracker::Add(OperationDriver* driver) {
148
31.4M
  int64_t driver_mem_footprint = driver->SpaceUsed();
149
31.4M
  if (mem_tracker_ && 
!mem_tracker_->TryConsume(driver_mem_footprint)31.4M
) {
150
4
    if (metrics_) {
151
4
      metrics_->operation_memory_pressure_rejections->Increment();
152
4
    }
153
154
    // May be null in unit tests.
155
4
    Tablet* tablet = driver->operation()->tablet();
156
157
4
    string msg = Substitute(
158
4
        "Operation failed, tablet $0 operation memory consumption ($1) "
159
4
        "has exceeded its limit ($2) or the limit of an ancestral tracker",
160
4
        tablet ? tablet->tablet_id() : 
"(unknown)"0
,
161
4
        mem_tracker_->consumption(), mem_tracker_->limit());
162
163
4
    YB_LOG_EVERY_N_SECS(WARNING, 1) << msg << THROTTLE_MSG;
164
165
4
    return STATUS(ServiceUnavailable, msg);
166
4
  }
167
168
31.4M
  IncrementCounters(*driver);
169
170
  // Cache the operation memory footprint so we needn't refer to the request
171
  // again, as it may disappear between now and then.
172
31.4M
  State st;
173
31.4M
  st.memory_footprint = driver_mem_footprint;
174
31.4M
  std::lock_guard<std::mutex> lock(mutex_);
175
31.4M
  CHECK(pending_operations_.emplace(driver, st).second);
176
31.4M
  return Status::OK();
177
31.4M
}
178
179
31.4M
void OperationTracker::IncrementCounters(const OperationDriver& driver) const {
180
31.4M
  if (!metrics_) {
181
0
    return;
182
0
  }
183
184
31.4M
  metrics_->all_operations_inflight->Increment();
185
31.4M
  metrics_->operations_inflight[to_underlying(driver.operation_type())]->Increment();
186
31.4M
}
187
188
31.4M
void OperationTracker::DecrementCounters(const OperationDriver& driver) const {
189
31.4M
  if (!metrics_) {
190
0
    return;
191
0
  }
192
193
31.4M
  DCHECK_GT(metrics_->all_operations_inflight->value(), 0);
194
31.4M
  metrics_->all_operations_inflight->Decrement();
195
31.4M
  auto index = to_underlying(driver.operation_type());
196
31.4M
  DCHECK_GT(metrics_->operations_inflight[index]->value(), 0);
197
31.4M
  metrics_->operations_inflight[index]->Decrement();
198
31.4M
}
199
200
31.4M
void OperationTracker::Release(OperationDriver* driver, OpIds* applied_op_ids) {
201
31.4M
  DecrementCounters(*driver);
202
203
31.4M
  State st;
204
31.4M
  yb::OpId op_id = driver->GetOpId();
205
31.4M
  OperationType operation_type = driver->operation_type();
206
31.4M
  bool notify;
207
31.4M
  {
208
    // Remove the operation from the map, retaining the state for use
209
    // below.
210
31.4M
    std::lock_guard<std::mutex> lock(mutex_);
211
31.4M
    st = FindOrDie(pending_operations_, driver);
212
31.4M
    if (PREDICT_FALSE(pending_operations_.erase(driver) != 1)) {
213
0
      LOG_WITH_PREFIX(FATAL) << "Could not remove pending operation from map: "
214
0
          << driver->ToStringUnlocked();
215
0
    }
216
31.4M
    notify = pending_operations_.empty();
217
31.4M
  }
218
31.4M
  if (notify) {
219
29.3M
    cond_.notify_all();
220
29.3M
  }
221
222
31.4M
  if (mem_tracker_ && 
st.memory_footprint31.4M
) {
223
14.3M
    mem_tracker_->Release(st.memory_footprint);
224
14.3M
  }
225
226
31.4M
  if (operation_type != OperationType::kEmpty) {
227
14.3M
    if (applied_op_ids) {
228
14.3M
      applied_op_ids->push_back(op_id);
229
14.3M
    } else 
if (546
post_tracker_546
) {
230
605
      post_tracker_(op_id);
231
605
    }
232
14.3M
  }
233
31.4M
}
234
235
50.2M
std::vector<scoped_refptr<OperationDriver>> OperationTracker::GetPendingOperations() const {
236
50.2M
  std::lock_guard<std::mutex> lock(mutex_);
237
50.2M
  return GetPendingOperationsUnlocked();
238
50.2M
}
239
240
50.2M
std::vector<scoped_refptr<OperationDriver>> OperationTracker::GetPendingOperationsUnlocked() const {
241
50.2M
  std::vector<scoped_refptr<OperationDriver>> result;
242
50.2M
  result.reserve(pending_operations_.size());
243
50.2M
  for (const auto& e : pending_operations_) {
244
963k
    result.push_back(e.first);
245
963k
  }
246
50.2M
  return result;
247
50.2M
}
248
249
250
0
size_t OperationTracker::TEST_GetNumPending() const {
251
0
  std::lock_guard<std::mutex> l(mutex_);
252
0
  return pending_operations_.size();
253
0
}
254
255
75.6k
void OperationTracker::WaitForAllToFinish() const {
256
  // Wait indefinitely.
257
75.6k
  CHECK_OK(WaitForAllToFinish(MonoDelta::FromNanoseconds(std::numeric_limits<int64_t>::max())));
258
75.6k
}
259
260
Status OperationTracker::WaitForAllToFinish(const MonoDelta& timeout) const
261
78.6k
    NO_THREAD_SAFETY_ANALYSIS {
262
78.6k
  const MonoDelta kComplainInterval = 1000ms * kTimeMultiplier;
263
78.6k
  MonoDelta wait_time = 250ms * kTimeMultiplier;
264
78.6k
  int num_complaints = 0;
265
78.6k
  MonoTime start_time = MonoTime::Now();
266
78.6k
  auto operations = GetPendingOperations();
267
78.6k
  if (operations.empty()) {
268
78.6k
    return Status::OK();
269
78.6k
  }
270
3
  for (;;) {
271
0
    MonoDelta diff = MonoTime::Now().GetDeltaSince(start_time);
272
0
    if (diff.MoreThan(timeout)) {
273
0
      return STATUS(TimedOut, Substitute("Timed out waiting for all operations to finish. "
274
0
                                         "$0 operations pending. Waited for $1",
275
0
                                         operations.size(), diff.ToString()));
276
0
    }
277
0
    if (diff > kComplainInterval * num_complaints) {
278
0
      LOG_WITH_PREFIX(WARNING)
279
0
          << Format("OperationTracker waiting for $0 outstanding operations to"
280
0
                        " complete now for $1", operations.size(), diff);
281
0
      num_complaints++;
282
0
    }
283
0
    wait_time = std::min<MonoDelta>(wait_time * 5 / 4, 1s);
284
285
0
    LOG_WITH_PREFIX(INFO) << "Dumping currently running operations: ";
286
0
    for (scoped_refptr<OperationDriver> driver : operations) {
287
0
      LOG_WITH_PREFIX(INFO) << driver->ToString();
288
0
    }
289
0
    {
290
0
      std::unique_lock<std::mutex> lock(mutex_);
291
0
      if (pending_operations_.empty()) {
292
0
        break;
293
0
      }
294
0
      if (cond_.wait_for(lock, wait_time.ToSteadyDuration()) == std::cv_status::no_timeout &&
295
0
          pending_operations_.empty()) {
296
0
        break;
297
0
      }
298
0
      operations = GetPendingOperationsUnlocked();
299
0
    }
300
0
  }
301
3
  return Status::OK();
302
3
}
303
304
void OperationTracker::StartInstrumentation(
305
150k
    const scoped_refptr<MetricEntity>& metric_entity) {
306
150k
  metrics_.reset(new Metrics(metric_entity));
307
150k
}
308
309
void OperationTracker::StartMemoryTracking(
310
150k
    const shared_ptr<MemTracker>& parent_mem_tracker) {
311
150k
  if (FLAGS_tablet_operation_memory_limit_mb != -1) {
312
150k
    mem_tracker_ = MemTracker::CreateTracker(
313
150k
        FLAGS_tablet_operation_memory_limit_mb * 1024 * 1024,
314
150k
        "operation_tracker",
315
150k
        parent_mem_tracker);
316
150k
  }
317
150k
}
318
319
}  // namespace tablet
320
}  // namespace yb