Coverage Report

Created: 2026-05-30 06:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/cpython/Python/parking_lot.c
Line
Count
Source
1
#include "Python.h"
2
3
#include "pycore_llist.h"
4
#include "pycore_lock.h"          // _PyRawMutex
5
#include "pycore_parking_lot.h"
6
#include "pycore_pyerrors.h"      // _Py_FatalErrorFormat
7
#include "pycore_pystate.h"       // _PyThreadState_GET
8
#include "pycore_semaphore.h"     // _PySemaphore
9
#include "pycore_time.h"          // _PyTime_Add()
10
11
#include <stdbool.h>
12
13
14
typedef struct {
15
    // The mutex protects the waiter queue and the num_waiters counter.
16
    _PyRawMutex mutex;
17
18
    // Linked list of `struct wait_entry` waiters in this bucket.
19
    struct llist_node root;
20
    size_t num_waiters;
21
} Bucket;
22
23
struct wait_entry {
24
    void *park_arg;
25
    uintptr_t addr;
26
    _PySemaphore sema;
27
    struct llist_node node;
28
    bool is_unparking;
29
};
30
31
// Prime number to avoid correlations with memory addresses.
32
// We want this to be roughly proportional to the number of CPU cores
33
// to minimize contention on the bucket locks, but not too big to avoid
34
// wasting memory. The exact choice does not matter much.
35
0
#define NUM_BUCKETS 257
36
37
#define BUCKET_INIT(b, i) [i] = { .root = LLIST_INIT(b[i].root) }
38
#define BUCKET_INIT_2(b, i)   BUCKET_INIT(b, i),     BUCKET_INIT(b, i+1)
39
#define BUCKET_INIT_4(b, i)   BUCKET_INIT_2(b, i),   BUCKET_INIT_2(b, i+2)
40
#define BUCKET_INIT_8(b, i)   BUCKET_INIT_4(b, i),   BUCKET_INIT_4(b, i+4)
41
#define BUCKET_INIT_16(b, i)  BUCKET_INIT_8(b, i),   BUCKET_INIT_8(b, i+8)
42
#define BUCKET_INIT_32(b, i)  BUCKET_INIT_16(b, i),  BUCKET_INIT_16(b, i+16)
43
#define BUCKET_INIT_64(b, i)  BUCKET_INIT_32(b, i),  BUCKET_INIT_32(b, i+32)
44
#define BUCKET_INIT_128(b, i) BUCKET_INIT_64(b, i),  BUCKET_INIT_64(b, i+64)
45
#define BUCKET_INIT_256(b, i) BUCKET_INIT_128(b, i), BUCKET_INIT_128(b, i+128)
46
47
// Table of waiters (hashed by address)
48
static Bucket buckets[NUM_BUCKETS] = {
49
    BUCKET_INIT_256(buckets, 0),
50
    BUCKET_INIT(buckets, 256),
51
};
52
53
void
54
_PySemaphore_Init(_PySemaphore *sema)
55
0
{
56
#if defined(MS_WINDOWS)
57
    sema->platform_sem = CreateSemaphore(
58
        NULL,   //  attributes
59
        0,      //  initial count
60
        10,     //  maximum count
61
        NULL    //  unnamed
62
    );
63
    if (!sema->platform_sem) {
64
        _Py_FatalErrorFormat(__func__,
65
            "parking_lot: CreateSemaphore failed (error: %u)",
66
            GetLastError());
67
    }
68
#elif defined(_Py_USE_SEMAPHORES)
69
0
    if (sem_init(&sema->platform_sem, /*pshared=*/0, /*value=*/0) < 0) {
70
0
        Py_FatalError("parking_lot: sem_init failed");
71
0
    }
72
#else
73
    if (pthread_mutex_init(&sema->mutex, NULL) != 0) {
74
        Py_FatalError("parking_lot: pthread_mutex_init failed");
75
    }
76
    if (pthread_cond_init(&sema->cond, NULL)) {
77
        Py_FatalError("parking_lot: pthread_cond_init failed");
78
    }
79
    sema->counter = 0;
80
#endif
81
0
}
82
83
void
84
_PySemaphore_Destroy(_PySemaphore *sema)
85
0
{
86
#if defined(MS_WINDOWS)
87
    CloseHandle(sema->platform_sem);
88
#elif defined(_Py_USE_SEMAPHORES)
89
    sem_destroy(&sema->platform_sem);
90
#else
91
    pthread_mutex_destroy(&sema->mutex);
92
    pthread_cond_destroy(&sema->cond);
93
#endif
94
0
}
95
96
int
97
_PySemaphore_Wait(_PySemaphore *sema, PyTime_t timeout)
98
0
{
99
0
    int res;
100
#if defined(MS_WINDOWS)
101
    DWORD wait;
102
    DWORD millis = 0;
103
    if (timeout < 0) {
104
        millis = INFINITE;
105
    }
106
    else {
107
        PyTime_t div = _PyTime_AsMilliseconds(timeout, _PyTime_ROUND_TIMEOUT);
108
        // Prevent overflow with clamping the result
109
        if ((PyTime_t)PY_DWORD_MAX < div) {
110
            millis = PY_DWORD_MAX;
111
        }
112
        else {
113
            millis = (DWORD) div;
114
        }
115
    }
116
117
    HANDLE handles[2] = { sema->platform_sem, NULL };
118
    HANDLE sigint_event = NULL;
119
    DWORD count = 1;
120
    if (_Py_IsMainThread()) {
121
        // gh-135099: Wait on the SIGINT event only in the main thread. Other
122
        // threads would ignore the result anyways, and accessing
123
        // `_PyOS_SigintEvent()` from non-main threads may race with
124
        // interpreter shutdown, which closes the event handle. Note that
125
        // non-main interpreters will ignore the result.
126
        sigint_event = _PyOS_SigintEvent();
127
        if (sigint_event != NULL) {
128
            handles[1] = sigint_event;
129
            count = 2;
130
        }
131
    }
132
    wait = WaitForMultipleObjects(count, handles, FALSE, millis);
133
    if (wait == WAIT_OBJECT_0) {
134
        res = Py_PARK_OK;
135
    }
136
    else if (wait == WAIT_OBJECT_0 + 1) {
137
        assert(sigint_event != NULL);
138
        ResetEvent(sigint_event);
139
        res = Py_PARK_INTR;
140
    }
141
    else if (wait == WAIT_TIMEOUT) {
142
        res = Py_PARK_TIMEOUT;
143
    }
144
    else {
145
        _Py_FatalErrorFormat(__func__,
146
            "unexpected error from semaphore: %u (error: %u, handle: %p)",
147
            wait, GetLastError(), sema->platform_sem);
148
    }
149
#elif defined(_Py_USE_SEMAPHORES)
150
    int err;
151
0
    if (timeout >= 0) {
152
0
        struct timespec ts;
153
154
0
#if defined(CLOCK_MONOTONIC) && defined(HAVE_SEM_CLOCKWAIT) && !defined(_Py_THREAD_SANITIZER)
155
0
        PyTime_t now;
156
        // silently ignore error: cannot report error to the caller
157
0
        (void)PyTime_MonotonicRaw(&now);
158
0
        PyTime_t deadline = _PyTime_Add(now, timeout);
159
0
        _PyTime_AsTimespec_clamp(deadline, &ts);
160
161
0
        err = sem_clockwait(&sema->platform_sem, CLOCK_MONOTONIC, &ts);
162
#else
163
        PyTime_t now;
164
        // silently ignore error: cannot report error to the caller
165
        (void)PyTime_TimeRaw(&now);
166
        PyTime_t deadline = _PyTime_Add(now, timeout);
167
168
        _PyTime_AsTimespec_clamp(deadline, &ts);
169
170
        err = sem_timedwait(&sema->platform_sem, &ts);
171
#endif
172
0
    }
173
0
    else {
174
0
        err = sem_wait(&sema->platform_sem);
175
0
    }
176
0
    if (err == -1) {
177
0
        err = errno;
178
0
        if (err == EINTR) {
179
0
            res = Py_PARK_INTR;
180
0
        }
181
0
        else if (err == ETIMEDOUT) {
182
0
            res = Py_PARK_TIMEOUT;
183
0
        }
184
0
        else {
185
0
            _Py_FatalErrorFormat(__func__,
186
0
                "unexpected error from semaphore: %d",
187
0
                err);
188
0
        }
189
0
    }
190
0
    else {
191
0
        res = Py_PARK_OK;
192
0
    }
193
#else
194
    pthread_mutex_lock(&sema->mutex);
195
    int err = 0;
196
    if (sema->counter == 0) {
197
        if (timeout >= 0) {
198
            struct timespec ts;
199
#if defined(HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP)
200
            _PyTime_AsTimespec_clamp(timeout, &ts);
201
            err = pthread_cond_timedwait_relative_np(&sema->cond, &sema->mutex, &ts);
202
#else
203
            PyTime_t now;
204
            (void)PyTime_TimeRaw(&now);
205
            PyTime_t deadline = _PyTime_Add(now, timeout);
206
            _PyTime_AsTimespec_clamp(deadline, &ts);
207
208
            err = pthread_cond_timedwait(&sema->cond, &sema->mutex, &ts);
209
#endif // HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP
210
        }
211
        else {
212
            err = pthread_cond_wait(&sema->cond, &sema->mutex);
213
        }
214
    }
215
    if (sema->counter > 0) {
216
        sema->counter--;
217
        res = Py_PARK_OK;
218
    }
219
    else if (err) {
220
        res = Py_PARK_TIMEOUT;
221
    }
222
    else {
223
        res = Py_PARK_INTR;
224
    }
225
    pthread_mutex_unlock(&sema->mutex);
226
#endif
227
0
    return res;
228
0
}
229
230
void
231
_PySemaphore_Wakeup(_PySemaphore *sema)
232
0
{
233
#if defined(MS_WINDOWS)
234
    if (!ReleaseSemaphore(sema->platform_sem, 1, NULL)) {
235
        _Py_FatalErrorFormat(__func__,
236
            "parking_lot: ReleaseSemaphore failed (error: %u, handle: %p)",
237
            GetLastError(), sema->platform_sem);
238
    }
239
#elif defined(_Py_USE_SEMAPHORES)
240
    int err = sem_post(&sema->platform_sem);
241
0
    if (err != 0) {
242
0
        Py_FatalError("parking_lot: sem_post failed");
243
0
    }
244
#else
245
    pthread_mutex_lock(&sema->mutex);
246
    sema->counter++;
247
    pthread_cond_signal(&sema->cond);
248
    pthread_mutex_unlock(&sema->mutex);
249
#endif
250
0
}
251
252
static void
253
enqueue(Bucket *bucket, const void *address, struct wait_entry *wait)
254
0
{
255
0
    llist_insert_tail(&bucket->root, &wait->node);
256
0
    ++bucket->num_waiters;
257
0
}
258
259
static struct wait_entry *
260
dequeue(Bucket *bucket, const void *address)
261
0
{
262
    // find the first waiter that is waiting on `address`
263
0
    struct llist_node *root = &bucket->root;
264
0
    struct llist_node *node;
265
0
    llist_for_each(node, root) {
266
0
        struct wait_entry *wait = llist_data(node, struct wait_entry, node);
267
0
        if (wait->addr == (uintptr_t)address) {
268
0
            llist_remove(node);
269
0
            --bucket->num_waiters;
270
0
            wait->is_unparking = true;
271
0
            return wait;
272
0
        }
273
0
    }
274
0
    return NULL;
275
0
}
276
277
static void
278
dequeue_all(Bucket *bucket, const void *address, struct llist_node *dst)
279
0
{
280
    // remove and append all matching waiters to dst
281
0
    struct llist_node *root = &bucket->root;
282
0
    struct llist_node *node;
283
0
    llist_for_each_safe(node, root) {
284
0
        struct wait_entry *wait = llist_data(node, struct wait_entry, node);
285
0
        if (wait->addr == (uintptr_t)address) {
286
0
            llist_remove(node);
287
0
            llist_insert_tail(dst, node);
288
0
            --bucket->num_waiters;
289
0
            wait->is_unparking = true;
290
0
        }
291
0
    }
292
0
}
293
294
// Checks that `*addr == *expected` (only works for 1, 2, 4, or 8 bytes)
295
static int
296
atomic_memcmp(const void *addr, const void *expected, size_t addr_size)
297
0
{
298
0
    switch (addr_size) {
299
0
    case 1: return _Py_atomic_load_uint8(addr) == *(const uint8_t *)expected;
300
0
    case 2: return _Py_atomic_load_uint16(addr) == *(const uint16_t *)expected;
301
0
    case 4: return _Py_atomic_load_uint32(addr) == *(const uint32_t *)expected;
302
0
    case 8: return _Py_atomic_load_uint64(addr) == *(const uint64_t *)expected;
303
0
    default: Py_UNREACHABLE();
304
0
    }
305
0
}
306
307
int
308
_PyParkingLot_Park(const void *addr, const void *expected, size_t size,
309
                   PyTime_t timeout_ns, void *park_arg, int detach)
