Coverage Report

Created: 2025-11-01 07:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/bthread/bthread.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
// Date: Tue Jul 10 17:40:58 CST 2012
21
22
#include <sys/syscall.h>
23
#include <gflags/gflags.h>
24
#include "butil/macros.h"                       // BAIDU_CASSERT
25
#include "butil/logging.h"
26
#include "butil/thread_local.h"
27
#include "butil/reloadable_flags.h"
28
#include "bthread/task_group.h"                // TaskGroup
29
#include "bthread/task_control.h"              // TaskControl
30
#include "bthread/timer_thread.h"
31
#include "bthread/list_of_abafree_id.h"
32
#include "bthread/bthread.h"
33
34
namespace bthread {
35
extern void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
36
                       bool ignore_not_matched = false);
37
38
0
static bool validate_bthread_concurrency(const char*, int32_t val) {
39
    // bthread_setconcurrency sets the flag on success path which should
40
    // not be strictly in a validator. But it's OK for a int flag.
41
0
    return bthread_setconcurrency(val) == 0;
42
0
}
43
static bool validate_bthread_min_concurrency(const char*, int32_t val);
44
static bool validate_bthread_current_tag(const char*, int32_t val);
45
static bool validate_bthread_concurrency_by_tag(const char*, int32_t val);
46
47
DEFINE_int32(bthread_concurrency, 8 + BTHREAD_EPOLL_THREAD_NUM,
48
             "Number of pthread workers");
49
BUTIL_VALIDATE_GFLAG(bthread_concurrency, validate_bthread_concurrency);
50
51
DEFINE_int32(bthread_min_concurrency, 0,
52
            "Initial number of pthread workers which will be added on-demand."
53
            " The laziness is disabled when this value is non-positive,"
54
            " and workers will be created eagerly according to -bthread_concurrency and bthread_setconcurrency(). ");
55
BUTIL_VALIDATE_GFLAG(bthread_min_concurrency, validate_bthread_min_concurrency);
56
57
DEFINE_int32(bthread_current_tag, BTHREAD_TAG_INVALID, "Set bthread concurrency for this tag");
58
BUTIL_VALIDATE_GFLAG(bthread_current_tag, validate_bthread_current_tag);
59
60
DEFINE_int32(bthread_concurrency_by_tag, 8 + BTHREAD_EPOLL_THREAD_NUM,
61
             "Number of pthread workers of FLAGS_bthread_current_tag");
