Coverage Report

Created: 2025-05-08 06:06

/src/brpc/src/bthread/bthread.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
// Date: Tue Jul 10 17:40:58 CST 2012
21
22
#include <sys/syscall.h>
23
#include <gflags/gflags.h>
24
#include "butil/macros.h"                       // BAIDU_CASSERT
25
#include "butil/logging.h"
26
#include "butil/thread_local.h"
27
#include "butil/reloadable_flags.h"
28
#include "bthread/task_group.h"                // TaskGroup
29
#include "bthread/task_control.h"              // TaskControl
30
#include "bthread/timer_thread.h"
31
#include "bthread/list_of_abafree_id.h"
32
#include "bthread/bthread.h"
33
34
namespace bthread {
35
36
0
static bool validate_bthread_concurrency(const char*, int32_t val) {
37
    // bthread_setconcurrency sets the flag on success path which should
38
    // not be strictly in a validator. But it's OK for a int flag.
39
0
    return bthread_setconcurrency(val) == 0;
40
0
}
41
static bool validate_bthread_min_concurrency(const char*, int32_t val);
42
static bool validate_bthread_current_tag(const char*, int32_t val);
43
static bool validate_bthread_concurrency_by_tag(const char*, int32_t val);
44
45
DEFINE_int32(bthread_concurrency, 8 + BTHREAD_EPOLL_THREAD_NUM,
46
             "Number of pthread workers");
47
BUTIL_VALIDATE_GFLAG(bthread_concurrency, validate_bthread_concurrency);
48
49
DEFINE_int32(bthread_min_concurrency, 0,
50
            "Initial number of pthread workers which will be added on-demand."
51
            " The laziness is disabled when this value is non-positive,"
52
            " and workers will be created eagerly according to -bthread_concurrency and bthread_setconcurrency(). ");
53
BUTIL_VALIDATE_GFLAG(bthread_min_concurrency, validate_bthread_min_concurrency);
54
55
DEFINE_int32(bthread_current_tag, BTHREAD_TAG_INVALID, "Set bthread concurrency for this tag");
56
BUTIL_VALIDATE_GFLAG(bthread_current_tag, validate_bthread_current_tag);
57
58
DEFINE_int32(bthread_concurrency_by_tag, 8 + BTHREAD_EPOLL_THREAD_NUM,
59
             "Number of pthread workers of FLAGS_bthread_current_tag");
60
BUTIL_VALIDATE_GFLAG(bthread_concurrency_by_tag, validate_bthread_concurrency_by_tag);
61
62
static bool never_set_bthread_concurrency = true;
63
64
BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>), atomic_size_match);
65
66
pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER;
67
// Referenced in rpc, needs to be extern.
68
// Notice that we can't declare the variable as atomic<TaskControl*> which
69
// are not constructed before main().
70
TaskControl* g_task_control = NULL;
71
72
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
73
EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);
74
extern void (*g_worker_startfn)();
75
extern void (*g_tagged_worker_startfn)(bthread_tag_t);
76
extern void* (*g_create_span_func)();
77
78
0
inline TaskControl* get_task_control() {
79
0
    return g_task_control;
80
0
}
81
82
0
inline TaskControl* get_or_new_task_control() {
83
0
    butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
84
0
    TaskControl* c = p->load(butil::memory_order_consume);
85
0
    if (c != NULL) {
86
0
        return c;
87
0
    }
88
0
    BAIDU_SCOPED_LOCK(g_task_control_mutex);
89
0
    c = p->load(butil::memory_order_consume);
90
0
    if (c != NULL) {
91
0
        return c;
92
0
    }
93
0
    c = new (std::nothrow) TaskControl;
94
0
    if (NULL == c) {
95
0
        return NULL;
96
0
    }
97
0
    int concurrency = FLAGS_bthread_min_concurrency > 0 ?
98
0
        FLAGS_bthread_min_concurrency :
99
0
        FLAGS_bthread_concurrency;
100
0
    if (c->init(concurrency) != 0) {
101
0
        LOG(ERROR) << "Fail to init g_task_control";
102
0
        delete c;
103
0
        return NULL;
104
0
    }
105
0
    p->store(c, butil::memory_order_release);
106
0
    return c;
107
0
}
108
109
#ifdef BRPC_BTHREAD_TRACER
110
BAIDU_THREAD_LOCAL TaskMeta* pthread_fake_meta = NULL;
111
112
bthread_t init_for_pthread_stack_trace() {
113
    if (NULL != pthread_fake_meta) {
114
        return pthread_fake_meta->tid;
115
    }
116
117
    TaskControl* c = get_task_control();
118
    if (NULL == c) {
119
        LOG(ERROR) << "TaskControl has not been created, "
120
                      "please use bthread_start_xxx before call this function";
121
        return INVALID_BTHREAD;
122
    }
123
124
    butil::ResourceId<TaskMeta> slot;
125
    pthread_fake_meta = butil::get_resource(&slot);
126
    if (BAIDU_UNLIKELY(NULL == pthread_fake_meta)) {
127
        LOG(ERROR) << "Fail to get TaskMeta";
128
        return INVALID_BTHREAD;
129
    }
130
131
    pthread_fake_meta->attr = BTHREAD_ATTR_PTHREAD;
132
    pthread_fake_meta->tid = make_tid(*pthread_fake_meta->version_butex, slot);
133
    // Make TaskTracer use signal trace mode for pthread.
134
    c->_task_tracer.set_running_status(syscall(SYS_gettid), pthread_fake_meta);
135
136
    // Release the TaskMeta at exit of pthread.
137
    butil::thread_atexit([]() {
138
        // Similar to TaskGroup::task_runner.
139
        bool tracing;
140
        {
141
            BAIDU_SCOPED_LOCK(pthread_fake_meta->version_lock);
142
            tracing = TaskTracer::set_end_status_unsafe(pthread_fake_meta);
143
            // If resulting version is 0,
144
            // change it to 1 to make bthread_t never be 0.
145
            if (0 == ++*pthread_fake_meta->version_butex) {
146
                ++*pthread_fake_meta->version_butex;
147
            }
148
        }
149
150
        if (tracing) {
151
            // Wait for tracing completion.
152
            get_task_control()->_task_tracer.WaitForTracing(pthread_fake_meta);
153
        }
154
        get_task_control()->_task_tracer.set_status(
155
            TASK_STATUS_UNKNOWN, pthread_fake_meta);
156
157
        butil::return_resource(get_slot(pthread_fake_meta->tid));
158
        pthread_fake_meta = NULL;
159
    });
160
161
    return pthread_fake_meta->tid;
162
}
163
164
void stack_trace(std::ostream& os, bthread_t tid) {
165
    TaskControl* c = get_task_control();
166
    if (NULL == c) {
167
        os << "TaskControl has not been created";
168
        return;
169
    }
170
    c->stack_trace(os, tid);
171
}
172
173
std::string stack_trace(bthread_t tid) {
174
    TaskControl* c = get_task_control();
175
    if (NULL == c) {
176
        return "TaskControl has not been created";
177
    }
178
    return c->stack_trace(tid);
179
}
180
#endif // BRPC_BTHREAD_TRACER
181
182
0
static int add_workers_for_each_tag(int num) {
183
0
    int added = 0;
184
0
    auto c = get_task_control();
185
0
    for (auto i = 0; i < num; ++i) {
186
0
        added += c->add_workers(1, i % FLAGS_task_group_ntags);
187
0
    }
188
0
    return added;
189
0
}
190
191
0
static bool validate_bthread_min_concurrency(const char*, int32_t val) {
192
0
    if (val <= 0) {
193
0
        return true;
194
0
    }
195
0
    if (val < BTHREAD_MIN_CONCURRENCY || val > FLAGS_bthread_concurrency) {
196
0
        return false;
197
0
    }
198
0
    TaskControl* c = get_task_control();
199
0
    if (!c) {
200
0
        return true;
201
0
    }
202
0
    BAIDU_SCOPED_LOCK(g_task_control_mutex);
203
0
    int concurrency = c->concurrency();
204
0
    if (val > concurrency) {
205
0
        int added = bthread::add_workers_for_each_tag(val - concurrency);
206
0
        return added == (val - concurrency);
207
0
    } else {
208
0
        return true;
209
0
    }
210
0
}
211
212
0
static bool validate_bthread_current_tag(const char*, int32_t val) {
213
0
    if (val == BTHREAD_TAG_INVALID) {
214
0
        return true;
215
0
    } else if (val < BTHREAD_TAG_DEFAULT || val >= FLAGS_task_group_ntags) {
216
0
        return false;
217
0
    }
218
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
219
0
    auto c = bthread::get_task_control();
220
0
    if (c == NULL) {
221
0
        FLAGS_bthread_concurrency_by_tag = 8 + BTHREAD_EPOLL_THREAD_NUM;
222
0
        return true;
223
0
    }
224
0
    FLAGS_bthread_concurrency_by_tag = c->concurrency(val);
225
0
    return true;
226
0
}
227
228
0
static bool validate_bthread_concurrency_by_tag(const char*, int32_t val) {
229
0
    return bthread_setconcurrency_by_tag(val, FLAGS_bthread_current_tag) == 0;
230
0
}
231
232
__thread TaskGroup* tls_task_group_nosignal = NULL;
233
234
BUTIL_FORCE_INLINE int
235
start_from_non_worker(bthread_t* __restrict tid,
236
                      const bthread_attr_t* __restrict attr,
237
                      void* (*fn)(void*),
238
0
                      void* __restrict arg) {
239
0
    TaskControl* c = get_or_new_task_control();
240
0
    if (NULL == c) {
241
0
        return ENOMEM;
242
0
    }
243
0
    auto tag = BTHREAD_TAG_DEFAULT;
244
0
    if (attr != NULL && attr->tag != BTHREAD_TAG_INVALID) {
245
0
        tag = attr->tag;
246
0
    }
247
0
    if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
248
        // Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons:
249
        // 1. NOSIGNAL is often for creating many bthreads in batch,
250
        //    inserting into the same TaskGroup maximizes the batch.
251
        // 2. bthread_flush() needs to know which TaskGroup to flush.
252
0
        auto g = tls_task_group_nosignal;
253
0
        if (NULL == g) {
254
0
            g = c->choose_one_group(tag);
255
0
            tls_task_group_nosignal = g;
256
0
        }
257
0
        return g->start_background<true>(tid, attr, fn, arg);
258
0
    }