310
0
{
311
0
    struct wait_entry wait = {
312
0
        .park_arg = park_arg,
313
0
        .addr = (uintptr_t)addr,
314
0
        .is_unparking = false,
315
0
    };
316
317
0
    Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
318
319
0
    _PyRawMutex_Lock(&bucket->mutex);
320
0
    if (!atomic_memcmp(addr, expected, size)) {
321
0
        _PyRawMutex_Unlock(&bucket->mutex);
322
0
        return Py_PARK_AGAIN;
323
0
    }
324
0
    _PySemaphore_Init(&wait.sema);
325
0
    enqueue(bucket, addr, &wait);
326
0
    _PyRawMutex_Unlock(&bucket->mutex);
327
328
0
    PyThreadState *tstate = NULL;
329
0
    if (detach) {
330
0
        tstate = _PyThreadState_GET();
331
0
        if (tstate && _PyThreadState_IsAttached(tstate)) {
332
            // Only detach if we are attached
333
0
            PyEval_ReleaseThread(tstate);
334
0
        }
335
0
        else {
336
0
            tstate = NULL;
337
0
        }
338
0
    }
339
340
0
    int res = _PySemaphore_Wait(&wait.sema, timeout_ns);
341
0
    if (res == Py_PARK_OK) {
342
0
        goto done;
343
0
    }
344
345
    // timeout or interrupt
346
0
    _PyRawMutex_Lock(&bucket->mutex);
347
0
    if (wait.is_unparking) {
348
0
        _PyRawMutex_Unlock(&bucket->mutex);
349
        // Another thread has started to unpark us. Wait until we process the
350
        // wakeup signal.
351
0
        do {
352
0
            res = _PySemaphore_Wait(&wait.sema, -1);
353
0
        } while (res != Py_PARK_OK);
354
0
        goto done;
355
0
    }
356
0
    else {
357
0
        llist_remove(&wait.node);
358
0
        --bucket->num_waiters;
359
0
    }
360
0
    _PyRawMutex_Unlock(&bucket->mutex);
361
362
0
done:
363
0
    _PySemaphore_Destroy(&wait.sema);
364
0
    if (tstate) {
365
0
        PyEval_AcquireThread(tstate);
366
0
    }
367
0
    return res;
368
369
0
}
370
371
void
372
_PyParkingLot_Unpark(const void *addr, _Py_unpark_fn_t *fn, void *arg)
373
0
{
374
0
    Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
375
376
    // Find the first waiter that is waiting on `addr`
377
0
    _PyRawMutex_Lock(&bucket->mutex);
378
0
    struct wait_entry *waiter = dequeue(bucket, addr);
379
0
    if (waiter) {
380
0
        int has_more_waiters = (bucket->num_waiters > 0);
381
0
        fn(arg, waiter->park_arg, has_more_waiters);
382
0
    }
383
0
    else {
384
0
        fn(arg, NULL, 0);
385
0
    }
386
0
    _PyRawMutex_Unlock(&bucket->mutex);
387
388
0
    if (waiter) {
389
        // Wakeup the waiter outside of the bucket lock
390
0
        _PySemaphore_Wakeup(&waiter->sema);
391
0
    }
392
0
}
393
394
void
395
_PyParkingLot_UnparkAll(const void *addr)
396
0
{
397
0
    struct llist_node head = LLIST_INIT(head);
398
0
    Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
399
400
0
    _PyRawMutex_Lock(&bucket->mutex);
401
0
    dequeue_all(bucket, addr, &head);
402
0
    _PyRawMutex_Unlock(&bucket->mutex);
403
404
0
    struct llist_node *node;
405
0
    llist_for_each_safe(node, &head) {
406
0
        struct wait_entry *waiter = llist_data(node, struct wait_entry, node);
407
0
        llist_remove(node);
408
0
        _PySemaphore_Wakeup(&waiter->sema);
409
0
    }
410
0
}
411
412
void
413
_PyParkingLot_AfterFork(void)
414
0
{
415
    // After a fork only one thread remains. That thread cannot be blocked
416
    // so all entries in the parking lot are for dead threads.
417
0
    memset(buckets, 0, sizeof(buckets));
418
0
    for (Py_ssize_t i = 0; i < NUM_BUCKETS; i++) {
419
0
        llist_init(&buckets[i].root);
420
0
    }
421
0
}