/src/duckdb/third_party/concurrentqueue/lightweightsemaphore.h
Line | Count | Source |
1 | | // Provides an efficient implementation of a semaphore (LightweightSemaphore). |
2 | | // This is an extension of Jeff Preshing's sempahore implementation (licensed |
3 | | // under the terms of its separate zlib license) that has been adapted and |
4 | | // extended by Cameron Desrochers. |
5 | | |
6 | | #pragma once |
7 | | |
8 | | #include <cstddef> // For std::size_t |
9 | | #include <atomic> |
10 | | #include <type_traits> // For std::make_signed<T> |
11 | | |
12 | | #if defined(_WIN32) |
13 | | // Avoid including windows.h in a header; we only need a handful of |
14 | | // items, so we'll redeclare them here (this is relatively safe since |
15 | | // the API generally has to remain stable between Windows versions). |
16 | | // I know this is an ugly hack but it still beats polluting the global |
17 | | // namespace with thousands of generic names or adding a .cpp for nothing. |
18 | | extern "C" { |
19 | | struct _SECURITY_ATTRIBUTES; |
20 | | __declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName); |
21 | | __declspec(dllimport) int __stdcall CloseHandle(void* hObject); |
22 | | __declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds); |
23 | | __declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount); |
24 | | } |
25 | | #elif defined(__MACH__) |
26 | | #include <mach/mach.h> |
27 | | #elif defined(__unix__) |
28 | | #include <semaphore.h> |
29 | | #include <chrono> |
30 | | #elif defined(__MVS__) |
31 | | #include <zos-semaphore.h> |
32 | | #include <chrono> |
33 | | #endif |
34 | | |
35 | | namespace duckdb_moodycamel |
36 | | { |
37 | | namespace details |
38 | | { |
39 | | |
40 | | // Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's |
41 | | // portable + lightweight semaphore implementations, originally from |
42 | | // https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h |
43 | | // LICENSE: |
44 | | // Copyright (c) 2015 Jeff Preshing |
45 | | // |
46 | | // This software is provided 'as-is', without any express or implied |
47 | | // warranty. In no event will the authors be held liable for any damages |
48 | | // arising from the use of this software. |
49 | | // |
50 | | // Permission is granted to anyone to use this software for any purpose, |
51 | | // including commercial applications, and to alter it and redistribute it |
52 | | // freely, subject to the following restrictions: |
53 | | // |
54 | | // 1. The origin of this software must not be misrepresented; you must not |
55 | | // claim that you wrote the original software. If you use this software |
56 | | // in a product, an acknowledgement in the product documentation would be |
57 | | // appreciated but is not required. |
58 | | // 2. Altered source versions must be plainly marked as such, and must not be |
59 | | // misrepresented as being the original software. |
60 | | // 3. This notice may not be removed or altered from any source distribution. |
61 | | #if defined(_WIN32) |
62 | | class Semaphore |
63 | | { |
64 | | private: |
65 | | void* m_hSema; |
66 | | |
67 | | Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION; |
68 | | Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION; |
69 | | |
70 | | public: |
71 | | Semaphore(int initialCount = 0) |
72 | | { |
73 | | assert(initialCount >= 0); |
74 | | const long maxLong = 0x7fffffff; |
75 | | m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr); |
76 | | assert(m_hSema); |
77 | | } |
78 | | |
79 | | ~Semaphore() |
80 | | { |
81 | | CloseHandle(m_hSema); |
82 | | } |
83 | | |
84 | | bool wait() |
85 | | { |
86 | | const unsigned long infinite = 0xffffffff; |
87 | | return WaitForSingleObject(m_hSema, infinite) == 0; |
88 | | } |
89 | | |
90 | | bool try_wait() |
91 | | { |
92 | | return WaitForSingleObject(m_hSema, 0) == 0; |
93 | | } |
94 | | |
95 | | bool timed_wait(std::uint64_t usecs) |
96 | | { |
97 | | return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) == 0; |
98 | | } |
99 | | |
100 | | void signal(int count = 1) |
101 | | { |
102 | | while (!ReleaseSemaphore(m_hSema, count, nullptr)); |
103 | | } |
104 | | }; |
105 | | #elif defined(__MACH__) |
106 | | //--------------------------------------------------------- |
107 | | // Semaphore (Apple iOS and OSX) |
108 | | // Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html |
109 | | //--------------------------------------------------------- |
110 | | class Semaphore |
111 | | { |
112 | | private: |
113 | | semaphore_t m_sema; |
114 | | |
115 | | Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION; |
116 | | Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION; |
117 | | |
118 | | public: |
119 | | Semaphore(int initialCount = 0) |
120 | | { |
121 | | assert(initialCount >= 0); |
122 | | kern_return_t rc = semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount); |
123 | | assert(rc == KERN_SUCCESS); |
124 | | (void)rc; |
125 | | } |
126 | | |
127 | | ~Semaphore() |
128 | | { |
129 | | semaphore_destroy(mach_task_self(), m_sema); |
130 | | } |
131 | | |
132 | | bool wait() |
133 | | { |
134 | | return semaphore_wait(m_sema) == KERN_SUCCESS; |
135 | | } |
136 | | |
137 | | bool try_wait() |
138 | | { |
139 | | return timed_wait(0); |
140 | | } |
141 | | |
142 | | bool timed_wait(std::uint64_t timeout_usecs) |
143 | | { |
144 | | mach_timespec_t ts; |
145 | | ts.tv_sec = static_cast<unsigned int>(timeout_usecs / 1000000); |
146 | | ts.tv_nsec = (timeout_usecs % 1000000) * 1000; |
147 | | |
148 | | // added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html |
149 | | kern_return_t rc = semaphore_timedwait(m_sema, ts); |
150 | | return rc == KERN_SUCCESS; |
151 | | } |
152 | | |
153 | | void signal() |
154 | | { |
155 | | while (semaphore_signal(m_sema) != KERN_SUCCESS); |
156 | | } |
157 | | |
158 | | void signal(int count) |
159 | | { |
160 | | while (count-- > 0) |
161 | | { |
162 | | while (semaphore_signal(m_sema) != KERN_SUCCESS); |
163 | | } |
164 | | } |
165 | | }; |
166 | | #elif defined(__unix__) || defined(__MVS__) |
167 | | //--------------------------------------------------------- |
168 | | // Semaphore (POSIX, Linux, zOS aka MVS) |
169 | | //--------------------------------------------------------- |
170 | | class Semaphore |
171 | | { |
172 | | private: |
173 | | sem_t m_sema; |
174 | | |
175 | | Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION; |
176 | | Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION; |
177 | | |
178 | | public: |
179 | | Semaphore(int initialCount = 0) |
180 | 9.43k | { |
181 | 9.43k | assert(initialCount >= 0); |
182 | 9.43k | int rc = sem_init(&m_sema, 0, initialCount); |
183 | 9.43k | assert(rc == 0); |
184 | 9.43k | (void)rc; |
185 | 9.43k | } |
186 | | |
187 | | ~Semaphore() |
188 | 9.43k | { |
189 | 9.43k | sem_destroy(&m_sema); |
190 | 9.43k | } |
191 | | |
192 | | bool wait() |
193 | 0 | { |
194 | | // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error |
195 | 0 | int rc; |
196 | 0 | do { |
197 | 0 | rc = sem_wait(&m_sema); |
198 | 0 | } while (rc == -1 && errno == EINTR); |
199 | 0 | return rc == 0; |
200 | 0 | } |
201 | | |
202 | | bool try_wait() |
203 | 0 | { |
204 | 0 | int rc; |
205 | 0 | do { |
206 | 0 | rc = sem_trywait(&m_sema); |
207 | 0 | } while (rc == -1 && errno == EINTR); |
208 | 0 | return rc == 0; |
209 | 0 | } |
210 | | |
211 | | bool timed_wait(std::uint64_t usecs) |
212 | 1.08M | { |
213 | 1.08M | struct timespec ts; |
214 | 1.08M | const int usecs_in_1_sec = 1000000; |
215 | 1.08M | const int nsecs_in_1_sec = 1000000000; |
216 | | |
217 | | // sem_timedwait needs an absolute time |
218 | | // hence we need to first obtain the current time |
219 | | // and then add the maximum time we want to wait |
220 | | // we want to avoid clock_gettime because of linking issues |
221 | | // chrono -> timespec conversion from here: https://embeddedartistry.com/blog/2019/01/31/converting-between-timespec-stdchrono/ |
222 | 1.08M | auto current_time = std::chrono::system_clock::now(); |
223 | 1.08M | auto secs = std::chrono::time_point_cast<std::chrono::seconds>(current_time); |
224 | 1.08M | auto ns = std::chrono::time_point_cast<std::chrono::nanoseconds>(current_time) - std::chrono::time_point_cast<std::chrono::nanoseconds>(secs); |
225 | | |
226 | 1.08M | ts.tv_sec = secs.time_since_epoch().count(); |
227 | 1.08M | ts.tv_nsec = ns.count(); |
228 | | |
229 | | // now add the time we want to wait |
230 | 1.08M | ts.tv_sec += usecs / usecs_in_1_sec; |
231 | 1.08M | ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000; |
232 | | |
233 | | // sem_timedwait bombs if you have more than 1e9 in tv_nsec |
234 | | // so we have to clean things up before passing it in |
235 | 1.08M | if (ts.tv_nsec >= nsecs_in_1_sec) { |
236 | 545k | ts.tv_nsec -= nsecs_in_1_sec; |
237 | 545k | ++ts.tv_sec; |
238 | 545k | } |
239 | | |
240 | 1.08M | int rc; |
241 | 1.08M | do { |
242 | 1.08M | rc = sem_timedwait(&m_sema, &ts); |
243 | 1.08M | } while (rc == -1 && errno == EINTR); |
244 | 1.08M | return rc == 0; |
245 | 1.08M | } |
246 | | |
247 | | void signal() |
248 | 0 | { |
249 | 0 | while (sem_post(&m_sema) == -1); |
250 | 0 | } |
251 | | |
252 | | void signal(int count) |
253 | 740k | { |
254 | 1.82M | while (count-- > 0) |
255 | 1.08M | { |
256 | 1.08M | while (sem_post(&m_sema) == -1); |
257 | 1.08M | } |
258 | 740k | } |
259 | | }; |
260 | | #else |
261 | | #error Unsupported platform! (No semaphore wrapper available) |
262 | | #endif |
263 | | |
264 | | } // end namespace details |
265 | | |
266 | | |
267 | | //--------------------------------------------------------- |
268 | | // LightweightSemaphore |
269 | | //--------------------------------------------------------- |
270 | | class LightweightSemaphore |
271 | | { |
272 | | public: |
273 | | typedef std::make_signed<std::size_t>::type ssize_t; |
274 | | |
275 | | private: |
276 | | std::atomic<ssize_t> m_count; |
277 | | details::Semaphore m_sema; |
278 | | |
279 | | bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1) |
280 | 1.14M | { |
281 | 1.14M | ssize_t oldCount; |
282 | | // Is there a better way to set the initial spin count? |
283 | | // If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC, |
284 | | // as threads start hitting the kernel semaphore. |
285 | 1.14M | int spin = 10000; |
286 | 9.07G | while (--spin >= 0) |
287 | 9.07G | { |
288 | 9.07G | oldCount = m_count.load(std::memory_order_relaxed); |
289 | 9.07G | if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed)) |
290 | 61.0k | return true; |
291 | 9.07G | std::atomic_signal_fence(std::memory_order_acquire); // Prevent the compiler from collapsing the loop. |
292 | 9.07G | } |
293 | 1.07M | oldCount = m_count.fetch_sub(1, std::memory_order_acquire); |
294 | 1.07M | if (oldCount > 0) |
295 | 29 | return true; |
296 | 1.07M | if (timeout_usecs < 0) |
297 | 0 | return m_sema.wait(); |
298 | 1.07M | if (m_sema.timed_wait((std::uint64_t)timeout_usecs)) |
299 | 1.08M | return true; |
300 | | // At this point, we've timed out waiting for the semaphore, but the |
301 | | // count is still decremented indicating we may still be waiting on |
302 | | // it. So we have to re-adjust the count, but only if the semaphore |
303 | | // wasn't signaled enough times for us too since then. If it was, we |
304 | | // need to release the semaphore too. |
305 | 18.4E | while (true) |
306 | 186 | { |
307 | 186 | oldCount = m_count.load(std::memory_order_acquire); |
308 | 186 | if (oldCount >= 0 && m_sema.try_wait()) |
309 | 0 | return true; |
310 | 186 | if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed)) |
311 | 184 | return false; |
312 | 186 | } |
313 | 18.4E | } |
314 | | |
315 | | ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1) |
316 | 0 | { |
317 | 0 | assert(max > 0); |
318 | 0 | ssize_t oldCount; |
319 | 0 | int spin = 10000; |
320 | 0 | while (--spin >= 0) |
321 | 0 | { |
322 | 0 | oldCount = m_count.load(std::memory_order_relaxed); |
323 | 0 | if (oldCount > 0) |
324 | 0 | { |
325 | 0 | ssize_t newCount = oldCount > max ? oldCount - max : 0; |
326 | 0 | if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed)) |
327 | 0 | return oldCount - newCount; |
328 | 0 | } |
329 | 0 | std::atomic_signal_fence(std::memory_order_acquire); |
330 | 0 | } |
331 | 0 | oldCount = m_count.fetch_sub(1, std::memory_order_acquire); |
332 | 0 | if (oldCount <= 0) |
333 | 0 | { |
334 | 0 | if (timeout_usecs < 0) |
335 | 0 | { |
336 | 0 | if (!m_sema.wait()) |
337 | 0 | return 0; |
338 | 0 | } |
339 | 0 | else if (!m_sema.timed_wait((std::uint64_t)timeout_usecs)) |
340 | 0 | { |
341 | 0 | while (true) |
342 | 0 | { |
343 | 0 | oldCount = m_count.load(std::memory_order_acquire); |
344 | 0 | if (oldCount >= 0 && m_sema.try_wait()) |
345 | 0 | break; |
346 | 0 | if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed)) |
347 | 0 | return 0; |
348 | 0 | } |
349 | 0 | } |
350 | 0 | } |
351 | 0 | if (max > 1) |
352 | 0 | return 1 + tryWaitMany(max - 1); |
353 | 0 | return 1; |
354 | 0 | } |
355 | | |
356 | | public: |
357 | 9.43k | LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount) |
358 | 9.43k | { |
359 | 9.43k | assert(initialCount >= 0); |
360 | 9.43k | } |
361 | | |
362 | | bool tryWait() |
363 | 1.24M | { |
364 | 1.24M | ssize_t oldCount = m_count.load(std::memory_order_relaxed); |
365 | 1.25M | while (oldCount > 0) |
366 | 113k | { |
367 | 113k | if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed)) |
368 | 108k | return true; |
369 | 113k | } |
370 | 1.14M | return false; |
371 | 1.24M | } |
372 | | |
373 | | bool wait() |
374 | 0 | { |
375 | 0 | return tryWait() || waitWithPartialSpinning(); |
376 | 0 | } |
377 | | |
378 | | bool wait(std::int64_t timeout_usecs) |
379 | 1.25M | { |
380 | 1.25M | return tryWait() || waitWithPartialSpinning(timeout_usecs); |
381 | 1.25M | } |
382 | | |
383 | | // Acquires between 0 and (greedily) max, inclusive |
384 | | ssize_t tryWaitMany(ssize_t max) |
385 | 0 | { |
386 | 0 | assert(max >= 0); |
387 | 0 | ssize_t oldCount = m_count.load(std::memory_order_relaxed); |
388 | 0 | while (oldCount > 0) |
389 | 0 | { |
390 | 0 | ssize_t newCount = oldCount > max ? oldCount - max : 0; |
391 | 0 | if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed)) |
392 | 0 | return oldCount - newCount; |
393 | 0 | } |
394 | 0 | return 0; |
395 | 0 | } |
396 | | |
397 | | // Acquires at least one, and (greedily) at most max |
398 | | ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs) |
399 | 0 | { |
400 | 0 | assert(max >= 0); |
401 | 0 | ssize_t result = tryWaitMany(max); |
402 | 0 | if (result == 0 && max > 0) |
403 | 0 | result = waitManyWithPartialSpinning(max, timeout_usecs); |
404 | 0 | return result; |
405 | 0 | } |
406 | | |
407 | | ssize_t waitMany(ssize_t max) |
408 | 0 | { |
409 | 0 | ssize_t result = waitMany(max, -1); |
410 | 0 | assert(result > 0); |
411 | 0 | return result; |
412 | 0 | } |
413 | | |
414 | | void signal(ssize_t count = 1) |
415 | 909k | { |
416 | 909k | assert(count >= 0); |
417 | 909k | ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release); |
418 | 909k | ssize_t toRelease = -oldCount < count ? -oldCount : count; |
419 | 909k | if (toRelease > 0) |
420 | 740k | { |
421 | 740k | m_sema.signal((int)toRelease); |
422 | 740k | } |
423 | 909k | } |
424 | | |
425 | | ssize_t availableApprox() const |
426 | 0 | { |
427 | 0 | ssize_t count = m_count.load(std::memory_order_relaxed); |
428 | 0 | return count > 0 ? count : 0; |
429 | 0 | } |
430 | | }; |
431 | | |
432 | | } // end namespace duckdb_moodycamel |