YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/concurrent_arena.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#ifndef YB_ROCKSDB_UTIL_CONCURRENT_ARENA_H
25
#define YB_ROCKSDB_UTIL_CONCURRENT_ARENA_H
26
27
#pragma once
28
29
#include <atomic>
30
#include <mutex>
31
32
#include "yb/rocksdb/util/allocator.h"
33
#include "yb/rocksdb/util/arena.h"
34
#include "yb/rocksdb/util/mutexlock.h"
35
36
// Only generate field unused warning for padding array, or build under
37
// GCC 4.8.1 will fail.
38
#ifdef __clang__
39
#define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
40
#else
41
#define ROCKSDB_FIELD_UNUSED
42
#endif  // __clang__
43
44
namespace yb {
45
46
class MemTracker;
47
48
}
49
50
namespace rocksdb {
51
52
class Logger;
53
54
// ConcurrentArena wraps an Arena.  It makes it thread safe using a fast
55
// inlined spinlock, and adds small per-core allocation caches to avoid
56
// contention for small allocations.  To avoid any memory waste from the
57
// per-core shards, they are kept small, they are lazily instantiated
58
// only if ConcurrentArena actually notices concurrent use, and they
59
// adjust their size so that there is no fragmentation waste when the
60
// shard blocks are allocated from the underlying main arena.
61
class ConcurrentArena : public Allocator {
62
 public:
63
  // block_size and huge_page_size are the same as for Arena (and are
64
  // in fact just passed to the constructor of arena_.  The core-local
65
  // shards compute their shard_block_size as a fraction of block_size
66
  // that varies according to the hardware concurrency level.
67
  explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
68
                           size_t huge_page_size = 0);
69
70
285k
  char* Allocate(size_t bytes) override {
71
285k
    return AllocateImpl(bytes, false /*force_arena*/,
72
285k
                        [=]() { return arena_.Allocate(bytes); });
73
285k
  }
74
75
  char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,
76
152M
                        Logger* logger = nullptr) override {
77
152M
    size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1;
78
152M
    assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) &&
79
152M
           (rounded_up % sizeof(void*)) == 0);
80
81
151M
    return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/, [=]() {
82
151M
      return arena_.AllocateAligned(rounded_up, huge_page_size, logger);
83
151M
    });
84
152M
  }
85
86
128k
  size_t ApproximateMemoryUsage() const {
87
128k
    std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
88
128k
    if (index_mask_ != 0) {
89
128k
      lock.lock();
90
128k
    }
91
128k
    return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
92
128k
  }
93
94
94.7M
  size_t MemoryAllocatedBytes() const {
95
94.7M
    return memory_allocated_bytes_.load(std::memory_order_relaxed);
96
94.7M
  }
97
98
856k
  size_t AllocatedAndUnused() const {
99
856k
    return arena_allocated_and_unused_.load(std::memory_order_relaxed) +
100
856k
           ShardAllocatedAndUnused();
101
856k
  }
102
103
0
  size_t IrregularBlockNum() const {
104
0
    return irregular_block_num_.load(std::memory_order_relaxed);
105
0
  }
106
107
1.30k
  size_t BlockSize() const override { return arena_.BlockSize(); }
108
109
  void SetMemTracker(std::shared_ptr<yb::MemTracker> mem_tracker);
110
111
 private:
112
  struct Shard {
113
    char padding[40] ROCKSDB_FIELD_UNUSED;
114
    mutable SpinMutex mutex;
115
    char* free_begin_;
116
    std::atomic<size_t> allocated_and_unused_;
117
118
6.07M
    Shard() : allocated_and_unused_(0) {}
119
  };
120
121
#if ROCKSDB_SUPPORT_THREAD_LOCAL
122
  static __thread uint32_t tls_cpuid;
123
#else
124
  enum ZeroFirstEnum : uint32_t { tls_cpuid = 0 };
125
#endif
126
127
  char padding0[56] ROCKSDB_FIELD_UNUSED;
128
129
  size_t shard_block_size_;
130
131
  // shards_[i & index_mask_] is valid
132
  size_t index_mask_;
133
  std::unique_ptr<Shard[]> shards_;
134
135
  Arena arena_;
