YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/client/transaction_pool.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/transaction_pool.h"
15
16
#include <deque>
17
18
#include <gflags/gflags.h>
19
20
#include "yb/client/batcher.h"
21
#include "yb/client/client.h"
22
#include "yb/client/transaction.h"
23
#include "yb/client/transaction_manager.h"
24
25
#include "yb/gutil/thread_annotations.h"
26
27
#include "yb/rpc/messenger.h"
28
#include "yb/rpc/scheduler.h"
29
30
#include "yb/util/flag_tags.h"
31
#include "yb/util/metrics.h"
32
#include "yb/util/result.h"
33
34
using namespace std::literals;
35
using namespace std::placeholders;
36
37
DEFINE_int32(transaction_pool_cleanup_interval_ms, 5000,
38
             "How frequently we should cleanup transaction pool");
39
40
DEFINE_double(transaction_pool_reserve_factor, 2,
41
              "During cleanup we will preserve number of transactions in pool that equals to"
42
                  " average number or take requests during prepration multiplied by this factor");
43
44
DEFINE_bool(force_global_transactions, false,
45
            "Force all transactions to be global transactions");
46
47
DEFINE_test_flag(bool, track_last_transaction, false,
48
                 "Keep track of the last transaction taken from pool for testing");
49
50
METRIC_DEFINE_coarse_histogram(
51
    server, transaction_pool_cache, "Rate of hitting transaction pool cache",
52
    yb::MetricUnit::kCacheHits, "Rate of hitting transaction pool cache");
53
54
METRIC_DEFINE_counter(server, transaction_pool_cache_hits,
55
                      "Total number of hits in transaction pool cache", yb::MetricUnit::kCacheHits,
56
                      "Total number of hits in transaction pool cache");
57
METRIC_DEFINE_counter(server, transaction_pool_cache_queries,
58
                      "Total number of queries to transaction pool cache",
59
                      yb::MetricUnit::kCacheQueries,
60
                      "Total number of queries to transaction pool cache");
61
62
METRIC_DEFINE_gauge_uint32(
63
    server, transaction_pool_preparing, "Number of preparing transactions in pool",
64
    yb::MetricUnit::kTransactions, "Number of preparing transactions in pool");
65
66
METRIC_DEFINE_gauge_uint32(
67
    server, transaction_pool_prepared, "Number of prepared transactions in pool",
68
    yb::MetricUnit::kTransactions, "Number of prepared transactions in pool");
