/Users/deen/code/yugabyte-db/src/yb/client/transaction_manager.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/client/transaction_manager.h" |
17 | | |
18 | | #include "yb/client/client.h" |
19 | | #include "yb/client/meta_cache.h" |
20 | | #include "yb/client/table.h" |
21 | | #include "yb/client/yb_table_name.h" |
22 | | |
23 | | #include "yb/master/catalog_manager.h" |
24 | | |
25 | | #include "yb/rpc/tasks_pool.h" |
26 | | |
27 | | #include "yb/server/server_base_options.h" |
28 | | |
29 | | #include "yb/util/format.h" |
30 | | #include "yb/util/status_format.h" |
31 | | #include "yb/util/status_log.h" |
32 | | #include "yb/util/string_util.h" |
33 | | #include "yb/util/thread_restrictions.h" |
34 | | |
35 | | DEFINE_uint64(transaction_manager_workers_limit, 50, |
36 | | "Max number of workers used by transaction manager"); |
37 | | |
38 | | DEFINE_uint64(transaction_manager_queue_limit, 500, |
39 | | "Max number of tasks used by transaction manager"); |
40 | | |
41 | | namespace yb { |
42 | | namespace client { |
43 | | |
44 | | namespace { |
45 | | |
46 | | // Cache of tablet ids of the global transaction table and any transaction tables with |
47 | | // the same placement. |
48 | | class TransactionTableState { |
49 | | public: |
50 | | explicit TransactionTableState(LocalTabletFilter local_tablet_filter) |
51 | 4.73k | : local_tablet_filter_(local_tablet_filter) { |
52 | 4.73k | } |
53 | | |
54 | | void InvokeCallback(const PickStatusTabletCallback& callback, |
55 | 243k | TransactionLocality locality) EXCLUDES(mutex_) { |
56 | 243k | SharedLock<yb::RWMutex> lock(mutex_); |
57 | 243k | const auto& tablets = PickTabletList(locality); |
58 | 243k | if (tablets.empty()) { |
59 | 0 | callback(STATUS_FORMAT( |
60 | 0 | IllegalState, "No $0 transaction tablets found", TransactionLocality_Name(locality))); |
61 | 0 | return; |
62 | 0 | } |
63 | 243k | if (PickStatusTabletId(tablets, callback)) { |
64 | 221k | return; |
65 | 221k | } |
66 | 21.6k | YB_LOG_EVERY_N_SECS(WARNING, 1) << "No placement local transaction status tablet found"; |
67 | 21.6k | callback(RandomElement(tablets)); |
68 | 21.6k | } |
69 | | |
70 | 243k | bool IsInitialized() { |
71 | 243k | return initialized_.load(); |
72 | 243k | } |
73 | | |
74 | | void UpdateStatusTablets(uint64_t new_version, |
75 | 1.99k | TransactionStatusTablets&& tablets) EXCLUDES(mutex_) { |
76 | 1.99k | std::lock_guard<yb::RWMutex> lock(mutex_); |
77 | 1.99k | if (!initialized_.load() || status_tablets_version_ < new_version) { |
78 | 1.31k | tablets_ = std::move(tablets); |
79 | 1.31k | has_placement_local_tablets_.store(!tablets_.placement_local_tablets.empty()); |
80 | 1.31k | status_tablets_version_ = new_version; |
81 | 1.31k | initialized_.store(true); |
82 | 1.31k | } |
83 | 1.99k | } |
84 | | |
85 | 96.3k | bool HasAnyPlacementLocalStatusTablets() { |
86 | 96.3k | return has_placement_local_tablets_.load(); |
87 | 96.3k | } |
88 | | |
89 | 38.8k | uint64_t GetStatusTabletsVersion() EXCLUDES(mutex_) { |
90 | 38.8k | std::lock_guard<yb::RWMutex> lock(mutex_); |
91 | 38.8k | return status_tablets_version_; |
92 | 38.8k | } |
93 | | |
94 | | private: |
95 | | // Picks a status tablet id from 'tablets' filtered by 'filter'. Returns true if a |
96 | | // tablet id was picked successfully, and false if there were no applicable tablet ids. |
97 | | bool PickStatusTabletId(const std::vector<TabletId>& tablets, |
98 | 243k | const PickStatusTabletCallback& callback) REQUIRES_SHARED(mutex_) { |
99 | 243k | if (tablets.empty()) { |
100 | 0 | return false; |
101 | 0 | } |
102 | 243k | if (local_tablet_filter_) { |
103 | 243k | std::vector<const TabletId*> ids; |
104 | 243k | ids.reserve(tablets.size()); |
105 | 4.86M | for (const auto& id : tablets) { |
106 | 4.86M | ids.push_back(&id); |
107 | 4.86M | } |
108 | 243k | local_tablet_filter_(&ids); |
109 | 243k | if (!ids.empty()) { |
110 | 221k | callback(*RandomElement(ids)); |
111 | 221k | return true; |
112 | 221k | } |
113 | 21.5k | return false; |
114 | 21.5k | } |
115 | 15 | callback(RandomElement(tablets)); |
116 | 15 | return true; |
117 | 15 | } |
118 | | |
119 | | const std::vector<TabletId>& PickTabletList(TransactionLocality locality) |
120 | 243k | REQUIRES_SHARED(mutex_) { |
121 | 243k | if (tablets_.placement_local_tablets.empty()) { |
122 | 243k | return tablets_.global_tablets; |
123 | 243k | } |
124 | 0 | switch (locality) { |
125 | 0 | case TransactionLocality::GLOBAL: |
126 | 0 | return tablets_.global_tablets; |
127 | 0 | case TransactionLocality::LOCAL: |
128 | 0 | return tablets_.placement_local_tablets; |
129 | 0 | } |
130 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionLocality, locality); |
131 | 0 | } |
132 | | |
133 | | LocalTabletFilter local_tablet_filter_; |
134 | | |
135 | | // Set to true once transaction tablets have been loaded at least once. global_tablets |
136 | | // is assumed to have at least one entry in it if this is true. |
137 | | std::atomic<bool> initialized_{false}; |
138 | | |
139 | | // Set to true if there are any placement local transaction tablets. |
140 | | std::atomic<bool> has_placement_local_tablets_{false}; |
141 | | |
142 | | // Locks the version/tablet lists. A read lock is acquired when picking |
143 | | // tablets, and a write lock is acquired when updating tablet lists. |
144 | | RWMutex mutex_; |
145 | | |
146 | | uint64_t status_tablets_version_ GUARDED_BY(mutex_) = 0; |
147 | | |
148 | | TransactionStatusTablets tablets_ GUARDED_BY(mutex_); |
149 | | }; |
150 | | |
151 | | // Loads transaction tablets list to cache. |
152 | | class LoadStatusTabletsTask { |
153 | | public: |
154 | | LoadStatusTabletsTask(YBClient* client, |
155 | | TransactionTableState* table_state, |
156 | | uint64_t version, |
157 | | PickStatusTabletCallback callback = PickStatusTabletCallback(), |
158 | | TransactionLocality locality = TransactionLocality::GLOBAL) |
159 | | : client_(client), table_state_(table_state), version_(version), callback_(callback), |
160 | 2.01k | locality_(locality) { |
161 | 2.01k | } |
162 | | |
163 | 2.01k | void Run() { |
164 | | // TODO(dtxn) async |
165 | 2.01k | auto tablets = GetTransactionStatusTablets(); |
166 | 2.01k | if (!tablets.ok()) { |
167 | 0 | YB_LOG_EVERY_N_SECS(ERROR, 1) << "Failed to get tablets of txn status tables: " |
168 | 0 | << tablets.status(); |
169 | 0 | if (callback_) { |
170 | 0 | callback_(tablets.status()); |
171 | 0 | } |
172 | 0 | return; |
173 | 0 | } |
174 | | |
175 | 2.01k | table_state_->UpdateStatusTablets(version_, std::move(*tablets)); |
176 | | |
177 | 2.01k | if (callback_) { |
178 | 1.33k | table_state_->InvokeCallback(callback_, locality_); |
179 | 1.33k | } |
180 | 2.01k | } |
181 | | |
182 | 2.01k | void Done(const Status& status) { |
183 | 2.01k | if (!status.ok()) { |
184 | 0 | callback_(status); |
185 | 0 | } |
186 | 2.01k | callback_ = PickStatusTabletCallback(); |
187 | 2.01k | client_ = nullptr; |
188 | 2.01k | } |
189 | | |
190 | | private: |
191 | 2.01k | Result<TransactionStatusTablets> GetTransactionStatusTablets() { |
192 | 2.01k | CloudInfoPB this_pb = yb::server::GetPlacementFromGFlags(); |
193 | 2.01k | return client_->GetTransactionStatusTablets(this_pb); |
194 | 2.01k | } |
195 | | |
196 | | YBClient* client_; |
197 | | TransactionTableState* table_state_; |
198 | | uint64_t version_; |
199 | | PickStatusTabletCallback callback_; |
200 | | TransactionLocality locality_; |
201 | | }; |
202 | | |
203 | | class InvokeCallbackTask { |
204 | | public: |
205 | | InvokeCallbackTask(TransactionTableState* table_state, |
206 | | PickStatusTabletCallback callback, |
207 | | TransactionLocality locality) |
208 | 0 | : table_state_(table_state), callback_(std::move(callback)), locality_(locality) { |
209 | 0 | } |
210 | | |
211 | 0 | void Run() { |
212 | 0 | table_state_->InvokeCallback(callback_, locality_); |
213 | 0 | } |
214 | | |
215 | 0 | void Done(const Status& status) { |
216 | 0 | if (!status.ok()) { |
217 | 0 | callback_(status); |
218 | 0 | } |
219 | 0 | callback_ = PickStatusTabletCallback(); |
220 | 0 | } |
221 | | |
222 | | private: |
223 | | TransactionTableState* table_state_; |
224 | | PickStatusTabletCallback callback_; |
225 | | TransactionLocality locality_; |
226 | | }; |
227 | | } // namespace |
228 | | |
229 | | class TransactionManager::Impl { |
230 | | public: |
231 | | explicit Impl(YBClient* client, const scoped_refptr<ClockBase>& clock, |
232 | | LocalTabletFilter local_tablet_filter) |
233 | | : client_(client), |
234 | | clock_(clock), |
235 | | table_state_{std::move(local_tablet_filter)}, |
236 | | thread_pool_( |
237 | | "TransactionManager", FLAGS_transaction_manager_queue_limit, |
238 | | FLAGS_transaction_manager_workers_limit), |
239 | | tasks_pool_(FLAGS_transaction_manager_queue_limit), |
240 | 4.73k | invoke_callback_tasks_(FLAGS_transaction_manager_queue_limit) { |
241 | 4.73k | CHECK(clock); |
242 | 4.73k | } |
243 | | |
244 | 3.66k | ~Impl() { |
245 | 3.66k | Shutdown(); |
246 | 3.66k | } |
247 | | |
248 | 38.8k | void UpdateTransactionTablesVersion(uint64_t version) { |
249 | 38.8k | if (table_state_.GetStatusTabletsVersion() >= version) { |
250 | 38.1k | return; |
251 | 38.1k | } |
252 | | |
253 | 684 | if (!tasks_pool_.Enqueue(&thread_pool_, client_, &table_state_, version)) { |
254 | 0 | YB_LOG_EVERY_N_SECS(ERROR, 1) << "Update tasks overflow, number of tasks: " |
255 | 0 | << tasks_pool_.size(); |
256 | 0 | } |
257 | 684 | } |
258 | | |
259 | 243k | void PickStatusTablet(PickStatusTabletCallback callback, TransactionLocality locality) { |
260 | 243k | if (table_state_.IsInitialized()) { |
261 | 241k | if (ThreadRestrictions::IsWaitAllowed()) { |
262 | 241k | table_state_.InvokeCallback(callback, locality); |
263 | 14 | } else if (!invoke_callback_tasks_.Enqueue( |
264 | 0 | &thread_pool_, &table_state_, callback, locality)) { |
265 | 0 | callback(STATUS_FORMAT(ServiceUnavailable, |
266 | 0 | "Invoke callback queue overflow, number of tasks: $0", |
267 | 0 | invoke_callback_tasks_.size())); |
268 | 0 | } |
269 | 241k | return; |
270 | 241k | } |
271 | | |
272 | 1.33k | if (!tasks_pool_.Enqueue( |
273 | 1.33k | &thread_pool_, client_, &table_state_, 0 /* version */, callback, |
274 | 0 | locality)) { |
275 | 0 | callback(STATUS_FORMAT(ServiceUnavailable, "Tasks overflow, exists: $0", tasks_pool_.size())); |
276 | 0 | } |
277 | 1.33k | } |
278 | | |
279 | 393k | const scoped_refptr<ClockBase>& clock() const { |
280 | 393k | return clock_; |
281 | 393k | } |
282 | | |
283 | 1.18M | YBClient* client() const { |
284 | 1.18M | return client_; |
285 | 1.18M | } |
286 | | |
287 | 2.07M | rpc::Rpcs& rpcs() { |
288 | 2.07M | return rpcs_; |
289 | 2.07M | } |
290 | | |
291 | 522k | HybridTime Now() const { |
292 | 522k | return clock_->Now(); |
293 | 522k | } |
294 | | |
295 | 0 | HybridTimeRange NowRange() const { |
296 | 0 | return clock_->NowRange(); |
297 | 0 | } |
298 | | |
299 | 518k | void UpdateClock(HybridTime time) { |
300 | 518k | clock_->Update(time); |
301 | 518k | } |
302 | | |
303 | 3.67k | void Shutdown() { |
304 | 3.67k | rpcs_.Shutdown(); |
305 | 3.67k | thread_pool_.Shutdown(); |
306 | 3.67k | } |
307 | | |
308 | 96.3k | bool PlacementLocalTransactionsPossible() { |
309 | 96.3k | return table_state_.HasAnyPlacementLocalStatusTablets(); |
310 | 96.3k | } |
311 | | |
312 | 0 | uint64_t GetLoadedStatusTabletsVersion() { |
313 | 0 | return table_state_.GetStatusTabletsVersion(); |
314 | 0 | } |
315 | | |
316 | | private: |
317 | | YBClient* const client_; |
318 | | scoped_refptr<ClockBase> clock_; |
319 | | TransactionTableState table_state_; |
320 | | std::atomic<bool> closed_{false}; |
321 | | |
322 | | yb::rpc::ThreadPool thread_pool_; // TODO async operations instead of pool |
323 | | yb::rpc::TasksPool<LoadStatusTabletsTask> tasks_pool_; |
324 | | yb::rpc::TasksPool<InvokeCallbackTask> invoke_callback_tasks_; |
325 | | yb::rpc::Rpcs rpcs_; |
326 | | }; |
327 | | |
328 | | TransactionManager::TransactionManager( |
329 | | YBClient* client, const scoped_refptr<ClockBase>& clock, |
330 | | LocalTabletFilter local_tablet_filter) |
331 | 4.72k | : impl_(new Impl(client, clock, std::move(local_tablet_filter))) {} |
332 | | |
333 | 3.66k | TransactionManager::~TransactionManager() = default; |
334 | | |
335 | 38.8k | void TransactionManager::UpdateTransactionTablesVersion(uint64_t version) { |
336 | 38.8k | impl_->UpdateTransactionTablesVersion(version); |
337 | 38.8k | } |
338 | | |
339 | | void TransactionManager::PickStatusTablet( |
340 | 243k | PickStatusTabletCallback callback, TransactionLocality locality) { |
341 | 243k | impl_->PickStatusTablet(std::move(callback), locality); |
342 | 243k | } |
343 | | |
344 | 1.18M | YBClient* TransactionManager::client() const { |
345 | 1.18M | return impl_->client(); |
346 | 1.18M | } |
347 | | |
348 | 2.07M | rpc::Rpcs& TransactionManager::rpcs() { |
349 | 2.07M | return impl_->rpcs(); |
350 | 2.07M | } |
351 | | |
352 | 393k | const scoped_refptr<ClockBase>& TransactionManager::clock() const { |
353 | 393k | return impl_->clock(); |
354 | 393k | } |
355 | | |
356 | 522k | HybridTime TransactionManager::Now() const { |
357 | 522k | return impl_->Now(); |
358 | 522k | } |
359 | | |
360 | 0 | HybridTimeRange TransactionManager::NowRange() const { |
361 | 0 | return impl_->NowRange(); |
362 | 0 | } |
363 | | |
364 | 518k | void TransactionManager::UpdateClock(HybridTime time) { |
365 | 518k | impl_->UpdateClock(time); |
366 | 518k | } |
367 | | |
368 | 96.3k | bool TransactionManager::PlacementLocalTransactionsPossible() { |
369 | 96.3k | return impl_->PlacementLocalTransactionsPossible(); |
370 | 96.3k | } |
371 | | |
372 | 0 | uint64_t TransactionManager::GetLoadedStatusTabletsVersion() { |
373 | 0 | return impl_->GetLoadedStatusTabletsVersion(); |
374 | 0 | } |
375 | | |
376 | 0 | TransactionManager::TransactionManager(TransactionManager&& rhs) = default; |
377 | 0 | TransactionManager& TransactionManager::operator=(TransactionManager&& rhs) = default; |
378 | | |
379 | | } // namespace client |
380 | | } // namespace yb |