Coverage Report

Created: 2025-07-04 06:49

/src/cpython/Python/parking_lot.c
Line
Count
Source (jump to first uncovered line)
1
#include "Python.h"
2
3
#include "pycore_llist.h"
4
#include "pycore_lock.h"          // _PyRawMutex
5
#include "pycore_parking_lot.h"
6
#include "pycore_pyerrors.h"      // _Py_FatalErrorFormat
7
#include "pycore_pystate.h"       // _PyThreadState_GET
8
#include "pycore_semaphore.h"     // _PySemaphore
9
#include "pycore_time.h"          // _PyTime_Add()
10
11
#include <stdbool.h>
12
13
14
typedef struct {
15
    // The mutex protects the waiter queue and the num_waiters counter.
16
    _PyRawMutex mutex;
17
18
    // Linked list of `struct wait_entry` waiters in this bucket.
19
    struct llist_node root;
20
    size_t num_waiters;
21
} Bucket;
22
23
struct wait_entry {
24
    void *park_arg;
25
    uintptr_t addr;
26
    _PySemaphore sema;
27
    struct llist_node node;
28
    bool is_unparking;
29
};
30
31
// Prime number to avoid correlations with memory addresses.
32
// We want this to be roughly proportional to the number of CPU cores
33
// to minimize contention on the bucket locks, but not too big to avoid
34
// wasting memory. The exact choice does not matter much.
35
0
#define NUM_BUCKETS 257
36
37
#define BUCKET_INIT(b, i) [i] = { .root = LLIST_INIT(b[i].root) }
38
#define BUCKET_INIT_2(b, i)   BUCKET_INIT(b, i),     BUCKET_INIT(b, i+1)
39
#define BUCKET_INIT_4(b, i)   BUCKET_INIT_2(b, i),   BUCKET_INIT_2(b, i+2)
40
#define BUCKET_INIT_8(b, i)   BUCKET_INIT_4(b, i),   BUCKET_INIT_4(b, i+4)
41
#define BUCKET_INIT_16(b, i)  BUCKET_INIT_8(b, i),   BUCKET_INIT_8(b, i+8)
42
#define BUCKET_INIT_32(b, i)  BUCKET_INIT_16(b, i),  BUCKET_INIT_16(b, i+16)
43
#define BUCKET_INIT_64(b, i)  BUCKET_INIT_32(b, i),  BUCKET_INIT_32(b, i+32)
44
#define BUCKET_INIT_128(b, i) BUCKET_INIT_64(b, i),  BUCKET_INIT_64(b, i+64)
45
#define BUCKET_INIT_256(b, i) BUCKET_INIT_128(b, i), BUCKET_INIT_128(b, i+128)
46
47
// Table of waiters (hashed by address)
48
static Bucket buckets[NUM_BUCKETS] = {
49
    BUCKET_INIT_256(buckets, 0),
50
    BUCKET_INIT(buckets, 256),
51
};
52
53
void
54
_PySemaphore_Init(_PySemaphore *sema)
55
0
{
56
#if defined(MS_WINDOWS)
57
    sema->platform_sem = CreateSemaphore(
58
        NULL,   //  attributes
59
        0,      //  initial count
60
        10,     //  maximum count
61
        NULL    //  unnamed
62
    );
63
    if (!sema->platform_sem) {
64
        Py_FatalError("parking_lot: CreateSemaphore failed");
65
    }
66
#elif defined(_Py_USE_SEMAPHORES)
67
0
    if (sem_init(&sema->platform_sem, /*pshared=*/0, /*value=*/0) < 0) {
68
0
        Py_FatalError("parking_lot: sem_init failed");
69
0
    }
70
#else
71
    if (pthread_mutex_init(&sema->mutex, NULL) != 0) {
72
        Py_FatalError("parking_lot: pthread_mutex_init failed");
73
    }
74
    if (pthread_cond_init(&sema->cond, NULL)) {
75
        Py_FatalError("parking_lot: pthread_cond_init failed");
76
    }
77
    sema->counter = 0;
78
#endif
79
0
}
80
81
void
82
_PySemaphore_Destroy(_PySemaphore *sema)
83
0
{
84
#if defined(MS_WINDOWS)
85
    CloseHandle(sema->platform_sem);
86
#elif defined(_Py_USE_SEMAPHORES)
87
    sem_destroy(&sema->platform_sem);
88
#else
89
    pthread_mutex_destroy(&sema->mutex);
90
    pthread_cond_destroy(&sema->cond);
91
#endif
92
0
}
93
94
static int
95
_PySemaphore_PlatformWait(_PySemaphore *sema, PyTime_t timeout)
96
0
{
97
0
    int res;
98
#if defined(MS_WINDOWS)
99
    DWORD wait;
100
    DWORD millis = 0;
101
    if (timeout < 0) {
102
        millis = INFINITE;
103
    }
104
    else {
105
        PyTime_t div = _PyTime_AsMilliseconds(timeout, _PyTime_ROUND_TIMEOUT);
106
        // Prevent overflow with clamping the result
107
        if ((PyTime_t)PY_DWORD_MAX < div) {
108
            millis = PY_DWORD_MAX;
109
        }
110
        else {
111
            millis = (DWORD) div;
112
        }
113
    }
114
115
    HANDLE handles[2] = { sema->platform_sem, NULL };
116
    HANDLE sigint_event = NULL;
117
    DWORD count = 1;
118
    if (_Py_IsMainThread()) {
119
        // gh-135099: Wait on the SIGINT event only in the main thread. Other
120
        // threads would ignore the result anyways, and accessing
121
        // `_PyOS_SigintEvent()` from non-main threads may race with
122
        // interpreter shutdown, which closes the event handle. Note that
123
        // non-main interpreters will ignore the result.
124
        sigint_event = _PyOS_SigintEvent();
125
        if (sigint_event != NULL) {
126
            handles[1] = sigint_event;
127
            count = 2;
128
        }
129
    }
130
    wait = WaitForMultipleObjects(count, handles, FALSE, millis);
131
    if (wait == WAIT_OBJECT_0) {
132
        res = Py_PARK_OK;
133
    }
134
    else if (wait == WAIT_OBJECT_0 + 1) {
135
        assert(sigint_event != NULL);
136
        ResetEvent(sigint_event);
137
        res = Py_PARK_INTR;
138
    }
139
    else if (wait == WAIT_TIMEOUT) {
140
        res = Py_PARK_TIMEOUT;
141
    }
142
    else {
143
        _Py_FatalErrorFormat(__func__,
144
            "unexpected error from semaphore: %u (error: %u)",
145
            wait, GetLastError());
146
    }
147
#elif defined(_Py_USE_SEMAPHORES)
148
    int err;
149
0
    if (timeout >= 0) {
150
0
        struct timespec ts;
151
152
0
#if defined(CLOCK_MONOTONIC) && defined(HAVE_SEM_CLOCKWAIT) && !defined(_Py_THREAD_SANITIZER)
153
0
        PyTime_t now;
154
        // silently ignore error: cannot report error to the caller
155
0
        (void)PyTime_MonotonicRaw(&now);
156
0
        PyTime_t deadline = _PyTime_Add(now, timeout);
157
0
        _PyTime_AsTimespec_clamp(deadline, &ts);
158
159
0
        err = sem_clockwait(&sema->platform_sem, CLOCK_MONOTONIC, &ts);
160
#else
161
        PyTime_t now;
162
        // silently ignore error: cannot report error to the caller
163
        (void)PyTime_TimeRaw(&now);
164
        PyTime_t deadline = _PyTime_Add(now, timeout);
165
166
        _PyTime_AsTimespec_clamp(deadline, &ts);
167
168
        err = sem_timedwait(&sema->platform_sem, &ts);
169
#endif
170
0
    }
171
0
    else {
172
0
        err = sem_wait(&sema->platform_sem);
173
0
    }
174
0
    if (err == -1) {
175
0
        err = errno;
176
0
        if (err == EINTR) {
177
0
            res = Py_PARK_INTR;
178
0
        }
179
0
        else if (err == ETIMEDOUT) {
180
0
            res = Py_PARK_TIMEOUT;
181
0
        }
182
0
        else {
183
0
            _Py_FatalErrorFormat(__func__,
184
0
                "unexpected error from semaphore: %d",
185
0
                err);
186
0
        }
187
0
    }
188
0
    else {
189
0
        res = Py_PARK_OK;
190
0
    }
191
#else
192
    pthread_mutex_lock(&sema->mutex);
193
    int err = 0;
194
    if (sema->counter == 0) {
195
        if (timeout >= 0) {
196
            struct timespec ts;
197
#if defined(HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP)
198
            _PyTime_AsTimespec_clamp(timeout, &ts);
199
            err = pthread_cond_timedwait_relative_np(&sema->cond, &sema->mutex, &ts);
200
#else
201
            PyTime_t now;
202
            (void)PyTime_TimeRaw(&now);
203
            PyTime_t deadline = _PyTime_Add(now, timeout);
204
            _PyTime_AsTimespec_clamp(deadline, &ts);
205
206
            err = pthread_cond_timedwait(&sema->cond, &sema->mutex, &ts);
207
#endif // HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP
208
        }
209
        else {
210
            err = pthread_cond_wait(&sema->cond, &sema->mutex);
211
        }
212
    }
213
    if (sema->counter > 0) {
214
        sema->counter--;
215
        res = Py_PARK_OK;
216
    }
217
    else if (err) {
218
        res = Py_PARK_TIMEOUT;
219
    }
220
    else {
221
        res = Py_PARK_INTR;
222
    }
223
    pthread_mutex_unlock(&sema->mutex);
224
#endif
225
0
    return res;
226
0
}
227
228
int
229
_PySemaphore_Wait(_PySemaphore *sema, PyTime_t timeout, int detach)
230
0
{
231
0
    PyThreadState *tstate = NULL;
232
0
    if (detach) {
233
0
        tstate = _PyThreadState_GET();
234
0
        if (tstate && _PyThreadState_IsAttached(tstate)) {
235
            // Only detach if we are attached
236
0
            PyEval_ReleaseThread(tstate);
237
0
        }
238
0
        else {
239
0
            tstate = NULL;
240
0
        }
241
0
    }
242
0
    int res = _PySemaphore_PlatformWait(sema, timeout);
243
0
    if (tstate) {
244
0
        PyEval_AcquireThread(tstate);
245
0
    }
246
0
    return res;
247
0
}
248
249
void
250
_PySemaphore_Wakeup(_PySemaphore *sema)
251
0
{
252
#if defined(MS_WINDOWS)
253
    if (!ReleaseSemaphore(sema->platform_sem, 1, NULL)) {
254
        Py_FatalError("parking_lot: ReleaseSemaphore failed");
255
    }
256
#elif defined(_Py_USE_SEMAPHORES)
257
    int err = sem_post(&sema->platform_sem);
258
0
    if (err != 0) {
259
0
        Py_FatalError("parking_lot: sem_post failed");
260
0
    }
261
#else
262
    pthread_mutex_lock(&sema->mutex);
263
    sema->counter++;
264
    pthread_cond_signal(&sema->cond);
265
    pthread_mutex_unlock(&sema->mutex);
266
#endif
267
0
}
268
269
static void
270
enqueue(Bucket *bucket, const void *address, struct wait_entry *wait)
271
0
{
272
0
    llist_insert_tail(&bucket->root, &wait->node);
273
0
    ++bucket->num_waiters;
274
0
}
275
276
static struct wait_entry *
277
dequeue(Bucket *bucket, const void *address)
278
0
{
279
    // find the first waiter that is waiting on `address`
280
0
    struct llist_node *root = &bucket->root;
281
0
    struct llist_node *node;
282
0
    llist_for_each(node, root) {
283
0
        struct wait_entry *wait = llist_data(node, struct wait_entry, node);
284
0
        if (wait->addr == (uintptr_t)address) {
285
0
            llist_remove(node);
286
0
            --bucket->num_waiters;
287
0
            wait->is_unparking = true;
288
0
            return wait;
289
0
        }
290
0
    }
291
0
    return NULL;
292
0
}
293
294
static void
295
dequeue_all(Bucket *bucket, const void *address, struct llist_node *dst)
296
0
{
297
    // remove and append all matching waiters to dst
298
0
    struct llist_node *root = &bucket->root;
299
0
    struct llist_node *node;
300
0
    llist_for_each_safe(node, root) {
301
0
        struct wait_entry *wait = llist_data(node, struct wait_entry, node);
302
0
        if (wait->addr == (uintptr_t)address) {
303
0
            llist_remove(node);
304
0
            llist_insert_tail(dst, node);
305
0
            --bucket->num_waiters;
306
0
            wait->is_unparking = true;
307
0
        }
308
0
    }
309
0
}
310
311
// Checks that `*addr == *expected` (only works for 1, 2, 4, or 8 bytes)
312
static int
313
atomic_memcmp(const void *addr, const void *expected, size_t addr_size)
314
0
{
315
0
    switch (addr_size) {
316
0
    case 1: return _Py_atomic_load_uint8(addr) == *(const uint8_t *)expected;
317
0
    case 2: return _Py_atomic_load_uint16(addr) == *(const uint16_t *)expected;
318
0
    case 4: return _Py_atomic_load_uint32(addr) == *(const uint32_t *)expected;
319
0
    case 8: return _Py_atomic_load_uint64(addr) == *(const uint64_t *)expected;
320
0
    default: Py_UNREACHABLE();
321
0
    }
322
0
}
323
324
int
325
_PyParkingLot_Park(const void *addr, const void *expected, size_t size,
326
                   PyTime_t timeout_ns, void *park_arg, int detach)