62
BUTIL_VALIDATE_GFLAG(bthread_concurrency_by_tag, validate_bthread_concurrency_by_tag);
63
64
DEFINE_int32(bthread_parking_lot_of_each_tag, 4, "Number of parking lots of each tag");
65
BUTIL_VALIDATE_GFLAG(bthread_parking_lot_of_each_tag, [](const char*, int32_t val) {
66
    if (val < BTHREAD_MIN_PARKINGLOT) {
67
        LOG(ERROR) << "bthread_parking_lot_of_each_tag must be greater than or equal to "
68
                   << BTHREAD_MIN_PARKINGLOT;
69
        return false;
70
    }
71
    if (val > BTHREAD_MAX_PARKINGLOT) {
72
        LOG(ERROR) << "bthread_parking_lot_of_each_tag must be less than or equal to "
73
                   << BTHREAD_MAX_PARKINGLOT;
74
        return false;
75
    }
76
    return true;
77
});
78
79
static bool never_set_bthread_concurrency = true;
80
81
BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>), atomic_size_match);
82
83
pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER;
84
// Referenced in rpc, needs to be extern.
85
// Notice that we can't declare the variable as atomic<TaskControl*> which
86
// are not constructed before main().
87
TaskControl* g_task_control = NULL;
88
89
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
90
EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);
91
extern void (*g_worker_startfn)();
92
extern void (*g_tagged_worker_startfn)(bthread_tag_t);
93
extern void* (*g_create_span_func)();
94
95
0
inline TaskControl* get_task_control() {
96
0
    return g_task_control;
97
0
}
98
99
0
inline TaskControl* get_or_new_task_control() {
100
0
    butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
101
0
    TaskControl* c = p->load(butil::memory_order_consume);
102
0
    if (c != NULL) {
103
0
        return c;
104
0
    }
105
0
    BAIDU_SCOPED_LOCK(g_task_control_mutex);
106
0
    c = p->load(butil::memory_order_consume);
107
0
    if (c != NULL) {
108
0
        return c;
109
0
    }
110
0
    c = new (std::nothrow) TaskControl;
111
0
    if (NULL == c) {
112
0
        return NULL;
113
0
    }
114
0
    int concurrency = FLAGS_bthread_min_concurrency > 0 ?
115
0
        FLAGS_bthread_min_concurrency :
116
0
        FLAGS_bthread_concurrency;
117
0
    if (c->init(concurrency) != 0) {
118
0
        LOG(ERROR) << "Fail to init g_task_control";
119
0
        delete c;
120
0
        return NULL;
121
0
    }
122
0
    p->store(c, butil::memory_order_release);
123
0
    return c;
124
0
}
125
126
#ifdef BRPC_BTHREAD_TRACER
127
BAIDU_THREAD_LOCAL TaskMeta* pthread_fake_meta = NULL;
128
129
bthread_t init_for_pthread_stack_trace() {
130
    if (NULL != pthread_fake_meta) {
131
        return pthread_fake_meta->tid;
132
    }
133
134
    TaskControl* c = get_task_control();
135
    if (NULL == c) {
136
        LOG(ERROR) << "TaskControl has not been created, "
137
                      "please use bthread_start_xxx before call this function";
138
        return INVALID_BTHREAD;
139
    }
140
141
    butil::ResourceId<TaskMeta> slot;
142
    pthread_fake_meta = butil::get_resource(&slot);
143
    if (BAIDU_UNLIKELY(NULL == pthread_fake_meta)) {
144
        LOG(ERROR) << "Fail to get TaskMeta";
145
        return INVALID_BTHREAD;
146
    }
147
148
    pthread_fake_meta->attr = BTHREAD_ATTR_PTHREAD;
149
    pthread_fake_meta->tid = make_tid(*pthread_fake_meta->version_butex, slot);
150
    // Make TaskTracer use signal trace mode for pthread.
151
    c->_task_tracer.set_running_status(syscall(SYS_gettid), pthread_fake_meta);
152
153
    // Release the TaskMeta at exit of pthread.
154
    butil::thread_atexit([]() {
155
        // Similar to TaskGroup::task_runner.
156
        bool tracing;
157
        {
158
            BAIDU_SCOPED_LOCK(pthread_fake_meta->version_lock);
159
            tracing = TaskTracer::set_end_status_unsafe(pthread_fake_meta);
160
            // If resulting version is 0,
161
            // change it to 1 to make bthread_t never be 0.
162
            if (0 == ++*pthread_fake_meta->version_butex) {
163
                ++*pthread_fake_meta->version_butex;
164
            }
165
        }
166
167
        if (tracing) {
168
            // Wait for tracing completion.
169
            get_task_control()->_task_tracer.WaitForTracing(pthread_fake_meta);
170
        }
171
        get_task_control()->_task_tracer.set_status(
172
            TASK_STATUS_UNKNOWN, pthread_fake_meta);
173
174
        butil::return_resource(get_slot(pthread_fake_meta->tid));
175
        pthread_fake_meta = NULL;
176
    });
177
178
    return pthread_fake_meta->tid;
179
}
180
181
void stack_trace(std::ostream& os, bthread_t tid) {
182
    TaskControl* c = get_task_control();
183
    if (NULL == c) {
184
        os << "TaskControl has not been created";
185
        return;
186
    }
187
    c->stack_trace(os, tid);
188
}
189
190
std::string stack_trace(bthread_t tid) {
191
    TaskControl* c = get_task_control();
192
    if (NULL == c) {
193
        return "TaskControl has not been created";
194
    }
195
    return c->stack_trace(tid);
196
}
197
198
#endif // BRPC_BTHREAD_TRACER
199
200
// Print all living (started and not finished) bthreads
201
0
void print_living_tasks(std::ostream& os, bool enable_trace) {
202
0
    TaskControl* c = get_task_control();
203
0
    if (NULL == c) {
204
0
        os << "TaskControl has not been created";
205
0
        return;
206
0
    }
207
0
    auto tids = c->get_living_bthreads();
208
0
    if (tids.empty()) {
209
0
        os << "No living bthreads\n";
210
0
        return;
211
0
    }
212
0
    for (auto tid : tids) {
213
0
        print_task(os, tid, enable_trace, true);
214
0
    }
215
0
}
216
217
0
static int add_workers_for_each_tag(int num) {
218
0
    int added = 0;
219
0
    auto c = get_task_control();
220
0
    for (auto i = 0; i < num; ++i) {
221
0
        added += c->add_workers(1, i % FLAGS_task_group_ntags);
222
0
    }
223
0
    return added;
224
0
}
225
226
0
static bool validate_bthread_min_concurrency(const char*, int32_t val) {
227
0
    if (val <= 0) {
228
0
        return true;
229
0
    }
230
0
    if (val < BTHREAD_MIN_CONCURRENCY || val > FLAGS_bthread_concurrency) {
231
0
        return false;
232
0
    }
233
0
    TaskControl* c = get_task_control();
234
0
    if (!c) {
235
0
        return true;
236
0
    }
237
0
    BAIDU_SCOPED_LOCK(g_task_control_mutex);
238
0
    int concurrency = c->concurrency();
239
0
    if (val > concurrency) {
240
0
        int added = bthread::add_workers_for_each_tag(val - concurrency);
241
0
        return added == (val - concurrency);
242
0
    } else {
243
0
        return true;
244
0
    }
245
0
}
246
247
0
static bool validate_bthread_current_tag(const char*, int32_t val) {
248
0
    if (val == BTHREAD_TAG_INVALID) {
249
0
        return true;
250
0
    } else if (val < BTHREAD_TAG_DEFAULT || val >= FLAGS_task_group_ntags) {
251
0
        return false;
252
0
    }
253
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
254
0
    auto c = get_task_control();
255
0
    if (c == NULL) {
256
0
        FLAGS_bthread_concurrency_by_tag = 8 + BTHREAD_EPOLL_THREAD_NUM;
257
0
        return true;
258
0
    }
259
0
    FLAGS_bthread_concurrency_by_tag = c->concurrency(val);
260
0
    return true;
261
0
}
262
263
0
static bool validate_bthread_concurrency_by_tag(const char*, int32_t val) {
264
0
    return bthread_setconcurrency_by_tag(val, FLAGS_bthread_current_tag) == 0;
265
0
}
266
267
__thread TaskGroup* tls_task_group_nosignal = NULL;
268
269
BUTIL_FORCE_INLINE int
270
start_from_non_worker(bthread_t* __restrict tid,
271
                      const bthread_attr_t* __restrict attr,
272
                      void* (*fn)(void*),
273
0
                      void* __restrict arg) {
274
0
    TaskControl* c = get_or_new_task_control();
275
0
    if (NULL == c) {
276
0
        return ENOMEM;
277
0
    }
278
0
    auto tag = BTHREAD_TAG_DEFAULT;
279
0
    if (attr != NULL && attr->tag != BTHREAD_TAG_INVALID) {
280
0
        tag = attr->tag;
281
0
    }
282
0
    if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
283
        // Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons:
284
        // 1. NOSIGNAL is often for creating many bthreads in batch,
285
        //    inserting into the same TaskGroup maximizes the batch.
286
        // 2. bthread_flush() needs to know which TaskGroup to flush.
287
0
        auto g = tls_task_group_nosignal;
288
0
        if (NULL == g) {
289
0
            g = c->choose_one_group(tag);
290
0
            tls_task_group_nosignal = g;
291
0
        }
292
0
        return g->start_background<true>(tid, attr, fn, arg);
293
0
    }
