Coverage Report

Created: 2026-01-10 07:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/bthread/task_group.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
// Date: Tue Jul 10 17:40:58 CST 2012
21
22
#include <sys/types.h>
23
#include <stddef.h>                         // size_t
24
#include <gflags/gflags.h>
25
#include "butil/compat.h"                   // OS_MACOSX
26
#include "butil/macros.h"                   // ARRAY_SIZE
27
#include "butil/scoped_lock.h"              // BAIDU_SCOPED_LOCK
28
#include "butil/fast_rand.h"
29
#include "butil/unique_ptr.h"
30
#include "butil/third_party/murmurhash3/murmurhash3.h" // fmix64
31
#include "butil/reloadable_flags.h"
32
#include "bthread/errno.h"                  // ESTOP
33
#include "bthread/butex.h"                  // butex_*
34
#include "bthread/sys_futex.h"              // futex_wake_private
35
#include "bthread/processor.h"              // cpu_relax
36
#include "bthread/task_control.h"
37
#include "bthread/task_group.h"
38
#include "bthread/timer_thread.h"
39
#include "bthread/bthread.h"
40
41
#ifdef __x86_64__
42
#include <x86intrin.h>
43
#endif // __x86_64__
44
45
#ifdef __ARM_NEON
46
#include <arm_neon.h>
47
#endif // __ARM_NEON
48
49
namespace bthread {
50
51
static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = {
52
    BTHREAD_STACKTYPE_UNKNOWN, 0, NULL, BTHREAD_TAG_INVALID, {0} };
53
54
DEFINE_bool(show_bthread_creation_in_vars, false, "When this flags is on, The time "
55
            "from bthread creation to first run will be recorded and shown in /vars");
56
BUTIL_VALIDATE_GFLAG(show_bthread_creation_in_vars, butil::PassValidate);
57
58
DEFINE_bool(show_per_worker_usage_in_vars, false,
59
            "Show per-worker usage in /vars/bthread_per_worker_usage_<tid>");
60
BUTIL_VALIDATE_GFLAG(show_per_worker_usage_in_vars, butil::PassValidate);
61
62
DEFINE_bool(bthread_enable_cpu_clock_stat, false,
63
            "Enable CPU clock statistics for bthread");
64
BUTIL_VALIDATE_GFLAG(bthread_enable_cpu_clock_stat, butil::PassValidate);
65
66
BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group, NULL);
67
// Sync with TaskMeta::local_storage when a bthread is created or destroyed.
68
// During running, the two fields may be inconsistent, use tls_bls as the
69
// groundtruth.
70
BAIDU_VOLATILE_THREAD_LOCAL(LocalStorage, tls_bls, BTHREAD_LOCAL_STORAGE_INITIALIZER);
71
72
// defined in bthread/key.cpp
73
extern void return_keytable(bthread_keytable_pool_t*, KeyTable*);
74
75
// [Hacky] This is a special TLS set by bthread-rpc privately... to save
76
// overhead of creation keytable, may be removed later.
77
BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, NULL);
78
79
const TaskStatistics EMPTY_STAT = { 0, 0, 0 };
80
81
void* (*g_create_span_func)() = NULL;
82
83
0
void* run_create_span_func() {
84
0
    if (g_create_span_func) {
85
0
        return g_create_span_func();
86
0
    }
87
0
    return BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls).rpcz_parent_span;
