Coverage Report

Created: 2025-10-10 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/bthread/butex.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
// Date: Tue Jul 22 17:30:12 CST 2014
21
22
#include "butil/atomicops.h"                // butil::atomic
23
#include "butil/scoped_lock.h"              // BAIDU_SCOPED_LOCK
24
#include "butil/macros.h"
25
#include "butil/containers/flat_map.h"
26
#include "butil/containers/linked_list.h"   // LinkNode
27
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
28
#include "butil/memory/singleton_on_pthread_once.h"
29
#endif
30
#include "butil/logging.h"
31
#include "butil/object_pool.h"
32
#include "bthread/errno.h"                 // EWOULDBLOCK
33
#include "bthread/sys_futex.h"             // futex_*
34
#include "bthread/processor.h"             // cpu_relax
35
#include "bthread/task_control.h"          // TaskControl
36
#include "bthread/task_group.h"            // TaskGroup
37
#include "bthread/timer_thread.h"
38
#include "bthread/butex.h"
39
#include "bthread/mutex.h"
40
41
// This file implements butex.h
42
// Provides futex-like semantics which is sequenced wait and wake operations
43
// and guaranteed visibilities.
44
//
45
// If wait is sequenced before wake:
46
//    [thread1]             [thread2]
47
//    wait()                value = new_value
48
//                          wake()
49
// wait() sees unmatched value(fail to wait), or wake() sees the waiter.
50
//
51
// If wait is sequenced after wake:
52
//    [thread1]             [thread2]
53
//                          value = new_value
54
//                          wake()
55
//    wait()
56
// wake() must provide some sort of memory fence to prevent assignment
57
// of value to be reordered after it. Thus the value is visible to wait()
58
// as well.
59
60
namespace bthread {
61
62
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
63
struct ButexWaiterCount : public bvar::Adder<int64_t> {
64
    ButexWaiterCount() : bvar::Adder<int64_t>("bthread_butex_waiter_count") {}
65
};
66
inline bvar::Adder<int64_t>& butex_waiter_count() {
67
    return *butil::get_leaky_singleton<ButexWaiterCount>();
68
}
69
#endif
70
71
enum WaiterState {
72
    WAITER_STATE_NONE,
73
    WAITER_STATE_READY,
74
    WAITER_STATE_TIMEDOUT,
75
    WAITER_STATE_UNMATCHEDVALUE,
76
    WAITER_STATE_INTERRUPTED,
77
};
78
79
struct Butex;
80
81
struct ButexWaiter : public butil::LinkNode<ButexWaiter> {
82
    // tids of pthreads are 0
83
    bthread_t tid;
84
85
    // Erasing node from middle of LinkedList is thread-unsafe, we need
86
    // to hold its container's lock.
87
    butil::atomic<Butex*> container;
88
};
89
90
// non_pthread_task allocates this structure on stack and queue it in
91
// Butex::waiters.
92
struct ButexBthreadWaiter : public ButexWaiter {
93
    TaskMeta* task_meta;
94
    TimerThread::TaskId sleep_id;
95
    WaiterState waiter_state;
96
    int expected_value;
97
    Butex* initial_butex;
98
    TaskControl* control;
99
    const timespec* abstime;
100
    bthread_tag_t tag;
101
};
102
103
// pthread_task or main_task allocates this structure on stack and queue it
104
// in Butex::waiters.
105
struct ButexPthreadWaiter : public ButexWaiter {
106
    butil::atomic<int> sig;
107
};
108
109
typedef butil::LinkedList<ButexWaiter> ButexWaiterList;
110
111
enum ButexPthreadSignal { PTHREAD_NOT_SIGNALLED, PTHREAD_SIGNALLED };
112
113
struct BAIDU_CACHELINE_ALIGNMENT Butex {
114
0
    Butex() {}
115
0
    ~Butex() {}
116
117
    butil::atomic<int> value;
118
    ButexWaiterList waiters;
119
    FastPthreadMutex waiter_lock;
120
};
121
122
BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0);
123
BAIDU_CASSERT(sizeof(Butex) == BAIDU_CACHELINE_SIZE, butex_fits_in_one_cacheline);
124
125
} // namespace bthread
126
127
namespace butil {
128
// Butex object returned to the ObjectPool<Butex> may be accessed,
129
// so ObjectPool<Butex> can not poison the memory region of Butex.
130
template <>
131
struct ObjectPoolWithASanPoison<bthread::Butex> : false_type {};
132
} // namespace butil
133
134
namespace bthread {
135
136
0
static void wakeup_pthread(ButexPthreadWaiter* pw) {
137
    // release fence makes wait_pthread see changes before wakeup.
138
0
    pw->sig.store(PTHREAD_SIGNALLED, butil::memory_order_release);
139
    // At this point, wait_pthread() possibly has woken up and destroyed `pw'.
140
    // In which case, futex_wake_private() should return EFAULT.
141
    // If crash happens in future, `pw' can be made TLS and never destroyed
142
    // to solve the issue.
143
0
    futex_wake_private(&pw->sig, 1);
144
0
}
145
146
bool erase_from_butex(ButexWaiter*, bool, WaiterState);
147
148
0
int wait_pthread(ButexPthreadWaiter& pw, const timespec* abstime) {
149
0
    timespec* ptimeout = NULL;
150
0
    timespec timeout;
151
0
    int64_t timeout_us = 0;
152
0
    int rc;
153
154
0
    while (true) {
155
0
        if (abstime != NULL) {
156
0
            timeout_us = butil::timespec_to_microseconds(*abstime) - butil::gettimeofday_us();
157
0
            timeout = butil::microseconds_to_timespec(timeout_us);
158
0
            ptimeout = &timeout;
159
0
        }
160
0
        if (timeout_us > MIN_SLEEP_US || abstime == NULL) {
161
0
            rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout);
162
0
            if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) {
163
                // If `sig' is changed, wakeup_pthread() must be called and `pw'
164
                // is already removed from the butex.
165
                // Acquire fence makes this thread sees changes before wakeup.
166
0
                return rc;
167
0
            }
168
0
        } else {
169
0
            errno = ETIMEDOUT;
170
0
            rc = -1;
171
0
        }
172
        // Handle ETIMEDOUT when abstime is valid.
173
        // If futex_wait_private return EINTR, just continue the loop.
174
0
        if (rc != 0 && errno == ETIMEDOUT) {
175
            // wait futex timeout, `pw' is still in the queue, remove it.
176
0
            if (!erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT)) {
177
                // Another thread is erasing `pw' as well, wait for the signal.
178
                // Acquire fence makes this thread sees changes before wakeup.
179
0
                if (pw.sig.load(butil::memory_order_acquire) == PTHREAD_NOT_SIGNALLED) {
180
                    // already timedout, abstime and ptimeout are expired.
181
0
                    abstime = NULL;
182
0
                    ptimeout = NULL;
183
0
                    continue;
184
0
                }
185
0
            }
186
0
            return rc;
187
0
        }
