YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/transactions/transaction_lock_mgr.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#ifndef ROCKSDB_LITE
22
23
#ifndef __STDC_FORMAT_MACROS
24
#define __STDC_FORMAT_MACROS
25
#endif
26
27
#include "yb/rocksdb/utilities/transactions/transaction_lock_mgr.h"
28
29
#include <inttypes.h>
30
31
#include <algorithm>
32
#include <condition_variable>
33
#include <functional>
34
#include <mutex>
35
#include <string>
36
#include <vector>
37
38
#include "yb/rocksdb/utilities/transaction_db_mutex.h"
39
#include "yb/rocksdb/util/autovector.h"
40
#include "yb/rocksdb/util/murmurhash.h"
41
#include "yb/rocksdb/util/thread_local.h"
42
#include "yb/rocksdb/util/timeout_error.h"
43
#include "yb/rocksdb/utilities/transactions/transaction_db_impl.h"
44
45
#include "yb/util/slice.h"
46
#include "yb/util/status_log.h"
47
48
namespace rocksdb {
49
50
struct LockInfo {
51
  TransactionID txn_id;
52
53
  // Transaction locks are not valid after this time in us
54
  uint64_t expiration_time;
55
56
  LockInfo(TransactionID id, uint64_t time)
57
3.33k
      : txn_id(id), expiration_time(time) {}
58
  LockInfo(const LockInfo& lock_info)
59
6.50k
      : txn_id(lock_info.txn_id), expiration_time(lock_info.expiration_time) {}
60
};
61
62
struct LockMapStripe {
63
912
  explicit LockMapStripe(std::shared_ptr<TransactionDBMutexFactory> factory) {
64
912
    stripe_mutex = factory->AllocateMutex();
65
912
    stripe_cv = factory->AllocateCondVar();
66
912
    assert(stripe_mutex);
67
912
    assert(stripe_cv);
68
912
  }
69
70
  // Mutex must be held before modifying keys map
71
  std::shared_ptr<TransactionDBMutex> stripe_mutex;
72
73
  // Condition Variable per stripe for waiting on a lock
74
  std::shared_ptr<TransactionDBCondVar> stripe_cv;
75
76
  // Locked keys mapped to the info about the transactions that locked them.
77
  // TODO(agiardullo): Explore performance of other data structures.
78
  std::unordered_map<std::string, LockInfo> keys;
79
};
80
81
// Map of #num_stripes LockMapStripes
82
struct LockMap {
83
  explicit LockMap(size_t num_stripes,
84
                   std::shared_ptr<TransactionDBMutexFactory> factory)
85
57
      : num_stripes_(num_stripes) {
86
57
    lock_map_stripes_.reserve(num_stripes);
87
969
    for (size_t i = 0; i < num_stripes; i++) {
88
912
      LockMapStripe* stripe = new LockMapStripe(factory);
89
912
      lock_map_stripes_.push_back(stripe);
90
912
    }
91
57
  }
92
93
57
  ~LockMap() {
94
912
    for (auto stripe : lock_map_stripes_) {
95
912
      delete stripe;
96
912
    }
97
57
  }
98
99
  // Number of sepearate LockMapStripes to create, each with their own Mutex
100
  const size_t num_stripes_;
101
102
  // Count of keys that are currently locked in this column family.
103
  // (Only maintained if TransactionLockMgr::max_num_locks_ is positive.)
104
  std::atomic<int64_t> lock_cnt{0};
105
106
  std::vector<LockMapStripe*> lock_map_stripes_;
107
108
  size_t GetStripe(const std::string& key) const;
109
};
110
111
namespace {
112
37
void UnrefLockMapsCache(void* ptr) {
113
  // Called when a thread exits or a ThreadLocalPtr gets destroyed.
114
37
  auto lock_maps_cache =
115
37
      static_cast<std::unordered_map<uint32_t, std::shared_ptr<LockMap>>*>(ptr);
116
37
  delete lock_maps_cache;
117
37
}
118
}  // anonymous namespace
119
120
TransactionLockMgr::TransactionLockMgr(
121
    TransactionDB* txn_db, size_t default_num_stripes, int64_t max_num_locks,
122
    std::shared_ptr<TransactionDBMutexFactory> mutex_factory)
123
    : txn_db_impl_(nullptr),
124
      default_num_stripes_(default_num_stripes),
125
      max_num_locks_(max_num_locks),
126
      mutex_factory_(mutex_factory),
127
47
      lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)) {
128
47
  txn_db_impl_ = dynamic_cast<TransactionDBImpl*>(txn_db);
129
47
  assert(txn_db_impl_);
130
47
}
131
132
47
TransactionLockMgr::~TransactionLockMgr() {}
133
134
6.59k
size_t LockMap::GetStripe(const std::string& key) const {
135
6.59k
  assert(num_stripes_ > 0);
136
6.59k
  static murmur_hash hash;
137
6.59k
  size_t stripe = hash(key) % num_stripes_;
138
6.59k
  return stripe;
139
6.59k
}
140
141
57
void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) {
142
57
  InstrumentedMutexLock l(&lock_map_mutex_);
143
144
57
  if (lock_maps_.find(column_family_id) == lock_maps_.end()) {
145
57
    lock_maps_.emplace(column_family_id,
146
57
                       std::shared_ptr<LockMap>(
147
57
                           new LockMap(default_num_stripes_, mutex_factory_)));
148
0
  } else {
149
    // column_family already exists in lock map
150
0
    assert(false);
151
0
  }
152
57
}
153
154
4
void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) {
155
  // Remove lock_map for this column family.  Since the lock map is stored
156
  // as a shared ptr, concurrent transactions can still keep keep using it
157
  // until they release their reference to it.
158
4
  {
159
4
    InstrumentedMutexLock l(&lock_map_mutex_);
160
161
4
    auto lock_maps_iter = lock_maps_.find(column_family_id);
162
4
    assert(lock_maps_iter != lock_maps_.end());
163
164
4
    lock_maps_.erase(lock_maps_iter);
165
4
  }  // lock_map_mutex_
166
167
  // Clear all thread-local caches
168
4
  autovector<void*> local_caches;
169
4
  lock_maps_cache_->Scrape(&local_caches, nullptr);
170
3
  for (auto cache : local_caches) {
171
3
    delete static_cast<LockMaps*>(cache);
172
3
  }
173
4
}
174
175
// Look up the LockMap shared_ptr for a given column_family_id.
176
// Note:  The LockMap is only valid as long as the caller is still holding on
177
//   to the returned shared_ptr.
178
std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap(
179
6.54k
    uint32_t column_family_id) {
180
  // First check thread-local cache
181
6.54k
  if (lock_maps_cache_->Get() == nullptr) {
182
40
    lock_maps_cache_->Reset(new LockMaps());
183
40
  }
184
185
6.54k
  auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get());
