/src/rocksdb/memory/concurrent_arena.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #pragma once |
11 | | #include <atomic> |
12 | | #include <memory> |
13 | | #include <utility> |
14 | | |
15 | | #include "memory/allocator.h" |
16 | | #include "memory/arena.h" |
17 | | #include "port/lang.h" |
18 | | #include "port/likely.h" |
19 | | #include "util/core_local.h" |
20 | | #include "util/mutexlock.h" |
21 | | #include "util/thread_local.h" |
22 | | |
23 | | // Only generate field unused warning for padding array, or build under |
24 | | // GCC 4.8.1 will fail. |
25 | | #ifdef __clang__ |
26 | | #define ROCKSDB_FIELD_UNUSED __attribute__((__unused__)) |
27 | | #else |
28 | | #define ROCKSDB_FIELD_UNUSED |
29 | | #endif // __clang__ |
30 | | |
31 | | namespace ROCKSDB_NAMESPACE { |
32 | | |
33 | | class Logger; |
34 | | |
35 | | // ConcurrentArena wraps an Arena. It makes it thread safe using a fast |
36 | | // inlined spinlock, and adds small per-core allocation caches to avoid |
37 | | // contention for small allocations. To avoid any memory waste from the |
38 | | // per-core shards, they are kept small, they are lazily instantiated |
39 | | // only if ConcurrentArena actually notices concurrent use, and they |
40 | | // adjust their size so that there is no fragmentation waste when the |
41 | | // shard blocks are allocated from the underlying main arena. |
42 | | class ConcurrentArena : public Allocator { |
43 | | public: |
44 | | // block_size and huge_page_size are the same as for Arena (and are |
45 | | // in fact just passed to the constructor of arena_. The core-local |
46 | | // shards compute their shard_block_size as a fraction of block_size |
47 | | // that varies according to the hardware concurrency level. |
48 | | explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize, |
49 | | AllocTracker* tracker = nullptr, |
50 | | size_t huge_page_size = 0); |
51 | | |
52 | 0 | char* Allocate(size_t bytes) override { |
53 | 0 | return AllocateImpl(bytes, false /*force_arena*/, |
54 | 0 | [this, bytes]() { return arena_.Allocate(bytes); }); |
55 | 0 | } |
56 | | |
57 | | char* AllocateAligned(size_t bytes, size_t huge_page_size = 0, |
58 | 2.75M | Logger* logger = nullptr) override { |
59 | 2.75M | size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1; |
60 | 2.75M | assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) && |
61 | 2.75M | (rounded_up % sizeof(void*)) == 0); |
62 | | |
63 | 2.75M | return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/, |
64 | 2.75M | [this, rounded_up, huge_page_size, logger]() { |
65 | 2.75M | return arena_.AllocateAligned(rounded_up, |
66 | 2.75M | huge_page_size, logger); |
67 | 2.75M | }); |
68 | 2.75M | } |
69 | | |
70 | 4.98k | size_t ApproximateMemoryUsage() const { |
71 | 4.98k | std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock); |
72 | 4.98k | lock.lock(); |
73 | 4.98k | return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused(); |
74 | 4.98k | } |
75 | | |
76 | 2.61M | size_t MemoryAllocatedBytes() const { |
77 | 2.61M | return memory_allocated_bytes_.load(std::memory_order_relaxed); |
78 | 2.61M | } |
79 | | |
80 | 0 | size_t AllocatedAndUnused() const { |
81 | 0 | return arena_allocated_and_unused_.load(std::memory_order_relaxed) + |
82 | 0 | ShardAllocatedAndUnused(); |
83 | 0 | } |
84 | | |
85 | 0 | size_t IrregularBlockNum() const { |
86 | 0 | return irregular_block_num_.load(std::memory_order_relaxed); |
87 | 0 | } |
88 | | |
89 | 0 | size_t BlockSize() const override { return arena_.BlockSize(); } |
90 | | |
91 | | private: |
92 | | struct Shard { |
93 | | char padding[40] ROCKSDB_FIELD_UNUSED; |
94 | | mutable SpinMutex mutex; |
95 | | char* free_begin_; |
96 | | std::atomic<size_t> allocated_and_unused_; |
97 | | |
98 | 1.54M | Shard() : free_begin_(nullptr), allocated_and_unused_(0) {} |
99 | | }; |
100 | | |
101 | | static thread_local size_t tls_cpuid; |
102 | | |
103 | | char padding0[56] ROCKSDB_FIELD_UNUSED; |
104 | | |
105 | | size_t shard_block_size_; |
106 | | |
107 | | CoreLocalArray<Shard> shards_; |
108 | | |
109 | | Arena arena_; |
110 | | mutable SpinMutex arena_mutex_; |
111 | | std::atomic<size_t> arena_allocated_and_unused_; |
112 | | std::atomic<size_t> memory_allocated_bytes_; |
113 | | std::atomic<size_t> irregular_block_num_; |
114 | | |
115 | | char padding1[56] ROCKSDB_FIELD_UNUSED; |
116 | | |
117 | | Shard* Repick(); |
118 | | |
119 | 4.98k | size_t ShardAllocatedAndUnused() const { |
120 | 4.98k | size_t total = 0; |
121 | 164k | for (size_t i = 0; i < shards_.Size(); ++i) { |
122 | 159k | total += shards_.AccessAtCore(i)->allocated_and_unused_.load( |
123 | 159k | std::memory_order_relaxed); |
124 | 159k | } |
125 | 4.98k | return total; |
126 | 4.98k | } |
127 | | |
128 | | template <typename Func> |
129 | 2.75M | char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) { |
130 | 2.75M | size_t cpu; |
131 | | |
132 | | // Go directly to the arena if the allocation is too large, or if |
133 | | // we've never needed to Repick() and the arena mutex is available |
134 | | // with no waiting. This keeps the fragmentation penalty of |
135 | | // concurrency zero unless it might actually confer an advantage. |
136 | 2.75M | std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock); |
137 | 2.75M | if (bytes > shard_block_size_ / 4 || force_arena || |
138 | 2.75M | ((cpu = tls_cpuid) == 0 && |
139 | 2.75M | !shards_.AccessAtCore(0)->allocated_and_unused_.load( |
140 | 2.75M | std::memory_order_relaxed) && |
141 | 2.75M | arena_lock.try_lock())) { |
142 | 2.75M | if (!arena_lock.owns_lock()) { |
143 | 3.21k | arena_lock.lock(); |
144 | 3.21k | } |
145 | 2.75M | auto rv = func(); |
146 | 2.75M | Fixup(); |
147 | 2.75M | return rv; |
148 | 2.75M | } |
149 | | |
150 | | // pick a shard from which to allocate |
151 | 0 | Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1)); |
152 | 0 | if (!s->mutex.try_lock()) { |
153 | 0 | s = Repick(); |
154 | 0 | s->mutex.lock(); |
155 | 0 | } |
156 | 0 | std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock); |
157 | |
|
158 | 0 | size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed); |
159 | 0 | if (avail < bytes) { |
160 | | // reload |
161 | 0 | std::lock_guard<SpinMutex> reload_lock(arena_mutex_); |
162 | | |
163 | | // If the arena's current block is within a factor of 2 of the right |
164 | | // size, we adjust our request to avoid arena waste. |
165 | 0 | auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed); |
166 | 0 | assert(exact == arena_.AllocatedAndUnused()); |
167 | |
|
168 | 0 | if (exact >= bytes && arena_.IsInInlineBlock()) { |
169 | | // If we haven't exhausted arena's inline block yet, allocate from arena |
170 | | // directly. This ensures that we'll do the first few small allocations |
171 | | // without allocating any blocks. |
172 | | // In particular this prevents empty memtables from using |
173 | | // disproportionately large amount of memory: a memtable allocates on |
174 | | // the order of 1 KB of memory when created; we wouldn't want to |
175 | | // allocate a full arena block (typically a few megabytes) for that, |
176 | | // especially if there are thousands of empty memtables. |
177 | 0 | auto rv = func(); |
178 | 0 | Fixup(); |
179 | 0 | return rv; |
180 | 0 | } |
181 | | |
182 | 0 | avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2 |
183 | 0 | ? exact |
184 | 0 | : shard_block_size_; |
185 | 0 | s->free_begin_ = arena_.AllocateAligned(avail); |
186 | 0 | Fixup(); |
187 | 0 | } |
188 | 0 | s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed); |
189 | |
|
190 | 0 | char* rv; |
191 | 0 | if ((bytes % sizeof(void*)) == 0) { |
192 | | // aligned allocation from the beginning |
193 | 0 | rv = s->free_begin_; |
194 | 0 | s->free_begin_ += bytes; |
195 | 0 | } else { |
196 | | // unaligned from the end |
197 | 0 | rv = s->free_begin_ + avail - bytes; |
198 | 0 | } |
199 | 0 | return rv; |
200 | 0 | } Unexecuted instantiation: char* rocksdb::ConcurrentArena::AllocateImpl<rocksdb::ConcurrentArena::Allocate(unsigned long)::{lambda()#1}>(unsigned long, bool, rocksdb::ConcurrentArena::Allocate(unsigned long)::{lambda()#1} const&) char* rocksdb::ConcurrentArena::AllocateImpl<rocksdb::ConcurrentArena::AllocateAligned(unsigned long, unsigned long, rocksdb::Logger*)::{lambda()#1}>(unsigned long, bool, rocksdb::ConcurrentArena::AllocateAligned(unsigned long, unsigned long, rocksdb::Logger*)::{lambda()#1} const&) Line | Count | Source | 129 | 2.75M | char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) { | 130 | 2.75M | size_t cpu; | 131 | | | 132 | | // Go directly to the arena if the allocation is too large, or if | 133 | | // we've never needed to Repick() and the arena mutex is available | 134 | | // with no waiting. This keeps the fragmentation penalty of | 135 | | // concurrency zero unless it might actually confer an advantage. | 136 | 2.75M | std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock); | 137 | 2.75M | if (bytes > shard_block_size_ / 4 || force_arena || | 138 | 2.75M | ((cpu = tls_cpuid) == 0 && | 139 | 2.75M | !shards_.AccessAtCore(0)->allocated_and_unused_.load( | 140 | 2.75M | std::memory_order_relaxed) && | 141 | 2.75M | arena_lock.try_lock())) { | 142 | 2.75M | if (!arena_lock.owns_lock()) { | 143 | 3.21k | arena_lock.lock(); | 144 | 3.21k | } | 145 | 2.75M | auto rv = func(); | 146 | 2.75M | Fixup(); | 147 | 2.75M | return rv; | 148 | 2.75M | } | 149 | | | 150 | | // pick a shard from which to allocate | 151 | 0 | Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1)); | 152 | 0 | if (!s->mutex.try_lock()) { | 153 | 0 | s = Repick(); | 154 | 0 | s->mutex.lock(); | 155 | 0 | } | 156 | 0 | std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock); | 157 | |
| 158 | 0 | size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed); | 159 | 0 | if (avail < bytes) { | 160 | | // reload | 161 | 0 | std::lock_guard<SpinMutex> reload_lock(arena_mutex_); | 162 | | | 163 | | // If the arena's current block is within a factor of 2 of the right | 164 | | // size, we adjust our request to avoid arena waste. | 165 | 0 | auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed); | 166 | 0 | assert(exact == arena_.AllocatedAndUnused()); | 167 | |
| 168 | 0 | if (exact >= bytes && arena_.IsInInlineBlock()) { | 169 | | // If we haven't exhausted arena's inline block yet, allocate from arena | 170 | | // directly. This ensures that we'll do the first few small allocations | 171 | | // without allocating any blocks. | 172 | | // In particular this prevents empty memtables from using | 173 | | // disproportionately large amount of memory: a memtable allocates on | 174 | | // the order of 1 KB of memory when created; we wouldn't want to | 175 | | // allocate a full arena block (typically a few megabytes) for that, | 176 | | // especially if there are thousands of empty memtables. | 177 | 0 | auto rv = func(); | 178 | 0 | Fixup(); | 179 | 0 | return rv; | 180 | 0 | } | 181 | | | 182 | 0 | avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2 | 183 | 0 | ? exact | 184 | 0 | : shard_block_size_; | 185 | 0 | s->free_begin_ = arena_.AllocateAligned(avail); | 186 | 0 | Fixup(); | 187 | 0 | } | 188 | 0 | s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed); | 189 | |
| 190 | 0 | char* rv; | 191 | 0 | if ((bytes % sizeof(void*)) == 0) { | 192 | | // aligned allocation from the beginning | 193 | 0 | rv = s->free_begin_; | 194 | 0 | s->free_begin_ += bytes; | 195 | 0 | } else { | 196 | | // unaligned from the end | 197 | 0 | rv = s->free_begin_ + avail - bytes; | 198 | 0 | } | 199 | 0 | return rv; | 200 | 0 | } |
|
201 | | |
202 | 2.80M | void Fixup() { |
203 | 2.80M | arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(), |
204 | 2.80M | std::memory_order_relaxed); |
205 | 2.80M | memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(), |
206 | 2.80M | std::memory_order_relaxed); |
207 | 2.80M | irregular_block_num_.store(arena_.IrregularBlockNum(), |
208 | 2.80M | std::memory_order_relaxed); |
209 | 2.80M | } |
210 | | |
211 | | ConcurrentArena(const ConcurrentArena&) = delete; |
212 | | ConcurrentArena& operator=(const ConcurrentArena&) = delete; |
213 | | }; |
214 | | |
215 | | } // namespace ROCKSDB_NAMESPACE |