294
0
    return c->choose_one_group(tag)->start_background<true>(tid, attr, fn, arg);
295
0
}
296
297
// Meet one of the three conditions, can run in thread local
298
// attr is nullptr
299
// tag equal to thread local
300
// tag equal to BTHREAD_TAG_INVALID
301
0
BUTIL_FORCE_INLINE bool can_run_thread_local(const bthread_attr_t* __restrict attr) {
302
0
    return attr == nullptr || attr->tag == bthread::tls_task_group->tag() ||
303
0
           attr->tag == BTHREAD_TAG_INVALID;
304
0
}
305
306
struct TidTraits {
307
    static const size_t BLOCK_SIZE = 63;
308
    static const size_t MAX_ENTRIES = 65536;
309
    static const size_t INIT_GC_SIZE = 65536;
310
    static const bthread_t ID_INIT;
311
0
    static bool exists(bthread_t id) { return bthread::TaskGroup::exists(id); }
312
};
313
const bthread_t TidTraits::ID_INIT = INVALID_BTHREAD;
314
315
typedef ListOfABAFreeId<bthread_t, TidTraits> TidList;
316
317
struct TidStopper {
318
0
    void operator()(bthread_t id) const { bthread_stop(id); }
319
};
320
struct TidJoiner {
321
0
    void operator()(bthread_t & id) const {
322
0
        bthread_join(id, NULL);
323
0
        id = INVALID_BTHREAD;
324
0
    }
325
};
326
327
}  // namespace bthread
328
329
extern "C" {
330
331
int bthread_start_urgent(bthread_t* __restrict tid,
332
                         const bthread_attr_t* __restrict attr,
333
                         void * (*fn)(void*),
334
0
                         void* __restrict arg) {
335
0
    bthread::TaskGroup* g = bthread::tls_task_group;
336
0
    if (g) {
337
        // if attribute is null use thread local task group
338
0
        if (bthread::can_run_thread_local(attr)) {
339
0
            return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
340
0
        }
341
0
    }
342
0
    return bthread::start_from_non_worker(tid, attr, fn, arg);
343
0
}
344
345
int bthread_start_background(bthread_t* __restrict tid,
346
                             const bthread_attr_t* __restrict attr,
347
                             void * (*fn)(void*),
348
0
                             void* __restrict arg) {
349
0
    bthread::TaskGroup* g = bthread::tls_task_group;
350
0
    if (g) {
351
        // if attribute is null use thread local task group
352
0
        if (bthread::can_run_thread_local(attr)) {
353
0
            return g->start_background<false>(tid, attr, fn, arg);
354
0
        }
355
0
    }
356
0
    return bthread::start_from_non_worker(tid, attr, fn, arg);
357
0
}
358
359
0
void bthread_flush() {
360
0
    bthread::TaskGroup* g = bthread::tls_task_group;
361
0
    if (g) {
362
0
        return g->flush_nosignal_tasks();
363
0
    }
364
0
    g = bthread::tls_task_group_nosignal;
365
0
    if (g) {
366
        // NOSIGNAL tasks were created in this non-worker.
367
0
        bthread::tls_task_group_nosignal = NULL;
368
0
        return g->flush_nosignal_tasks_remote();
369
0
    }
370
0
}
371
372
0
int bthread_interrupt(bthread_t tid, bthread_tag_t tag) {
373
0
    return bthread::TaskGroup::interrupt(tid, bthread::get_task_control(), tag);
374
0
}
375
376
0
int bthread_stop(bthread_t tid) {
377
0
    bthread::TaskGroup::set_stopped(tid);
378
0
    return bthread_interrupt(tid);
379
0
}
380
381
0
int bthread_stopped(bthread_t tid) {
382
0
    return (int)bthread::TaskGroup::is_stopped(tid);
383
0
}
384
385
5.89k
bthread_t bthread_self(void) {
386
5.89k
    bthread::TaskGroup* g = bthread::tls_task_group;
387
    // note: return 0 for main tasks now, which include main thread and
388
    // all work threads. So that we can identify main tasks from logs
389
    // more easily. This is probably questionable in the future.
390
5.89k
    if (g != NULL && !g->is_current_main_task()/*note*/) {
391
0
        return g->current_tid();
392
0
    }
393
5.89k
    return INVALID_BTHREAD;
394
5.89k
}
395
396
0
int bthread_equal(bthread_t t1, bthread_t t2) {
397
0
    return t1 == t2;
398
0
}
399
400
#ifdef BUTIL_USE_ASAN
401
// Fixme!!!
402
// The noreturn `bthread_exit' may cause a warning of ASan, but does not abort the program.
403
//
404
// ==94463==WARNING: ASan is ignoring requested __asan_handle_no_return: stack type: default top: 0x00016dd7f000; bottom 0x00010b1a4000; size: 0x000062bdb000 (1656598528)
405
// False positive error reports may follow
406
#endif // BUTIL_USE_ASAN
407
0
void bthread_exit(void* retval) {
408
0
    bthread::TaskGroup* g = bthread::tls_task_group;
409
0
    if (g != NULL && !g->is_current_main_task()) {
410
0
        throw bthread::ExitException(retval);
411
0
    } else {
412
0
        pthread_exit(retval);
413
0
    }
414
0
}
415
416
0
int bthread_join(bthread_t tid, void** thread_return) {
417
0
    return bthread::TaskGroup::join(tid, thread_return);
418
0
}
419
420
0
int bthread_attr_init(bthread_attr_t* a) {
421
0
    *a = BTHREAD_ATTR_NORMAL;
422
0
    return 0;
423
0
}
424
425
0
int bthread_attr_destroy(bthread_attr_t*) {
426
0
    return 0;
427
0
}
428
429
0
int bthread_getattr(bthread_t tid, bthread_attr_t* attr) {
430
0
    return bthread::TaskGroup::get_attr(tid, attr);
431
0
}
432
433
0
int bthread_getconcurrency(void) {
434
0
    return bthread::FLAGS_bthread_concurrency;
435
0
}
436
437
0
int bthread_setconcurrency(int num) {
438
0
    if (num < BTHREAD_MIN_CONCURRENCY || num > BTHREAD_MAX_CONCURRENCY) {
439
0
        LOG(ERROR) << "Invalid concurrency=" << num;
440
0
        return EINVAL;
441
0
    }
442
0
    if (bthread::FLAGS_bthread_min_concurrency > 0) {
443
0
        if (num < bthread::FLAGS_bthread_min_concurrency) {
444
0
            return EINVAL;
445
0
        }
446
0
        if (bthread::never_set_bthread_concurrency) {
447
0
            bthread::never_set_bthread_concurrency = false;
448
0
        }
449
0
        bthread::FLAGS_bthread_concurrency = num;
450
0
        return 0;
451
0
    }
452
0
    bthread::TaskControl* c = bthread::get_task_control();
453
0
    if (c != NULL) {
454
0
        if (num < c->concurrency()) {
455
0
            return EPERM;
456
0
        } else if (num == c->concurrency()) {
457
0
            return 0;
458
0
        }
459
0
    }
460
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
461
0
    c = bthread::get_task_control();
462
0
    if (c == NULL) {
463
0
        if (bthread::never_set_bthread_concurrency) {
464
0
            bthread::never_set_bthread_concurrency = false;
465
0
            bthread::FLAGS_bthread_concurrency = num;
466
0
        } else if (num > bthread::FLAGS_bthread_concurrency) {
467
0
            bthread::FLAGS_bthread_concurrency = num;
468
0
        }
469
0
        return 0;
470
0
    }
471
0
    if (bthread::FLAGS_bthread_concurrency != c->concurrency()) {
472
0
        LOG(ERROR) << "CHECK failed: bthread_concurrency="
473
0
                   << bthread::FLAGS_bthread_concurrency
474
0
                   << " != tc_concurrency=" << c->concurrency();
475
0
        bthread::FLAGS_bthread_concurrency = c->concurrency();
476
0
    }
477
0
    if (num > bthread::FLAGS_bthread_concurrency) {
478
        // Create more workers if needed.
479
0
        auto added = bthread::add_workers_for_each_tag(num - bthread::FLAGS_bthread_concurrency);
480
0
        bthread::FLAGS_bthread_concurrency += added;
481
0
    }
482
0
    return (num == bthread::FLAGS_bthread_concurrency ? 0 : EPERM);
483
0
}
484
485
0
int bthread_getconcurrency_by_tag(bthread_tag_t tag) {
486
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
487
0
    auto c = bthread::get_task_control();
488
0
    if (c == NULL) {
489
0
        return EPERM;
490
0
    }
491
0
    return c->concurrency(tag);
492
0
}
493
494
0
int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) {
495
0
    if (tag == BTHREAD_TAG_INVALID) {
496
0
        return 0;
497
0
    } else if (tag < BTHREAD_TAG_DEFAULT || tag >= FLAGS_task_group_ntags) {
498
0
        return EINVAL;
499
0
    }
500
0
    if (num < BTHREAD_MIN_CONCURRENCY || num > BTHREAD_MAX_CONCURRENCY) {
501
0
        LOG(ERROR) << "Invalid concurrency_by_tag=" << num;
502
0
        return EINVAL;
503
0
    }
504
0
    auto c = bthread::get_or_new_task_control();
505
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
506
0
    auto tag_ngroup = c->concurrency(tag);
507
0
    auto add = num - tag_ngroup;
508
509
0
    if (add >= 0) {
510
0
        auto added = c->add_workers(add, tag);
511
0
        bthread::FLAGS_bthread_concurrency += added;
512
0
        return (add == added ? 0 : EPERM);
513
0
    } else {
514
0
        LOG(WARNING) << "Fail to set concurrency by tag: " << tag
515
0
                     << ", tag concurrency should be larger than old oncurrency. old concurrency: "
516
0
                     << tag_ngroup << ", new concurrency: " << num;
517
0
        return EPERM;
518
0
    }
519
0
}
520
521
0
int bthread_about_to_quit() {
522
0
    bthread::TaskGroup* g = bthread::tls_task_group;
523
0
    if (g != NULL) {
524
0
        bthread::TaskMeta* current_task = g->current_task();
525
0
        if(!(current_task->attr.flags & BTHREAD_NEVER_QUIT)) {
526
0
            current_task->about_to_quit = true;
527
0
        }
528
0
        return 0;
529
0
    }
530
0
    return EPERM;
531
0
}
532
533
int bthread_timer_add(bthread_timer_t* id, timespec abstime,
534
0
                      void (*on_timer)(void*), void* arg) {
535
0
    bthread::TaskControl* c = bthread::get_or_new_task_control();
536
0
    if (c == NULL) {
537
0
        return ENOMEM;
538
0
    }
539
0
    bthread::TimerThread* tt = bthread::get_or_create_global_timer_thread();
540
0
    if (tt == NULL) {
541
0
        return ENOMEM;
542
0
    }
543
0
    bthread_timer_t tmp = tt->schedule(on_timer, arg, abstime);
544
0
    if (tmp != 0) {
545
0
        *id = tmp;
546
0
        return 0;
547
0
    }
548
0
    return ESTOP;
549
0
}
550
551
0
int bthread_timer_del(bthread_timer_t id) {
552
0
    bthread::TaskControl* c = bthread::get_task_control();
553
0
    if (c != NULL) {
554
0
        bthread::TimerThread* tt = bthread::get_global_timer_thread();
555
0
        if (tt == NULL) {
556
0
            return EINVAL;
557
0
        }
558
0
        const int state = tt->unschedule(id);
559
0
        if (state >= 0) {
560
0
            return state;
561
0
        }
562
0
    }
563
0
    return EINVAL;
564
0
}
565
566
0
int bthread_usleep(uint64_t microseconds) {
567
0
    bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
568
0
    if (NULL != g && !g->is_current_pthread_task()) {
569
0
        return bthread::TaskGroup::usleep(&g, microseconds);
570
0
    }
571
0
    return ::usleep(microseconds);
572
0
}
573
574
0
int bthread_yield(void) {
575
0
    bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
576
0
    if (NULL != g && !g->is_current_pthread_task()) {
577
0
        bthread::TaskGroup::yield(&g);
578
0
        return 0;
579
0
    }
580
    // pthread_yield is not available on MAC
581
0
    return sched_yield();
582
0
}
583
584
0
int bthread_set_worker_startfn(void (*start_fn)()) {
585
0
    if (start_fn == NULL) {
586
0
        return EINVAL;
587
0
    }
588
0
    bthread::g_worker_startfn = start_fn;
589
0
    return 0;
590
0
}
591
592
0
int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)) {
593
0
    if (start_fn == NULL) {
594
0
        return EINVAL;
595
0
    }
596
0
    bthread::g_tagged_worker_startfn = start_fn;
597
0
    return 0;
598
0
}
599
600
0
int bthread_set_create_span_func(void* (*func)()) {
601
0
    if (func == NULL) {
602
0
        return EINVAL;
603
0
    }
604
0
    bthread::g_create_span_func = func;
605
0
    return 0;
606
0
}
607
608
0
void bthread_stop_world() {
609
0
    bthread::TaskControl* c = bthread::get_task_control();
610
0
    if (c != NULL) {
611
0
        c->stop_and_join();
612
0
    }
613
0
}
614
615
int bthread_list_init(bthread_list_t* list,
616
                      unsigned /*size*/,
617
0
                      unsigned /*conflict_size*/) {
618
0
    list->impl = new (std::nothrow) bthread::TidList;
619
0
    if (NULL == list->impl) {
620
0
        return ENOMEM;
621
0
    }
622
    // Set unused fields to zero as well.
623
0
    list->head = 0;
624
0
    list->size = 0;
625
0
    list->conflict_head = 0;
626
0
    list->conflict_size = 0;
627
0
    return 0;
628
0
}
629
630
0
void bthread_list_destroy(bthread_list_t* list) {
631
0
    delete static_cast<bthread::TidList*>(list->impl);
632
0
    list->impl = NULL;
633
0
}
634
635
0
int bthread_list_add(bthread_list_t* list, bthread_t id) {
636
0
    if (list->impl == NULL) {
637
0
        return EINVAL;
638
0
    }
639
0
    return static_cast<bthread::TidList*>(list->impl)->add(id);
640
0
}
641
642
0
int bthread_list_stop(bthread_list_t* list) {
643
0
    if (list->impl == NULL) {
644
0
        return EINVAL;
645
0
    }
646
0
    static_cast<bthread::TidList*>(list->impl)->apply(bthread::TidStopper());
647
0
    return 0;
648
0
}
649
650
0
int bthread_list_join(bthread_list_t* list) {
651
0
    if (list->impl == NULL) {
652
0
        return EINVAL;
653
0
    }
654
0
    static_cast<bthread::TidList*>(list->impl)->apply(bthread::TidJoiner());
655
0
    return 0;
656
0
}
657
658
0
bthread_tag_t bthread_self_tag(void) {
659
0
    return bthread::tls_task_group != nullptr ? bthread::tls_task_group->tag()
660
0
                                              : BTHREAD_TAG_DEFAULT;
661
0
}
662
663
0
uint64_t bthread_cpu_clock_ns(void) {
664
0
     bthread::TaskGroup* g = bthread::tls_task_group;
665
0
    if (g != NULL && !g->is_current_main_task()) {
666
0
        return g->current_task_cpu_clock_ns();
667
0
    }
668
0
    return 0;
669
0
}
670
671
}  // extern "C"