88
0
}
89
90
0
AtomicInteger128::Value AtomicInteger128::load() const {
91
0
#if __x86_64__ || __ARM_NEON
92
    // Supress compiler warning.
93
0
    (void)_mutex;
94
0
#endif // __x86_64__ || __ARM_NEON
95
96
0
#if __x86_64__ || __ARM_NEON
97
0
#ifdef __x86_64__
98
0
    __m128i value = _mm_load_si128(reinterpret_cast<const __m128i*>(&_value));
99
#else // __ARM_NEON
100
    int64x2_t value = vld1q_s64(reinterpret_cast<const int64_t*>(&_value));
101
#endif // __x86_64__
102
0
    return {value[0], value[1]};
103
#else // __x86_64__ || __ARM_NEON
104
    // RISC-V and other architectures use mutex fallback
105
    BAIDU_SCOPED_LOCK(const_cast<FastPthreadMutex&>(_mutex));
106
    return _value;
107
#endif // __x86_64__ || __ARM_NEON
108
0
}
109
110
0
void AtomicInteger128::store(Value value) {
111
0
#if __x86_64__
112
0
    __m128i v = _mm_load_si128(reinterpret_cast<__m128i*>(&value));
113
0
    _mm_store_si128(reinterpret_cast<__m128i*>(&_value), v);
114
#elif __ARM_NEON
115
    int64x2_t v = vld1q_s64(reinterpret_cast<int64_t*>(&value));
116
    vst1q_s64(reinterpret_cast<int64_t*>(&_value), v);
117
#else
118
    // RISC-V and other architectures use mutex fallback
119
    BAIDU_SCOPED_LOCK(const_cast<FastPthreadMutex&>(_mutex));
120
    _value = value;
121
#endif // __x86_64__ || __ARM_NEON
122
0
}
123
124
125
0
int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) {
126
0
    TaskMeta* const m = address_meta(tid);
127
0
    if (m != NULL) {
128
0
        const uint32_t given_ver = get_version(tid);
129
0
        BAIDU_SCOPED_LOCK(m->version_lock);
130
0
        if (given_ver == *m->version_butex) {
131
0
            *out = m->attr;
132
0
            return 0;
133
0
        }
134
0
    }
135
0
    errno = EINVAL;
136
0
    return -1;
137
0
}
138
139
0
void TaskGroup::set_stopped(bthread_t tid) {
140
0
    TaskMeta* const m = address_meta(tid);
141
0
    if (m != NULL) {
142
0
        const uint32_t given_ver = get_version(tid);
143
0
        BAIDU_SCOPED_LOCK(m->version_lock);
144
0
        if (given_ver == *m->version_butex) {
145
0
            m->stop = true;
146
0
        }
147
0
    }
148
0
}
149
150
0
bool TaskGroup::is_stopped(bthread_t tid) {
151
0
    TaskMeta* const m = address_meta(tid);
152
0
    if (m != NULL) {
153
0
        const uint32_t given_ver = get_version(tid);
154
0
        BAIDU_SCOPED_LOCK(m->version_lock);
155
0
        if (given_ver == *m->version_butex) {
156
0
            return m->stop;
157
0
        }
158
0
    }
159
    // If the tid does not exist or version does not match, it's intuitive
160
    // to treat the thread as "stopped".
161
0
    return true;
162
0
}
163
164
0
bool TaskGroup::wait_task(bthread_t* tid) {
165
0
    do {
166
0
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
167
0
        if (_last_pl_state.stopped()) {
168
0
            return false;
169
0
        }
170
0
        _pl->wait(_last_pl_state);
171
0
        if (steal_task(tid)) {
172
0
            return true;
173
0
        }
174
#else
175
        const ParkingLot::State st = _pl->get_state();
176
        if (st.stopped()) {
177
            return false;
178
        }
179
        if (steal_task(tid)) {
180
            return true;
181
        }
182
        _pl->wait(st);
183
#endif
184
0
    } while (true);
185
0
}
186
187
0
static double get_cumulated_cputime_from_this(void* arg) {
188
0
    return static_cast<TaskGroup*>(arg)->cumulated_cputime_ns() / 1000000000.0;
189
0
}
190
191
0
int64_t TaskGroup::cumulated_cputime_ns() const {
192
0
    CPUTimeStat cpu_time_stat = _cpu_time_stat.load();
193
    // Add the elapsed time of running bthread.
194
0
    int64_t cumulated_cputime_ns = cpu_time_stat.cumulated_cputime_ns();
195
0
    if (!cpu_time_stat.is_main_task()) {
196
0
        cumulated_cputime_ns += butil::cpuwide_time_ns() - cpu_time_stat.last_run_ns();
197
0
    }
198
0
    return cumulated_cputime_ns;
199
0
}
200
201
0
void TaskGroup::run_main_task() {
202
0
    bvar::PassiveStatus<double> cumulated_cputime(
203
0
        get_cumulated_cputime_from_this, this);
204
0
    std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;
205
206
0
    TaskGroup* dummy = this;
207
0
    bthread_t tid;
208
0
    while (wait_task(&tid)) {
209
0
        sched_to(&dummy, tid);
210
0
        DCHECK_EQ(this, dummy);
211
0
        DCHECK_EQ(_cur_meta->stack, _main_stack);
212
0
        if (_cur_meta->tid != _main_tid) {
213
0
            task_runner(1/*skip remained*/);
214
0
        }
215
0
        if (FLAGS_show_per_worker_usage_in_vars && !usage_bvar) {
216
0
            char name[32];
217
#if defined(OS_MACOSX)
218
            snprintf(name, sizeof(name), "bthread_worker_usage_%" PRIu64,
219
                     pthread_numeric_id());
220
#else
221
0
            snprintf(name, sizeof(name), "bthread_worker_usage_%ld",
222
0
                     (long)syscall(SYS_gettid));
223
0
#endif
224
0
            usage_bvar.reset(new bvar::PerSecond<bvar::PassiveStatus<double> >
225
0
                             (name, &cumulated_cputime, 1));
226
0
        }
227
0
    }
228
    // Don't forget to add elapse of last wait_task.
229
0
    current_task()->stat.cputime_ns +=
230
0
        butil::cpuwide_time_ns() - _cpu_time_stat.load_unsafe().last_run_ns();
231
0
}
232
233
TaskGroup::TaskGroup(TaskControl* c)
234
0
    :  _control(c) {
235
0
    CHECK(c);
236
0
}
237
238
0
TaskGroup::~TaskGroup() {
239
0
    if (_main_tid) {
240
0
        TaskMeta* m = address_meta(_main_tid);
241
0
        CHECK(_main_stack == m->stack);
242
#ifdef BUTIL_USE_ASAN
243
        _main_stack->storage.bottom = NULL;
244
        _main_stack->storage.stacksize = 0;
245
#endif // BUTIL_USE_ASAN
246
0
        return_stack(m->release_stack());
247
0
        return_resource(get_slot(_main_tid));
248
0
        _main_tid = 0;
249
0
    }
250
0
}
251
252
#ifdef BUTIL_USE_ASAN
253
int PthreadAttrGetStack(void*& stack_addr, size_t& stack_size) {
254
#if defined(OS_MACOSX)
255
    stack_addr = pthread_get_stackaddr_np(pthread_self());
256
    stack_size = pthread_get_stacksize_np(pthread_self());
257
    return 0;
258
#else
259
    pthread_attr_t attr;
260
    int rc = pthread_getattr_np(pthread_self(), &attr);
261
    if (0 != rc) {
262
        LOG(ERROR) << "Fail to get pthread attributes: " << berror(rc);
263
        return rc;
264
    }
265
    rc = pthread_attr_getstack(&attr, &stack_addr, &stack_size);
266
    if (0 != rc) {
267
        LOG(ERROR) << "Fail to get pthread stack: " << berror(rc);
268
    }
269
    pthread_attr_destroy(&attr);
270
    return rc;
271
#endif // OS_MACOSX
272
}
273
#endif // BUTIL_USE_ASAN
274
275
0
int TaskGroup::init(size_t runqueue_capacity) {
276
0
    if (_rq.init(runqueue_capacity) != 0) {
277
0
        LOG(FATAL) << "Fail to init _rq";
278
0
        return -1;
279
0
    }
280
0
    if (_remote_rq.init(runqueue_capacity / 2) != 0) {
281
0
        LOG(FATAL) << "Fail to init _remote_rq";
282
0
        return -1;
283
0
    }
284
285
#ifdef BUTIL_USE_ASAN
286
    void* stack_addr = NULL;
287
    size_t stack_size = 0;
288
    if (0 != PthreadAttrGetStack(stack_addr, stack_size)) {
289
        return -1;
290
    }
291
#endif // BUTIL_USE_ASAN
292
293
0
    ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
294
0
    if (NULL == stk) {
295
0
        LOG(FATAL) << "Fail to get main stack container";
296
0
        return -1;
297
0
    }
298
0
    butil::ResourceId<TaskMeta> slot;
299
0
    TaskMeta* m = butil::get_resource<TaskMeta>(&slot);
300
0
    if (NULL == m) {
301
0
        LOG(FATAL) << "Fail to get TaskMeta";
302
0
        return -1;
303
0
    }
304
0
    m->sleep_failed = false;
305
0
    m->stop = false;
306
0
    m->interrupted = false;
307
0
    m->about_to_quit = false;
308
0
    m->fn = NULL;
309
0
    m->arg = NULL;
310
0
    m->local_storage = LOCAL_STORAGE_INIT;
311
0
    m->cpuwide_start_ns = butil::cpuwide_time_ns();
312
0
    m->stat = EMPTY_STAT;
313
0
    m->attr = BTHREAD_ATTR_TASKGROUP;
314
0
    m->tid = make_tid(*m->version_butex, slot);
315
0
    m->set_stack(stk);
316
317
#ifdef BUTIL_USE_ASAN
318
    stk->storage.bottom = stack_addr;
319
    stk->storage.stacksize = stack_size;
320
    // No guard size required for ASan.
321
#endif // BUTIL_USE_ASAN
322
323
0
    _cur_meta = m;
324
0
    _main_tid = m->tid;
325
0
    _main_stack = stk;
326
327
0
    CPUTimeStat cpu_time_stat;
328
0
    cpu_time_stat.set_last_run_ns(m->cpuwide_start_ns, true);
329
0
    _cpu_time_stat.store(cpu_time_stat);
330
0
    _last_cpu_clock_ns = 0;
331
332
0
    return 0;
333
0
}
334
335
#ifdef BUTIL_USE_ASAN
336
void TaskGroup::asan_task_runner(intptr_t) {
337
    // This is a new thread, and it doesn't have the fake stack yet. ASan will
338
    // create it lazily, for now just pass NULL.
339
    internal::FinishSwitchFiber(NULL);
340
    task_runner(0);
341
}
342
#endif // BUTIL_USE_ASAN
343
344
0
void TaskGroup::task_runner(intptr_t skip_remained) {
345
    // NOTE: tls_task_group is volatile since tasks are moved around
346
    //       different groups.
347
0
    TaskGroup* g = tls_task_group;
348
#ifdef BRPC_BTHREAD_TRACER
349
    TaskTracer::set_running_status(g->tid(), g->_cur_meta);
350
#endif // BRPC_BTHREAD_TRACER
351
352
0
    if (!skip_remained) {
353
0
        while (g->_last_context_remained) {
354
0
            RemainedFn fn = g->_last_context_remained;
355
0
            g->_last_context_remained = NULL;
356
0
            fn(g->_last_context_remained_arg);
357
0
            g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
358
0
        }
359
360
0
#ifndef NDEBUG
361
0
        --g->_sched_recursive_guard;
362
0
#endif
363
0
    }
364
365
0
    do {
366
        // A task can be stopped before it gets running, in which case
367
        // we may skip user function, but that may confuse user:
368
        // Most tasks have variables to remember running result of the task,
369
        // which is often initialized to values indicating success. If an
370
        // user function is never called, the variables will be unchanged
371
        // however they'd better reflect failures because the task is stopped
372
        // abnormally.
373
374
        // Meta and identifier of the task is persistent in this run.
375
0
        TaskMeta* const m = g->_cur_meta;
376
377
0
        if (FLAGS_show_bthread_creation_in_vars) {
378
            // NOTE: the thread triggering exposure of pending time may spend
379
            // considerable time because a single bvar::LatencyRecorder
380
            // contains many bvar.
381
0
            g->_control->exposed_pending_time() <<
382
0
                (butil::cpuwide_time_ns() - m->cpuwide_start_ns) / 1000L;
383
0
        }
384
385
        // Not catch exceptions except ExitException which is for implementing
386
        // bthread_exit(). User code is intended to crash when an exception is
387
        // not caught explicitly. This is consistent with other threading
388
        // libraries.
389
0
        void* thread_return;
390
0
        try {
391
0
            thread_return = m->fn(m->arg);
392
0
        } catch (ExitException& e) {
393
0
            thread_return = e.value();
394
0
        }
395
396
        // TODO: Save thread_return
397
0
        (void)thread_return;
398
399
        // Logging must be done before returning the keytable, since the logging lib
400
        // use bthread local storage internally, or will cause memory leak.
401
        // FIXME: the time from quiting fn to here is not counted into cputime
402
0
        if (m->attr.flags & BTHREAD_LOG_START_AND_FINISH) {
403
0
            LOG(INFO) << "Finished bthread " << m->tid << ", cputime="
404
0
                      << m->stat.cputime_ns / 1000000.0 << "ms";
405
0
        }
406
407
        // Clean tls variables, must be done before changing version_butex
408
        // otherwise another thread just joined this thread may not see side
409
        // effects of destructing tls variables.
410
0
        LocalStorage* tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
411
0
        KeyTable* kt = tls_bls_ptr->keytable;
412
0
        if (kt != NULL) {
413
0
            return_keytable(m->attr.keytable_pool, kt);
414
            // After deletion: tls may be set during deletion.
415
0
            tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
416
0
            tls_bls_ptr->keytable = NULL;
417
0
            m->local_storage.keytable = NULL; // optional
418
0
        }
419
420
        // During running the function in TaskMeta and deleting the KeyTable in
421
        // return_KeyTable, the group is probably changed.
422
0
        g =  BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
423
424
        // Increase the version and wake up all joiners, if resulting version
425
        // is 0, change it to 1 to make bthread_t never be 0. Any access
426
        // or join to the bthread after changing version will be rejected.
427
        // The spinlock is for visibility of TaskGroup::get_attr.
428
#ifdef BRPC_BTHREAD_TRACER
429
        bool tracing = false;
430
#endif // BRPC_BTHREAD_TRACER
431
0
        {
432
0
            BAIDU_SCOPED_LOCK(m->version_lock);
433
#ifdef BRPC_BTHREAD_TRACER
434
            tracing = TaskTracer::set_end_status_unsafe(m);
435
#endif // BRPC_BTHREAD_TRACER
436
0
            if (0 == ++*m->version_butex) {
437
0
                ++*m->version_butex;
438
0
            }
439
0
        }
440
0
        butex_wake_except(m->version_butex, 0);
441
442
#ifdef BRPC_BTHREAD_TRACER
443
        if (tracing) {
444
            // Wait for tracing completion.
445
            g->_control->_task_tracer.WaitForTracing(m);
446
        }
447
        g->_control->_task_tracer.set_status(TASK_STATUS_UNKNOWN, m);
448
#endif // BRPC_BTHREAD_TRACER
449
450
0
        g->_control->_nbthreads << -1;
451
0
        g->_control->tag_nbthreads(g->tag()) << -1;
452
0
        g->set_remained(_release_last_context, m);
453
0
        ending_sched(&g);
454
455
0
    } while (g->_cur_meta->tid != g->_main_tid);
456
457
    // Was called from a pthread and we don't have BTHREAD_STACKTYPE_PTHREAD
458
    // tasks to run, quit for more tasks.
459
0
}
460
461
0
void TaskGroup::_release_last_context(void* arg) {
462
0
    TaskMeta* m = static_cast<TaskMeta*>(arg);
463
0
    if (m->stack_type() != STACK_TYPE_PTHREAD) {
464
0
        return_stack(m->release_stack()/*may be NULL*/);
465
0
    } else {
466
        // it's _main_stack, don't return.
467
0
        m->set_stack(NULL);
468
0
    }
469
0
    return_resource(get_slot(m->tid));
470
0
}
471
472
int TaskGroup::start_foreground(TaskGroup** pg,
473
                                bthread_t* __restrict th,
474
                                const bthread_attr_t* __restrict attr,
475
                                void * (*fn)(void*),
476
0
                                void* __restrict arg) {
477
0
    if (__builtin_expect(!fn, 0)) {
478
0
        return EINVAL;
479
0
    }
480
0
    const int64_t start_ns = butil::cpuwide_time_ns();
481
0
    const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
482
0
    butil::ResourceId<TaskMeta> slot;
483
0
    TaskMeta* m = butil::get_resource(&slot);
484
0
    if (BAIDU_UNLIKELY(NULL == m)) {
485
0
        return ENOMEM;
486
0
    }
487
0
    CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
488
0
    m->sleep_failed = false;
489
0
    m->stop = false;
490
0
    m->interrupted = false;
491
0
    m->about_to_quit = false;
492
0
    m->fn = fn;
493
0
    m->arg = arg;
494
0
    CHECK(m->stack == NULL);
495
0
    m->attr = using_attr;
496
0
    m->local_storage = LOCAL_STORAGE_INIT;
497
0
    if (using_attr.flags & BTHREAD_INHERIT_SPAN) {
498
0
        m->local_storage.rpcz_parent_span = run_create_span_func();
499
0
    }
500
0
    m->cpuwide_start_ns = start_ns;
501
0
    m->stat = EMPTY_STAT;
502
0
    m->tid = make_tid(*m->version_butex, slot);
503
0
    *th = m->tid;
504
0
    if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
505
0
        LOG(INFO) << "Started bthread " << m->tid;
506
0
    }