188
0
    }
189
0
}
190
191
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
192
193
// Returns 0 when no need to unschedule or successfully unscheduled,
194
// -1 otherwise.
195
inline int unsleep_if_necessary(ButexBthreadWaiter* w,
196
0
                                TimerThread* timer_thread) {
197
0
    if (!w->sleep_id) {
198
0
        return 0;
199
0
    }
200
0
    if (timer_thread->unschedule(w->sleep_id) > 0) {
201
        // the callback is running.
202
0
        return -1;
203
0
    }
204
0
    w->sleep_id = 0;
205
0
    return 0;
206
0
}
207
208
// Use ObjectPool(which never frees memory) to solve the race between
209
// butex_wake() and butex_destroy(). The race is as follows:
210
//
211
//   class Event {
212
//   public:
213
//     void wait() {
214
//       _mutex.lock();
215
//       if (!_done) {
216
//         _cond.wait(&_mutex);
217
//       }
218
//       _mutex.unlock();
219
//     }
220
//     void signal() {
221
//       _mutex.lock();
222
//       if (!_done) {
223
//         _done = true;
224
//         _cond.signal();
225
//       }
226
//       _mutex.unlock();  /*1*/
227
//     }
228
//   private:
229
//     bool _done = false;
230
//     Mutex _mutex;
231
//     Condition _cond;
232
//   };
233
//
234
//   [Thread1]                         [Thread2]
235
//   foo() {
236
//     Event event;
237
//     pass_to_thread2(&event);  --->  event.signal();
238
//     event.wait();
239
//   } <-- event destroyed
240
//   
241
// Summary: Thread1 passes a stateful condition to Thread2 and waits until
242
// the condition being signalled, which basically means the associated
243
// job is done and Thread1 can release related resources including the mutex
244
// and condition. The scenario is fine and the code is correct.
245
// The race needs a closer look. The unlock at /*1*/ may have different 
246
// implementations, but in which the last step is probably an atomic store
247
// and butex_wake(), like this:
248
//
249
//   locked->store(0);
250
//   butex_wake(locked);
251
//
252
// The `locked' represents the locking status of the mutex. The issue is that
253
// just after the store(), the mutex is already unlocked and the code in
254
// Event.wait() may successfully grab the lock and go through everything
255
// left and leave foo() function, destroying the mutex and butex, making
256
// the butex_wake(locked) crash.
257
// To solve this issue, one method is to add reference before store and
258
// release the reference after butex_wake. However reference countings need
259
// to be added in nearly every user scenario of butex_wake(), which is very
260
// error-prone. Another method is never freeing butex, with the side effect 
261
// that butex_wake() may wake up an unrelated butex(the one reuses the memory)
262
// and cause spurious wakeups. According to our observations, the race is 
263
// infrequent, even rare. The extra spurious wakeups should be acceptable.
264
265
0
void* butex_create() {
266
0
    Butex* b = butil::get_object<Butex>();
267
0
    if (b) {
268
0
        return &b->value;
269
0
    }
270
0
    return NULL;
271
0
}
272
273
0
void butex_destroy(void* butex) {
274
0
    if (!butex) {
275
0
        return;
276
0
    }
277
0
    Butex* b = static_cast<Butex*>(
278
0
        container_of(static_cast<butil::atomic<int>*>(butex), Butex, value));
279
0
    butil::return_object(b);
280
0
}
281
282
// if TaskGroup tls_task_group is belong to tag
283
0
inline bool is_same_tag(bthread_tag_t tag) {
284
0
    return tls_task_group && tls_task_group->tag() == tag;
285
0
}
286
287
//  nosignal is true & tag is same can return true
288
0
inline bool check_nosignal(bool nosignal, bthread_tag_t tag) {
289
0
    return nosignal && is_same_tag(tag);
290
0
}
291
292
// if tag is same return tls_task_group else choose one group with tag
293
0
inline TaskGroup* get_task_group(TaskControl* c, bthread_tag_t tag) {
294
0
    return is_same_tag(tag) ? tls_task_group : c->choose_one_group(tag);
295
0
}
296
297
0
inline void run_in_local_task_group(TaskGroup* g, TaskMeta* next_meta, bool nosignal) {
298
0
    if (!nosignal) {
299
0
        TaskGroup::exchange(&g, next_meta);
300
0
    } else {
301
0
        g->ready_to_run(next_meta, nosignal);
302
0
    }
303
0
}
304
305
0
int butex_wake(void* arg, bool nosignal) {
306
0
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
307
0
    ButexWaiter* front = NULL;
308
0
    {
309
0
        BAIDU_SCOPED_LOCK(b->waiter_lock);
310
0
        if (b->waiters.empty()) {
311
0
            return 0;
312
0
        }
313
0
        front = b->waiters.head()->value();
314
0
        front->RemoveFromList();
315
0
        front->container.store(NULL, butil::memory_order_relaxed);
316
0
    }
317
0
    if (front->tid == 0) {
318
0
        wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
319
0
        return 1;
320
0
    }
321
0
    ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
322
0
    unsleep_if_necessary(bbw, get_global_timer_thread());
323
0
    TaskGroup* g = get_task_group(bbw->control, bbw->tag);
324
0
    if (g == tls_task_group) {
325
0
        run_in_local_task_group(g, bbw->task_meta, nosignal);
326
0
    } else {
327
0
        g->ready_to_run_remote(bbw->task_meta, check_nosignal(nosignal, g->tag()));
328
0
    }
329
0
    return 1;
330
0
}
331
332
0
int butex_wake_n(void* arg, size_t n, bool nosignal) {
333
0
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
334
335
0
    ButexWaiterList bthread_waiters;
336
0
    ButexWaiterList pthread_waiters;
337
0
    {
338
0
        BAIDU_SCOPED_LOCK(b->waiter_lock);
339
0
        for (size_t i = 0; (n == 0 || i < n) && !b->waiters.empty(); ++i) {
340
0
            ButexWaiter* bw = b->waiters.head()->value();
341
0
            bw->RemoveFromList();
342
0
            bw->container.store(NULL, butil::memory_order_relaxed);
343
0
            if (bw->tid) {
344
0
                bthread_waiters.Append(bw);
345
0
            } else {
346
0
                pthread_waiters.Append(bw);
347
0
            }
348
0
        }
349
0
    }
350
351
0
    int nwakeup = 0;
352
0
    while (!pthread_waiters.empty()) {
353
0
        ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>(
354
0
            pthread_waiters.head()->value());
355
0
        bw->RemoveFromList();
356
0
        wakeup_pthread(bw);
357
0
        ++nwakeup;
358
0
    }
359
0
    if (bthread_waiters.empty()) {
360
0
        return nwakeup;
361
0
    }
362
0
    butil::FlatMap<bthread_tag_t, TaskGroup*> nwakeups;
363
0
    nwakeups.init(FLAGS_task_group_ntags);
364
    // We will exchange with first waiter in the end.
365
0
    ButexBthreadWaiter* next = static_cast<ButexBthreadWaiter*>(
366
0
        bthread_waiters.head()->value());
367
0
    next->RemoveFromList();
368
0
    unsleep_if_necessary(next, get_global_timer_thread());
369
0
    ++nwakeup;
370
0
    while (!bthread_waiters.empty()) {
371
        // pop reversely
372
0
        ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(
373
0
            bthread_waiters.tail()->value());
374
0
        w->RemoveFromList();
375
0
        unsleep_if_necessary(w, get_global_timer_thread());
376
0
        auto g = get_task_group(w->control, w->tag);
377
0
        g->ready_to_run_general(w->task_meta, true);
378
0
        nwakeups[g->tag()] = g;
379
0
        ++nwakeup;
380
0
    }
381
0
    for (auto it = nwakeups.begin(); it != nwakeups.end(); ++it) {
382
0
        auto g = it->second;
383
0
        if (!check_nosignal(nosignal, g->tag())) {
384
0
            g->flush_nosignal_tasks_general();
385
0
        }
386
0
    }
387
0
    auto g = get_task_group(next->control, next->tag);
388
0
    if (g == tls_task_group) {
389
0
        run_in_local_task_group(g, next->task_meta, nosignal);
390
0
    } else {
391
0
        g->ready_to_run_remote(next->task_meta, check_nosignal(nosignal, g->tag()));
392
0
    }
393
0
    return nwakeup;
394
0
}
395
396
0
int butex_wake_all(void* arg, bool nosignal) {
397
0
    return butex_wake_n(arg, 0, nosignal);
398
0
}
399
400
0
int butex_wake_except(void* arg, bthread_t excluded_bthread) {
401
0
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
402
403
0
    ButexWaiterList bthread_waiters;
404
0
    ButexWaiterList pthread_waiters;
405
0
    {
406
0
        ButexWaiter* excluded_waiter = NULL;
407
0
        BAIDU_SCOPED_LOCK(b->waiter_lock);
408
0
        while (!b->waiters.empty()) {
409
0
            ButexWaiter* bw = b->waiters.head()->value();
410
0
            bw->RemoveFromList();
411
412
0
            if (bw->tid) {
413
0
                if (bw->tid != excluded_bthread) {
414
0
                    bthread_waiters.Append(bw);
415
0
                    bw->container.store(NULL, butil::memory_order_relaxed);
416
0
                } else {
417
0
                    excluded_waiter = bw;
418
0
                }
419
0
            } else {
420
0
                bw->container.store(NULL, butil::memory_order_relaxed);
421
0
                pthread_waiters.Append(bw);
422
0
            }
423
0
        }
424
425
0
        if (excluded_waiter) {
426
0
            b->waiters.Append(excluded_waiter);
427
0
        }
428
0
    }
429
430
0
    int nwakeup = 0;
431
0
    while (!pthread_waiters.empty()) {
432
0
        ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>(
433
0
            pthread_waiters.head()->value());
434
0
        bw->RemoveFromList();
435
0
        wakeup_pthread(bw);
436
0
        ++nwakeup;
437
0
    }
438
439
0
    if (bthread_waiters.empty()) {
440
0
        return nwakeup;
441
0
    }
442
0
    butil::FlatMap<bthread_tag_t, TaskGroup*> nwakeups;
443
0
    nwakeups.init(FLAGS_task_group_ntags);
444
0
    do {
445
        // pop reversely
446
0
        ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(bthread_waiters.tail()->value());
447
0
        w->RemoveFromList();
448
0
        unsleep_if_necessary(w, get_global_timer_thread());
449
0
        auto g = get_task_group(w->control, w->tag);
450
0
        g->ready_to_run_general(w->task_meta, true);
451
0
        nwakeups[g->tag()] = g;
452
0
        ++nwakeup;
453
0
    } while (!bthread_waiters.empty());
454
0
    for (auto it = nwakeups.begin(); it != nwakeups.end(); ++it) {
455
0
        auto g = it->second;
456
0
        g->flush_nosignal_tasks_general();
457
0
    }
458
0
    return nwakeup;
459
0
}
460
461
0
int butex_requeue(void* arg, void* arg2) {
462
0
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
463
0
    Butex* m = container_of(static_cast<butil::atomic<int>*>(arg2), Butex, value);
464
465
0
    ButexWaiter* front = NULL;
466
0
    {
467
0
        std::unique_lock<FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock);
468
0
        std::unique_lock<FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock);