136
  mutable SpinMutex arena_mutex_;
137
  std::atomic<size_t> arena_allocated_and_unused_;
138
  std::atomic<size_t> memory_allocated_bytes_;
139
  std::atomic<size_t> irregular_block_num_;
140
141
  char padding1[56] ROCKSDB_FIELD_UNUSED;
142
143
  Shard* Repick();
144
145
984k
  size_t ShardAllocatedAndUnused() const {
146
984k
    size_t total = 0;
147
16.7M
    for (size_t i = 0; i <= index_mask_; ++i) {
148
15.7M
      total += shards_[i].allocated_and_unused_.load(std::memory_order_relaxed);
149
15.7M
    }
150
984k
    return total;
151
984k
  }
152
153
  template <typename Func>
154
152M
  char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
155
152M
    uint32_t cpu;
156
157
    // Go directly to the arena if the allocation is too large, or if
158
    // we've never needed to Repick() and the arena mutex is available
159
    // with no waiting.  This keeps the fragmentation penalty of
160
    // concurrency zero unless it might actually confer an advantage.
161
152M
    std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
162
152M
    if (bytes > shard_block_size_ / 4 || force_arena ||
163
151M
        ((cpu = tls_cpuid) == 0 &&
164
151M
         !shards_[0].allocated_and_unused_.load(std::memory_order_relaxed) &&
165
152M
         arena_lock.try_lock())) {
166
152M
      if (!arena_lock.owns_lock()) {
167
957k
        arena_lock.lock();
168
957k
      }
169
152M
      auto rv = func();
170
152M
      Fixup();
171
152M
      return rv;
172
152M
    }
173
174
    // pick a shard from which to allocate
175
616k
    Shard* s = &shards_[cpu & index_mask_];
176
616k
    if (!s->mutex.try_lock()) {
177
8.08k
      s = Repick();
178
8.08k
      s->mutex.lock();
179
8.08k
    }
180
616k
    std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
181
182
616k
    size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
183
616k
    if (avail < bytes) {
184
      // reload
185
73.8k
      std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
186
187
      // If the arena's current block is within a factor of 2 of the right
188
      // size, we adjust our request to avoid arena waste.
189
73.8k
      auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
190
73.8k
      assert(exact == arena_.AllocatedAndUnused());
191
73.8k
      avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
192
9.23k
                  ? exact
193
64.5k
                  : shard_block_size_;
194
73.8k
      s->free_begin_ = arena_.AllocateAligned(avail);
195
73.8k
      Fixup();
196
73.8k
    }
197
616k
    s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
198
199
616k
    char* rv;
200
616k
    if ((bytes % sizeof(void*)) == 0) {
201
      // aligned allocation from the beginning
202
517k
      rv = s->free_begin_;
203
517k
      s->free_begin_ += bytes;
204
99.9k
    } else {
205
      // unaligned from the end
206
99.9k
      rv = s->free_begin_ + avail - bytes;
207
99.9k
    }
208
616k
    return rv;
209
616k
  }
_ZN7rocksdb15ConcurrentArena12AllocateImplIZNS0_8AllocateEmEUlvE_EEPcmbRKT_
Line
Count
Source
154
285k
  char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
155
285k
    uint32_t cpu;
156
157
    // Go directly to the arena if the allocation is too large, or if
158
    // we've never needed to Repick() and the arena mutex is available
159
    // with no waiting.  This keeps the fragmentation penalty of
160
    // concurrency zero unless it might actually confer an advantage.
161
285k
    std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
162
285k
    if (bytes > shard_block_size_ / 4 || force_arena ||
163
285k
        ((cpu = tls_cpuid) == 0 &&
164
285k
         !shards_[0].allocated_and_unused_.load(std::memory_order_relaxed) &&
165
285k
         arena_lock.try_lock())) {
166
285k
      if (!arena_lock.owns_lock()) {
167
365
        arena_lock.lock();
168
365
      }
169
285k
      auto rv = func();
170
285k
      Fixup();
171
285k
      return rv;
172
285k
    }
173
174
    // pick a shard from which to allocate
175
0
    Shard* s = &shards_[cpu & index_mask_];
