Coverage Report

Created: 2025-11-01 07:40

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/duckdb/third_party/concurrentqueue/lightweightsemaphore.h
Line
Count
Source
1
// Provides an efficient implementation of a semaphore (LightweightSemaphore).
2
// This is an extension of Jeff Preshing's sempahore implementation (licensed
3
// under the terms of its separate zlib license) that has been adapted and
4
// extended by Cameron Desrochers.
5
6
#pragma once
7
8
#include <cstddef> // For std::size_t
9
#include <atomic>
10
#include <type_traits> // For std::make_signed<T>
11
12
#if defined(_WIN32)
13
// Avoid including windows.h in a header; we only need a handful of
14
// items, so we'll redeclare them here (this is relatively safe since
15
// the API generally has to remain stable between Windows versions).
16
// I know this is an ugly hack but it still beats polluting the global
17
// namespace with thousands of generic names or adding a .cpp for nothing.
18
extern "C" {
19
  struct _SECURITY_ATTRIBUTES;
20
  __declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName);
21
  __declspec(dllimport) int __stdcall CloseHandle(void* hObject);
22
  __declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds);
23
  __declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount);
24
}
25
#elif defined(__MACH__)
26
#include <mach/mach.h>
27
#elif defined(__unix__)
28
#include <semaphore.h>
29
#include <chrono>
30
#elif defined(__MVS__)
31
#include <zos-semaphore.h>
32
#include <chrono>
33
#endif
34
35
namespace duckdb_moodycamel
36
{
37
namespace details
38
{
39
40
// Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
41
// portable + lightweight semaphore implementations, originally from
42
// https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
43
// LICENSE:
44
// Copyright (c) 2015 Jeff Preshing
45
//
46
// This software is provided 'as-is', without any express or implied
47
// warranty. In no event will the authors be held liable for any damages
48
// arising from the use of this software.
49
//
50
// Permission is granted to anyone to use this software for any purpose,
51
// including commercial applications, and to alter it and redistribute it
52
// freely, subject to the following restrictions:
53
//
54
// 1. The origin of this software must not be misrepresented; you must not
55
//  claim that you wrote the original software. If you use this software
56
//  in a product, an acknowledgement in the product documentation would be
57
//  appreciated but is not required.
58
// 2. Altered source versions must be plainly marked as such, and must not be
59
//  misrepresented as being the original software.
60
// 3. This notice may not be removed or altered from any source distribution.
61
#if defined(_WIN32)
62
class Semaphore
63
{
64
private:
65
  void* m_hSema;
66
67
  Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
68
  Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
69
70
public:
71
  Semaphore(int initialCount = 0)
72
  {
73
    assert(initialCount >= 0);
74
    const long maxLong = 0x7fffffff;
75
    m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
76
    assert(m_hSema);
77
  }
78
79
  ~Semaphore()
80
  {
81
    CloseHandle(m_hSema);
82
  }
83
84
  bool wait()
85
  {
86
    const unsigned long infinite = 0xffffffff;
87
    return WaitForSingleObject(m_hSema, infinite) == 0;
88
  }
89
90
  bool try_wait()
91
  {
92
    return WaitForSingleObject(m_hSema, 0) == 0;
93
  }
94
95
  bool timed_wait(std::uint64_t usecs)
96
  {
97
    return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) == 0;
98
  }
99
100
  void signal(int count = 1)
101
  {
102
    while (!ReleaseSemaphore(m_hSema, count, nullptr));
103
  }
104
};
105
#elif defined(__MACH__)
106
//---------------------------------------------------------
107
// Semaphore (Apple iOS and OSX)
108
// Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
109
//---------------------------------------------------------
110
class Semaphore
111
{
112
private:
113
  semaphore_t m_sema;
114
115
  Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
116
  Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
117
118
public:
119
  Semaphore(int initialCount = 0)
120
  {
121
    assert(initialCount >= 0);
122
    kern_return_t rc = semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
123
    assert(rc == KERN_SUCCESS);
124
    (void)rc;
125
  }
126
127
  ~Semaphore()
128
  {
129
    semaphore_destroy(mach_task_self(), m_sema);
130
  }
131
132
  bool wait()
133
  {
134
    return semaphore_wait(m_sema) == KERN_SUCCESS;
135
  }
136
137
  bool try_wait()
138
  {
139
    return timed_wait(0);
140
  }
141
142
  bool timed_wait(std::uint64_t timeout_usecs)
143
  {
144
    mach_timespec_t ts;
145
    ts.tv_sec = static_cast<unsigned int>(timeout_usecs / 1000000);
146
    ts.tv_nsec = (timeout_usecs % 1000000) * 1000;
147
148
    // added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
149
    kern_return_t rc = semaphore_timedwait(m_sema, ts);
150
    return rc == KERN_SUCCESS;
151
  }
152
153
  void signal()
154
  {
155
    while (semaphore_signal(m_sema) != KERN_SUCCESS);
156
  }
157
158
  void signal(int count)
159
  {
160
    while (count-- > 0)
161
    {
162
      while (semaphore_signal(m_sema) != KERN_SUCCESS);
163
    }
164
  }
165
};
166
#elif defined(__unix__) || defined(__MVS__)
167
//---------------------------------------------------------
168
// Semaphore (POSIX, Linux, zOS aka MVS)
169
//---------------------------------------------------------
170
class Semaphore
171
{
172
private:
173
  sem_t m_sema;
174
175
  Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
176
  Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
177
178
public:
179
  Semaphore(int initialCount = 0)
180
9.43k
  {
181
9.43k
    assert(initialCount >= 0);
182
9.43k
    int rc = sem_init(&m_sema, 0, initialCount);
183
9.43k
    assert(rc == 0);
184
9.43k
    (void)rc;
185
9.43k
  }
186
187
  ~Semaphore()
188
9.43k
  {
189
9.43k
    sem_destroy(&m_sema);
190
9.43k
  }
191
192
  bool wait()
193
0
  {
194
    // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
195
0
    int rc;
196
0
    do {
197
0
      rc = sem_wait(&m_sema);
198
0
    } while (rc == -1 && errno == EINTR);
199
0
    return rc == 0;
200
0
  }
201
202
  bool try_wait()
203
0
  {
204
0
    int rc;
205
0
    do {
206
0
      rc = sem_trywait(&m_sema);
207
0
    } while (rc == -1 && errno == EINTR);
208
0
    return rc == 0;
209
0
  }
210
211
  bool timed_wait(std::uint64_t usecs)
212
1.08M
  {
213
1.08M
    struct timespec ts;
214
1.08M
    const int usecs_in_1_sec = 1000000;
215
1.08M
    const int nsecs_in_1_sec = 1000000000;
216
217
    // sem_timedwait needs an absolute time
218
    // hence we need to first obtain the current time
219
    // and then add the maximum time we want to wait
220
    // we want to avoid clock_gettime because of linking issues
221
    // chrono -> timespec conversion from here: https://embeddedartistry.com/blog/2019/01/31/converting-between-timespec-stdchrono/
222
1.08M
    auto current_time = std::chrono::system_clock::now();
223
1.08M
    auto secs =  std::chrono::time_point_cast<std::chrono::seconds>(current_time);
224
1.08M
    auto ns = std::chrono::time_point_cast<std::chrono::nanoseconds>(current_time) - std::chrono::time_point_cast<std::chrono::nanoseconds>(secs);
225
226
1.08M
    ts.tv_sec = secs.time_since_epoch().count();
227
1.08M
    ts.tv_nsec = ns.count();
228
229
    // now add the time we want to wait
230
1.08M
    ts.tv_sec += usecs / usecs_in_1_sec;
231
1.08M
    ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000;
232
233
    // sem_timedwait bombs if you have more than 1e9 in tv_nsec
234
    // so we have to clean things up before passing it in
235
1.08M
    if (ts.tv_nsec >= nsecs_in_1_sec) {
236
545k
      ts.tv_nsec -= nsecs_in_1_sec;
237
545k
      ++ts.tv_sec;
238
545k
    }
239
240
1.08M
    int rc;
241
1.08M
    do {
242
1.08M
      rc = sem_timedwait(&m_sema, &ts);
243
1.08M
    } while (rc == -1 && errno == EINTR);
244
1.08M
    return rc == 0;
245
1.08M
  }
246
247
  void signal()
248
0
  {
249
0
    while (sem_post(&m_sema) == -1);
250
0
  }
251
252
  void signal(int count)
253
740k
  {
254
1.82M
    while (count-- > 0)
255
1.08M
    {
256
1.08M
      while (sem_post(&m_sema) == -1);
257
1.08M
    }
258
740k
  }
259
};
260
#else
261
#error Unsupported platform! (No semaphore wrapper available)
262
#endif
263
264
} // end namespace details
265
266
267
//---------------------------------------------------------
268
// LightweightSemaphore
269
//---------------------------------------------------------
270
class LightweightSemaphore
271
{
272
public:
273
  typedef std::make_signed<std::size_t>::type ssize_t;
274
275
private:
276
  std::atomic<ssize_t> m_count;
277
  details::Semaphore m_sema;
278
279
  bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
280
1.14M
  {
281
1.14M
    ssize_t oldCount;
282
    // Is there a better way to set the initial spin count?
283
    // If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC,
284
    // as threads start hitting the kernel semaphore.
285
1.14M
    int spin = 10000;
286
9.07G
    while (--spin >= 0)
287
9.07G
    {
288
9.07G
      oldCount = m_count.load(std::memory_order_relaxed);
289
9.07G
      if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
290
61.0k
        return true;
291
9.07G
      std::atomic_signal_fence(std::memory_order_acquire);   // Prevent the compiler from collapsing the loop.
292
9.07G
    }
293
1.07M
    oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
294
1.07M
    if (oldCount > 0)
295
29
      return true;
296
1.07M
    if (timeout_usecs < 0)
297
0
      return m_sema.wait();
298
1.07M
    if (m_sema.timed_wait((std::uint64_t)timeout_usecs))
299
1.08M
      return true;
300
    // At this point, we've timed out waiting for the semaphore, but the
301
    // count is still decremented indicating we may still be waiting on
302
    // it. So we have to re-adjust the count, but only if the semaphore
303
    // wasn't signaled enough times for us too since then. If it was, we
304
    // need to release the semaphore too.
305
18.4E
    while (true)
306
186
    {
307
186
      oldCount = m_count.load(std::memory_order_acquire);
308
186
      if (oldCount >= 0 && m_sema.try_wait())
309
0
        return true;
310
186
      if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
311
184
        return false;
312
186
    }
313
18.4E
  }
314
315
  ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1)