469
0
        butil::double_lock(lck1, lck2);
470
0
        if (b->waiters.empty()) {
471
0
            return 0;
472
0
        }
473
474
0
        front = b->waiters.head()->value();
475
0
        front->RemoveFromList();
476
0
        front->container.store(NULL, butil::memory_order_relaxed);
477
478
0
        while (!b->waiters.empty()) {
479
0
            ButexWaiter* bw = b->waiters.head()->value();
480
0
            bw->RemoveFromList();
481
0
            m->waiters.Append(bw);
482
0
            bw->container.store(m, butil::memory_order_relaxed);
483
0
        }
484
0
    }
485
486
0
    if (front->tid == 0) {  // which is a pthread
487
0
        wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
488
0
        return 1;
489
0
    }
490
0
    ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
491
0
    unsleep_if_necessary(bbw, get_global_timer_thread());
492
0
    auto g = is_same_tag(bbw->tag) ? tls_task_group : NULL;
493
0
    if (g) {
494
0
        TaskGroup::exchange(&g, bbw->task_meta);
495
0
    } else {
496
0
        bbw->control->choose_one_group(bbw->tag)->ready_to_run_remote(bbw->task_meta);
497
0
    }
498
0
    return 1;
499
0
}
500
501
// Callable from multiple threads, at most one thread may wake up the waiter.
502
0
static void erase_from_butex_and_wakeup(void* arg) {
503
0
    erase_from_butex(static_cast<ButexWaiter*>(arg), true, WAITER_STATE_TIMEDOUT);
504
0
}
505
506
// Used in task_group.cpp
507
0
bool erase_from_butex_because_of_interruption(ButexWaiter* bw) {
508
0
    return erase_from_butex(bw, true, WAITER_STATE_INTERRUPTED);
509
0
}
510
511
0
inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) {
512
    // `bw' is guaranteed to be valid inside this function because waiter
513
    // will wait until this function being cancelled or finished.
514
    // NOTE: This function must be no-op when bw->container is NULL.
515
0
    bool erased = false;
516
0
    Butex* b;
517
0
    int saved_errno = errno;
518
0
    while ((b = bw->container.load(butil::memory_order_acquire))) {
519
        // b can be NULL when the waiter is scheduled but queued.
520
0
        BAIDU_SCOPED_LOCK(b->waiter_lock);
521
0
        if (b == bw->container.load(butil::memory_order_relaxed)) {
522
0
            bw->RemoveFromList();
523
0
            bw->container.store(NULL, butil::memory_order_relaxed);
524
0
            if (bw->tid) {
525
0
                static_cast<ButexBthreadWaiter*>(bw)->waiter_state = state;
526
0
            }
527
0
            erased = true;
528
0
            break;
529
0
        }
530
0
    }
531
0
    if (erased && wakeup) {
532
0
        if (bw->tid) {
533
0
            ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(bw);
534
0
            get_task_group(bbw->control, bbw->tag)->ready_to_run_general(bbw->task_meta);
535
0
        } else {
536
0
            ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw);
537
0
            wakeup_pthread(pw);
538
0
        }
539
0
    }