176
0
    if (!s->mutex.try_lock()) {
177
0
      s = Repick();
178
0
      s->mutex.lock();
179
0
    }
180
0
    std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
181
182
0
    size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
183
0
    if (avail < bytes) {
184
      // reload
185
0
      std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
186
187
      // If the arena's current block is within a factor of 2 of the right
188
      // size, we adjust our request to avoid arena waste.
189
0
      auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
190
0
      assert(exact == arena_.AllocatedAndUnused());
191
0
      avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
192
0
                  ? exact
193
0
                  : shard_block_size_;
194
0
      s->free_begin_ = arena_.AllocateAligned(avail);
195
0
      Fixup();
196
0
    }
197
0
    s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
198
199
0
    char* rv;
200
0
    if ((bytes % sizeof(void*)) == 0) {
201
      // aligned allocation from the beginning
202
0
      rv = s->free_begin_;
203
0
      s->free_begin_ += bytes;
204
0
    } else {
205
      // unaligned from the end
206
0
      rv = s->free_begin_ + avail - bytes;
207
0
    }
208
0
    return rv;
209
0
  }
_ZN7rocksdb15ConcurrentArena12AllocateImplIZNS0_15AllocateAlignedEmmPNS_6LoggerEEUlvE_EEPcmbRKT_
Line
Count
Source
154
152M
  char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
155
152M
    uint32_t cpu;
156
157
    // Go directly to the arena if the allocation is too large, or if
158
    // we've never needed to Repick() and the arena mutex is available
159
    // with no waiting.  This keeps the fragmentation penalty of
160
    // concurrency zero unless it might actually confer an advantage.
161
152M
    std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
162
152M
    if (bytes > shard_block_size_ / 4 || force_arena ||
163
151M
        ((cpu = tls_cpuid) == 0 &&
164
151M
         !shards_[0].allocated_and_unused_.load(std::memory_order_relaxed) &&
165
151M
         arena_lock.try_lock())) {
166
151M
      if (!arena_lock.owns_lock()) {
167
957k
        arena_lock.lock();
168
957k
      }
169
151M
      auto rv = func();
170
151M
      Fixup();
171
151M
      return rv;
172
151M
    }
173
174
    // pick a shard from which to allocate
175
616k
    Shard* s = &shards_[cpu & index_mask_];
176
616k
    if (!s->mutex.try_lock()) {
177
8.08k
      s = Repick();
178
8.08k
      s->mutex.lock();
179
8.08k
    }
180
616k
    std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
181
182
616k
    size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
183
616k
    if (avail < bytes) {
184
      // reload
185
73.8k
      std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
186
187
      // If the arena's current block is within a factor of 2 of the right
188
      // size, we adjust our request to avoid arena waste.
189
73.8k
      auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
190
73.8k
      assert(exact == arena_.AllocatedAndUnused());
191
73.8k
      avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
192
9.23k
                  ? exact
193
64.5k
                  : shard_block_size_;
194
73.8k
      s->free_begin_ = arena_.AllocateAligned(avail);
195
73.8k
      Fixup();
196
73.8k
    }
197
616k
    s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
198
199
616k
    char* rv;
200
616k
    if ((bytes % sizeof(void*)) == 0) {
201
      // aligned allocation from the beginning
202
517k
      rv = s->free_begin_;
203
517k
      s->free_begin_ += bytes;
204
99.9k
    } else {
205
      // unaligned from the end
206
99.9k
      rv = s->free_begin_ + avail - bytes;
207
99.9k
    }
208
616k
    return rv;
209
616k
  }
210
211
152M
  void Fixup() {
212
152M
    arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(),
213
152M
                                      std::memory_order_relaxed);
214
152M
    memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(),
215
152M
                                  std::memory_order_relaxed);
216
152M
    irregular_block_num_.store(arena_.IrregularBlockNum(),
217
152M
                               std::memory_order_relaxed);
218
152M
  }
219
220
  ConcurrentArena(const ConcurrentArena&) = delete;
221
  ConcurrentArena& operator=(const ConcurrentArena&) = delete;
222
};
223
224
}  // namespace rocksdb
225
226
#endif // YB_ROCKSDB_UTIL_CONCURRENT_ARENA_H