507
508
0
    TaskGroup* g = *pg;
509
0
    g->_control->_nbthreads << 1;
510
0
    g->_control->tag_nbthreads(g->tag()) << 1;
511
#ifdef BRPC_BTHREAD_TRACER
512
    g->_control->_task_tracer.set_status(TASK_STATUS_CREATED, m);
513
#endif // BRPC_BTHREAD_TRACER
514
0
    if (g->is_current_pthread_task()) {
515
        // never create foreground task in pthread.
516
0
        g->ready_to_run(m, using_attr.flags & BTHREAD_NOSIGNAL);
517
0
    } else {
518
        // NOSIGNAL affects current task, not the new task.
519
0
        RemainedFn fn = NULL;
520
0
        auto& cur_attr = g->_cur_meta->attr;
521
0
        if (g->_control->_enable_priority_queue && cur_attr.flags & BTHREAD_GLOBAL_PRIORITY) {
522
0
            fn = priority_to_run;
523
0
        } else if (g->current_task()->about_to_quit) {
524
0
            fn = ready_to_run_in_worker_ignoresignal;
525
0
        } else {
526
0
            fn = ready_to_run_in_worker;
527
0
        }
528
0
        ReadyToRunArgs args = {
529
0
            g->tag(), g->_cur_meta, (bool)(using_attr.flags & BTHREAD_NOSIGNAL)
530
0
        };
531
0
        g->set_remained(fn, &args);
532
0
        sched_to(pg, m->tid);
533
0
    }