540
0
    errno = saved_errno;
541
0
    return erased;
542
0
}
543
544
struct WaitForButexArgs {
545
    ButexBthreadWaiter* bw;
546
    bool prepend;
547
};
548
549
0
void wait_for_butex(void* arg) {
550
0
    auto args = static_cast<WaitForButexArgs*>(arg);
551
0
    ButexBthreadWaiter* const bw = args->bw;
552
0
    Butex* const b = bw->initial_butex;
553
    // 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
554
    //    before they're queued, otherwise the waiter is already timedout
555
    //    and removed by TimerThread, in which case we should stop queueing.
556
    //
557
    // Visibility of waiter_state:
558
    //    [bthread]                         [TimerThread]
559
    //    waiter_state = TIMED
560
    //    tt_lock { add task }
561
    //                                      tt_lock { get task }
562
    //                                      waiter_lock { waiter_state=TIMEDOUT }
563
    //    waiter_lock { use waiter_state }
564
    // tt_lock represents TimerThread::_mutex. Visibility of waiter_state is
565
    // sequenced by two locks, both threads are guaranteed to see the correct
566
    // value.
567
0
    {
568
0
        BAIDU_SCOPED_LOCK(b->waiter_lock);
569
0
        if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) {
570
0
            bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
571
0
        } else if (bw->waiter_state == WAITER_STATE_READY/*1*/ &&
572
0
                   !bw->task_meta->interrupted) {
573
0
            if (args->prepend) {
574
0
                b->waiters.Prepend(bw);
575
0
            } else {
576
0
                b->waiters.Append(bw);
577
0
            }
578
0
            bw->container.store(b, butil::memory_order_relaxed);
579
#ifdef BRPC_BTHREAD_TRACER
580
            bw->control->_task_tracer.set_status(TASK_STATUS_SUSPENDED, bw->task_meta);
581
#endif // BRPC_BTHREAD_TRACER
582
0
            if (bw->abstime != NULL) {
583
0
                bw->sleep_id = get_global_timer_thread()->schedule(
584
0
                    erase_from_butex_and_wakeup, bw, *bw->abstime);
585
0
                if (!bw->sleep_id) {  // TimerThread stopped.
586
0
                    errno = ESTOP;
587
0
                    erase_from_butex_and_wakeup(bw);
588
0
                }
589
0
            }
590
0
            return;
591
0
        }
592
0
    }