186
187
6.54k
  auto lock_map_iter = lock_maps_cache->find(column_family_id);
188
6.54k
  if (lock_map_iter != lock_maps_cache->end()) {
189
    // Found lock map for this column family.
190
6.49k
    return lock_map_iter->second;
191
6.49k
  }
192
193
  // Not found in local cache, grab mutex and check shared LockMaps
194
46
  InstrumentedMutexLock l(&lock_map_mutex_);
195
196
46
  lock_map_iter = lock_maps_.find(column_family_id);
197
46
  if (lock_map_iter == lock_maps_.end()) {
198
1
    return std::shared_ptr<LockMap>(nullptr);
199
45
  } else {
200
    // Found lock map.  Store in thread-local cache and return.
201
45
    std::shared_ptr<LockMap>& lock_map = lock_map_iter->second;
202
45
    lock_maps_cache->insert({column_family_id, lock_map});
203
204
45
    return lock_map;
205
45
  }
206
46
}
207
208
// Returns true if this lock has expired and can be acquired by another
209
// transaction.
210
// If false, sets *expire_time to the expiration time of the lock according
211
// to Env->GetMicros() or 0 if no expiration.
212
bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env,
213
124
                                       uint64_t* expire_time) {
214
124
  auto now = env->NowMicros();
215
216
124
  bool expired =
217
124
      (lock_info.expiration_time > 0 && lock_info.expiration_time <= now);
218
219
124
  if (!expired && lock_info.expiration_time > 0) {
220
    // return how many microseconds until lock will be expired
221
5
    *expire_time = lock_info.expiration_time;
222
119
  } else {
223
119
    bool success =
224
119
        txn_db_impl_->TryStealingExpiredTransactionLocks(lock_info.txn_id);
225
119
    if (!success) {
226
1
      expired = false;
227
1
    }
228
119
    *expire_time = 0;
229
119
  }
230
231
124
  return expired;
232
124
}
233
234
Status TransactionLockMgr::TryLock(const TransactionImpl* txn,
235
                                   uint32_t column_family_id,
236
3.33k
                                   const std::string& key, Env* env) {
237
  // Lookup lock map for this column family id
238
3.33k
  std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
239
3.33k
  LockMap* lock_map = lock_map_ptr.get();
240
3.33k
  if (lock_map == nullptr) {
241
1
    char msg[255];
242
1
    snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32,
243
1
             column_family_id);
244
245
1
    return STATUS(InvalidArgument, msg);
246
1
  }