316
0
  {
317
0
    assert(max > 0);
318
0
    ssize_t oldCount;
319
0
    int spin = 10000;
320
0
    while (--spin >= 0)
321
0
    {
322
0
      oldCount = m_count.load(std::memory_order_relaxed);
323
0
      if (oldCount > 0)
324
0
      {
325
0
        ssize_t newCount = oldCount > max ? oldCount - max : 0;
326
0
        if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
327
0
          return oldCount - newCount;
328
0
      }
329
0
      std::atomic_signal_fence(std::memory_order_acquire);
330
0
    }
331
0
    oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
332
0
    if (oldCount <= 0)
333
0
    {
334
0
      if (timeout_usecs < 0)
335
0
      {
336
0
        if (!m_sema.wait())
337
0
          return 0;
338
0
      }
339
0
      else if (!m_sema.timed_wait((std::uint64_t)timeout_usecs))
340
0
      {
341
0
        while (true)
342
0
        {
343
0
          oldCount = m_count.load(std::memory_order_acquire);
344
0
          if (oldCount >= 0 && m_sema.try_wait())
345
0
            break;
346
0
          if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
347
0
            return 0;
348
0
        }
349
0
      }
350
0
    }
351
0
    if (max > 1)
352
0
      return 1 + tryWaitMany(max - 1);
353
0
    return 1;
354
0
  }
