Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/memory/concurrent_arena.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#pragma once
11
#include <atomic>
12
#include <memory>
13
#include <utility>
14
15
#include "memory/allocator.h"
16
#include "memory/arena.h"
17
#include "port/lang.h"
18
#include "port/likely.h"
19
#include "util/core_local.h"
20
#include "util/mutexlock.h"
21
#include "util/thread_local.h"
22
23
// Only generate field unused warning for padding array, or build under
24
// GCC 4.8.1 will fail.
25
#ifdef __clang__
26
#define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
27
#else
28
#define ROCKSDB_FIELD_UNUSED
29
#endif  // __clang__
30
31
namespace ROCKSDB_NAMESPACE {
32
33
class Logger;
34
35
// ConcurrentArena wraps an Arena.  It makes it thread safe using a fast
36
// inlined spinlock, and adds small per-core allocation caches to avoid
37
// contention for small allocations.  To avoid any memory waste from the
38
// per-core shards, they are kept small, they are lazily instantiated
39
// only if ConcurrentArena actually notices concurrent use, and they
40
// adjust their size so that there is no fragmentation waste when the
41
// shard blocks are allocated from the underlying main arena.
42
class ConcurrentArena : public Allocator {
43
 public:
44
  // block_size and huge_page_size are the same as for Arena (and are
45
  // in fact just passed to the constructor of arena_.  The core-local
46
  // shards compute their shard_block_size as a fraction of block_size
47
  // that varies according to the hardware concurrency level.
48
  explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
49
                           AllocTracker* tracker = nullptr,
50
                           size_t huge_page_size = 0);
51
52
0
  char* Allocate(size_t bytes) override {
53
0
    return AllocateImpl(bytes, false /*force_arena*/,
54
0
                        [this, bytes]() { return arena_.Allocate(bytes); });
55
0
  }
56
57
  char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,
58
2.75M
                        Logger* logger = nullptr) override {
59
2.75M
    size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1;
60
2.75M
    assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) &&
61
2.75M
           (rounded_up % sizeof(void*)) == 0);
62
63
2.75M
    return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/,
64
2.75M
                        [this, rounded_up, huge_page_size, logger]() {
65
2.75M
                          return arena_.AllocateAligned(rounded_up,
66
2.75M
                                                        huge_page_size, logger);
67
2.75M
                        });
68
2.75M
  }
69
70
4.98k
  size_t ApproximateMemoryUsage() const {
71
4.98k
    std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
72
4.98k
    lock.lock();
73
4.98k
    return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
74
4.98k
  }
75
76
2.61M
  size_t MemoryAllocatedBytes() const {
77
2.61M
    return memory_allocated_bytes_.load(std::memory_order_relaxed);
78
2.61M
  }
79
80
0
  size_t AllocatedAndUnused() const {
81
0
    return arena_allocated_and_unused_.load(std::memory_order_relaxed) +
82
0
           ShardAllocatedAndUnused();
83
0
  }
84
85
0
  size_t IrregularBlockNum() const {
86
0
    return irregular_block_num_.load(std::memory_order_relaxed);
87
0
  }
88
89
0
  size_t BlockSize() const override { return arena_.BlockSize(); }
90
91
 private:
92
  struct Shard {
93
    char padding[40] ROCKSDB_FIELD_UNUSED;
94
    mutable SpinMutex mutex;
95
    char* free_begin_;
96
    std::atomic<size_t> allocated_and_unused_;
97
98
1.54M
    Shard() : free_begin_(nullptr), allocated_and_unused_(0) {}
99
  };
100
101
  static thread_local size_t tls_cpuid;
102
103
  char padding0[56] ROCKSDB_FIELD_UNUSED;
104
105
  size_t shard_block_size_;
106
107
  CoreLocalArray<Shard> shards_;
108
109
  Arena arena_;
110
  mutable SpinMutex arena_mutex_;
111
  std::atomic<size_t> arena_allocated_and_unused_;
112
  std::atomic<size_t> memory_allocated_bytes_;
113
  std::atomic<size_t> irregular_block_num_;
114
115
  char padding1[56] ROCKSDB_FIELD_UNUSED;
116
117
  Shard* Repick();
118
119
4.98k
  size_t ShardAllocatedAndUnused() const {
120
4.98k
    size_t total = 0;
121
164k
    for (size_t i = 0; i < shards_.Size(); ++i) {
122
159k
      total += shards_.AccessAtCore(i)->allocated_and_unused_.load(
123
159k
          std::memory_order_relaxed);
124
159k
    }
125
4.98k
    return total;
126
4.98k
  }
127
128
  template <typename Func>
129
2.75M
  char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
130
2.75M
    size_t cpu;
131
132
    // Go directly to the arena if the allocation is too large, or if
133
    // we've never needed to Repick() and the arena mutex is available
134
    // with no waiting.  This keeps the fragmentation penalty of
135
    // concurrency zero unless it might actually confer an advantage.
136
2.75M
    std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
137
2.75M
    if (bytes > shard_block_size_ / 4 || force_arena ||
138
2.75M
        ((cpu = tls_cpuid) == 0 &&
139
2.75M
         !shards_.AccessAtCore(0)->allocated_and_unused_.load(
140
2.75M
             std::memory_order_relaxed) &&
141
2.75M
         arena_lock.try_lock())) {
142
2.75M
      if (!arena_lock.owns_lock()) {
143
3.21k
        arena_lock.lock();
144
3.21k
      }
145
2.75M
      auto rv = func();
146
2.75M
      Fixup();
147
2.75M
      return rv;
148
2.75M
    }
149
150
    // pick a shard from which to allocate
151
0
    Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));
152
0
    if (!s->mutex.try_lock()) {
153
0
      s = Repick();
154
0
      s->mutex.lock();
155
0
    }
