/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/transactions/transaction_lock_mgr.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #ifndef ROCKSDB_LITE |
22 | | |
23 | | #ifndef __STDC_FORMAT_MACROS |
24 | | #define __STDC_FORMAT_MACROS |
25 | | #endif |
26 | | |
27 | | #include "yb/rocksdb/utilities/transactions/transaction_lock_mgr.h" |
28 | | |
29 | | #include <inttypes.h> |
30 | | |
31 | | #include <algorithm> |
32 | | #include <condition_variable> |
33 | | #include <functional> |
34 | | #include <mutex> |
35 | | #include <string> |
36 | | #include <vector> |
37 | | |
38 | | #include "yb/rocksdb/utilities/transaction_db_mutex.h" |
39 | | #include "yb/rocksdb/util/autovector.h" |
40 | | #include "yb/rocksdb/util/murmurhash.h" |
41 | | #include "yb/rocksdb/util/thread_local.h" |
42 | | #include "yb/rocksdb/util/timeout_error.h" |
43 | | #include "yb/rocksdb/utilities/transactions/transaction_db_impl.h" |
44 | | |
45 | | #include "yb/util/slice.h" |
46 | | #include "yb/util/status_log.h" |
47 | | |
48 | | namespace rocksdb { |
49 | | |
50 | | struct LockInfo { |
51 | | TransactionID txn_id; |
52 | | |
53 | | // Transaction locks are not valid after this time in us |
54 | | uint64_t expiration_time; |
55 | | |
56 | | LockInfo(TransactionID id, uint64_t time) |
57 | 3.33k | : txn_id(id), expiration_time(time) {} |
58 | | LockInfo(const LockInfo& lock_info) |
59 | 6.50k | : txn_id(lock_info.txn_id), expiration_time(lock_info.expiration_time) {} |
60 | | }; |
61 | | |
62 | | struct LockMapStripe { |
63 | 912 | explicit LockMapStripe(std::shared_ptr<TransactionDBMutexFactory> factory) { |
64 | 912 | stripe_mutex = factory->AllocateMutex(); |
65 | 912 | stripe_cv = factory->AllocateCondVar(); |
66 | 912 | assert(stripe_mutex); |
67 | 912 | assert(stripe_cv); |
68 | 912 | } |
69 | | |
70 | | // Mutex must be held before modifying keys map |
71 | | std::shared_ptr<TransactionDBMutex> stripe_mutex; |
72 | | |
73 | | // Condition Variable per stripe for waiting on a lock |
74 | | std::shared_ptr<TransactionDBCondVar> stripe_cv; |
75 | | |
76 | | // Locked keys mapped to the info about the transactions that locked them. |
77 | | // TODO(agiardullo): Explore performance of other data structures. |
78 | | std::unordered_map<std::string, LockInfo> keys; |
79 | | }; |
80 | | |
81 | | // Map of #num_stripes LockMapStripes |
82 | | struct LockMap { |
83 | | explicit LockMap(size_t num_stripes, |
84 | | std::shared_ptr<TransactionDBMutexFactory> factory) |
85 | 57 | : num_stripes_(num_stripes) { |
86 | 57 | lock_map_stripes_.reserve(num_stripes); |
87 | 969 | for (size_t i = 0; i < num_stripes; i++) { |
88 | 912 | LockMapStripe* stripe = new LockMapStripe(factory); |
89 | 912 | lock_map_stripes_.push_back(stripe); |
90 | 912 | } |
91 | 57 | } |
92 | | |
93 | 57 | ~LockMap() { |
94 | 912 | for (auto stripe : lock_map_stripes_) { |
95 | 912 | delete stripe; |
96 | 912 | } |
97 | 57 | } |
98 | | |
99 | | // Number of sepearate LockMapStripes to create, each with their own Mutex |
100 | | const size_t num_stripes_; |
101 | | |
102 | | // Count of keys that are currently locked in this column family. |
103 | | // (Only maintained if TransactionLockMgr::max_num_locks_ is positive.) |
104 | | std::atomic<int64_t> lock_cnt{0}; |
105 | | |
106 | | std::vector<LockMapStripe*> lock_map_stripes_; |
107 | | |
108 | | size_t GetStripe(const std::string& key) const; |
109 | | }; |
110 | | |
111 | | namespace { |
112 | 37 | void UnrefLockMapsCache(void* ptr) { |
113 | | // Called when a thread exits or a ThreadLocalPtr gets destroyed. |
114 | 37 | auto lock_maps_cache = |
115 | 37 | static_cast<std::unordered_map<uint32_t, std::shared_ptr<LockMap>>*>(ptr); |
116 | 37 | delete lock_maps_cache; |
117 | 37 | } |
118 | | } // anonymous namespace |
119 | | |
120 | | TransactionLockMgr::TransactionLockMgr( |
121 | | TransactionDB* txn_db, size_t default_num_stripes, int64_t max_num_locks, |
122 | | std::shared_ptr<TransactionDBMutexFactory> mutex_factory) |
123 | | : txn_db_impl_(nullptr), |
124 | | default_num_stripes_(default_num_stripes), |
125 | | max_num_locks_(max_num_locks), |
126 | | mutex_factory_(mutex_factory), |
127 | 47 | lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)) { |
128 | 47 | txn_db_impl_ = dynamic_cast<TransactionDBImpl*>(txn_db); |
129 | 47 | assert(txn_db_impl_); |
130 | 47 | } |
131 | | |
132 | 47 | TransactionLockMgr::~TransactionLockMgr() {} |
133 | | |
134 | 6.59k | size_t LockMap::GetStripe(const std::string& key) const { |
135 | 6.59k | assert(num_stripes_ > 0); |
136 | 6.59k | static murmur_hash hash; |
137 | 6.59k | size_t stripe = hash(key) % num_stripes_; |
138 | 6.59k | return stripe; |
139 | 6.59k | } |
140 | | |
141 | 57 | void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) { |
142 | 57 | InstrumentedMutexLock l(&lock_map_mutex_); |
143 | | |
144 | 57 | if (lock_maps_.find(column_family_id) == lock_maps_.end()) { |
145 | 57 | lock_maps_.emplace(column_family_id, |
146 | 57 | std::shared_ptr<LockMap>( |
147 | 57 | new LockMap(default_num_stripes_, mutex_factory_))); |
148 | 0 | } else { |
149 | | // column_family already exists in lock map |
150 | 0 | assert(false); |
151 | 0 | } |
152 | 57 | } |
153 | | |
154 | 4 | void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) { |
155 | | // Remove lock_map for this column family. Since the lock map is stored |
156 | | // as a shared ptr, concurrent transactions can still keep keep using it |
157 | | // until they release their reference to it. |
158 | 4 | { |
159 | 4 | InstrumentedMutexLock l(&lock_map_mutex_); |
160 | | |
161 | 4 | auto lock_maps_iter = lock_maps_.find(column_family_id); |
162 | 4 | assert(lock_maps_iter != lock_maps_.end()); |
163 | | |
164 | 4 | lock_maps_.erase(lock_maps_iter); |
165 | 4 | } // lock_map_mutex_ |
166 | | |
167 | | // Clear all thread-local caches |
168 | 4 | autovector<void*> local_caches; |
169 | 4 | lock_maps_cache_->Scrape(&local_caches, nullptr); |
170 | 3 | for (auto cache : local_caches) { |
171 | 3 | delete static_cast<LockMaps*>(cache); |
172 | 3 | } |
173 | 4 | } |
174 | | |
175 | | // Look up the LockMap shared_ptr for a given column_family_id. |
176 | | // Note: The LockMap is only valid as long as the caller is still holding on |
177 | | // to the returned shared_ptr. |
178 | | std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap( |
179 | 6.54k | uint32_t column_family_id) { |
180 | | // First check thread-local cache |
181 | 6.54k | if (lock_maps_cache_->Get() == nullptr) { |
182 | 40 | lock_maps_cache_->Reset(new LockMaps()); |
183 | 40 | } |
184 | | |
185 | 6.54k | auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get()); |
186 | | |
187 | 6.54k | auto lock_map_iter = lock_maps_cache->find(column_family_id); |
188 | 6.54k | if (lock_map_iter != lock_maps_cache->end()) { |
189 | | // Found lock map for this column family. |
190 | 6.49k | return lock_map_iter->second; |
191 | 6.49k | } |
192 | | |
193 | | // Not found in local cache, grab mutex and check shared LockMaps |
194 | 46 | InstrumentedMutexLock l(&lock_map_mutex_); |
195 | | |
196 | 46 | lock_map_iter = lock_maps_.find(column_family_id); |
197 | 46 | if (lock_map_iter == lock_maps_.end()) { |
198 | 1 | return std::shared_ptr<LockMap>(nullptr); |
199 | 45 | } else { |
200 | | // Found lock map. Store in thread-local cache and return. |
201 | 45 | std::shared_ptr<LockMap>& lock_map = lock_map_iter->second; |
202 | 45 | lock_maps_cache->insert({column_family_id, lock_map}); |
203 | | |
204 | 45 | return lock_map; |
205 | 45 | } |
206 | 46 | } |
207 | | |
208 | | // Returns true if this lock has expired and can be acquired by another |
209 | | // transaction. |
210 | | // If false, sets *expire_time to the expiration time of the lock according |
211 | | // to Env->GetMicros() or 0 if no expiration. |
212 | | bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env, |
213 | 124 | uint64_t* expire_time) { |
214 | 124 | auto now = env->NowMicros(); |
215 | | |
216 | 124 | bool expired = |
217 | 124 | (lock_info.expiration_time > 0 && lock_info.expiration_time <= now); |
218 | | |
219 | 124 | if (!expired && lock_info.expiration_time > 0) { |
220 | | // return how many microseconds until lock will be expired |
221 | 5 | *expire_time = lock_info.expiration_time; |
222 | 119 | } else { |
223 | 119 | bool success = |
224 | 119 | txn_db_impl_->TryStealingExpiredTransactionLocks(lock_info.txn_id); |
225 | 119 | if (!success) { |
226 | 1 | expired = false; |
227 | 1 | } |
228 | 119 | *expire_time = 0; |
229 | 119 | } |
230 | | |
231 | 124 | return expired; |
232 | 124 | } |
233 | | |
234 | | Status TransactionLockMgr::TryLock(const TransactionImpl* txn, |
235 | | uint32_t column_family_id, |
236 | 3.33k | const std::string& key, Env* env) { |
237 | | // Lookup lock map for this column family id |
238 | 3.33k | std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); |
239 | 3.33k | LockMap* lock_map = lock_map_ptr.get(); |
240 | 3.33k | if (lock_map == nullptr) { |
241 | 1 | char msg[255]; |
242 | 1 | snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32, |
243 | 1 | column_family_id); |
244 | | |
245 | 1 | return STATUS(InvalidArgument, msg); |
246 | 1 | } |
247 | | |
248 | | // Need to lock the mutex for the stripe that this key hashes to |
249 | 3.33k | size_t stripe_num = lock_map->GetStripe(key); |
250 | 3.33k | assert(lock_map->lock_map_stripes_.size() > stripe_num); |
251 | 3.33k | LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); |
252 | | |
253 | 3.33k | LockInfo lock_info(txn->GetTxnID(), txn->GetExpirationTime()); |
254 | 3.33k | int64_t timeout = txn->GetLockTimeout(); |
255 | | |
256 | 3.33k | return AcquireWithTimeout(lock_map, stripe, key, env, timeout, lock_info); |
257 | 3.33k | } |
258 | | |
259 | | // Helper function for TryLock(). |
260 | | Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map, |
261 | | LockMapStripe* stripe, |
262 | | const std::string& key, Env* env, |
263 | | int64_t timeout, |
264 | 3.33k | const LockInfo& lock_info) { |
265 | 3.33k | Status result; |
266 | 3.33k | uint64_t start_time = 0; |
267 | 3.33k | uint64_t end_time = 0; |
268 | | |
269 | 3.33k | if (timeout > 0) { |
270 | 88 | start_time = env->NowMicros(); |
271 | 88 | end_time = start_time + timeout; |
272 | 88 | } |
273 | | |
274 | 3.33k | if (timeout < 0) { |
275 | | // If timeout is negative, we wait indefinitely to acquire the lock |
276 | 4 | result = stripe->stripe_mutex->Lock(); |
277 | 3.32k | } else { |
278 | 3.32k | result = stripe->stripe_mutex->TryLockFor(timeout); |
279 | 3.32k | } |
280 | | |
281 | 3.33k | if (!result.ok()) { |
282 | | // failed to acquire mutex |
283 | 0 | return result; |
284 | 0 | } |
285 | | |
286 | | // Acquire lock if we are able to |
287 | 3.33k | uint64_t expire_time_hint = 0; |
288 | 3.33k | result = |
289 | 3.33k | AcquireLocked(lock_map, stripe, key, env, lock_info, &expire_time_hint); |
290 | | |
291 | 3.33k | if (!result.ok() && timeout != 0) { |
292 | | // If we weren't able to acquire the lock, we will keep retrying as long |
293 | | // as the timeout allows. |
294 | 51 | bool timed_out = false; |
295 | 51 | do { |
296 | | // Decide how long to wait |
297 | 51 | int64_t cv_end_time = -1; |
298 | | |
299 | | // Check if held lock's expiration time is sooner than our timeout |
300 | 51 | if (expire_time_hint > 0 && |
301 | 4 | (timeout < 0 || (timeout > 0 && expire_time_hint < end_time))) { |
302 | | // expiration time is sooner than our timeout |
303 | 3 | cv_end_time = expire_time_hint; |
304 | 48 | } else if (timeout >= 0) { |
305 | 48 | cv_end_time = end_time; |
306 | 48 | } |
307 | | |
308 | 51 | if (cv_end_time < 0) { |
309 | | // Wait indefinitely |
310 | 0 | result = stripe->stripe_cv->Wait(stripe->stripe_mutex); |
311 | 51 | } else { |
312 | 51 | uint64_t now = env->NowMicros(); |
313 | 51 | if (static_cast<uint64_t>(cv_end_time) > now) { |
314 | 51 | result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex, |
315 | 51 | cv_end_time - now); |
316 | 51 | } |
317 | 51 | } |
318 | | |
319 | 51 | if (result.IsTimedOut()) { |
320 | 51 | timed_out = true; |
321 | | // Even though we timed out, we will still make one more attempt to |
322 | | // acquire lock below (it is possible the lock expired and we |
323 | | // were never signaled). |
324 | 51 | } |
325 | | |
326 | 51 | if (result.ok() || result.IsTimedOut()) { |
327 | 51 | result = AcquireLocked(lock_map, stripe, key, env, lock_info, |
328 | 51 | &expire_time_hint); |
329 | 51 | } |
330 | 51 | } while (!result.ok() && !timed_out); |
331 | 51 | } |
332 | | |
333 | 3.33k | stripe->stripe_mutex->UnLock(); |
334 | | |
335 | 3.33k | return result; |
336 | 3.33k | } |
337 | | |
338 | | // Try to lock this key after we have acquired the mutex. |
339 | | // Sets *expire_time to the expiration time in microseconds |
340 | | // or 0 if no expiration. |
341 | | // REQUIRED: Stripe mutex must be held. |
342 | | Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, |
343 | | LockMapStripe* stripe, |
344 | | const std::string& key, Env* env, |
345 | | const LockInfo& txn_lock_info, |
346 | 3.38k | uint64_t* expire_time) { |
347 | 3.38k | Status result; |
348 | | // Check if this key is already locked |
349 | 3.38k | if (stripe->keys.find(key) != stripe->keys.end()) { |
350 | | // Lock already held |
351 | | |
352 | 124 | LockInfo& lock_info = stripe->keys.at(key); |
353 | 124 | if (lock_info.txn_id != txn_lock_info.txn_id) { |
354 | | // locked by another txn. Check if it's expired |
355 | 124 | if (IsLockExpired(lock_info, env, expire_time)) { |
356 | | // lock is expired, can steal it |
357 | 4 | lock_info.txn_id = txn_lock_info.txn_id; |
358 | 4 | lock_info.expiration_time = txn_lock_info.expiration_time; |
359 | | // lock_cnt does not change |
360 | 120 | } else { |
361 | 120 | result = STATUS(TimedOut, TimeoutError(TimeoutCode::kLock)); |
362 | 120 | } |
363 | 124 | } |
364 | 3.25k | } else { // Lock not held. |
365 | | // Check lock limit |
366 | 3.25k | if (max_num_locks_ > 0 && |
367 | 11 | lock_map->lock_cnt.load(std::memory_order_acquire) >= max_num_locks_) { |
368 | 5 | result = STATUS(Busy, TimeoutError(TimeoutCode::kLockLimit)); |
369 | 3.25k | } else { |
370 | | // acquire lock |
371 | 3.25k | stripe->keys.insert({key, txn_lock_info}); |
372 | | |
373 | | // Maintain lock count if there is a limit on the number of locks |
374 | 3.25k | if (max_num_locks_) { |
375 | 3.25k | lock_map->lock_cnt++; |
376 | 3.25k | } |
377 | 3.25k | } |
378 | 3.25k | } |
379 | | |
380 | 3.38k | return result; |
381 | 3.38k | } |
382 | | |
383 | | void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, |
384 | 31 | const std::string& key, Env* env) { |
385 | 31 | std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); |
386 | 31 | LockMap* lock_map = lock_map_ptr.get(); |
387 | 31 | if (lock_map == nullptr) { |
388 | | // Column Family must have been dropped. |
389 | 0 | return; |
390 | 0 | } |
391 | | |
392 | | // Lock the mutex for the stripe that this key hashes to |
393 | 31 | size_t stripe_num = lock_map->GetStripe(key); |
394 | 31 | assert(lock_map->lock_map_stripes_.size() > stripe_num); |
395 | 31 | LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); |
396 | | |
397 | 31 | TransactionID txn_id = txn->GetTxnID(); |
398 | | |
399 | 31 | CHECK_OK(stripe->stripe_mutex->Lock()); |
400 | | |
401 | 31 | const auto& iter = stripe->keys.find(key); |
402 | 31 | if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) { |
403 | | // Found the key we locked. unlock it. |
404 | 31 | stripe->keys.erase(iter); |
405 | 31 | if (max_num_locks_ > 0) { |
406 | | // Maintain lock count if there is a limit on the number of locks. |
407 | 0 | assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); |
408 | 0 | lock_map->lock_cnt--; |
409 | 0 | } |
410 | 0 | } else { |
411 | | // This key is either not locked or locked by someone else. This should |
412 | | // only happen if the unlocking transaction has expired. |
413 | 0 | assert(txn->GetExpirationTime() > 0 && |
414 | 0 | txn->GetExpirationTime() < env->NowMicros()); |
415 | 0 | } |
416 | | |
417 | 31 | stripe->stripe_mutex->UnLock(); |
418 | | |
419 | | // Signal waiting threads to retry locking |
420 | 31 | stripe->stripe_cv->NotifyAll(); |
421 | 31 | } |
422 | | |
423 | | void TransactionLockMgr::UnLock(const TransactionImpl* txn, |
424 | 6.37k | const TransactionKeyMap* key_map, Env* env) { |
425 | 6.37k | TransactionID txn_id = txn->GetTxnID(); |
426 | | |
427 | 3.17k | for (auto& key_map_iter : *key_map) { |
428 | 3.17k | uint32_t column_family_id = key_map_iter.first; |
429 | 3.17k | auto& keys = key_map_iter.second; |
430 | | |
431 | 3.17k | std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); |
432 | 3.17k | LockMap* lock_map = lock_map_ptr.get(); |
433 | | |
434 | 3.17k | if (lock_map == nullptr) { |
435 | | // Column Family must have been dropped. |
436 | 0 | return; |
437 | 0 | } |
438 | | |
439 | | // Bucket keys by lock_map_ stripe |
440 | 3.17k | std::unordered_map<size_t, std::vector<const std::string*>> keys_by_stripe( |
441 | 3.17k | std::max(keys.size(), lock_map->num_stripes_)); |
442 | | |
443 | 3.22k | for (auto& key_iter : keys) { |
444 | 3.22k | const std::string& key = key_iter.first; |
445 | | |
446 | 3.22k | size_t stripe_num = lock_map->GetStripe(key); |
447 | 3.22k | keys_by_stripe[stripe_num].push_back(&key); |
448 | 3.22k | } |
449 | | |
450 | | // For each stripe, grab the stripe mutex and unlock all keys in this stripe |
451 | 3.22k | for (auto& stripe_iter : keys_by_stripe) { |
452 | 3.22k | size_t stripe_num = stripe_iter.first; |
453 | 3.22k | auto& stripe_keys = stripe_iter.second; |
454 | | |
455 | 3.22k | assert(lock_map->lock_map_stripes_.size() > stripe_num); |
456 | 3.22k | LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); |
457 | | |
458 | 3.22k | CHECK_OK(stripe->stripe_mutex->Lock()); |
459 | | |
460 | 3.22k | for (const std::string* key : stripe_keys) { |
461 | 3.22k | const auto& iter = stripe->keys.find(*key); |
462 | 3.22k | if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) { |
463 | | // Found the key we locked. unlock it. |
464 | 3.22k | stripe->keys.erase(iter); |
465 | 3.22k | if (max_num_locks_ > 0) { |
466 | | // Maintain lock count if there is a limit on the number of locks. |
467 | 6 | assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); |
468 | 6 | lock_map->lock_cnt--; |
469 | 6 | } |
470 | 4 | } else { |
471 | | // This key is either not locked or locked by someone else. This |
472 | | // should only |
473 | | // happen if the unlocking transaction has expired. |
474 | 4 | assert(txn->GetExpirationTime() > 0 && |
475 | 4 | txn->GetExpirationTime() < env->NowMicros()); |
476 | 4 | } |
477 | 3.22k | } |
478 | | |
479 | 3.22k | stripe->stripe_mutex->UnLock(); |
480 | | |
481 | | // Signal waiting threads to retry locking |
482 | 3.22k | stripe->stripe_cv->NotifyAll(); |
483 | 3.22k | } |
484 | 3.17k | } |
485 | 6.37k | } |
486 | | |
487 | | } // namespace rocksdb |
488 | | #endif // ROCKSDB_LITE |