534
0
    return 0;
535
0
}
536
537
template <bool REMOTE>
538
int TaskGroup::start_background(bthread_t* __restrict th,
539
                                const bthread_attr_t* __restrict attr,
540
                                void * (*fn)(void*),
541
0
                                void* __restrict arg) {
542
0
    if (__builtin_expect(!fn, 0)) {
543
0
        return EINVAL;
544
0
    }
545
0
    const int64_t start_ns = butil::cpuwide_time_ns();
546
0
    const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
547
0
    butil::ResourceId<TaskMeta> slot;
548
0
    TaskMeta* m = butil::get_resource(&slot);
549
0
    if (BAIDU_UNLIKELY(NULL == m)) {
550
0
        return ENOMEM;
551
0
    }
552
0
    CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
553
0
    m->sleep_failed = false;
554
0
    m->stop = false;
555
0
    m->interrupted = false;
556
0
    m->about_to_quit = false;
557
0
    m->fn = fn;
558
0
    m->arg = arg;
559
0
    CHECK(m->stack == NULL);
560
0
    m->attr = using_attr;
561
0
    m->local_storage = LOCAL_STORAGE_INIT;
562
0
    if (using_attr.flags & BTHREAD_INHERIT_SPAN) {
563
0
        m->local_storage.rpcz_parent_span = run_create_span_func();
564
0
    }
565
0
    m->cpuwide_start_ns = start_ns;
566
0
    m->stat = EMPTY_STAT;
567
0
    m->tid = make_tid(*m->version_butex, slot);
568
0
    *th = m->tid;
569
0
    if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
570
0
        LOG(INFO) << "Started bthread " << m->tid;
571
0
    }
572
0
    _control->_nbthreads << 1;
573
0
    _control->tag_nbthreads(tag()) << 1;
574
#ifdef BRPC_BTHREAD_TRACER
575
    _control->_task_tracer.set_status(TASK_STATUS_CREATED, m);
576
#endif // BRPC_BTHREAD_TRACER
577
0
    if (REMOTE) {
578
0
        ready_to_run_remote(m, (using_attr.flags & BTHREAD_NOSIGNAL));
579
0
    } else {
580
0
        ready_to_run(m, (using_attr.flags & BTHREAD_NOSIGNAL));
581
0
    }
582
0
    return 0;
583
0
}
Unexecuted instantiation: int bthread::TaskGroup::start_background<true>(unsigned long*, bthread_attr_t const*, void* (*)(void*), void*)
Unexecuted instantiation: int bthread::TaskGroup::start_background<false>(unsigned long*, bthread_attr_t const*, void* (*)(void*), void*)
584
585
// Explicit instantiations.
586
template int
587
TaskGroup::start_background<true>(bthread_t* __restrict th,
588
                                  const bthread_attr_t* __restrict attr,
589
                                  void * (*fn)(void*),
590
                                  void* __restrict arg);