247
248
  // Need to lock the mutex for the stripe that this key hashes to
249
3.33k
  size_t stripe_num = lock_map->GetStripe(key);
250
3.33k
  assert(lock_map->lock_map_stripes_.size() > stripe_num);
251
3.33k
  LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
252
253
3.33k
  LockInfo lock_info(txn->GetTxnID(), txn->GetExpirationTime());
254
3.33k
  int64_t timeout = txn->GetLockTimeout();
255
256
3.33k
  return AcquireWithTimeout(lock_map, stripe, key, env, timeout, lock_info);
257
3.33k
}
258
259
// Helper function for TryLock().
260
Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map,
261
                                              LockMapStripe* stripe,
262
                                              const std::string& key, Env* env,
263
                                              int64_t timeout,
264
3.33k
                                              const LockInfo& lock_info) {
265
3.33k
  Status result;
266
3.33k
  uint64_t start_time = 0;
267
3.33k
  uint64_t end_time = 0;
268
269
3.33k
  if (timeout > 0) {
270
88
    start_time = env->NowMicros();
271
88
    end_time = start_time + timeout;
272
88
  }
273
274
3.33k
  if (timeout < 0) {
275
    // If timeout is negative, we wait indefinitely to acquire the lock
276
4
    result = stripe->stripe_mutex->Lock();
277
3.32k
  } else {
278
3.32k
    result = stripe->stripe_mutex->TryLockFor(timeout);
279
3.32k
  }
280
281
3.33k
  if (!result.ok()) {
282
    // failed to acquire mutex
283
0
    return result;
284
0
  }
285
286
  // Acquire lock if we are able to
287
3.33k
  uint64_t expire_time_hint = 0;
288
3.33k
  result =
289
3.33k
      AcquireLocked(lock_map, stripe, key, env, lock_info, &expire_time_hint);
290
291
3.33k
  if (!result.ok() && timeout != 0) {
292
    // If we weren't able to acquire the lock, we will keep retrying as long
293
    // as the timeout allows.
294
51
    bool timed_out = false;
295
51
    do {
296
      // Decide how long to wait
297
51
      int64_t cv_end_time = -1;
298
299
      // Check if held lock's expiration time is sooner than our timeout
300
51
      if (expire_time_hint > 0 &&
301
4
          (timeout < 0 || (timeout > 0 && expire_time_hint < end_time))) {
302
        // expiration time is sooner than our timeout
303
3
        cv_end_time = expire_time_hint;
304
48
      } else if (timeout >= 0) {
305
48
        cv_end_time = end_time;
306
48
      }
307
308
51
      if (cv_end_time < 0) {
309
        // Wait indefinitely
310
0
        result = stripe->stripe_cv->Wait(stripe->stripe_mutex);
311
51
      } else {
312
51
        uint64_t now = env->NowMicros();
313
51
        if (static_cast<uint64_t>(cv_end_time) > now) {
314
51
          result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex,
315
51
                                              cv_end_time - now);
316
51
        }
317
51
      }
318
319
51
      if (result.IsTimedOut()) {
320
51
          timed_out = true;
321
          // Even though we timed out, we will still make one more attempt to
322
          // acquire lock below (it is possible the lock expired and we
323
          // were never signaled).
324
51
      }
325
326
51
      if (result.ok() || result.IsTimedOut()) {
327
51
        result = AcquireLocked(lock_map, stripe, key, env, lock_info,
328
51
                               &expire_time_hint);
329
51
      }
330
51
    } while (!result.ok() && !timed_out);
331
51
  }
