YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/transaction_pool.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/transaction_pool.h"
15
16
#include <deque>
17
18
#include <gflags/gflags.h>
19
20
#include "yb/client/batcher.h"
21
#include "yb/client/client.h"
22
#include "yb/client/transaction.h"
23
#include "yb/client/transaction_manager.h"
24
25
#include "yb/gutil/thread_annotations.h"
26
27
#include "yb/rpc/messenger.h"
28
#include "yb/rpc/scheduler.h"
29
30
#include "yb/util/flag_tags.h"
31
#include "yb/util/metrics.h"
32
#include "yb/util/result.h"
33
34
using namespace std::literals;
35
using namespace std::placeholders;
36
37
DEFINE_int32(transaction_pool_cleanup_interval_ms, 5000,
38
             "How frequently we should cleanup transaction pool");
39
40
DEFINE_double(transaction_pool_reserve_factor, 2,
41
              "During cleanup we will preserve number of transactions in pool that equals to"
42
                  " average number or take requests during prepration multiplied by this factor");
43
44
DEFINE_bool(force_global_transactions, false,
45
            "Force all transactions to be global transactions");
46
47
DEFINE_test_flag(bool, track_last_transaction, false,
48
                 "Keep track of the last transaction taken from pool for testing");
49
50
METRIC_DEFINE_coarse_histogram(
51
    server, transaction_pool_cache, "Rate of hitting transaction pool cache",
52
    yb::MetricUnit::kCacheHits, "Rate of hitting transaction pool cache");
53
54
METRIC_DEFINE_counter(server, transaction_pool_cache_hits,
55
                      "Total number of hits in transaction pool cache", yb::MetricUnit::kCacheHits,
56
                      "Total number of hits in transaction pool cache");
57
METRIC_DEFINE_counter(server, transaction_pool_cache_queries,
58
                      "Total number of queries to transaction pool cache",
59
                      yb::MetricUnit::kCacheQueries,
60
                      "Total number of queries to transaction pool cache");
61
62
METRIC_DEFINE_gauge_uint32(
63
    server, transaction_pool_preparing, "Number of preparing transactions in pool",
64
    yb::MetricUnit::kTransactions, "Number of preparing transactions in pool");
65
66
METRIC_DEFINE_gauge_uint32(
67
    server, transaction_pool_prepared, "Number of prepared transactions in pool",
68
    yb::MetricUnit::kTransactions, "Number of prepared transactions in pool");