591
template int
592
TaskGroup::start_background<false>(bthread_t* __restrict th,
593
                                   const bthread_attr_t* __restrict attr,
594
                                   void * (*fn)(void*),
595
                                   void* __restrict arg);
596
597
0
int TaskGroup::join(bthread_t tid, void** return_value) {
598
0
    if (__builtin_expect(!tid, 0)) {  // tid of bthread is never 0.
599
0
        return EINVAL;
600
0
    }
601
0
    TaskMeta* m = address_meta(tid);
602
0
    if (BAIDU_UNLIKELY(NULL == m)) {
603
        // The bthread is not created yet, this join is definitely wrong.
604
0
        return EINVAL;
605
0
    }
606
0
    TaskGroup* g = tls_task_group;
607
0
    if (g != NULL && g->current_tid() == tid) {
608
        // joining self causes indefinite waiting.
609
0
        return EINVAL;
610
0
    }
611
0
    const uint32_t expected_version = get_version(tid);
612
0
    while (*m->version_butex == expected_version) {
613
0
        if (butex_wait(m->version_butex, expected_version, NULL) < 0 &&
614
0
            errno != EWOULDBLOCK && errno != EINTR) {
615
0
            return errno;
616
0
        }
617
0
    }
618
0
    if (return_value) {
619
0
        *return_value = NULL;
620
0
    }
621
0
    return 0;
622
0
}
623
624
0
bool TaskGroup::exists(bthread_t tid) {
625
0
    if (tid != 0) {  // tid of bthread is never 0.
626
0
        TaskMeta* m = address_meta(tid);
627
0
        if (m != NULL) {
628
0
            return (*m->version_butex == get_version(tid));
629
0
        }
630
0
    }
631
0
    return false;
632
0
}
633
634
0
TaskStatistics TaskGroup::main_stat() const {
635
0
    TaskMeta* m = address_meta(_main_tid);
636
0
    return m ? m->stat : EMPTY_STAT;
637
0
}
638
639
0
void TaskGroup::ending_sched(TaskGroup** pg) {
640
0
    TaskGroup* g = *pg;
641
0
    bthread_t next_tid = 0;
642
    // Find next task to run, if none, switch to idle thread of the group.
643
0
#ifndef BTHREAD_FAIR_WSQ
644
    // When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
645
    // WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
646
    // to 2.9%
647
0
    const bool popped = g->_rq.pop(&next_tid);
648
#else
649
    const bool popped = g->_rq.steal(&next_tid);
650
#endif
651
0
    if (!popped && !g->steal_task(&next_tid)) {
652
        // Jump to main task if there's no task to run.
653
0
        next_tid = g->_main_tid;
654
0
    }
655
656
0
    TaskMeta* const cur_meta = g->_cur_meta;
657
0
    TaskMeta* next_meta = address_meta(next_tid);
658
0
    if (next_meta->stack == NULL) {
659
0
        if (next_meta->stack_type() == cur_meta->stack_type()) {
660
            // Reuse the stack of the current ending task.
661
            //
662
            // also works with pthread_task scheduling to pthread_task, the
663
            // transfered stack is just _main_stack.
664
0
            next_meta->set_stack(cur_meta->release_stack());
665
0
        } else {
666
#ifdef BUTIL_USE_ASAN
667
            ContextualStack* stk = get_stack(
668
                next_meta->stack_type(), asan_task_runner);
669
#else
670
0
            ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
671
0
#endif // BUTIL_USE_ASAN
672
0
            if (stk) {
673
0
                next_meta->set_stack(stk);
674
0
            } else {
675
                // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
676
                // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
677
                // This basically means that if we can't allocate stack, run
678
                // the task in pthread directly.
679
0
                next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
680
0
                next_meta->set_stack(g->_main_stack);
681
0
            }
682
0
        }
683
0
    }
684
0
    sched_to(pg, next_meta);
685
0
}
686
687
0
void TaskGroup::sched(TaskGroup** pg) {
688
0
    TaskGroup* g = *pg;
689
0
    bthread_t next_tid = 0;
690
    // Find next task to run, if none, switch to idle thread of the group.
691
0
#ifndef BTHREAD_FAIR_WSQ
692
0
    const bool popped = g->_rq.pop(&next_tid);
693
#else
694
    const bool popped = g->_rq.steal(&next_tid);
695
#endif
696
0
    if (!popped && !g->steal_task(&next_tid)) {
697
        // Jump to main task if there's no task to run.
698
0
        next_tid = g->_main_tid;
699
0
    }
700
0
    sched_to(pg, next_tid);
701
0
}
702
703
extern void CheckBthreadScheSafety();
704
705
0
void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
706
0
    TaskGroup* g = *pg;
707
0
#ifndef NDEBUG
708
0
    if ((++g->_sched_recursive_guard) > 1) {
709
0
        LOG(FATAL) << "Recursively(" << g->_sched_recursive_guard - 1
710
0
                   << ") call sched_to(" << g << ")";
711
0
    }
712
0
#endif
713
    // Save errno so that errno is bthread-specific.
714
0
    int saved_errno = errno;
715
0
    void* saved_unique_user_ptr = tls_unique_user_ptr;
716
717
0
    TaskMeta* const cur_meta = g->_cur_meta;
718
0
    int64_t now = butil::cpuwide_time_ns();
719
0
    CPUTimeStat cpu_time_stat = g->_cpu_time_stat.load_unsafe();
720
0
    int64_t elp_ns = now - cpu_time_stat.last_run_ns();
721
0
    cur_meta->stat.cputime_ns += elp_ns;
722
    // Update cpu_time_stat.
723
0
    cpu_time_stat.set_last_run_ns(now, is_main_task(g, next_meta->tid));
724
0
    cpu_time_stat.add_cumulated_cputime_ns(elp_ns, is_main_task(g, cur_meta->tid));
725
0
    g->_cpu_time_stat.store(cpu_time_stat);
726
727
0
    if (FLAGS_bthread_enable_cpu_clock_stat) {
728
0
        const int64_t cpu_thread_time = butil::cputhread_time_ns();
729
0
        if (g->_last_cpu_clock_ns != 0) {
730
0
            cur_meta->stat.cpu_usage_ns += cpu_thread_time - g->_last_cpu_clock_ns;
731
0
        }
732
0
        g->_last_cpu_clock_ns = cpu_thread_time;
733
0
    } else {
734
0
        g->_last_cpu_clock_ns = 0;
735
0
    }
736
737
0
    ++cur_meta->stat.nswitch;