327
0
{
328
0
    struct wait_entry wait = {
329
0
        .park_arg = park_arg,
330
0
        .addr = (uintptr_t)addr,
331
0
        .is_unparking = false,
332
0
    };
333
334
0
    Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
335
336
0
    _PyRawMutex_Lock(&bucket->mutex);
337
0
    if (!atomic_memcmp(addr, expected, size)) {
338
0
        _PyRawMutex_Unlock(&bucket->mutex);
339
0
        return Py_PARK_AGAIN;
340
0
    }
341
0
    _PySemaphore_Init(&wait.sema);
342
0
    enqueue(bucket, addr, &wait);
343
0
    _PyRawMutex_Unlock(&bucket->mutex);
344
345
0
    int res = _PySemaphore_Wait(&wait.sema, timeout_ns, detach);
346
0
    if (res == Py_PARK_OK) {
347
0
        goto done;
348
0
    }
349
350
    // timeout or interrupt
351
0
    _PyRawMutex_Lock(&bucket->mutex);
352
0
    if (wait.is_unparking) {
353
0
        _PyRawMutex_Unlock(&bucket->mutex);
354
        // Another thread has started to unpark us. Wait until we process the
355
        // wakeup signal.
356
0
        do {
357
0
            res = _PySemaphore_Wait(&wait.sema, -1, detach);
358
0
        } while (res != Py_PARK_OK);
359
0
        goto done;
360
0
    }
361
0
    else {
362
0
        llist_remove(&wait.node);
363
0
        --bucket->num_waiters;
364
0
    }
365
0
    _PyRawMutex_Unlock(&bucket->mutex);
366
367
0
done:
368
0
    _PySemaphore_Destroy(&wait.sema);
369
0
    return res;
370
371
0
}
372
373
void
374
_PyParkingLot_Unpark(const void *addr, _Py_unpark_fn_t *fn, void *arg)
375
0
{
376
0
    Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
377
378
    // Find the first waiter that is waiting on `addr`
379
0
    _PyRawMutex_Lock(&bucket->mutex);
380
0
    struct wait_entry *waiter = dequeue(bucket, addr);
381
0
    if (waiter) {
382
0
        int has_more_waiters = (bucket->num_waiters > 0);
383
0
        fn(arg, waiter->park_arg, has_more_waiters);
384
0
    }
385
0
    else {
386
0
        fn(arg, NULL, 0);
387
0
    }
388
0
    _PyRawMutex_Unlock(&bucket->mutex);
389
390
0
    if (waiter) {
391
        // Wakeup the waiter outside of the bucket lock
392
0
        _PySemaphore_Wakeup(&waiter->sema);
393
0
    }
394
0
}
395
396
void
397
_PyParkingLot_UnparkAll(const void *addr)
398
0
{
399
0
    struct llist_node head = LLIST_INIT(head);
400
0
    Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
401
402
0
    _PyRawMutex_Lock(&bucket->mutex);
403
0
    dequeue_all(bucket, addr, &head);
404
0
    _PyRawMutex_Unlock(&bucket->mutex);
405
406
0
    struct llist_node *node;
407
0
    llist_for_each_safe(node, &head) {
408
0
        struct wait_entry *waiter = llist_data(node, struct wait_entry, node);
409
0
        llist_remove(node);
410
0
        _PySemaphore_Wakeup(&waiter->sema);
411
0
    }
412
0
}
413
414
void
415
_PyParkingLot_AfterFork(void)
416
0
{
417
    // After a fork only one thread remains. That thread cannot be blocked
418
    // so all entries in the parking lot are for dead threads.
419
0
    memset(buckets, 0, sizeof(buckets));
420
0
    for (Py_ssize_t i = 0; i < NUM_BUCKETS; i++) {
421
0
        llist_init(&buckets[i].root);
422
0
    }
423
0
}