/Users/deen/code/yugabyte-db/src/yb/client/transaction_pool.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/client/transaction_pool.h" |
15 | | |
16 | | #include <deque> |
17 | | |
18 | | #include <gflags/gflags.h> |
19 | | |
20 | | #include "yb/client/batcher.h" |
21 | | #include "yb/client/client.h" |
22 | | #include "yb/client/transaction.h" |
23 | | #include "yb/client/transaction_manager.h" |
24 | | |
25 | | #include "yb/gutil/thread_annotations.h" |
26 | | |
27 | | #include "yb/rpc/messenger.h" |
28 | | #include "yb/rpc/scheduler.h" |
29 | | |
30 | | #include "yb/util/flag_tags.h" |
31 | | #include "yb/util/metrics.h" |
32 | | #include "yb/util/result.h" |
33 | | |
34 | | using namespace std::literals; |
35 | | using namespace std::placeholders; |
36 | | |
37 | | DEFINE_int32(transaction_pool_cleanup_interval_ms, 5000, |
38 | | "How frequently we should cleanup transaction pool"); |
39 | | |
40 | | DEFINE_double(transaction_pool_reserve_factor, 2, |
41 | | "During cleanup we will preserve number of transactions in pool that equals to" |
42 | | " average number or take requests during prepration multiplied by this factor"); |
43 | | |
44 | | DEFINE_bool(force_global_transactions, false, |
45 | | "Force all transactions to be global transactions"); |
46 | | |
47 | | DEFINE_test_flag(bool, track_last_transaction, false, |
48 | | "Keep track of the last transaction taken from pool for testing"); |
49 | | |
50 | | METRIC_DEFINE_coarse_histogram( |
51 | | server, transaction_pool_cache, "Rate of hitting transaction pool cache", |
52 | | yb::MetricUnit::kCacheHits, "Rate of hitting transaction pool cache"); |
53 | | |
54 | | METRIC_DEFINE_counter(server, transaction_pool_cache_hits, |
55 | | "Total number of hits in transaction pool cache", yb::MetricUnit::kCacheHits, |
56 | | "Total number of hits in transaction pool cache"); |
57 | | METRIC_DEFINE_counter(server, transaction_pool_cache_queries, |
58 | | "Total number of queries to transaction pool cache", |
59 | | yb::MetricUnit::kCacheQueries, |
60 | | "Total number of queries to transaction pool cache"); |
61 | | |
62 | | METRIC_DEFINE_gauge_uint32( |
63 | | server, transaction_pool_preparing, "Number of preparing transactions in pool", |
64 | | yb::MetricUnit::kTransactions, "Number of preparing transactions in pool"); |
65 | | |
66 | | METRIC_DEFINE_gauge_uint32( |
67 | | server, transaction_pool_prepared, "Number of prepared transactions in pool", |
68 | | yb::MetricUnit::kTransactions, "Number of prepared transactions in pool"); |
69 | | |
70 | | namespace yb { |
71 | | namespace client { |
72 | | |
73 | | namespace { |
74 | | |
75 | | // Transaction pool where all transactions have a specific locality (LOCAL or GLOBAL). |
76 | | class SingleLocalityPool { |
77 | | public: |
78 | | SingleLocalityPool(TransactionManager* manager, |
79 | | MetricEntity* metric_entity, |
80 | | TransactionLocality locality) |
81 | 1.31k | : manager_(*manager), locality_(locality) { |
82 | 1.31k | if (metric_entity) { |
83 | 1.31k | cache_histogram_ = METRIC_transaction_pool_cache.Instantiate(metric_entity); |
84 | 1.31k | cache_hits_ = METRIC_transaction_pool_cache_hits.Instantiate(metric_entity); |
85 | 1.31k | cache_queries_ = METRIC_transaction_pool_cache_queries.Instantiate(metric_entity); |
86 | 1.31k | gauge_preparing_ = METRIC_transaction_pool_preparing.Instantiate(metric_entity, 0); |
87 | 1.31k | gauge_prepared_ = METRIC_transaction_pool_prepared.Instantiate(metric_entity, 0); |
88 | 1.31k | } |
89 | 1.31k | } |
90 | | |
91 | 0 | ~SingleLocalityPool() { |
92 | 0 | std::unique_lock<std::mutex> lock(mutex_); |
93 | 0 | closing_ = true; |
94 | 0 | if (scheduled_task_ != rpc::kUninitializedScheduledTaskId) { |
95 | 0 | manager_.client()->messenger()->scheduler().Abort(scheduled_task_); |
96 | 0 | } |
97 | | // Have to use while, since GUARDED_BY does not understand cond wait with predicate. |
98 | 0 | while (!Idle()) { |
99 | 0 | cond_.wait(lock); |
100 | 0 | } |
101 | 0 | } |
102 | | |
103 | 238k | YBTransactionPtr Take() { |
104 | 238k | YBTransactionPtr result, new_txn; |
105 | 238k | uint64_t old_taken; |
106 | 238k | IncrementCounter(cache_queries_); |
107 | 238k | { |
108 | 238k | std::lock_guard<std::mutex> lock(mutex_); |
109 | 238k | old_taken = taken_transactions_; |
110 | 238k | ++taken_transactions_; |
111 | | // We create new transaction on each take request, does not matter whether is was |
112 | | // newly created or not. So number of transactions in pool will near average number of take |
113 | | // requests during transaction preparation. |
114 | 238k | if (transactions_.empty()) { |
115 | | // Transaction is automatically prepared when batcher is executed, so we don't have to |
116 | | // prepare newly created transaction, since it is anyway too late. |
117 | 4.15k | result = std::make_shared<YBTransaction>(&manager_, locality_); |
118 | 4.15k | IncrementHistogram(cache_histogram_, 0); |
119 | 234k | } else { |
120 | 234k | result = Pop(); |
121 | | // Cache histogram should show number of cache hits in percents, so we put 100 in case of |
122 | | // hit. |
123 | 234k | IncrementHistogram(cache_histogram_, 100); |
124 | 234k | IncrementCounter(cache_hits_); |
125 | 234k | } |
126 | 238k | new_txn = std::make_shared<YBTransaction>(&manager_, locality_); |
127 | 238k | ++preparing_transactions_; |
128 | 238k | } |
129 | 238k | IncrementGauge(gauge_preparing_); |
130 | 238k | internal::InFlightOpsGroupsWithMetadata ops_info; |
131 | 238k | if (new_txn->batcher_if().Prepare( |
132 | 238k | &ops_info, ForceConsistentRead::kFalse, TransactionRpcDeadline(), Initial::kFalse, |
133 | 0 | std::bind(&SingleLocalityPool::TransactionReady, this, _1, new_txn, old_taken))) { |
134 | 0 | TransactionReady(Status::OK(), new_txn, old_taken); |
135 | 0 | } |
136 | 238k | return result; |
137 | 238k | } |
138 | | |
139 | | private: |
140 | | void TransactionReady( |
141 | 238k | const Status& status, const YBTransactionPtr& txn, uint64_t taken_before_creation) { |
142 | 238k | if (status.ok()) { |
143 | 238k | IncrementGauge(gauge_prepared_); |
144 | 238k | } |
145 | 238k | DecrementGauge(gauge_preparing_); |
146 | | |
147 | 238k | std::lock_guard<std::mutex> lock(mutex_); |
148 | 238k | if (status.ok()) { |
149 | 238k | uint64_t taken_during_preparation = taken_transactions_ - taken_before_creation; |
150 | 238k | taken_during_preparation_sum_ += taken_during_preparation; |
151 | 238k | transactions_.push_back({ txn, taken_during_preparation }); |
152 | 238k | } |
153 | 238k | --preparing_transactions_; |
154 | 238k | if (CheckClosing()) { |
155 | 0 | return; |
156 | 0 | } |
157 | 238k | if (transactions_.size() == 1 && scheduled_task_ == rpc::kUninitializedScheduledTaskId) { |
158 | 787 | ScheduleCleanup(); |
159 | 787 | } |
160 | 238k | } |
161 | | |
162 | 2.05k | void ScheduleCleanup() REQUIRES(mutex_) { |
163 | 2.05k | scheduled_task_ = manager_.client()->messenger()->scheduler().Schedule( |
164 | 2.05k | std::bind(&SingleLocalityPool::Cleanup, this, _1), |
165 | 2.05k | FLAGS_transaction_pool_cleanup_interval_ms * 1ms); |
166 | 2.05k | } |
167 | | |
168 | 1.47k | void Cleanup(const Status& status) { |
169 | 1.47k | std::lock_guard<std::mutex> lock(mutex_); |
170 | 1.47k | scheduled_task_ = rpc::kUninitializedScheduledTaskId; |
171 | 1.47k | if (CheckClosing()) { |
172 | 0 | return; |
173 | 0 | } |
174 | | |
175 | 1.47k | if (taken_transactions_at_last_cleanup_ == taken_transactions_) { |
176 | | // No transactions were taken since last cleanup, could abort all transactions. |
177 | 730 | while (!transactions_.empty()) { |
178 | 540 | Pop()->Abort(); |
179 | 540 | } |
180 | 190 | return; |
181 | 190 | } |
182 | 1.28k | taken_transactions_at_last_cleanup_ = taken_transactions_; |
183 | | |
184 | 1.28k | #ifndef NDEBUG |
185 | | // Check that taken_during_preparation_sum_ reflects actual sum of taken_during_preparation, |
186 | | // of transactions in pool. |
187 | 1.28k | { |
188 | 1.28k | uint64_t taken_during_preparation_sum = 0; |
189 | 4.44k | for (const auto& entry : transactions_) { |
190 | 4.44k | taken_during_preparation_sum += entry.taken_during_preparation; |
191 | 4.44k | } |
192 | 1.28k | CHECK_EQ(taken_during_preparation_sum, taken_during_preparation_sum_); |
193 | 1.28k | } |
194 | 1.28k | #endif |
195 | | // For each prepared transaction we know number of take requests that happened while this |
196 | | // transaction were prepared. So we try to keep number of prepared transactions |
197 | | // as average of this value + 50%. |
198 | | // taken_during_preparation_sum_ is sum of this values so we should divide it by size |
199 | | // to get average value. |
200 | | |
201 | 1.28k | size_t size = transactions_.size(); |
202 | | |
203 | | // Skip cleanup if we have too small amount of prepared transactions. |
204 | 1.28k | if (preparing_transactions_ < size) { |
205 | 1.24k | size_t min_size = size * 4 / 5; |
206 | 1.69k | while (size > min_size && |
207 | 1.56k | ((size + preparing_transactions_) * size > |
208 | 454 | taken_during_preparation_sum_ * FLAGS_transaction_pool_reserve_factor)) { |
209 | 454 | Pop()->Abort(); |
210 | 454 | --size; |
211 | 454 | } |
212 | 1.24k | } |
213 | 1.28k | if (!transactions_.empty()) { |
214 | 1.27k | ScheduleCleanup(); |
215 | 1.27k | } |
216 | 1.28k | } |
217 | | |
218 | 236k | YBTransactionPtr Pop() REQUIRES(mutex_) { |
219 | 236k | DecrementGauge(gauge_prepared_); |
220 | 236k | YBTransactionPtr result = std::move(transactions_.front().transaction); |
221 | 236k | taken_during_preparation_sum_ -= transactions_.front().taken_during_preparation; |
222 | 236k | transactions_.pop_front(); |
223 | 236k | return result; |
224 | 236k | } |
225 | | |
226 | 240k | bool CheckClosing() REQUIRES(mutex_) { |
227 | 240k | if (!closing_) { |
228 | 240k | return false; |
229 | 240k | } |
230 | 0 | if (Idle()) { |
231 | 0 | cond_.notify_all(); |
232 | 0 | } |
233 | 0 | return true; |
234 | 0 | } |
235 | | |
236 | 0 | bool Idle() const REQUIRES(mutex_) { |
237 | 0 | LOG(INFO) << "preparing_transactions: " << preparing_transactions_ |
238 | 0 | << ", scheduled_task: " << scheduled_task_; |
239 | 0 | return preparing_transactions_ == 0 && scheduled_task_ == rpc::kUninitializedScheduledTaskId; |
240 | 0 | } |
241 | | |
242 | | struct TransactionEntry { |
243 | | YBTransactionPtr transaction; |
244 | | uint64_t taken_during_preparation; |
245 | | }; |
246 | | |
247 | | TransactionManager& manager_; |
248 | | TransactionLocality locality_; |
249 | | scoped_refptr<Histogram> cache_histogram_; |
250 | | scoped_refptr<Counter> cache_hits_; |
251 | | scoped_refptr<Counter> cache_queries_; |
252 | | scoped_refptr<AtomicGauge<uint32_t>> gauge_preparing_; |
253 | | scoped_refptr<AtomicGauge<uint32_t>> gauge_prepared_; |
254 | | std::mutex mutex_; |
255 | | std::condition_variable cond_; |
256 | | std::deque<TransactionEntry> transactions_ GUARDED_BY(mutex_); |
257 | | bool closing_ GUARDED_BY(mutex_) = false; |
258 | | size_t preparing_transactions_ GUARDED_BY(mutex_) = 0; |
259 | | rpc::ScheduledTaskId scheduled_task_ GUARDED_BY(mutex_) = rpc::kUninitializedScheduledTaskId; |
260 | | uint64_t taken_transactions_ GUARDED_BY(mutex_) = 0; |
261 | | uint64_t taken_during_preparation_sum_ GUARDED_BY(mutex_) = 0; |
262 | | uint64_t taken_transactions_at_last_cleanup_ GUARDED_BY(mutex_) = 0; |
263 | | }; |
264 | | } // namespace |
265 | | |
266 | | class TransactionPool::Impl { |
267 | | public: |
268 | | Impl(TransactionManager* manager, MetricEntity* metric_entity) |
269 | | : manager_(manager), |
270 | | global_pool_(manager, metric_entity, TransactionLocality::GLOBAL), |
271 | 658 | local_pool_(manager, metric_entity, TransactionLocality::LOCAL) { |
272 | 658 | } |
273 | | |
274 | 0 | ~Impl() = default; |
275 | | |
276 | 238k | YBTransactionPtr Take(ForceGlobalTransaction force_global_transaction) EXCLUDES(mutex_) { |
277 | 238k | const auto is_global = force_global_transaction || |
278 | 96.2k | FLAGS_force_global_transactions || |
279 | 96.2k | !manager_->PlacementLocalTransactionsPossible(); |
280 | 238k | auto transaction = (is_global ? &global_pool_ : &local_pool_)->Take(); |
281 | 238k | if (FLAGS_TEST_track_last_transaction) { |
282 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
283 | 0 | last_transaction_ = transaction; |
284 | 0 | } |
285 | 238k | return transaction; |
286 | 238k | } |
287 | | |
288 | 0 | YBTransactionPtr TEST_GetLastTransaction() EXCLUDES(mutex_) { |
289 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
290 | 0 | return last_transaction_; |
291 | 0 | } |
292 | | private: |
293 | | TransactionManager* manager_; |
294 | | SingleLocalityPool global_pool_; |
295 | | SingleLocalityPool local_pool_; |
296 | | |
297 | | std::mutex mutex_; |
298 | | YBTransactionPtr last_transaction_ GUARDED_BY(mutex_); |
299 | | }; |
300 | | |
301 | | TransactionPool::TransactionPool(TransactionManager* manager, MetricEntity* metric_entity) |
302 | 658 | : impl_(new Impl(manager, metric_entity)) { |
303 | 658 | } |
304 | | |
305 | 0 | TransactionPool::~TransactionPool() { |
306 | 0 | } |
307 | | |
308 | 233k | YBTransactionPtr TransactionPool::Take(ForceGlobalTransaction force_global_transaction) { |
309 | 233k | return impl_->Take(force_global_transaction); |
310 | 233k | } |
311 | | |
312 | | Result<YBTransactionPtr> TransactionPool::TakeAndInit( |
313 | 5.82k | IsolationLevel isolation, const ReadHybridTime& read_time) { |
314 | 5.82k | auto result = impl_->Take(ForceGlobalTransaction::kTrue); |
315 | 5.82k | RETURN_NOT_OK(result->Init(isolation, read_time)); |
316 | 5.82k | return result; |
317 | 5.82k | } |
318 | | |
319 | 0 | Result<YBTransactionPtr> TransactionPool::TakeRestarted(const YBTransactionPtr& source) { |
320 | 0 | const auto &metadata = source->GetMetadata().get(); |
321 | 0 | RETURN_NOT_OK(metadata); |
322 | 0 | const auto force_global = |
323 | 0 | metadata->locality == TransactionLocality::GLOBAL ? ForceGlobalTransaction::kTrue |
324 | 0 | : ForceGlobalTransaction::kFalse; |
325 | 0 | auto result = impl_->Take(force_global); |
326 | 0 | RETURN_NOT_OK(source->FillRestartedTransaction(result)); |
327 | 0 | return result; |
328 | 0 | } |
329 | | |
330 | 0 | YBTransactionPtr TransactionPool::TEST_GetLastTransaction() { |
331 | 0 | return impl_->TEST_GetLastTransaction(); |
332 | 0 | } |
333 | | |
334 | | } // namespace client |
335 | | } // namespace yb |