YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/concurrent_arena.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#ifndef YB_ROCKSDB_UTIL_CONCURRENT_ARENA_H
25
#define YB_ROCKSDB_UTIL_CONCURRENT_ARENA_H
26
27
#pragma once
28
29
#include <atomic>
30
#include <mutex>
31
32
#include "yb/rocksdb/util/allocator.h"
33
#include "yb/rocksdb/util/arena.h"
34
#include "yb/rocksdb/util/mutexlock.h"
35
36
// Only generate field unused warning for padding array, or build under
37
// GCC 4.8.1 will fail.
38
#ifdef __clang__
39
#define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
40
#else
41
#define ROCKSDB_FIELD_UNUSED
42
#endif  // __clang__
43
44
namespace yb {
45
46
class MemTracker;
47
48
}
49
50
namespace rocksdb {
51
52
class Logger;
53
54
// ConcurrentArena wraps an Arena.  It makes it thread safe using a fast
55
// inlined spinlock, and adds small per-core allocation caches to avoid
56
// contention for small allocations.  To avoid any memory waste from the
57
// per-core shards, they are kept small, they are lazily instantiated
58
// only if ConcurrentArena actually notices concurrent use, and they
59
// adjust their size so that there is no fragmentation waste when the
60
// shard blocks are allocated from the underlying main arena.
61
class ConcurrentArena : public Allocator {
62
 public:
63
  // block_size and huge_page_size are the same as for Arena (and are
64
  // in fact just passed to the constructor of arena_.  The core-local
65
  // shards compute their shard_block_size as a fraction of block_size
66
  // that varies according to the hardware concurrency level.
67
  explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
68
                           size_t huge_page_size = 0);
69
70
291k
  char* Allocate(size_t bytes) override {
71
291k
    return AllocateImpl(bytes, false /*force_arena*/,
72
291k
                        [=]() { return arena_.Allocate(bytes); });
73
291k
  }
74
75
  char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,
76
401M
                        Logger* logger = nullptr) override {
77
401M
    size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1;
78
401M
    assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) &&
79
401M
           (rounded_up % sizeof(void*)) == 0);
80
81
400M
    return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/, [=]() {
82
400M
      return arena_.AllocateAligned(rounded_up, huge_page_size, logger);
83
400M
    });
84
401M
  }
85
86
117k
  size_t ApproximateMemoryUsage() const {
87
117k
    std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
88
117k
    if (index_mask_ != 0) {
89
117k
      lock.lock();
90
117k
    }
91
117k
    return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
92
117k
  }
93
94
181M
  size_t MemoryAllocatedBytes() const {
95
181M
    return memory_allocated_bytes_.load(std::memory_order_relaxed);
96
181M
  }
97
98
861k
  size_t AllocatedAndUnused() const {
99
861k
    return arena_allocated_and_unused_.load(std::memory_order_relaxed) +
100
861k
           ShardAllocatedAndUnused();
101
861k
  }
102
103
0
  size_t IrregularBlockNum() const {
104
0
    return irregular_block_num_.load(std::memory_order_relaxed);
105
0
  }
106
107
1.30k
  size_t BlockSize() const override { return arena_.BlockSize(); }
108
109
  void SetMemTracker(std::shared_ptr<yb::MemTracker> mem_tracker);
110
111
 private:
112
  struct Shard {
113
    char padding[40] ROCKSDB_FIELD_UNUSED;
114
    mutable SpinMutex mutex;
115
    char* free_begin_;
116
    std::atomic<size_t> allocated_and_unused_;
117
118
7.63M
    Shard() : allocated_and_unused_(0) {}
119
  };
120
121
#if ROCKSDB_SUPPORT_THREAD_LOCAL
122
  static __thread uint32_t tls_cpuid;
123
#else
124
  enum ZeroFirstEnum : uint32_t { tls_cpuid = 0 };
125
#endif
126
127
  char padding0[56] ROCKSDB_FIELD_UNUSED;
128
129
  size_t shard_block_size_;
130
131
  // shards_[i & index_mask_] is valid
132
  size_t index_mask_;
133
  std::unique_ptr<Shard[]> shards_;
134
135
  Arena arena_;
136
  mutable SpinMutex arena_mutex_;
137
  std::atomic<size_t> arena_allocated_and_unused_;
138
  std::atomic<size_t> memory_allocated_bytes_;
139
  std::atomic<size_t> irregular_block_num_;