332
333
3.33k
  stripe->stripe_mutex->UnLock();
334
335
3.33k
  return result;
336
3.33k
}
337
338
// Try to lock this key after we have acquired the mutex.
339
// Sets *expire_time to the expiration time in microseconds
340
//  or 0 if no expiration.
341
// REQUIRED:  Stripe mutex must be held.
342
Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
343
                                         LockMapStripe* stripe,
344
                                         const std::string& key, Env* env,
345
                                         const LockInfo& txn_lock_info,
346
3.38k
                                         uint64_t* expire_time) {
347
3.38k
  Status result;
348
  // Check if this key is already locked
349
3.38k
  if (stripe->keys.find(key) != stripe->keys.end()) {
350
    // Lock already held
351
352
124
    LockInfo& lock_info = stripe->keys.at(key);
353
124
    if (lock_info.txn_id != txn_lock_info.txn_id) {
354
      // locked by another txn.  Check if it's expired
355
124
      if (IsLockExpired(lock_info, env, expire_time)) {
356
        // lock is expired, can steal it
357
4
        lock_info.txn_id = txn_lock_info.txn_id;
358
4
        lock_info.expiration_time = txn_lock_info.expiration_time;
359
        // lock_cnt does not change
360
120
      } else {
361
120
        result = STATUS(TimedOut, TimeoutError(TimeoutCode::kLock));
362
120
      }
363
124
    }
364
3.25k
  } else {  // Lock not held.
365
    // Check lock limit
366
3.25k
    if (max_num_locks_ > 0 &&
367
11
        lock_map->lock_cnt.load(std::memory_order_acquire) >= max_num_locks_) {
368
5
      result = STATUS(Busy, TimeoutError(TimeoutCode::kLockLimit));
369
3.25k
    } else {
370
      // acquire lock
371
3.25k
      stripe->keys.insert({key, txn_lock_info});
372
373
      // Maintain lock count if there is a limit on the number of locks
374
3.25k
      if (max_num_locks_) {
375
3.25k
        lock_map->lock_cnt++;
376
3.25k
      }
377
3.25k
    }
378
3.25k
  }
379
380
3.38k
  return result;
381
3.38k
}
382
383
void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id,
384
31
                                const std::string& key, Env* env) {
385
31
  std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
386
31
  LockMap* lock_map = lock_map_ptr.get();
387
31
  if (lock_map == nullptr) {
388
    // Column Family must have been dropped.
389
0
    return;
390
0
  }
391
392
  // Lock the mutex for the stripe that this key hashes to
393
31
  size_t stripe_num = lock_map->GetStripe(key);
394
31
  assert(lock_map->lock_map_stripes_.size() > stripe_num);
395
31
  LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
396
397
31
  TransactionID txn_id = txn->GetTxnID();
398
399
31
  CHECK_OK(stripe->stripe_mutex->Lock());
400
401
31
  const auto& iter = stripe->keys.find(key);
402
31
  if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) {
403
    // Found the key we locked.  unlock it.
404
31
    stripe->keys.erase(iter);
405
31
    if (max_num_locks_ > 0) {
406
      // Maintain lock count if there is a limit on the number of locks.
407
0
      assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);
408
0
      lock_map->lock_cnt--;
409
0
    }