593
    
594
    // b->container is NULL which makes erase_from_butex_and_wakeup() and
595
    // TaskGroup::interrupt() no-op, there's no race between following code and
596
    // the two functions. The on-stack ButexBthreadWaiter is safe to use and
597
    // bw->waiter_state will not change again.
598
    // unsleep_if_necessary(bw, get_global_timer_thread());
599
0
    tls_task_group->ready_to_run(bw->task_meta);
600
    // FIXME: jump back to original thread is buggy.
601
    
602
    // // Value unmatched or waiter is already woken up by TimerThread, jump
603
    // // back to original bthread.
604
    // TaskGroup* g = tls_task_group;
605
    // ReadyToRunArgs args = { g->current_tid(), false };
606
    // g->set_remained(TaskGroup::ready_to_run_in_worker, &args);
607
    // // 2: Don't run remained because we're already in a remained function
608
    // //    otherwise stack may overflow.
609
    // TaskGroup::sched_to(&g, bw->tid, false/*2*/);
610
0
}
611
612
static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
613
0
                                   const timespec* abstime, bool prepend) {
614
0
    TaskMeta* task = NULL;
615
0
    ButexPthreadWaiter pw;
616
0
    pw.tid = 0;
617
0
    pw.sig.store(PTHREAD_NOT_SIGNALLED, butil::memory_order_relaxed);
618
0
    int rc = 0;
619
    
620
0
    if (g) {
621
0
        task = g->current_task();
622
0
        task->current_waiter.store(&pw, butil::memory_order_release);
623
0
    }
624
0
    b->waiter_lock.lock();
625
0
    if (b->value.load(butil::memory_order_relaxed) != expected_value) {
626
0
        b->waiter_lock.unlock();
627
0
        errno = EWOULDBLOCK;
628
0
        rc = -1;
629
0
    } else if (task != NULL && task->interrupted) {
630
0
        b->waiter_lock.unlock();
631
        // Race with set and may consume multiple interruptions, which are OK.
632
0
        task->interrupted = false;
633
0
        errno = EINTR;
634
0
        rc = -1;
635
0
    } else {
636
0
        if (prepend) {
637
0
            b->waiters.Prepend(&pw);
638
0
        } else {
639
0
            b->waiters.Append(&pw);
640
0
        }
641
0
        pw.container.store(b, butil::memory_order_relaxed);
642
0
        b->waiter_lock.unlock();
643
644
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
645
        bvar::Adder<int64_t>& num_waiters = butex_waiter_count();
646
        num_waiters << 1;
647
#endif
648
0
        rc = wait_pthread(pw, abstime);
649
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
650
        num_waiters << -1;
651
#endif
652
0
    }
653
0
    if (task) {
654
        // If current_waiter is NULL, TaskGroup::interrupt() is running and
655
        // using pw, spin until current_waiter != NULL.
656
0
        BT_LOOP_WHEN(task->current_waiter.exchange(
657
0
                         NULL, butil::memory_order_acquire) == NULL,
658
0
                     30/*nops before sched_yield*/);
659
0
        if (task->interrupted) {
660
0
            task->interrupted = false;
661
0
            if (rc == 0) {
662
0
                errno = EINTR;
663
0
                return -1;
664
0
            }
665
0
        }
666
0
    }
667
0
    return rc;
668
0
}
669
670
0
int butex_wait(void* arg, int expected_value, const timespec* abstime, bool prepend) {
671
0
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
672
0
    if (b->value.load(butil::memory_order_relaxed) != expected_value) {
673
0
        errno = EWOULDBLOCK;
674
        // Sometimes we may take actions immediately after unmatched butex,
675
        // this fence makes sure that we see changes before changing butex.
676
0
        butil::atomic_thread_fence(butil::memory_order_acquire);
677
0
        return -1;
678
0
    }
679
0
    TaskGroup* g = tls_task_group;
680
0
    if (NULL == g || g->is_current_pthread_task()) {
681
0
        return butex_wait_from_pthread(g, b, expected_value, abstime, prepend);
682
0
    }
683
0
    ButexBthreadWaiter bbw;
684
    // tid is 0 iff the thread is non-bthread
685
0
    bbw.tid = g->current_tid();
686
0
    bbw.container.store(NULL, butil::memory_order_relaxed);
687
0
    bbw.task_meta = g->current_task();
688
0
    bbw.sleep_id = 0;
689
0
    bbw.waiter_state = WAITER_STATE_READY;
690
0
    bbw.expected_value = expected_value;
691
0
    bbw.initial_butex = b;
692
0
    bbw.control = g->control();
693
0
    bbw.abstime = abstime;
694
0
    bbw.tag = g->tag();
695
696
0
    if (abstime != NULL) {
697
        // Schedule timer before queueing. If the timer is triggered before
698
        // queueing, cancel queueing. This is a kind of optimistic locking.
699
0
        if (butil::timespec_to_microseconds(*abstime) <
700
0
            (butil::gettimeofday_us() + MIN_SLEEP_US)) {
701
            // Already timed out.
702
0
            errno = ETIMEDOUT;
703
0
            return -1;
704
0
        }
705
0
    }
706
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
707
    bvar::Adder<int64_t>& num_waiters = butex_waiter_count();
708
    num_waiters << 1;
709
#endif
710
711
    // release fence matches with acquire fence in interrupt_and_consume_waiters
712
    // in task_group.cpp to guarantee visibility of `interrupted'.
713
0
    bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
714
0
    WaitForButexArgs args{ &bbw, prepend };
715
0
    g->set_remained(wait_for_butex, &args);
716
0
    TaskGroup::sched(&g);
717
718
    // erase_from_butex_and_wakeup (called by TimerThread) is possibly still
719
    // running and using bbw. The chance is small, just spin until it's done.
720
0
    BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0,
721
0
                 30/*nops before sched_yield*/);
722
    
723
    // If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw.
724
    // Spin until current_waiter != NULL.
725
0
    BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
726
0
                     NULL, butil::memory_order_acquire) == NULL,
727
0
                 30/*nops before sched_yield*/);
728
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
729
    num_waiters << -1;
730
#endif
731
732
0
    bool is_interrupted = false;
733
0
    if (bbw.task_meta->interrupted) {
734
        // Race with set and may consume multiple interruptions, which are OK.
735
0
        bbw.task_meta->interrupted = false;
736
0
        is_interrupted = true;
737
0
    }
738
    // If timed out as well as value unmatched, return ETIMEDOUT.
739
0
    if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) {
740
0
        errno = ETIMEDOUT;
741
0
        return -1;
742
0
    } else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) {
743
0
        errno = EWOULDBLOCK;
744
0
        return -1;
745
0
    } else if (is_interrupted) {
746
0
        errno = EINTR;
747
0
        return -1;
748
0
    }
749
0
    return 0;
750
0
}
751
752
}  // namespace bthread
753
754
namespace butil {
755
template <> struct ObjectPoolBlockMaxItem<bthread::Butex> {
756
    static const size_t value = 128;
757
};
758
}