140
141
  char padding1[56] ROCKSDB_FIELD_UNUSED;
142
143
  Shard* Repick();
144
145
978k
  size_t ShardAllocatedAndUnused() const {
146
978k
    size_t total = 0;
147
16.6M
    for (size_t i = 0; i <= index_mask_; 
++i15.6M
) {
148
15.6M
      total += shards_[i].allocated_and_unused_.load(std::memory_order_relaxed);
149
15.6M
    }
150
978k
    return total;
151
978k
  }
152
153
  template <typename Func>
154
401M
  char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
155
401M
    uint32_t cpu;
156
157
    // Go directly to the arena if the allocation is too large, or if
158
    // we've never needed to Repick() and the arena mutex is available
159
    // with no waiting.  This keeps the fragmentation penalty of
160
    // concurrency zero unless it might actually confer an advantage.
161
401M
    std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
162
401M
    if (bytes > shard_block_size_ / 4 || 
force_arena400M
||
163
401M
        
(400M
(cpu = tls_cpuid) == 0400M
&&
164
400M
         
!shards_[0].allocated_and_unused_.load(std::memory_order_relaxed)400M
&&
165
400M
         
arena_lock.try_lock()399M
)) {
166
400M
      if (!arena_lock.owns_lock()) {
167
1.28M
        arena_lock.lock();
168
1.28M
      }
169
400M
      auto rv = func();
170
400M
      Fixup();
171
400M
      return rv;
172
400M
    }
173
174
    // pick a shard from which to allocate
175
781k
    Shard* s = &shards_[cpu & index_mask_];
176
781k
    if (!s->mutex.try_lock()) {
177
7.24k
      s = Repick();
178
7.24k
      s->mutex.lock();
179
7.24k
    }
180
781k
    std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
181
182
781k
    size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
183
781k
    if (avail < bytes) {
184
      // reload
185
74.9k
      std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
186
187
      // If the arena's current block is within a factor of 2 of the right
188
      // size, we adjust our request to avoid arena waste.
189
74.9k
      auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
190
74.9k
      assert(exact == arena_.AllocatedAndUnused());
191
74.9k
      avail = exact >= shard_block_size_ / 2 && 
exact < shard_block_size_ * 265.5k
192
74.9k
                  ? 
exact9.37k
193
74.9k
                  : 
shard_block_size_65.6k
;
194
74.9k
      s->free_begin_ = arena_.AllocateAligned(avail);
195
74.9k
      Fixup();
196
74.9k
    }
197
0
    s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
198
199
781k
    char* rv;
200
781k
    if ((bytes % sizeof(void*)) == 0) {
201
      // aligned allocation from the beginning
202
525k
      rv = s->free_begin_;
203
525k
      s->free_begin_ += bytes;
204
525k
    } else {
205
      // unaligned from the end
206
256k
      rv = s->free_begin_ + avail - bytes;
207
256k
    }
208
781k
    return rv;
209
401M
  }
char* rocksdb::ConcurrentArena::AllocateImpl<rocksdb::ConcurrentArena::Allocate(unsigned long)::'lambda'()>(unsigned long, bool, rocksdb::ConcurrentArena::Allocate(unsigned long)::'lambda'() const&)
Line
Count
Source
154
291k
  char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
155
291k
    uint32_t cpu;
156
157
    // Go directly to the arena if the allocation is too large, or if
158
    // we've never needed to Repick() and the arena mutex is available
159
    // with no waiting.  This keeps the fragmentation penalty of
160
    // concurrency zero unless it might actually confer an advantage.
161
291k
    std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
162
291k
    if (bytes > shard_block_size_ / 4 || 
force_arena291k
||
163
291k
        
(291k
(cpu = tls_cpuid) == 0291k
&&
164
291k
         !shards_[0].allocated_and_unused_.load(std::memory_order_relaxed) &&
165
291k
         
arena_lock.try_lock()291k
)) {
166
291k
      if (!arena_lock.owns_lock()) {
167
364
        arena_lock.lock();
168
364
      }
169
291k
      auto rv = func();
170
291k
      Fixup();
171
291k
      return rv;
172
291k
    }
173
174
    // pick a shard from which to allocate
175
0
    Shard* s = &shards_[cpu & index_mask_];
176
0
    if (!s->mutex.try_lock()) {
177
0
      s = Repick();
178
0
      s->mutex.lock();
179
0
    }
180
0
    std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