410
0
  } else {
411
    // This key is either not locked or locked by someone else.  This should
412
    // only happen if the unlocking transaction has expired.
413
0
    assert(txn->GetExpirationTime() > 0 &&
414
0
           txn->GetExpirationTime() < env->NowMicros());
415
0
  }
416
417
31
  stripe->stripe_mutex->UnLock();
418
419
  // Signal waiting threads to retry locking
420
31
  stripe->stripe_cv->NotifyAll();
421
31
}
422
423
void TransactionLockMgr::UnLock(const TransactionImpl* txn,
424
6.37k
                                const TransactionKeyMap* key_map, Env* env) {
425
6.37k
  TransactionID txn_id = txn->GetTxnID();
426
427
3.17k
  for (auto& key_map_iter : *key_map) {
428
3.17k
    uint32_t column_family_id = key_map_iter.first;
429
3.17k
    auto& keys = key_map_iter.second;
430
431
3.17k
    std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
432
3.17k
    LockMap* lock_map = lock_map_ptr.get();
433
434
3.17k
    if (lock_map == nullptr) {
435
      // Column Family must have been dropped.
436
0
      return;
437
0
    }
438
439
    // Bucket keys by lock_map_ stripe
440
3.17k
    std::unordered_map<size_t, std::vector<const std::string*>> keys_by_stripe(
441
3.17k
        std::max(keys.size(), lock_map->num_stripes_));
442
443
3.22k
    for (auto& key_iter : keys) {
444
3.22k
      const std::string& key = key_iter.first;
445
446
3.22k
      size_t stripe_num = lock_map->GetStripe(key);
447
3.22k
      keys_by_stripe[stripe_num].push_back(&key);
448
3.22k
    }
449
450
    // For each stripe, grab the stripe mutex and unlock all keys in this stripe
451
3.22k
    for (auto& stripe_iter : keys_by_stripe) {
452
3.22k
      size_t stripe_num = stripe_iter.first;
453
3.22k
      auto& stripe_keys = stripe_iter.second;
454
455
3.22k
      assert(lock_map->lock_map_stripes_.size() > stripe_num);
456
3.22k
      LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
457
458
3.22k
      CHECK_OK(stripe->stripe_mutex->Lock());
459
460
3.22k
      for (const std::string* key : stripe_keys) {
461
3.22k
        const auto& iter = stripe->keys.find(*key);
462
3.22k
        if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) {
463
          // Found the key we locked.  unlock it.
464
3.22k
          stripe->keys.erase(iter);
465
3.22k
          if (max_num_locks_ > 0) {
466
            // Maintain lock count if there is a limit on the number of locks.
467
6
            assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);
468
6
            lock_map->lock_cnt--;
469
6
          }
470
4
        } else {
471
          // This key is either not locked or locked by someone else.  This
472
          // should only
473
          // happen if the unlocking transaction has expired.
474
4
          assert(txn->GetExpirationTime() > 0 &&
475
4
                 txn->GetExpirationTime() < env->NowMicros());
476
4
        }
477
3.22k
      }
478
479
3.22k
      stripe->stripe_mutex->UnLock();
480
481
      // Signal waiting threads to retry locking
482
3.22k
      stripe->stripe_cv->NotifyAll();
483
3.22k
    }
484
3.17k
  }
485
6.37k
}
486
487
}  //  namespace rocksdb
488
#endif  // ROCKSDB_LITE