259
0
    return c->choose_one_group(tag)->start_background<true>(tid, attr, fn, arg);
260
0
}
261
262
// Meet one of the three conditions, can run in thread local
263
// attr is nullptr
264
// tag equal to thread local
265
// tag equal to BTHREAD_TAG_INVALID
266
0
BUTIL_FORCE_INLINE bool can_run_thread_local(const bthread_attr_t* __restrict attr) {
267
0
    return attr == nullptr || attr->tag == bthread::tls_task_group->tag() ||
268
0
           attr->tag == BTHREAD_TAG_INVALID;
269
0
}
270
271
struct TidTraits {
272
    static const size_t BLOCK_SIZE = 63;
273
    static const size_t MAX_ENTRIES = 65536;
274
    static const size_t INIT_GC_SIZE = 65536;
275
    static const bthread_t ID_INIT;
276
0
    static bool exists(bthread_t id) { return bthread::TaskGroup::exists(id); }
277
};
278
const bthread_t TidTraits::ID_INIT = INVALID_BTHREAD;
279
280
typedef ListOfABAFreeId<bthread_t, TidTraits> TidList;
281
282
struct TidStopper {
283
0
    void operator()(bthread_t id) const { bthread_stop(id); }
284
};
285
struct TidJoiner {
286
0
    void operator()(bthread_t & id) const {
287
0
        bthread_join(id, NULL);
288
0
        id = INVALID_BTHREAD;
289
0
    }
290
};
291
292
}  // namespace bthread
293
294
extern "C" {
295
296
int bthread_start_urgent(bthread_t* __restrict tid,
297
                         const bthread_attr_t* __restrict attr,
298
                         void * (*fn)(void*),
299
0
                         void* __restrict arg) {
300
0
    bthread::TaskGroup* g = bthread::tls_task_group;
301
0
    if (g) {
302
        // if attribute is null use thread local task group
303
0
        if (bthread::can_run_thread_local(attr)) {
304
0
            return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
305
0
        }
306
0
    }
307
0
    return bthread::start_from_non_worker(tid, attr, fn, arg);
308
0
}
309
310
int bthread_start_background(bthread_t* __restrict tid,
311
                             const bthread_attr_t* __restrict attr,
312
                             void * (*fn)(void*),
313
0
                             void* __restrict arg) {
314
0
    bthread::TaskGroup* g = bthread::tls_task_group;
315
0
    if (g) {
316
        // if attribute is null use thread local task group
317
0
        if (bthread::can_run_thread_local(attr)) {
318
0
            return g->start_background<false>(tid, attr, fn, arg);
319
0
        }
320
0
    }
321
0
    return bthread::start_from_non_worker(tid, attr, fn, arg);
322
0
}
323
324
0
void bthread_flush() {
325
0
    bthread::TaskGroup* g = bthread::tls_task_group;
326
0
    if (g) {
327
0
        return g->flush_nosignal_tasks();
328
0
    }
329
0
    g = bthread::tls_task_group_nosignal;
330
0
    if (g) {
331
        // NOSIGNAL tasks were created in this non-worker.
332
0
        bthread::tls_task_group_nosignal = NULL;
333
0
        return g->flush_nosignal_tasks_remote();
334
0
    }
335
0
}
336
337
0
int bthread_interrupt(bthread_t tid, bthread_tag_t tag) {
338
0
    return bthread::TaskGroup::interrupt(tid, bthread::get_task_control(), tag);
339
0
}
340
341
0
int bthread_stop(bthread_t tid) {
342
0
    bthread::TaskGroup::set_stopped(tid);
343
0
    return bthread_interrupt(tid);
344
0
}
345
346
0
int bthread_stopped(bthread_t tid) {
347
0
    return (int)bthread::TaskGroup::is_stopped(tid);
348
0
}
349
350
24.4k
bthread_t bthread_self(void) {
351
24.4k
    bthread::TaskGroup* g = bthread::tls_task_group;
352
    // note: return 0 for main tasks now, which include main thread and
353
    // all work threads. So that we can identify main tasks from logs
354
    // more easily. This is probably questionable in the future.
355
24.4k
    if (g != NULL && !g->is_current_main_task()/*note*/) {
356
0
        return g->current_tid();
357
0
    }
358
24.4k
    return INVALID_BTHREAD;
359
24.4k
}
360
361
0
int bthread_equal(bthread_t t1, bthread_t t2) {
362
0
    return t1 == t2;
363
0
}
364
365
#ifdef BUTIL_USE_ASAN
366
// Fixme!!!
367
// The noreturn `bthread_exit' may cause a warning of ASan, but does not abort the program.
368
//
369
// ==94463==WARNING: ASan is ignoring requested __asan_handle_no_return: stack type: default top: 0x00016dd7f000; bottom 0x00010b1a4000; size: 0x000062bdb000 (1656598528)
370
// False positive error reports may follow
371
#endif // BUTIL_USE_ASAN
372
0
void bthread_exit(void* retval) {
373
0
    bthread::TaskGroup* g = bthread::tls_task_group;
374
0
    if (g != NULL && !g->is_current_main_task()) {
375
0
        throw bthread::ExitException(retval);
376
0
    } else {
377
0
        pthread_exit(retval);
378
0
    }
379
0
}
380
381
0
int bthread_join(bthread_t tid, void** thread_return) {
382
0
    return bthread::TaskGroup::join(tid, thread_return);
383
0
}
384
385
0
int bthread_attr_init(bthread_attr_t* a) {
386
0
    *a = BTHREAD_ATTR_NORMAL;
387
0
    return 0;
388
0
}
389
390
0
int bthread_attr_destroy(bthread_attr_t*) {
391
0
    return 0;
392
0
}
393
394
0
int bthread_getattr(bthread_t tid, bthread_attr_t* attr) {
395
0
    return bthread::TaskGroup::get_attr(tid, attr);
396
0
}
397
398
0
int bthread_getconcurrency(void) {
399
0
    return bthread::FLAGS_bthread_concurrency;
400
0
}
401
402
0
int bthread_setconcurrency(int num) {
403
0
    if (num < BTHREAD_MIN_CONCURRENCY || num > BTHREAD_MAX_CONCURRENCY) {
404
0
        LOG(ERROR) << "Invalid concurrency=" << num;
405
0
        return EINVAL;
406
0
    }
407
0
    if (bthread::FLAGS_bthread_min_concurrency > 0) {
408
0
        if (num < bthread::FLAGS_bthread_min_concurrency) {
409
0
            return EINVAL;
410
0
        }
411
0
        if (bthread::never_set_bthread_concurrency) {
412
0
            bthread::never_set_bthread_concurrency = false;
413
0
        }
414
0
        bthread::FLAGS_bthread_concurrency = num;
415
0
        return 0;
416
0
    }
417
0
    bthread::TaskControl* c = bthread::get_task_control();
418
0
    if (c != NULL) {
419
0
        if (num < c->concurrency()) {
420
0
            return EPERM;
421
0
        } else if (num == c->concurrency()) {
422
0
            return 0;
423
0
        }
424
0
    }
425
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
426
0
    c = bthread::get_task_control();
427
0
    if (c == NULL) {
428
0
        if (bthread::never_set_bthread_concurrency) {
429
0
            bthread::never_set_bthread_concurrency = false;
430
0
            bthread::FLAGS_bthread_concurrency = num;
431
0
        } else if (num > bthread::FLAGS_bthread_concurrency) {
432
0
            bthread::FLAGS_bthread_concurrency = num;
433
0
        }
434
0
        return 0;
435
0
    }
436
0
    if (bthread::FLAGS_bthread_concurrency != c->concurrency()) {
437
0
        LOG(ERROR) << "CHECK failed: bthread_concurrency="
438
0
                   << bthread::FLAGS_bthread_concurrency
439
0
                   << " != tc_concurrency=" << c->concurrency();
440
0
        bthread::FLAGS_bthread_concurrency = c->concurrency();
441
0
    }
442
0
    if (num > bthread::FLAGS_bthread_concurrency) {
443
        // Create more workers if needed.
444
0
        auto added = bthread::add_workers_for_each_tag(num - bthread::FLAGS_bthread_concurrency);
445
0
        bthread::FLAGS_bthread_concurrency += added;
446
0
    }
447
0
    return (num == bthread::FLAGS_bthread_concurrency ? 0 : EPERM);
448
0
}
449
450
0
int bthread_getconcurrency_by_tag(bthread_tag_t tag) {
451
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
452
0
    auto c = bthread::get_task_control();
453
0
    if (c == NULL) {
454
0
        return EPERM;
455
0
    }
456
0
    return c->concurrency(tag);
457
0
}
458
459
0
int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) {
460
0
    if (tag == BTHREAD_TAG_INVALID) {
461
0
        return 0;
462
0
    } else if (tag < BTHREAD_TAG_DEFAULT || tag >= FLAGS_task_group_ntags) {
463
0
        return EINVAL;
464
0
    }
465
0
    if (num < BTHREAD_MIN_CONCURRENCY || num > BTHREAD_MAX_CONCURRENCY) {
466
0
        LOG(ERROR) << "Invalid concurrency_by_tag=" << num;
467
0
        return EINVAL;
468
0
    }
469
0
    auto c = bthread::get_or_new_task_control();
470
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
471
0
    auto tag_ngroup = c->concurrency(tag);
472
0
    auto add = num - tag_ngroup;
473
474
0
    if (add >= 0) {
475
0
        auto added = c->add_workers(add, tag);
476
0
        bthread::FLAGS_bthread_concurrency += added;
477
0
        return (add == added ? 0 : EPERM);
478
0
    } else {
479
0
        LOG(WARNING) << "Fail to set concurrency by tag: " << tag
480
0
                     << ", tag concurrency should be larger than old oncurrency. old concurrency: "
481
0
                     << tag_ngroup << ", new concurrency: " << num;
482
0
        return EPERM;
483
0
    }
484
0
}
485
486
0
int bthread_about_to_quit() {
487
0
    bthread::TaskGroup* g = bthread::tls_task_group;
488
0
    if (g != NULL) {
489
0
        bthread::TaskMeta* current_task = g->current_task();
490
0
        if(!(current_task->attr.flags & BTHREAD_NEVER_QUIT)) {
491
0
            current_task->about_to_quit = true;
492
0
        }
493
0
        return 0;
494
0
    }
495
0
    return EPERM;
496
0
}
497
498
int bthread_timer_add(bthread_timer_t* id, timespec abstime,
499
0
                      void (*on_timer)(void*), void* arg) {
500
0
    bthread::TaskControl* c = bthread::get_or_new_task_control();
501
0
    if (c == NULL) {
502
0
        return ENOMEM;
503
0
    }
504
0
    bthread::TimerThread* tt = bthread::get_or_create_global_timer_thread();
505
0
    if (tt == NULL) {
506
0
        return ENOMEM;
507
0
    }
508
0
    bthread_timer_t tmp = tt->schedule(on_timer, arg, abstime);
509
0
    if (tmp != 0) {
510
0
        *id = tmp;
511
0
        return 0;
512
0
    }
513
0
    return ESTOP;
514
0
}
515
516
0
int bthread_timer_del(bthread_timer_t id) {
517
0
    bthread::TaskControl* c = bthread::get_task_control();
518
0
    if (c != NULL) {
519
0
        bthread::TimerThread* tt = bthread::get_global_timer_thread();
520
0
        if (tt == NULL) {
521
0
            return EINVAL;
522
0
        }
523
0
        const int state = tt->unschedule(id);
524
0
        if (state >= 0) {
525
0
            return state;
526
0
        }
527
0
    }
528
0
    return EINVAL;
529
0
}
530
531
0
int bthread_usleep(uint64_t microseconds) {
532
0
    bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
533
0
    if (NULL != g && !g->is_current_pthread_task()) {
534
0
        return bthread::TaskGroup::usleep(&g, microseconds);
535
0
    }
536
0
    return ::usleep(microseconds);
537
0
}
538
539
0
int bthread_yield(void) {
540
0
    bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
541
0
    if (NULL != g && !g->is_current_pthread_task()) {
542
0
        bthread::TaskGroup::yield(&g);
543
0
        return 0;
544
0
    }
545
    // pthread_yield is not available on MAC
546
0
    return sched_yield();
547
0
}
548
549
0
int bthread_set_worker_startfn(void (*start_fn)()) {
550
0
    if (start_fn == NULL) {
551
0
        return EINVAL;
552
0
    }
553
0
    bthread::g_worker_startfn = start_fn;
554
0
    return 0;
555
0
}
556
557
0
int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)) {
558
0
    if (start_fn == NULL) {
559
0
        return EINVAL;
560
0
    }
561
0
    bthread::g_tagged_worker_startfn = start_fn;
562
0
    return 0;
563
0
}
564
565
0
int bthread_set_create_span_func(void* (*func)()) {
566
0
    if (func == NULL) {
567
0
        return EINVAL;
568
0
    }
569
0
    bthread::g_create_span_func = func;
570
0
    return 0;
571
0
}
572
573
0
void bthread_stop_world() {
574
0
    bthread::TaskControl* c = bthread::get_task_control();
575
0
    if (c != NULL) {
576
0
        c->stop_and_join();
577
0
    }
578
0
}
579
580
int bthread_list_init(bthread_list_t* list,
581
                      unsigned /*size*/,
582
0
                      unsigned /*conflict_size*/) {
583
0
    list->impl = new (std::nothrow) bthread::TidList;
584
0
    if (NULL == list->impl) {
585
0
        return ENOMEM;
586
0
    }
587
    // Set unused fields to zero as well.
588
0
    list->head = 0;
589
0
    list->size = 0;
590
0
    list->conflict_head = 0;
591
0
    list->conflict_size = 0;
592
0
    return 0;
593
0
}
594
595
0
void bthread_list_destroy(bthread_list_t* list) {
596
0
    delete static_cast<bthread::TidList*>(list->impl);
597
0
    list->impl = NULL;
598
0
}
599
600
0
int bthread_list_add(bthread_list_t* list, bthread_t id) {
601
0
    if (list->impl == NULL) {
602
0
        return EINVAL;
603
0
    }
604
0
    return static_cast<bthread::TidList*>(list->impl)->add(id);
605
0
}
606
607
0
int bthread_list_stop(bthread_list_t* list) {
608
0
    if (list->impl == NULL) {
609
0
        return EINVAL;
610
0
    }
611
0
    static_cast<bthread::TidList*>(list->impl)->apply(bthread::TidStopper());
612
0
    return 0;
613
0
}
614
615
0
int bthread_list_join(bthread_list_t* list) {
616
0
    if (list->impl == NULL) {
617
0
        return EINVAL;
618
0
    }
619
0
    static_cast<bthread::TidList*>(list->impl)->apply(bthread::TidJoiner());
620
0
    return 0;
621
0
}
622
623
0
bthread_tag_t bthread_self_tag(void) {
624
0
    return bthread::tls_task_group != nullptr ? bthread::tls_task_group->tag()
625
0
                                              : BTHREAD_TAG_DEFAULT;
626
0
}
627
628
0
uint64_t bthread_cpu_clock_ns(void) {
629
0
     bthread::TaskGroup* g = bthread::tls_task_group;
630
0
    if (g != NULL && !g->is_current_main_task()) {
631
0
        return g->current_task_cpu_clock_ns();
632
0
    }
633
0
    return 0;
634
0
}
635
636
}  // extern "C"