181
182
0
    size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
183
0
    if (avail < bytes) {
184
      // reload
185
0
      std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
186
187
      // If the arena's current block is within a factor of 2 of the right
188
      // size, we adjust our request to avoid arena waste.
189
0
      auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
190
0
      assert(exact == arena_.AllocatedAndUnused());
191
0
      avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
192
0
                  ? exact
193
0
                  : shard_block_size_;
194
0
      s->free_begin_ = arena_.AllocateAligned(avail);
195
0
      Fixup();
196
0
    }
197
0
    s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
198
199
0
    char* rv;
200
0
    if ((bytes % sizeof(void*)) == 0) {
201
      // aligned allocation from the beginning
202
0
      rv = s->free_begin_;
203
0
      s->free_begin_ += bytes;
204
0
    } else {
205
      // unaligned from the end
206
0
      rv = s->free_begin_ + avail - bytes;
207
0
    }
208
0
    return rv;
209
291k
  }
char* rocksdb::ConcurrentArena::AllocateImpl<rocksdb::ConcurrentArena::AllocateAligned(unsigned long, unsigned long, rocksdb::Logger*)::'lambda'()>(unsigned long, bool, rocksdb::ConcurrentArena::AllocateAligned(unsigned long, unsigned long, rocksdb::Logger*)::'lambda'() const&)
Line
Count
Source
154
401M
  char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
155
401M
    uint32_t cpu;
156
157
    // Go directly to the arena if the allocation is too large, or if
158
    // we've never needed to Repick() and the arena mutex is available
159
    // with no waiting.  This keeps the fragmentation penalty of
160
    // concurrency zero unless it might actually confer an advantage.
161
401M
    std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
162
401M
    if (bytes > shard_block_size_ / 4 || 
force_arena399M
||
163
401M
        
(399M
(cpu = tls_cpuid) == 0399M
&&
164
399M
         
!shards_[0].allocated_and_unused_.load(std::memory_order_relaxed)399M
&&
165
400M
         
arena_lock.try_lock()399M
)) {
166
400M
      if (!arena_lock.owns_lock()) {
167
1.28M
        arena_lock.lock();
168
1.28M
      }
169
400M
      auto rv = func();
170
400M
      Fixup();
171
400M
      return rv;
172
400M
    }
173
174
    // pick a shard from which to allocate
175
781k
    Shard* s = &shards_[cpu & index_mask_];
176
781k
    if (!s->mutex.try_lock()) {
177
7.24k
      s = Repick();
178
7.24k
      s->mutex.lock();
179
7.24k
    }
180
781k
    std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
181
182
781k
    size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
183
781k
    if (avail < bytes) {
184
      // reload
185
74.9k
      std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
186
187
      // If the arena's current block is within a factor of 2 of the right
188
      // size, we adjust our request to avoid arena waste.
189
74.9k
      auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
190
74.9k
      assert(exact == arena_.AllocatedAndUnused());
191
74.9k
      avail = exact >= shard_block_size_ / 2 && 
exact < shard_block_size_ * 265.5k
192
74.9k
                  ? 
exact9.37k
193
74.9k
                  : 
shard_block_size_65.6k
;
194
74.9k
      s->free_begin_ = arena_.AllocateAligned(avail);
195
74.9k
      Fixup();
196
74.9k
    }
197
0
    s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
198
199
781k
    char* rv;
200
781k
    if ((bytes % sizeof(void*)) == 0) {
201
      // aligned allocation from the beginning
202
525k
      rv = s->free_begin_;
203
525k
      s->free_begin_ += bytes;
204
525k
    } else {
205
      // unaligned from the end
206
256k
      rv = s->free_begin_ + avail - bytes;
207
256k
    }
208
781k
    return rv;
209
401M
  }
210
211
401M
  void Fixup() {
212
401M
    arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(),
213
401M
                                      std::memory_order_relaxed);
214
401M
    memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(),
215
401M
                                  std::memory_order_relaxed);
216
401M
    irregular_block_num_.store(arena_.IrregularBlockNum(),
217
401M
                               std::memory_order_relaxed);
218
401M
  }
219
220
  ConcurrentArena(const ConcurrentArena&) = delete;
221
  ConcurrentArena& operator=(const ConcurrentArena&) = delete;
222
};
223
224
}  // namespace rocksdb
225
226
#endif // YB_ROCKSDB_UTIL_CONCURRENT_ARENA_H