738
0
    ++ g->_nswitch;
739
    // Switch to the task
740
0
    if (__builtin_expect(next_meta != cur_meta, 1)) {
741
0
        g->_cur_meta = next_meta;
742
        // Switch tls_bls
743
0
        cur_meta->local_storage = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls);
744
0
        BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_bls, next_meta->local_storage);
745
746
        // Logging must be done after switching the local storage, since the logging lib
747
        // use bthread local storage internally, or will cause memory leak.
748
0
        if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) ||
749
0
            (next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) {
750
0
            LOG(INFO) << "Switch bthread: " << cur_meta->tid << " -> "
751
0
                      << next_meta->tid;
752
0
        }
753
754
0
        if (cur_meta->stack != NULL) {
755
0
            if (next_meta->stack != cur_meta->stack) {
756
0
                CheckBthreadScheSafety();
757
#ifdef BRPC_BTHREAD_TRACER
758
                g->_control->_task_tracer.set_status(TASK_STATUS_JUMPING, cur_meta);
759
                g->_control->_task_tracer.set_status(TASK_STATUS_JUMPING, next_meta);
760
#endif // BRPC_BTHREAD_TRACER
761
0
                {
762
0
                    BTHREAD_SCOPED_ASAN_FIBER_SWITCHER(next_meta->stack->storage);
763
0
                    jump_stack(cur_meta->stack, next_meta->stack);
764
0
                }
765
                // probably went to another group, need to assign g again.
766
0
                g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
767
#ifdef BRPC_BTHREAD_TRACER
768
                TaskTracer::set_running_status(g->tid(), g->_cur_meta);
769
#endif // BRPC_BTHREAD_TRACER
770
0
            }
771
0
#ifndef NDEBUG
772
0
            else {
773
                // else pthread_task is switching to another pthread_task, sc
774
                // can only equal when they're both _main_stack
775
0
                CHECK(cur_meta->stack == g->_main_stack);
776
0
            }
777
0
#endif
778
0
        } /* else because of ending_sched(including pthread_task->pthread_task). */
779
#ifdef BRPC_BTHREAD_TRACER
780
        else {
781
            // _cur_meta: TASK_STATUS_FIRST_READY -> TASK_STATUS_RUNNING.
782
            TaskTracer::set_running_status(g->tid(), g->_cur_meta);
783
        }
784
#endif // BRPC_BTHREAD_TRACER
785
0
    } else {
786
0
        LOG(FATAL) << "bthread=" << g->current_tid() << " sched_to itself!";
787
0
    }
788
789
0
    while (g->_last_context_remained) {
790
0
        RemainedFn fn = g->_last_context_remained;
791
0
        g->_last_context_remained = NULL;
792
0
        fn(g->_last_context_remained_arg);
793
0
        g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
794
0
    }
795
796
    // Restore errno
797
0
    errno = saved_errno;
798
    // tls_unique_user_ptr probably changed.
799
0
    BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_unique_user_ptr, saved_unique_user_ptr);
800
801
0
#ifndef NDEBUG
802
0
    --g->_sched_recursive_guard;
803
0
#endif
804
0
    *pg = g;