355
356
public:
357
9.43k
  LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount)
358
9.43k
  {
359
9.43k
    assert(initialCount >= 0);
360
9.43k
  }
361
362
  bool tryWait()
363
1.24M
  {
364
1.24M
    ssize_t oldCount = m_count.load(std::memory_order_relaxed);
365
1.25M
    while (oldCount > 0)
366
113k
    {
367
113k
      if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
368
108k
        return true;
369
113k
    }
370
1.14M
    return false;
371
1.24M
  }
372
373
  bool wait()
374
0
  {
375
0
    return tryWait() || waitWithPartialSpinning();
376
0
  }
377
378
  bool wait(std::int64_t timeout_usecs)
379
1.25M
  {
380
1.25M
    return tryWait() || waitWithPartialSpinning(timeout_usecs);
381
1.25M
  }
382
383
  // Acquires between 0 and (greedily) max, inclusive
384
  ssize_t tryWaitMany(ssize_t max)
385
0
  {
386
0
    assert(max >= 0);
387
0
    ssize_t oldCount = m_count.load(std::memory_order_relaxed);
388
0
    while (oldCount > 0)
389
0
    {
390
0
      ssize_t newCount = oldCount > max ? oldCount - max : 0;
391
0
      if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
392
0
        return oldCount - newCount;
393
0
    }
394
0
    return 0;
395
0
  }
396
397
  // Acquires at least one, and (greedily) at most max
398
  ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
399
0
  {
400
0
    assert(max >= 0);
401
0
    ssize_t result = tryWaitMany(max);
402
0
    if (result == 0 && max > 0)
403
0
      result = waitManyWithPartialSpinning(max, timeout_usecs);
404
0
    return result;
405
0
  }
406
407
  ssize_t waitMany(ssize_t max)
408
0
  {
409
0
    ssize_t result = waitMany(max, -1);
410
0
    assert(result > 0);
411
0
    return result;
412
0
  }
413
414
  void signal(ssize_t count = 1)
415
909k
  {
416
909k
    assert(count >= 0);
417
909k
    ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
418
909k
    ssize_t toRelease = -oldCount < count ? -oldCount : count;
419
909k
    if (toRelease > 0)
420
740k
    {
421
740k
      m_sema.signal((int)toRelease);
422
740k
    }
423
909k
  }
424
425
  ssize_t availableApprox() const
426
0
  {
427
0
    ssize_t count = m_count.load(std::memory_order_relaxed);
428
0
    return count > 0 ? count : 0;
429
0
  }
430
};
431
432
}   // end namespace duckdb_moodycamel