156
0
    std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
157
158
0
    size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
159
0
    if (avail < bytes) {
160
      // reload
161
0
      std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
162
163
      // If the arena's current block is within a factor of 2 of the right
164
      // size, we adjust our request to avoid arena waste.
165
0
      auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
166
0
      assert(exact == arena_.AllocatedAndUnused());
167
168
0
      if (exact >= bytes && arena_.IsInInlineBlock()) {
169
        // If we haven't exhausted arena's inline block yet, allocate from arena
170
        // directly. This ensures that we'll do the first few small allocations
171
        // without allocating any blocks.
172
        // In particular this prevents empty memtables from using
173
        // disproportionately large amount of memory: a memtable allocates on
174
        // the order of 1 KB of memory when created; we wouldn't want to
175
        // allocate a full arena block (typically a few megabytes) for that,
176
        // especially if there are thousands of empty memtables.
177
0
        auto rv = func();
178
0
        Fixup();
179
0
        return rv;
180
0
      }
181
182
0
      avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
183
0
                  ? exact
184
0
                  : shard_block_size_;
185
0
      s->free_begin_ = arena_.AllocateAligned(avail);
186
0
      Fixup();
187
0
    }
188
0
    s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
189
190
0
    char* rv;
191
0
    if ((bytes % sizeof(void*)) == 0) {
192
      // aligned allocation from the beginning
193
0
      rv = s->free_begin_;
194
0
      s->free_begin_ += bytes;
195
0
    } else {
196
      // unaligned from the end
197
0
      rv = s->free_begin_ + avail - bytes;
198
0
    }
199
0
    return rv;
200
0
  }
Unexecuted instantiation: char* rocksdb::ConcurrentArena::AllocateImpl<rocksdb::ConcurrentArena::Allocate(unsigned long)::{lambda()#1}>(unsigned long, bool, rocksdb::ConcurrentArena::Allocate(unsigned long)::{lambda()#1} const&)
char* rocksdb::ConcurrentArena::AllocateImpl<rocksdb::ConcurrentArena::AllocateAligned(unsigned long, unsigned long, rocksdb::Logger*)::{lambda()#1}>(unsigned long, bool, rocksdb::ConcurrentArena::AllocateAligned(unsigned long, unsigned long, rocksdb::Logger*)::{lambda()#1} const&)
Line
Count
Source
129
2.75M
  char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
130
2.75M
    size_t cpu;
131
132
    // Go directly to the arena if the allocation is too large, or if
133
    // we've never needed to Repick() and the arena mutex is available
134
    // with no waiting.  This keeps the fragmentation penalty of
135
    // concurrency zero unless it might actually confer an advantage.
136
2.75M
    std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
137
2.75M
    if (bytes > shard_block_size_ / 4 || force_arena ||
138
2.75M
        ((cpu = tls_cpuid) == 0 &&
139
2.75M
         !shards_.AccessAtCore(0)->allocated_and_unused_.load(
140
2.75M
             std::memory_order_relaxed) &&
141
2.75M
         arena_lock.try_lock())) {
142
2.75M
      if (!arena_lock.owns_lock()) {
143
3.21k
        arena_lock.lock();
144
3.21k
      }
145
2.75M
      auto rv = func();
146
2.75M
      Fixup();
147
2.75M
      return rv;
148
2.75M
    }
149
150
    // pick a shard from which to allocate
151
0
    Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));
152
0
    if (!s->mutex.try_lock()) {
153
0
      s = Repick();
154
0
      s->mutex.lock();
155
0
    }
156
0
    std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
157
158
0
    size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
159
0
    if (avail < bytes) {
160
      // reload
161
0
      std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
162
163
      // If the arena's current block is within a factor of 2 of the right
164
      // size, we adjust our request to avoid arena waste.
165
0
      auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
166
0
      assert(exact == arena_.AllocatedAndUnused());
167
168
0
      if (exact >= bytes && arena_.IsInInlineBlock()) {
169
        // If we haven't exhausted arena's inline block yet, allocate from arena
170
        // directly. This ensures that we'll do the first few small allocations
171
        // without allocating any blocks.
172
        // In particular this prevents empty memtables from using
173
        // disproportionately large amount of memory: a memtable allocates on
174
        // the order of 1 KB of memory when created; we wouldn't want to
175
        // allocate a full arena block (typically a few megabytes) for that,
176
        // especially if there are thousands of empty memtables.
177
0
        auto rv = func();
178
0
        Fixup();
179
0
        return rv;
180
0
      }
181
182
0
      avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
183
0
                  ? exact
184
0
                  : shard_block_size_;
185
0
      s->free_begin_ = arena_.AllocateAligned(avail);
186
0
      Fixup();
187
0
    }
188
0
    s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
189
190
0
    char* rv;
191
0
    if ((bytes % sizeof(void*)) == 0) {
192
      // aligned allocation from the beginning
193
0
      rv = s->free_begin_;
194
0
      s->free_begin_ += bytes;
195
0
    } else {
196
      // unaligned from the end
197
0
      rv = s->free_begin_ + avail - bytes;
198
0
    }
199
0
    return rv;
200
0
  }
201
202
2.80M
  void Fixup() {
203
2.80M
    arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(),
204
2.80M
                                      std::memory_order_relaxed);
205
2.80M
    memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(),
206
2.80M
                                  std::memory_order_relaxed);
207
2.80M
    irregular_block_num_.store(arena_.IrregularBlockNum(),
208
2.80M
                               std::memory_order_relaxed);
209
2.80M
  }
210
211
  ConcurrentArena(const ConcurrentArena&) = delete;
212
  ConcurrentArena& operator=(const ConcurrentArena&) = delete;
213
};
214
215
}  // namespace ROCKSDB_NAMESPACE