805
0
}
806
807
0
void TaskGroup::destroy_self() {
808
0
    if (_control) {
809
0
        _control->_destroy_group(this);
810
0
        _control = NULL;
811
0
    } else {
812
0
        CHECK(false);
813
0
    }
814
0
}
815
816
817
0
void TaskGroup::ready_to_run(TaskMeta* meta, bool nosignal) {
818
#ifdef BRPC_BTHREAD_TRACER
819
    _control->_task_tracer.set_status(TASK_STATUS_READY, meta);
820
#endif // BRPC_BTHREAD_TRACER
821
0
    push_rq(meta->tid);
822
0
    if (nosignal) {
823
0
        ++_num_nosignal;
824
0
    } else {
825
0
        const int additional_signal = _num_nosignal;
826
0
        _num_nosignal = 0;
827
0
        _nsignaled += 1 + additional_signal;
828
0
        _control->signal_task(1 + additional_signal, _tag);
829
0
    }
830
0
}
831
832
0
void TaskGroup::flush_nosignal_tasks() {
833
0
    const int val = _num_nosignal;
834
0
    if (val) {
835
0
        _num_nosignal = 0;
836
0
        _nsignaled += val;
837
0
        _control->signal_task(val, _tag);
838
0
    }
839
0
}
840
841
0
void TaskGroup::ready_to_run_remote(TaskMeta* meta, bool nosignal) {
842
#ifdef BRPC_BTHREAD_TRACER
843
    _control->_task_tracer.set_status(TASK_STATUS_READY, meta);
844
#endif // BRPC_BTHREAD_TRACER
845
0
    _remote_rq._mutex.lock();
846
0
    while (!_remote_rq.push_locked(meta->tid)) {
847
0
        flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
848
0
        LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
849
0
                                << _remote_rq.capacity();
850
0
        ::usleep(1000);
851
0
        _remote_rq._mutex.lock();
852
0
    }
853
0
    if (nosignal) {
854
0
        ++_remote_num_nosignal;
855
0
        _remote_rq._mutex.unlock();
856
0
    } else {
857
0
        const int additional_signal = _remote_num_nosignal;
858
0
        _remote_num_nosignal = 0;
859
0
        _remote_nsignaled += 1 + additional_signal;
860
0
        _remote_rq._mutex.unlock();
861
0
        _control->signal_task(1 + additional_signal, _tag);
862
0
    }
863
0
}
864
865
0
void TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) {
866
0
    const int val = _remote_num_nosignal;
867
0
    if (!val) {
868
0
        locked_mutex.unlock();
869
0
        return;
870
0
    }
871
0
    _remote_num_nosignal = 0;
872
0
    _remote_nsignaled += val;
873
0
    locked_mutex.unlock();
874
0
    _control->signal_task(val, _tag);
875
0
}
876
877
0
void TaskGroup::ready_to_run_general(TaskMeta* meta, bool nosignal) {
878
0
    if (tls_task_group == this) {
879
0
        return ready_to_run(meta, nosignal);
880
0
    }
881
0
    return ready_to_run_remote(meta, nosignal);
882
0
}
883
884
0
void TaskGroup::flush_nosignal_tasks_general() {
885
0
    if (tls_task_group == this) {
886
0
        return flush_nosignal_tasks();
887
0
    }
888
0
    return flush_nosignal_tasks_remote();
889
0
}
890
891
0
void TaskGroup::ready_to_run_in_worker(void* args_in) {
892
0
    ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
893
0
    return tls_task_group->ready_to_run(args->meta, args->nosignal);
894
0
}
895
896
0
void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) {
897
0
    ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
898
#ifdef BRPC_BTHREAD_TRACER
899
    tls_task_group->_control->_task_tracer.set_status(
900
        TASK_STATUS_READY, args->meta);
901
#endif // BRPC_BTHREAD_TRACER
902
0
    return tls_task_group->push_rq(args->meta->tid);
903
0
}
904
905
0
void TaskGroup::priority_to_run(void* args_in) {
906
0
    ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
907
#ifdef BRPC_BTHREAD_TRACER
908
    tls_task_group->_control->_task_tracer.set_status(
909
        TASK_STATUS_READY, args->meta);
910
#endif // BRPC_BTHREAD_TRACER
911
0
    return tls_task_group->control()->push_priority_queue(args->tag, args->meta->tid);
912
0
}
913
914
struct SleepArgs {
915
    uint64_t timeout_us;
916
    bthread_t tid;
917
    TaskMeta* meta;
918
    TaskGroup* group;
919
};
920
921
0
static void ready_to_run_from_timer_thread(void* arg) {
922
0
    CHECK(tls_task_group == NULL);
923
0
    const SleepArgs* e = static_cast<const SleepArgs*>(arg);
924
0
    auto g = e->group;
925
0
    auto tag = g->tag();
926
0
    g->control()->choose_one_group(tag)->ready_to_run_remote(e->meta);
927
0
}
928
929
0
void TaskGroup::_add_sleep_event(void* void_args) {
930
    // Must copy SleepArgs. After calling TimerThread::schedule(), previous
931
    // thread may be stolen by a worker immediately and the on-stack SleepArgs
932
    // will be gone.
933
0
    SleepArgs e = *static_cast<SleepArgs*>(void_args);
934
0
    TaskGroup* g = e.group;
935
#ifdef BRPC_BTHREAD_TRACER
936
    g->_control->_task_tracer.set_status(TASK_STATUS_SUSPENDED, e.meta);
937
#endif // BRPC_BTHREAD_TRACER
938
939
0
    TimerThread::TaskId sleep_id;
940
0
    sleep_id = get_global_timer_thread()->schedule(
941
0
        ready_to_run_from_timer_thread, void_args,
942
0
        butil::microseconds_from_now(e.timeout_us));
943
944
0
    if (!sleep_id) {
945
0
        e.meta->sleep_failed = true;
946
        // Fail to schedule timer, go back to previous thread.
947
0
        g->ready_to_run(e.meta);
948
0
        return;
949
0
    }
950
951
    // Set TaskMeta::current_sleep which is for interruption.
952
0
    const uint32_t given_ver = get_version(e.tid);
953
0
    {
954
0
        BAIDU_SCOPED_LOCK(e.meta->version_lock);
955
0
        if (given_ver == *e.meta->version_butex && !e.meta->interrupted) {
956
0
            e.meta->current_sleep = sleep_id;
957
0
            return;
958
0
        }
959
0
    }
960
    // The thread is stopped or interrupted.
961
    // interrupt() always sees that current_sleep == 0. It will not schedule
962
    // the calling thread. The race is between current thread and timer thread.
963
0
    if (get_global_timer_thread()->unschedule(sleep_id) == 0) {
964
        // added to timer, previous thread may be already woken up by timer and
965
        // even stopped. It's safe to schedule previous thread when unschedule()
966
        // returns 0 which means "the not-run-yet sleep_id is removed". If the
967
        // sleep_id is running(returns 1), ready_to_run_in_worker() will
968
        // schedule previous thread as well. If sleep_id does not exist,
969
        // previous thread is scheduled by timer thread before and we don't
970
        // have to do it again.
971
0
        g->ready_to_run(e.meta);
972
0
    }
973
0
}
974
975
// To be consistent with sys_usleep, set errno and return -1 on error.
976
0
int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) {
977
0
    if (0 == timeout_us) {
978
0
        yield(pg);
979
0
        return 0;
980
0
    }
981
0
    TaskGroup* g = *pg;
982
    // We have to schedule timer after we switched to next bthread otherwise
983
    // the timer may wake up(jump to) current still-running context.
984
0
    SleepArgs e = { timeout_us, g->current_tid(), g->current_task(), g };
985
0
    g->set_remained(_add_sleep_event, &e);
986
0
    sched(pg);
987
0
    g = *pg;
988
0
    if (e.meta->sleep_failed) {
989
        // Fail to schedule timer, return error.
990
0
        e.meta->sleep_failed = false;
991
0
        errno = ESTOP;
992
0
        return -1;
993
0
    }
994
0
    e.meta->current_sleep = 0;
995
0
    if (e.meta->interrupted) {
996
        // Race with set and may consume multiple interruptions, which are OK.
997
0
        e.meta->interrupted = false;
998
        // NOTE: setting errno to ESTOP is not necessary from bthread's
999
        // pespective, however many RPC code expects bthread_usleep to set
1000
        // errno to ESTOP when the thread is stopping, and print FATAL
1001
        // otherwise. To make smooth transitions, ESTOP is still set instead
1002
        // of EINTR when the thread is stopping.
1003
0
        errno = (e.meta->stop ? ESTOP : EINTR);
1004
0
        return -1;
1005
0
    }
1006
0
    return 0;
1007
0
}
1008
1009
// Defined in butex.cpp
1010
bool erase_from_butex_because_of_interruption(ButexWaiter* bw);
1011
1012
static int interrupt_and_consume_waiters(
1013
0
    bthread_t tid, ButexWaiter** pw, uint64_t* sleep_id) {
1014
0
    TaskMeta* const m = TaskGroup::address_meta(tid);
1015
0
    if (m == NULL) {
1016
0
        return EINVAL;
1017
0
    }
1018
0
    const uint32_t given_ver = get_version(tid);
1019
0
    BAIDU_SCOPED_LOCK(m->version_lock);
1020
0
    if (given_ver == *m->version_butex) {
1021
0
        *pw = m->current_waiter.exchange(NULL, butil::memory_order_acquire);
1022
0
        *sleep_id = m->current_sleep;
1023
0
        m->current_sleep = 0;  // only one stopper gets the sleep_id
1024
0
        m->interrupted = true;
1025
0
        return 0;
1026
0
    }
1027
0
    return EINVAL;
1028
0
}
1029
1030
0
static int set_butex_waiter(bthread_t tid, ButexWaiter* w) {
1031
0
    TaskMeta* const m = TaskGroup::address_meta(tid);
1032
0
    if (m != NULL) {
1033
0
        const uint32_t given_ver = get_version(tid);
1034
0
        BAIDU_SCOPED_LOCK(m->version_lock);
1035
0
        if (given_ver == *m->version_butex) {
1036
            // Release fence makes m->interrupted visible to butex_wait
1037
0
            m->current_waiter.store(w, butil::memory_order_release);
1038
0
            return 0;
1039
0
        }
1040
0
    }
1041
0
    return EINVAL;
1042
0
}
1043
1044
// The interruption is "persistent" compared to the ones caused by signals,
1045
// namely if a bthread is interrupted when it's not blocked, the interruption
1046
// is still remembered and will be checked at next blocking. This designing
1047
// choice simplifies the implementation and reduces notification loss caused
1048
// by race conditions.
1049
// TODO: bthreads created by BTHREAD_ATTR_PTHREAD blocking on bthread_usleep()
1050
// can't be interrupted.
1051
0
int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) {
1052
    // Consume current_waiter in the TaskMeta, wake it up then set it back.
1053
0
    ButexWaiter* w = NULL;
1054
0
    uint64_t sleep_id = 0;
1055
0
    int rc = interrupt_and_consume_waiters(tid, &w, &sleep_id);
1056
0
    if (rc) {
1057
0
        return rc;
1058
0
    }
1059
    // a bthread cannot wait on a butex and be sleepy at the same time.
1060
0
    CHECK(!sleep_id || !w);
1061
0
    if (w != NULL) {
1062
0
        erase_from_butex_because_of_interruption(w);
1063
        // If butex_wait() already wakes up before we set current_waiter back,
1064
        // the function will spin until current_waiter becomes non-NULL.
1065
0
        rc = set_butex_waiter(tid, w);
1066
0
        if (rc) {
1067
0
            LOG(FATAL) << "butex_wait should spin until setting back waiter";
1068
0
            return rc;
1069
0
        }
1070
0
    } else if (sleep_id != 0) {
1071
0
        if (get_global_timer_thread()->unschedule(sleep_id) == 0) {
1072
0
            TaskGroup* g = tls_task_group;
1073
0
            TaskMeta* m = address_meta(tid);
1074
0
            if (g) {
1075
0
                g->ready_to_run(m);
1076
0
            } else {
1077
0
                if (!c) {
1078
0
                    return EINVAL;
1079
0
                }
1080
0
                c->choose_one_group(tag)->ready_to_run_remote(m);
1081
0
            }
1082
0
        }
1083
0
    }
1084
0
    return 0;
1085
0
}
1086
1087
0
void TaskGroup::yield(TaskGroup** pg) {
1088
0
    TaskGroup* g = *pg;
1089
0
    ReadyToRunArgs args = { g->tag(), g->_cur_meta, false };
1090
0
    g->set_remained(ready_to_run_in_worker, &args);
1091
0
    sched(pg);
1092
0
}
1093
1094
void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
1095
0
                bool ignore_not_matched = false) {
1096
0
    TaskMeta* const m = TaskGroup::address_meta(tid);
1097
0
    if (m == NULL) {
1098
0
        os << "bthread=" << tid << " : never existed\n";
1099
0
        return;
1100
0
    }
1101
0
    const uint32_t given_ver = get_version(tid);
1102
0
    bool matched = false;
1103
0
    bool stop = false;
1104
0
    bool interrupted = false;
1105
0
    bool about_to_quit = false;
1106
0
    void* (*fn)(void*) = NULL;
1107
0
    void* arg = NULL;
1108
0
    bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
1109
0
    bool has_tls = false;
1110
0
    int64_t cpuwide_start_ns = 0;
1111
0
    TaskStatistics stat = {0, 0, 0};
1112
0
    TaskStatus status = TASK_STATUS_UNKNOWN;
1113
0
    bool traced = false;
1114
0
    pthread_t worker_tid{};
1115
0
    {
1116
0
        BAIDU_SCOPED_LOCK(m->version_lock);
1117
0
        if (given_ver == *m->version_butex) {
1118
0
            matched = true;
1119
0
            stop = m->stop;
1120
0
            interrupted = m->interrupted;
1121
0
            about_to_quit = m->about_to_quit;
1122
0
            fn = m->fn;
1123
0
            arg = m->arg;
1124
0
            attr = m->attr;
1125
0
            has_tls = m->local_storage.keytable;
1126
0
            cpuwide_start_ns = m->cpuwide_start_ns;
1127
0
            stat = m->stat;
1128
0
            status = m->status;
1129
0
            traced = m->traced;
1130
0
            worker_tid = m->worker_tid;
1131
0
        }
1132
0
    }
1133
0
    if (!matched) {
1134
0
        if (!ignore_not_matched) {
1135
0
            os << "bthread=" << tid << " : not exist now\n";
1136
0
        }
1137
0
    } else {
1138
0
        os << "bthread=" << tid << " :\nstop=" << stop
1139
0
           << "\ninterrupted=" << interrupted
1140
0
           << "\nabout_to_quit=" << about_to_quit
1141
0
           << "\nfn=" << (void*)fn
1142
0
           << "\narg=" << (void*)arg
1143
0
           << "\nattr={stack_type=" << attr.stack_type
1144
0
           << " flags=" << attr.flags
1145
0
           << " specified_tag=" << attr.tag
1146
0
           << " name=" << attr.name
1147
0
           << " keytable_pool=" << attr.keytable_pool
1148
0
           << "}\nhas_tls=" << has_tls
1149
0
           << "\nuptime_ns=" << butil::cpuwide_time_ns() - cpuwide_start_ns
1150
0
           << "\ncputime_ns=" << stat.cputime_ns
1151
0
           << "\nnswitch=" << stat.nswitch
1152
#ifdef BRPC_BTHREAD_TRACER
1153
           << "\nstatus=" << status
1154
           << "\ntraced=" << traced
1155
           << "\nworker_tid=" << worker_tid;
1156
        if (enable_trace) {
1157
            os << "\nbthread call stack:\n";
1158
            stack_trace(os, tid);
1159
        }
1160
        os << "\n\n";
1161
 #else
1162
0
           << "\n\n";
1163
0
           (void)status;(void)traced;(void)worker_tid;
1164
0
#endif // BRPC_BTHREAD_TRACER
1165
0
    }
1166
0
}
1167
1168
}  // namespace bthread