/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/concurrent_arena.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #ifndef YB_ROCKSDB_UTIL_CONCURRENT_ARENA_H |
25 | | #define YB_ROCKSDB_UTIL_CONCURRENT_ARENA_H |
26 | | |
27 | | #pragma once |
28 | | |
29 | | #include <atomic> |
30 | | #include <mutex> |
31 | | |
32 | | #include "yb/rocksdb/util/allocator.h" |
33 | | #include "yb/rocksdb/util/arena.h" |
34 | | #include "yb/rocksdb/util/mutexlock.h" |
35 | | |
36 | | // Only generate field unused warning for padding array, or build under |
37 | | // GCC 4.8.1 will fail. |
38 | | #ifdef __clang__ |
39 | | #define ROCKSDB_FIELD_UNUSED __attribute__((__unused__)) |
40 | | #else |
41 | | #define ROCKSDB_FIELD_UNUSED |
42 | | #endif // __clang__ |
43 | | |
44 | | namespace yb { |
45 | | |
46 | | class MemTracker; |
47 | | |
48 | | } |
49 | | |
50 | | namespace rocksdb { |
51 | | |
52 | | class Logger; |
53 | | |
54 | | // ConcurrentArena wraps an Arena. It makes it thread safe using a fast |
55 | | // inlined spinlock, and adds small per-core allocation caches to avoid |
56 | | // contention for small allocations. To avoid any memory waste from the |
57 | | // per-core shards, they are kept small, they are lazily instantiated |
58 | | // only if ConcurrentArena actually notices concurrent use, and they |
59 | | // adjust their size so that there is no fragmentation waste when the |
60 | | // shard blocks are allocated from the underlying main arena. |
61 | | class ConcurrentArena : public Allocator { |
62 | | public: |
63 | | // block_size and huge_page_size are the same as for Arena (and are |
64 | | // in fact just passed to the constructor of arena_. The core-local |
65 | | // shards compute their shard_block_size as a fraction of block_size |
66 | | // that varies according to the hardware concurrency level. |
67 | | explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize, |
68 | | size_t huge_page_size = 0); |
69 | | |
70 | 291k | char* Allocate(size_t bytes) override { |
71 | 291k | return AllocateImpl(bytes, false /*force_arena*/, |
72 | 291k | [=]() { return arena_.Allocate(bytes); }); |
73 | 291k | } |
74 | | |
75 | | char* AllocateAligned(size_t bytes, size_t huge_page_size = 0, |
76 | 401M | Logger* logger = nullptr) override { |
77 | 401M | size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1; |
78 | 401M | assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) && |
79 | 401M | (rounded_up % sizeof(void*)) == 0); |
80 | | |
81 | 400M | return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/, [=]() { |
82 | 400M | return arena_.AllocateAligned(rounded_up, huge_page_size, logger); |
83 | 400M | }); |
84 | 401M | } |
85 | | |
86 | 117k | size_t ApproximateMemoryUsage() const { |
87 | 117k | std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock); |
88 | 117k | if (index_mask_ != 0) { |
89 | 117k | lock.lock(); |
90 | 117k | } |
91 | 117k | return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused(); |
92 | 117k | } |
93 | | |
94 | 181M | size_t MemoryAllocatedBytes() const { |
95 | 181M | return memory_allocated_bytes_.load(std::memory_order_relaxed); |
96 | 181M | } |
97 | | |
98 | 861k | size_t AllocatedAndUnused() const { |
99 | 861k | return arena_allocated_and_unused_.load(std::memory_order_relaxed) + |
100 | 861k | ShardAllocatedAndUnused(); |
101 | 861k | } |
102 | | |
103 | 0 | size_t IrregularBlockNum() const { |
104 | 0 | return irregular_block_num_.load(std::memory_order_relaxed); |
105 | 0 | } |
106 | | |
107 | 1.30k | size_t BlockSize() const override { return arena_.BlockSize(); } |
108 | | |
109 | | void SetMemTracker(std::shared_ptr<yb::MemTracker> mem_tracker); |
110 | | |
111 | | private: |
112 | | struct Shard { |
113 | | char padding[40] ROCKSDB_FIELD_UNUSED; |
114 | | mutable SpinMutex mutex; |
115 | | char* free_begin_; |
116 | | std::atomic<size_t> allocated_and_unused_; |
117 | | |
118 | 7.63M | Shard() : allocated_and_unused_(0) {} |
119 | | }; |
120 | | |
121 | | #if ROCKSDB_SUPPORT_THREAD_LOCAL |
122 | | static __thread uint32_t tls_cpuid; |
123 | | #else |
124 | | enum ZeroFirstEnum : uint32_t { tls_cpuid = 0 }; |
125 | | #endif |
126 | | |
127 | | char padding0[56] ROCKSDB_FIELD_UNUSED; |
128 | | |
129 | | size_t shard_block_size_; |
130 | | |
131 | | // shards_[i & index_mask_] is valid |
132 | | size_t index_mask_; |
133 | | std::unique_ptr<Shard[]> shards_; |
134 | | |
135 | | Arena arena_; |
136 | | mutable SpinMutex arena_mutex_; |
137 | | std::atomic<size_t> arena_allocated_and_unused_; |
138 | | std::atomic<size_t> memory_allocated_bytes_; |
139 | | std::atomic<size_t> irregular_block_num_; |
140 | | |
141 | | char padding1[56] ROCKSDB_FIELD_UNUSED; |
142 | | |
143 | | Shard* Repick(); |
144 | | |
145 | 978k | size_t ShardAllocatedAndUnused() const { |
146 | 978k | size_t total = 0; |
147 | 16.6M | for (size_t i = 0; i <= index_mask_; ++i15.6M ) { |
148 | 15.6M | total += shards_[i].allocated_and_unused_.load(std::memory_order_relaxed); |
149 | 15.6M | } |
150 | 978k | return total; |
151 | 978k | } |
152 | | |
153 | | template <typename Func> |
154 | 401M | char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) { |
155 | 401M | uint32_t cpu; |
156 | | |
157 | | // Go directly to the arena if the allocation is too large, or if |
158 | | // we've never needed to Repick() and the arena mutex is available |
159 | | // with no waiting. This keeps the fragmentation penalty of |
160 | | // concurrency zero unless it might actually confer an advantage. |
161 | 401M | std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock); |
162 | 401M | if (bytes > shard_block_size_ / 4 || force_arena400M || |
163 | 401M | (400M (cpu = tls_cpuid) == 0400M && |
164 | 400M | !shards_[0].allocated_and_unused_.load(std::memory_order_relaxed)400M && |
165 | 400M | arena_lock.try_lock()399M )) { |
166 | 400M | if (!arena_lock.owns_lock()) { |
167 | 1.28M | arena_lock.lock(); |
168 | 1.28M | } |
169 | 400M | auto rv = func(); |
170 | 400M | Fixup(); |
171 | 400M | return rv; |
172 | 400M | } |
173 | | |
174 | | // pick a shard from which to allocate |
175 | 781k | Shard* s = &shards_[cpu & index_mask_]; |
176 | 781k | if (!s->mutex.try_lock()) { |
177 | 7.24k | s = Repick(); |
178 | 7.24k | s->mutex.lock(); |
179 | 7.24k | } |
180 | 781k | std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock); |
181 | | |
182 | 781k | size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed); |
183 | 781k | if (avail < bytes) { |
184 | | // reload |
185 | 74.9k | std::lock_guard<SpinMutex> reload_lock(arena_mutex_); |
186 | | |
187 | | // If the arena's current block is within a factor of 2 of the right |
188 | | // size, we adjust our request to avoid arena waste. |
189 | 74.9k | auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed); |
190 | 74.9k | assert(exact == arena_.AllocatedAndUnused()); |
191 | 74.9k | avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 265.5k |
192 | 74.9k | ? exact9.37k |
193 | 74.9k | : shard_block_size_65.6k ; |
194 | 74.9k | s->free_begin_ = arena_.AllocateAligned(avail); |
195 | 74.9k | Fixup(); |
196 | 74.9k | } |
197 | 0 | s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed); |
198 | | |
199 | 781k | char* rv; |
200 | 781k | if ((bytes % sizeof(void*)) == 0) { |
201 | | // aligned allocation from the beginning |
202 | 525k | rv = s->free_begin_; |
203 | 525k | s->free_begin_ += bytes; |
204 | 525k | } else { |
205 | | // unaligned from the end |
206 | 256k | rv = s->free_begin_ + avail - bytes; |
207 | 256k | } |
208 | 781k | return rv; |
209 | 401M | } char* rocksdb::ConcurrentArena::AllocateImpl<rocksdb::ConcurrentArena::Allocate(unsigned long)::'lambda'()>(unsigned long, bool, rocksdb::ConcurrentArena::Allocate(unsigned long)::'lambda'() const&) Line | Count | Source | 154 | 291k | char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) { | 155 | 291k | uint32_t cpu; | 156 | | | 157 | | // Go directly to the arena if the allocation is too large, or if | 158 | | // we've never needed to Repick() and the arena mutex is available | 159 | | // with no waiting. This keeps the fragmentation penalty of | 160 | | // concurrency zero unless it might actually confer an advantage. | 161 | 291k | std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock); | 162 | 291k | if (bytes > shard_block_size_ / 4 || force_arena291k || | 163 | 291k | (291k (cpu = tls_cpuid) == 0291k && | 164 | 291k | !shards_[0].allocated_and_unused_.load(std::memory_order_relaxed) && | 165 | 291k | arena_lock.try_lock()291k )) { | 166 | 291k | if (!arena_lock.owns_lock()) { | 167 | 364 | arena_lock.lock(); | 168 | 364 | } | 169 | 291k | auto rv = func(); | 170 | 291k | Fixup(); | 171 | 291k | return rv; | 172 | 291k | } | 173 | | | 174 | | // pick a shard from which to allocate | 175 | 0 | Shard* s = &shards_[cpu & index_mask_]; | 176 | 0 | if (!s->mutex.try_lock()) { | 177 | 0 | s = Repick(); | 178 | 0 | s->mutex.lock(); | 179 | 0 | } | 180 | 0 | std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock); | 181 | |
| 182 | 0 | size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed); | 183 | 0 | if (avail < bytes) { | 184 | | // reload | 185 | 0 | std::lock_guard<SpinMutex> reload_lock(arena_mutex_); | 186 | | | 187 | | // If the arena's current block is within a factor of 2 of the right | 188 | | // size, we adjust our request to avoid arena waste. | 189 | 0 | auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed); | 190 | 0 | assert(exact == arena_.AllocatedAndUnused()); | 191 | 0 | avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2 | 192 | 0 | ? exact | 193 | 0 | : shard_block_size_; | 194 | 0 | s->free_begin_ = arena_.AllocateAligned(avail); | 195 | 0 | Fixup(); | 196 | 0 | } | 197 | 0 | s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed); | 198 | |
| 199 | 0 | char* rv; | 200 | 0 | if ((bytes % sizeof(void*)) == 0) { | 201 | | // aligned allocation from the beginning | 202 | 0 | rv = s->free_begin_; | 203 | 0 | s->free_begin_ += bytes; | 204 | 0 | } else { | 205 | | // unaligned from the end | 206 | 0 | rv = s->free_begin_ + avail - bytes; | 207 | 0 | } | 208 | 0 | return rv; | 209 | 291k | } |
char* rocksdb::ConcurrentArena::AllocateImpl<rocksdb::ConcurrentArena::AllocateAligned(unsigned long, unsigned long, rocksdb::Logger*)::'lambda'()>(unsigned long, bool, rocksdb::ConcurrentArena::AllocateAligned(unsigned long, unsigned long, rocksdb::Logger*)::'lambda'() const&) Line | Count | Source | 154 | 401M | char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) { | 155 | 401M | uint32_t cpu; | 156 | | | 157 | | // Go directly to the arena if the allocation is too large, or if | 158 | | // we've never needed to Repick() and the arena mutex is available | 159 | | // with no waiting. This keeps the fragmentation penalty of | 160 | | // concurrency zero unless it might actually confer an advantage. | 161 | 401M | std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock); | 162 | 401M | if (bytes > shard_block_size_ / 4 || force_arena399M || | 163 | 401M | (399M (cpu = tls_cpuid) == 0399M && | 164 | 399M | !shards_[0].allocated_and_unused_.load(std::memory_order_relaxed)399M && | 165 | 400M | arena_lock.try_lock()399M )) { | 166 | 400M | if (!arena_lock.owns_lock()) { | 167 | 1.28M | arena_lock.lock(); | 168 | 1.28M | } | 169 | 400M | auto rv = func(); | 170 | 400M | Fixup(); | 171 | 400M | return rv; | 172 | 400M | } | 173 | | | 174 | | // pick a shard from which to allocate | 175 | 781k | Shard* s = &shards_[cpu & index_mask_]; | 176 | 781k | if (!s->mutex.try_lock()) { | 177 | 7.24k | s = Repick(); | 178 | 7.24k | s->mutex.lock(); | 179 | 7.24k | } | 180 | 781k | std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock); | 181 | | | 182 | 781k | size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed); | 183 | 781k | if (avail < bytes) { | 184 | | // reload | 185 | 74.9k | std::lock_guard<SpinMutex> reload_lock(arena_mutex_); | 186 | | | 187 | | // If the arena's current block is within a factor of 2 of the right | 188 | | // size, we adjust our request to avoid arena waste. | 189 | 74.9k | auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed); | 190 | 74.9k | assert(exact == arena_.AllocatedAndUnused()); | 191 | 74.9k | avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 265.5k | 192 | 74.9k | ? exact9.37k | 193 | 74.9k | : shard_block_size_65.6k ; | 194 | 74.9k | s->free_begin_ = arena_.AllocateAligned(avail); | 195 | 74.9k | Fixup(); | 196 | 74.9k | } | 197 | 0 | s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed); | 198 | | | 199 | 781k | char* rv; | 200 | 781k | if ((bytes % sizeof(void*)) == 0) { | 201 | | // aligned allocation from the beginning | 202 | 525k | rv = s->free_begin_; | 203 | 525k | s->free_begin_ += bytes; | 204 | 525k | } else { | 205 | | // unaligned from the end | 206 | 256k | rv = s->free_begin_ + avail - bytes; | 207 | 256k | } | 208 | 781k | return rv; | 209 | 401M | } |
|
210 | | |
211 | 401M | void Fixup() { |
212 | 401M | arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(), |
213 | 401M | std::memory_order_relaxed); |
214 | 401M | memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(), |
215 | 401M | std::memory_order_relaxed); |
216 | 401M | irregular_block_num_.store(arena_.IrregularBlockNum(), |
217 | 401M | std::memory_order_relaxed); |
218 | 401M | } |
219 | | |
220 | | ConcurrentArena(const ConcurrentArena&) = delete; |
221 | | ConcurrentArena& operator=(const ConcurrentArena&) = delete; |
222 | | }; |
223 | | |
224 | | } // namespace rocksdb |
225 | | |
226 | | #endif // YB_ROCKSDB_UTIL_CONCURRENT_ARENA_H |