69
70
namespace yb {
71
namespace client {
72
73
namespace {
74
75
// Transaction pool where all transactions have a specific locality (LOCAL or GLOBAL).
76
class SingleLocalityPool {
77
 public:
78
  SingleLocalityPool(TransactionManager* manager,
79
                     MetricEntity* metric_entity,
80
                     TransactionLocality locality)
81
2.09k
      : manager_(*manager), locality_(locality) {
82
2.09k
    if (metric_entity) {
83
2.09k
      cache_histogram_ = METRIC_transaction_pool_cache.Instantiate(metric_entity);
84
2.09k
      cache_hits_ = METRIC_transaction_pool_cache_hits.Instantiate(metric_entity);
85
2.09k
      cache_queries_ = METRIC_transaction_pool_cache_queries.Instantiate(metric_entity);
86
2.09k
      gauge_preparing_ = METRIC_transaction_pool_preparing.Instantiate(metric_entity, 0);
87
2.09k
      gauge_prepared_ = METRIC_transaction_pool_prepared.Instantiate(metric_entity, 0);
88
2.09k
    }
89
2.09k
  }
90
91
0
  ~SingleLocalityPool() {
92
0
    std::unique_lock<std::mutex> lock(mutex_);
93
0
    closing_ = true;
94
0
    if (scheduled_task_ != rpc::kUninitializedScheduledTaskId) {
95
0
      manager_.client()->messenger()->scheduler().Abort(scheduled_task_);
96
0
    }
97
    // Have to use while, since GUARDED_BY does not understand cond wait with predicate.
98
0
    while (!Idle()) {
99
0
      cond_.wait(lock);
100
0
    }
101
0
  }
102
103
405k
  YBTransactionPtr Take(CoarseTimePoint deadline) {
104
405k
    YBTransactionPtr result, new_txn;
105
405k
    uint64_t old_taken;
106
405k
    IncrementCounter(cache_queries_);
107
405k
    {
108
405k
      std::lock_guard<std::mutex> lock(mutex_);
109
405k
      old_taken = taken_transactions_;
110
405k
      ++taken_transactions_;
111
      // We create new transaction on each take request, does not matter whether is was
112
      // newly created or not. So number of transactions in pool will near average number of take
113
      // requests during transaction preparation.
114
405k
      if (transactions_.empty()) {
115
        // Transaction is automatically prepared when batcher is executed, so we don't have to
116
        // prepare newly created transaction, since it is anyway too late.
117
4.82k
        result = std::make_shared<YBTransaction>(&manager_, locality_);
118
4.82k
        IncrementHistogram(cache_histogram_, 0);
119
400k
      } else {
120
400k
        result = Pop();
121
        // Cache histogram should show number of cache hits in percents, so we put 100 in case of
122
        // hit.
123
400k
        IncrementHistogram(cache_histogram_, 100);
124
400k
        IncrementCounter(cache_hits_);
125
400k
      }
126
405k
      new_txn = std::make_shared<YBTransaction>(&manager_, locality_);
127
405k
      ++preparing_transactions_;
128
405k
    }
129
405k
    IncrementGauge(gauge_preparing_);
130
405k
    internal::InFlightOpsGroupsWithMetadata ops_info;
131
405k
    if (new_txn->batcher_if().Prepare(
132
405k
        &ops_info, ForceConsistentRead::kFalse, deadline, Initial::kFalse,
133
405k
        std::bind(&SingleLocalityPool::TransactionReady, this, _1, new_txn, old_taken))) {
134
0
      TransactionReady(Status::OK(), new_txn, old_taken);
135
0
    }
136
405k
    return result;
137
405k
  }
138
139
 private:
140
  void TransactionReady(
141
404k
      const Status& status, const YBTransactionPtr& txn, uint64_t taken_before_creation) {
142
404k
    if (status.ok()) {
143
404k
      IncrementGauge(gauge_prepared_);
144
404k
    }
145
404k
    DecrementGauge(gauge_preparing_);
146
147
404k
    std::lock_guard<std::mutex> lock(mutex_);
148
404k
    if (
status.ok()404k
) {
149
404k
      uint64_t taken_during_preparation = taken_transactions_ - taken_before_creation;
150
404k
      taken_during_preparation_sum_ += taken_during_preparation;
151
404k
      transactions_.push_back({ txn, taken_during_preparation });
152
404k
    }
153
404k
    --preparing_transactions_;
154
404k
    if (CheckClosing()) {
155
0
      return;
156
0
    }
157
404k
    if (transactions_.size() == 1 && 
scheduled_task_ == rpc::kUninitializedScheduledTaskId52.7k
) {
158
1.44k
      ScheduleCleanup();
159
1.44k
    }
160
404k
  }
161
162
4.69k
  void ScheduleCleanup() REQUIRES(mutex_) {
163
4.69k
    scheduled_task_ = manager_.client()->messenger()->scheduler().Schedule(
164
4.69k
        std::bind(&SingleLocalityPool::Cleanup, this, _1),
165
4.69k
        FLAGS_transaction_pool_cleanup_interval_ms * 1ms);
166
4.69k
  }
167
168
3.77k
  void Cleanup(const Status& status) {
169
3.77k
    std::lock_guard<std::mutex> lock(mutex_);
170
3.77k
    scheduled_task_ = rpc::kUninitializedScheduledTaskId;
171
3.77k
    if (CheckClosing()) {
172
0
      return;
173
0
    }
174
175
3.77k
    if (taken_transactions_at_last_cleanup_ == taken_transactions_) {
176
      // No transactions were taken since last cleanup, could abort all transactions.
177
1.03k
      while (!transactions_.empty()) {
178
538
        Pop()->Abort();
179
538
      }
180
496
      return;
181
496
    }
182
3.28k
    taken_transactions_at_last_cleanup_ = taken_transactions_;
183
184
3.28k
#ifndef NDEBUG
185
    // Check that taken_during_preparation_sum_ reflects actual sum of taken_during_preparation,
186
    // of transactions in pool.
187
3.28k
    {
188
3.28k
      uint64_t taken_during_preparation_sum = 0;
189
5.58k
      for (const auto& entry : transactions_) {
190
5.58k
        taken_during_preparation_sum += entry.taken_during_preparation;
191
5.58k
      }
192
3.28k
      CHECK_EQ(taken_during_preparation_sum, taken_during_preparation_sum_);
193
3.28k
    }
194
3.28k
#endif
195
    // For each prepared transaction we know number of take requests that happened while this
196
    // transaction were prepared. So we try to keep number of prepared transactions
197
    // as average of this value + 50%.
198
    // taken_during_preparation_sum_ is sum of this values so we should divide it by size
199
    // to get average value.
200
201
3.28k
    size_t size = transactions_.size();
202
203
    // Skip cleanup if we have too small amount of prepared transactions.
204
3.28k
    if (preparing_transactions_ < size) {
205
3.19k
      size_t min_size = size * 4 / 5;
206
3.61k
      while (size > min_size &&
207
3.61k
          ((size + preparing_transactions_) * size >
208
3.32k
              taken_during_preparation_sum_ * FLAGS_transaction_pool_reserve_factor)) {
209
426
        Pop()->Abort();
210
426
        --size;
211
426
      }
212
3.19k
    }
213
3.28k
    if (!transactions_.empty()) {
214
3.25k
      ScheduleCleanup();
215
3.25k
    }
216
3.28k
  }
217
218
402k
  YBTransactionPtr Pop() REQUIRES(mutex_) {
219
402k
    DecrementGauge(gauge_prepared_);
220
402k
    YBTransactionPtr result = std::move(transactions_.front().transaction);
221
402k
    taken_during_preparation_sum_ -= transactions_.front().taken_during_preparation;
222
402k
    transactions_.pop_front();
223
402k
    return result;
224
402k
  }
225
226
408k
  bool CheckClosing() REQUIRES(mutex_) {
227
408k
    if (!closing_) {
228
408k
      return false;
229
408k
    }
230
0
    if (Idle()) {
231
0
      cond_.notify_all();
232
0
    }
233
0
    return true;
234
408k
  }
235
236
0
  bool Idle() const REQUIRES(mutex_) {
237
0
    LOG(INFO) << "preparing_transactions: " << preparing_transactions_
238
0
              << ", scheduled_task: " << scheduled_task_;
239
0
    return preparing_transactions_ == 0 && scheduled_task_ == rpc::kUninitializedScheduledTaskId;
240
0
  }
241
242
  struct TransactionEntry {
243
    YBTransactionPtr transaction;
244
    uint64_t taken_during_preparation;
245
  };
246
247
  TransactionManager& manager_;
248
  TransactionLocality locality_;
249
  scoped_refptr<Histogram> cache_histogram_;
250
  scoped_refptr<Counter> cache_hits_;
251
  scoped_refptr<Counter> cache_queries_;
252
  scoped_refptr<AtomicGauge<uint32_t>> gauge_preparing_;
253
  scoped_refptr<AtomicGauge<uint32_t>> gauge_prepared_;
254
  std::mutex mutex_;
255
  std::condition_variable cond_;
256
  std::deque<TransactionEntry> transactions_ GUARDED_BY(mutex_);
257
  bool closing_ GUARDED_BY(mutex_) = false;
258
  size_t preparing_transactions_ GUARDED_BY(mutex_) = 0;
259
  rpc::ScheduledTaskId scheduled_task_ GUARDED_BY(mutex_) = rpc::kUninitializedScheduledTaskId;
260
  uint64_t taken_transactions_ GUARDED_BY(mutex_) = 0;
261
  uint64_t taken_during_preparation_sum_ GUARDED_BY(mutex_) = 0;
262
  uint64_t taken_transactions_at_last_cleanup_ GUARDED_BY(mutex_) = 0;
263
};
264
} // namespace
265
266
class TransactionPool::Impl {
267
 public:
268
  Impl(TransactionManager* manager, MetricEntity* metric_entity)
269
      : manager_(manager),
270
        global_pool_(manager, metric_entity, TransactionLocality::GLOBAL),
271
1.04k
        local_pool_(manager, metric_entity, TransactionLocality::LOCAL) {
272
1.04k
  }
273
274
0
  ~Impl() = default;
275
276
  YBTransactionPtr Take(
277
404k
      ForceGlobalTransaction force_global_transaction, CoarseTimePoint deadline) EXCLUDES(mutex_) {
278
404k
    const auto is_global = force_global_transaction ||
279
404k
                           
FLAGS_force_global_transactions265k
||
280
404k
                           
!manager_->PlacementLocalTransactionsPossible()266k
;
281
18.4E
    auto transaction = (
is_global404k
?
&global_pool_405k
: &local_pool_)->Take(deadline);
282
404k
    if (FLAGS_TEST_track_last_transaction) {
283
0
      std::lock_guard<std::mutex> lock(mutex_);
284
0
      last_transaction_ = transaction;
285
0
    }
286
404k
    return transaction;
287
404k
  }
288
289
0
  YBTransactionPtr TEST_GetLastTransaction() EXCLUDES(mutex_) {
290
0
    std::lock_guard<std::mutex> lock(mutex_);
291
0
    return last_transaction_;
292
0
  }
293
 private:
294
  TransactionManager* manager_;
295
  SingleLocalityPool global_pool_;
296
  SingleLocalityPool local_pool_;
297
298
  std::mutex mutex_;
299
  YBTransactionPtr last_transaction_ GUARDED_BY(mutex_);
300
};
301
302
TransactionPool::TransactionPool(TransactionManager* manager, MetricEntity* metric_entity)
303
1.04k
    : impl_(new Impl(manager, metric_entity)) {
304
1.04k
}
305
306
0
TransactionPool::~TransactionPool() {
307
0
}
308
309
YBTransactionPtr TransactionPool::Take(
310
387k
    ForceGlobalTransaction force_global_transaction, CoarseTimePoint deadline) {
311
387k
  return impl_->Take(force_global_transaction, deadline);
312
387k
}
313
314
Result<YBTransactionPtr> TransactionPool::TakeAndInit(
315
18.8k
    IsolationLevel isolation, CoarseTimePoint deadline, const ReadHybridTime& read_time) {
316
18.8k
  auto result = impl_->Take(ForceGlobalTransaction::kTrue, deadline);
317
18.8k
  RETURN_NOT_OK(result->Init(isolation, read_time));
318
18.8k
  return result;
319
18.8k
}
320
321
Result<YBTransactionPtr> TransactionPool::TakeRestarted(
322
0
    const YBTransactionPtr& source, CoarseTimePoint deadline) {
323
0
  const auto &metadata = source->GetMetadata(deadline).get();
324
0
  RETURN_NOT_OK(metadata);
325
0
  const auto force_global =
326
0
      metadata->locality == TransactionLocality::GLOBAL ? ForceGlobalTransaction::kTrue
327
0
                                                        : ForceGlobalTransaction::kFalse;
328
0
  auto result = impl_->Take(force_global, deadline);
329
0
  RETURN_NOT_OK(source->FillRestartedTransaction(result));
330
0
  return result;
331
0
}
332
333
0
YBTransactionPtr TransactionPool::TEST_GetLastTransaction() {
334
0
  return impl_->TEST_GetLastTransaction();
335
0
}
336
337
} // namespace client
338
} // namespace yb