69
70
namespace yb {
71
namespace client {
72
73
namespace {
74
75
// Transaction pool where all transactions have a specific locality (LOCAL or GLOBAL).
76
class SingleLocalityPool {
77
 public:
78
  SingleLocalityPool(TransactionManager* manager,
79
                     MetricEntity* metric_entity,
80
                     TransactionLocality locality)
81
1.31k
      : manager_(*manager), locality_(locality) {
82
1.31k
    if (metric_entity) {
83
1.31k
      cache_histogram_ = METRIC_transaction_pool_cache.Instantiate(metric_entity);
84
1.31k
      cache_hits_ = METRIC_transaction_pool_cache_hits.Instantiate(metric_entity);
85
1.31k
      cache_queries_ = METRIC_transaction_pool_cache_queries.Instantiate(metric_entity);
86
1.31k
      gauge_preparing_ = METRIC_transaction_pool_preparing.Instantiate(metric_entity, 0);
87
1.31k
      gauge_prepared_ = METRIC_transaction_pool_prepared.Instantiate(metric_entity, 0);
88
1.31k
    }
89
1.31k
  }
90
91
0
  ~SingleLocalityPool() {
92
0
    std::unique_lock<std::mutex> lock(mutex_);
93
0
    closing_ = true;
94
0
    if (scheduled_task_ != rpc::kUninitializedScheduledTaskId) {
95
0
      manager_.client()->messenger()->scheduler().Abort(scheduled_task_);
96
0
    }
97
    // Have to use while, since GUARDED_BY does not understand cond wait with predicate.
98
0
    while (!Idle()) {
99
0
      cond_.wait(lock);
100
0
    }
101
0
  }
102
103
238k
  YBTransactionPtr Take() {
104
238k
    YBTransactionPtr result, new_txn;
105
238k
    uint64_t old_taken;
106
238k
    IncrementCounter(cache_queries_);
107
238k
    {
108
238k
      std::lock_guard<std::mutex> lock(mutex_);
109
238k
      old_taken = taken_transactions_;
110
238k
      ++taken_transactions_;
111
      // We create new transaction on each take request, does not matter whether is was
112
      // newly created or not. So number of transactions in pool will near average number of take
113
      // requests during transaction preparation.
114
238k
      if (transactions_.empty()) {
115
        // Transaction is automatically prepared when batcher is executed, so we don't have to
116
        // prepare newly created transaction, since it is anyway too late.
117
4.15k
        result = std::make_shared<YBTransaction>(&manager_, locality_);
118
4.15k
        IncrementHistogram(cache_histogram_, 0);
119
234k
      } else {
120
234k
        result = Pop();
121
        // Cache histogram should show number of cache hits in percents, so we put 100 in case of
122
        // hit.
123
234k
        IncrementHistogram(cache_histogram_, 100);
124
234k
        IncrementCounter(cache_hits_);
125
234k
      }
126
238k
      new_txn = std::make_shared<YBTransaction>(&manager_, locality_);
127
238k
      ++preparing_transactions_;
128
238k
    }
129
238k
    IncrementGauge(gauge_preparing_);
130
238k
    internal::InFlightOpsGroupsWithMetadata ops_info;
131
238k
    if (new_txn->batcher_if().Prepare(
132
238k
        &ops_info, ForceConsistentRead::kFalse, TransactionRpcDeadline(), Initial::kFalse,
133
0
        std::bind(&SingleLocalityPool::TransactionReady, this, _1, new_txn, old_taken))) {
134
0
      TransactionReady(Status::OK(), new_txn, old_taken);
135
0
    }
136
238k
    return result;
137
238k
  }
138
139
 private:
140
  void TransactionReady(
141
238k
      const Status& status, const YBTransactionPtr& txn, uint64_t taken_before_creation) {
142
238k
    if (status.ok()) {
143
238k
      IncrementGauge(gauge_prepared_);
144
238k
    }
145
238k
    DecrementGauge(gauge_preparing_);
146
147
238k
    std::lock_guard<std::mutex> lock(mutex_);
148
238k
    if (status.ok()) {
149
238k
      uint64_t taken_during_preparation = taken_transactions_ - taken_before_creation;
150
238k
      taken_during_preparation_sum_ += taken_during_preparation;
151
238k
      transactions_.push_back({ txn, taken_during_preparation });
152
238k
    }
153
238k
    --preparing_transactions_;
154
238k
    if (CheckClosing()) {
155
0
      return;
156
0
    }
157
238k
    if (transactions_.size() == 1 && scheduled_task_ == rpc::kUninitializedScheduledTaskId) {
158
787
      ScheduleCleanup();
159
787
    }
160
238k
  }
161
162
2.05k
  void ScheduleCleanup() REQUIRES(mutex_) {
163
2.05k
    scheduled_task_ = manager_.client()->messenger()->scheduler().Schedule(
164
2.05k
        std::bind(&SingleLocalityPool::Cleanup, this, _1),
165
2.05k
        FLAGS_transaction_pool_cleanup_interval_ms * 1ms);
166
2.05k
  }
167
168
1.47k
  void Cleanup(const Status& status) {
169
1.47k
    std::lock_guard<std::mutex> lock(mutex_);
170
1.47k
    scheduled_task_ = rpc::kUninitializedScheduledTaskId;
171
1.47k
    if (CheckClosing()) {
172
0
      return;
173
0
    }
174
175
1.47k
    if (taken_transactions_at_last_cleanup_ == taken_transactions_) {
176
      // No transactions were taken since last cleanup, could abort all transactions.
177
730
      while (!transactions_.empty()) {
178
540
        Pop()->Abort();
179
540
      }
180
190
      return;
181
190
    }
182
1.28k
    taken_transactions_at_last_cleanup_ = taken_transactions_;
183
184
1.28k
#ifndef NDEBUG
185
    // Check that taken_during_preparation_sum_ reflects actual sum of taken_during_preparation,
186
    // of transactions in pool.
187
1.28k
    {
188
1.28k
      uint64_t taken_during_preparation_sum = 0;
189
4.44k
      for (const auto& entry : transactions_) {
190
4.44k
        taken_during_preparation_sum += entry.taken_during_preparation;
191
4.44k
      }
192
1.28k
      CHECK_EQ(taken_during_preparation_sum, taken_during_preparation_sum_);
193
1.28k
    }
194
1.28k
#endif
195
    // For each prepared transaction we know number of take requests that happened while this
196
    // transaction were prepared. So we try to keep number of prepared transactions
197
    // as average of this value + 50%.
198
    // taken_during_preparation_sum_ is sum of this values so we should divide it by size
199
    // to get average value.
200
201
1.28k
    size_t size = transactions_.size();
202
203
    // Skip cleanup if we have too small amount of prepared transactions.
204
1.28k
    if (preparing_transactions_ < size) {
205
1.24k
      size_t min_size = size * 4 / 5;
206
1.69k
      while (size > min_size &&
207
1.56k
          ((size + preparing_transactions_) * size >
208
454
              taken_during_preparation_sum_ * FLAGS_transaction_pool_reserve_factor)) {
209
454
        Pop()->Abort();
210
454
        --size;
211
454
      }
212
1.24k
    }
213
1.28k
    if (!transactions_.empty()) {
214
1.27k
      ScheduleCleanup();
215
1.27k
    }
216
1.28k
  }
217
218
236k
  YBTransactionPtr Pop() REQUIRES(mutex_) {
219
236k
    DecrementGauge(gauge_prepared_);
220
236k
    YBTransactionPtr result = std::move(transactions_.front().transaction);
221
236k
    taken_during_preparation_sum_ -= transactions_.front().taken_during_preparation;
222
236k
    transactions_.pop_front();
223
236k
    return result;
224
236k
  }
225
226
240k
  bool CheckClosing() REQUIRES(mutex_) {
227
240k
    if (!closing_) {
228
240k
      return false;
229
240k
    }
230
0
    if (Idle()) {
231
0
      cond_.notify_all();
232
0
    }
233
0
    return true;
234
0
  }
235
236
0
  bool Idle() const REQUIRES(mutex_) {
237
0
    LOG(INFO) << "preparing_transactions: " << preparing_transactions_
238
0
              << ", scheduled_task: " << scheduled_task_;
239
0
    return preparing_transactions_ == 0 && scheduled_task_ == rpc::kUninitializedScheduledTaskId;
240
0
  }
241
242
  struct TransactionEntry {
243
    YBTransactionPtr transaction;
244
    uint64_t taken_during_preparation;
245
  };
246
247
  TransactionManager& manager_;
248
  TransactionLocality locality_;
249
  scoped_refptr<Histogram> cache_histogram_;
250
  scoped_refptr<Counter> cache_hits_;
251
  scoped_refptr<Counter> cache_queries_;
252
  scoped_refptr<AtomicGauge<uint32_t>> gauge_preparing_;
253
  scoped_refptr<AtomicGauge<uint32_t>> gauge_prepared_;
254
  std::mutex mutex_;
255
  std::condition_variable cond_;
256
  std::deque<TransactionEntry> transactions_ GUARDED_BY(mutex_);
257
  bool closing_ GUARDED_BY(mutex_) = false;
258
  size_t preparing_transactions_ GUARDED_BY(mutex_) = 0;
259
  rpc::ScheduledTaskId scheduled_task_ GUARDED_BY(mutex_) = rpc::kUninitializedScheduledTaskId;
260
  uint64_t taken_transactions_ GUARDED_BY(mutex_) = 0;
261
  uint64_t taken_during_preparation_sum_ GUARDED_BY(mutex_) = 0;
262
  uint64_t taken_transactions_at_last_cleanup_ GUARDED_BY(mutex_) = 0;
263
};
264
} // namespace
265
266
class TransactionPool::Impl {
267
 public:
268
  Impl(TransactionManager* manager, MetricEntity* metric_entity)
269
      : manager_(manager),
270
        global_pool_(manager, metric_entity, TransactionLocality::GLOBAL),
271
658
        local_pool_(manager, metric_entity, TransactionLocality::LOCAL) {
272
658
  }
273
274
0
  ~Impl() = default;
275
276
238k
  YBTransactionPtr Take(ForceGlobalTransaction force_global_transaction) EXCLUDES(mutex_) {
277
238k
    const auto is_global = force_global_transaction ||
278
96.2k
                           FLAGS_force_global_transactions ||
279
96.2k
                           !manager_->PlacementLocalTransactionsPossible();
280
238k
    auto transaction = (is_global ? &global_pool_ : &local_pool_)->Take();
281
238k
    if (FLAGS_TEST_track_last_transaction) {
282
0
      std::lock_guard<std::mutex> lock(mutex_);
283
0
      last_transaction_ = transaction;
284
0
    }
285
238k
    return transaction;
286
238k
  }
287
288
0
  YBTransactionPtr TEST_GetLastTransaction() EXCLUDES(mutex_) {
289
0
    std::lock_guard<std::mutex> lock(mutex_);
290
0
    return last_transaction_;
291
0
  }
292
 private:
293
  TransactionManager* manager_;
294
  SingleLocalityPool global_pool_;
295
  SingleLocalityPool local_pool_;
296
297
  std::mutex mutex_;
298
  YBTransactionPtr last_transaction_ GUARDED_BY(mutex_);
299
};
300
301
TransactionPool::TransactionPool(TransactionManager* manager, MetricEntity* metric_entity)
302
658
    : impl_(new Impl(manager, metric_entity)) {
303
658
}
304
305
0
TransactionPool::~TransactionPool() {
306
0
}
307
308
233k
YBTransactionPtr TransactionPool::Take(ForceGlobalTransaction force_global_transaction) {
309
233k
  return impl_->Take(force_global_transaction);
310
233k
}
311
312
Result<YBTransactionPtr> TransactionPool::TakeAndInit(
313
5.82k
    IsolationLevel isolation, const ReadHybridTime& read_time) {
314
5.82k
  auto result = impl_->Take(ForceGlobalTransaction::kTrue);
315
5.82k
  RETURN_NOT_OK(result->Init(isolation, read_time));
316
5.82k
  return result;
317
5.82k
}
318
319
0
Result<YBTransactionPtr> TransactionPool::TakeRestarted(const YBTransactionPtr& source) {
320
0
  const auto &metadata = source->GetMetadata().get();
321
0
  RETURN_NOT_OK(metadata);
322
0
  const auto force_global =
323
0
      metadata->locality == TransactionLocality::GLOBAL ? ForceGlobalTransaction::kTrue
324
0
                                                        : ForceGlobalTransaction::kFalse;
325
0
  auto result = impl_->Take(force_global);
326
0
  RETURN_NOT_OK(source->FillRestartedTransaction(result));
327
0
  return result;
328
0
}
329
330
0
YBTransactionPtr TransactionPool::TEST_GetLastTransaction() {
331
0
  return impl_->TEST_GetLastTransaction();
332
0
}
333
334
} // namespace client
335
} // namespace yb