Coverage Report

Created: 2024-09-08 06:43

/src/brpc/src/bthread/bthread.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
// Date: Tue Jul 10 17:40:58 CST 2012
21
22
#include <gflags/gflags.h>
23
#include "butil/macros.h"                       // BAIDU_CASSERT
24
#include "butil/logging.h"
25
#include "bthread/task_group.h"                // TaskGroup
26
#include "bthread/task_control.h"              // TaskControl
27
#include "bthread/timer_thread.h"
28
#include "bthread/list_of_abafree_id.h"
29
#include "bthread/bthread.h"
30
31
namespace bthread {
32
33
DEFINE_int32(bthread_concurrency, 8 + BTHREAD_EPOLL_THREAD_NUM,
34
             "Number of pthread workers");
35
36
DEFINE_int32(bthread_min_concurrency, 0,
37
            "Initial number of pthread workers which will be added on-demand."
38
            " The laziness is disabled when this value is non-positive,"
39
            " and workers will be created eagerly according to -bthread_concurrency and bthread_setconcurrency(). ");
40
41
DEFINE_int32(bthread_current_tag, BTHREAD_TAG_INVALID, "Set bthread concurrency for this tag");
42
43
DEFINE_int32(bthread_concurrency_by_tag, 8 + BTHREAD_EPOLL_THREAD_NUM,
44
             "Number of pthread workers of FLAGS_bthread_current_tag");
45
46
static bool never_set_bthread_concurrency = true;
47
48
0
static bool validate_bthread_concurrency(const char*, int32_t val) {
49
    // bthread_setconcurrency sets the flag on success path which should
50
    // not be strictly in a validator. But it's OK for a int flag.
51
0
    return bthread_setconcurrency(val) == 0;
52
0
}
53
const int ALLOW_UNUSED register_FLAGS_bthread_concurrency = 
54
    ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_concurrency,
55
                                    validate_bthread_concurrency);
56
57
static bool validate_bthread_min_concurrency(const char*, int32_t val);
58
59
const int ALLOW_UNUSED register_FLAGS_bthread_min_concurrency =
60
    ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_min_concurrency,
61
                                    validate_bthread_min_concurrency);
62
63
static bool validate_bthread_current_tag(const char*, int32_t val);
64
65
const int ALLOW_UNUSED register_FLAGS_bthread_current_tag =
66
    ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_current_tag, validate_bthread_current_tag);
67
68
static bool validate_bthread_concurrency_by_tag(const char*, int32_t val);
69
70
const int ALLOW_UNUSED register_FLAGS_bthread_concurrency_by_tag =
71
    ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_concurrency_by_tag,
72
                                       validate_bthread_concurrency_by_tag);
73
74
BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>), atomic_size_match);
75
76
pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER;
77
// Referenced in rpc, needs to be extern.
78
// Notice that we can't declare the variable as atomic<TaskControl*> which
79
// are not constructed before main().
80
TaskControl* g_task_control = NULL;
81
82
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
83
extern void (*g_worker_startfn)();
84
extern void (*g_tagged_worker_startfn)(bthread_tag_t);
85
extern void* (*g_create_span_func)();
86
87
0
inline TaskControl* get_task_control() {
88
0
    return g_task_control;
89
0
}
90
91
0
inline TaskControl* get_or_new_task_control() {
92
0
    butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
93
0
    TaskControl* c = p->load(butil::memory_order_consume);
94
0
    if (c != NULL) {
95
0
        return c;
96
0
    }
97
0
    BAIDU_SCOPED_LOCK(g_task_control_mutex);
98
0
    c = p->load(butil::memory_order_consume);
99
0
    if (c != NULL) {
100
0
        return c;
101
0
    }
102
0
    c = new (std::nothrow) TaskControl;
103
0
    if (NULL == c) {
104
0
        return NULL;
105
0
    }
106
0
    int concurrency = FLAGS_bthread_min_concurrency > 0 ?
107
0
        FLAGS_bthread_min_concurrency :
108
0
        FLAGS_bthread_concurrency;
109
0
    if (c->init(concurrency) != 0) {
110
0
        LOG(ERROR) << "Fail to init g_task_control";
111
0
        delete c;
112
0
        return NULL;
113
0
    }
114
0
    p->store(c, butil::memory_order_release);
115
0
    return c;
116
0
}
117
118
0
static int add_workers_for_each_tag(int num) {
119
0
    int added = 0;
120
0
    auto c = get_task_control();
121
0
    for (auto i = 0; i < num; ++i) {
122
0
        added += c->add_workers(1, i % FLAGS_task_group_ntags);
123
0
    }
124
0
    return added;
125
0
}
126
127
0
static bool validate_bthread_min_concurrency(const char*, int32_t val) {
128
0
    if (val <= 0) {
129
0
        return true;
130
0
    }
131
0
    if (val < BTHREAD_MIN_CONCURRENCY || val > FLAGS_bthread_concurrency) {
132
0
        return false;
133
0
    }
134
0
    TaskControl* c = get_task_control();
135
0
    if (!c) {
136
0
        return true;
137
0
    }
138
0
    BAIDU_SCOPED_LOCK(g_task_control_mutex);
139
0
    int concurrency = c->concurrency();
140
0
    if (val > concurrency) {
141
0
        int added = bthread::add_workers_for_each_tag(val - concurrency);
142
0
        return added == (val - concurrency);
143
0
    } else {
144
0
        return true;
145
0
    }
146
0
}
147
148
0
static bool validate_bthread_current_tag(const char*, int32_t val) {
149
0
    if (val == BTHREAD_TAG_INVALID) {
150
0
        return true;
151
0
    } else if (val < BTHREAD_TAG_DEFAULT || val >= FLAGS_task_group_ntags) {
152
0
        return false;
153
0
    }
154
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
155
0
    auto c = bthread::get_task_control();
156
0
    if (c == NULL) {
157
0
        FLAGS_bthread_concurrency_by_tag = 8 + BTHREAD_EPOLL_THREAD_NUM;
158
0
        return true;
159
0
    }
160
0
    FLAGS_bthread_concurrency_by_tag = c->concurrency(val);
161
0
    return true;
162
0
}
163
164
0
static bool validate_bthread_concurrency_by_tag(const char*, int32_t val) {
165
0
    return bthread_setconcurrency_by_tag(val, FLAGS_bthread_current_tag) == 0;
166
0
}
167
168
__thread TaskGroup* tls_task_group_nosignal = NULL;
169
170
BUTIL_FORCE_INLINE int
171
start_from_non_worker(bthread_t* __restrict tid,
172
                      const bthread_attr_t* __restrict attr,
173
                      void* (*fn)(void*),
174
0
                      void* __restrict arg) {
175
0
    TaskControl* c = get_or_new_task_control();
176
0
    if (NULL == c) {
177
0
        return ENOMEM;
178
0
    }
179
0
    auto tag = BTHREAD_TAG_DEFAULT;
180
0
    if (attr != NULL && attr->tag != BTHREAD_TAG_INVALID) {
181
0
        tag = attr->tag;
182
0
    }
183
0
    if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
184
        // Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons:
185
        // 1. NOSIGNAL is often for creating many bthreads in batch,
186
        //    inserting into the same TaskGroup maximizes the batch.
187
        // 2. bthread_flush() needs to know which TaskGroup to flush.
188
0
        auto g = tls_task_group_nosignal;
189
0
        if (NULL == g) {
190
0
            g = c->choose_one_group(tag);
191
0
            tls_task_group_nosignal = g;
192
0
        }
193
0
        return g->start_background<true>(tid, attr, fn, arg);
194
0
    }
195
0
    return c->choose_one_group(tag)->start_background<true>(tid, attr, fn, arg);
196
0
}
197
198
// Meet one of the three conditions, can run in thread local
199
// attr is nullptr
200
// tag equal to thread local
201
// tag equal to BTHREAD_TAG_INVALID
202
0
BUTIL_FORCE_INLINE bool can_run_thread_local(const bthread_attr_t* __restrict attr) {
203
0
    return attr == nullptr || attr->tag == bthread::tls_task_group->tag() ||
204
0
           attr->tag == BTHREAD_TAG_INVALID;
205
0
}
206
207
struct TidTraits {
208
    static const size_t BLOCK_SIZE = 63;
209
    static const size_t MAX_ENTRIES = 65536;
210
    static const size_t INIT_GC_SIZE = 65536;
211
    static const bthread_t ID_INIT;
212
0
    static bool exists(bthread_t id) { return bthread::TaskGroup::exists(id); }
213
};
214
const bthread_t TidTraits::ID_INIT = INVALID_BTHREAD;
215
216
typedef ListOfABAFreeId<bthread_t, TidTraits> TidList;
217
218
struct TidStopper {
219
0
    void operator()(bthread_t id) const { bthread_stop(id); }
220
};
221
struct TidJoiner {
222
0
    void operator()(bthread_t & id) const {
223
0
        bthread_join(id, NULL);
224
0
        id = INVALID_BTHREAD;
225
0
    }
226
};
227
228
}  // namespace bthread
229
230
extern "C" {
231
232
int bthread_start_urgent(bthread_t* __restrict tid,
233
                         const bthread_attr_t* __restrict attr,
234
                         void * (*fn)(void*),
235
0
                         void* __restrict arg) {
236
0
    bthread::TaskGroup* g = bthread::tls_task_group;
237
0
    if (g) {
238
        // if attribute is null use thread local task group
239
0
        if (bthread::can_run_thread_local(attr)) {
240
0
            return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
241
0
        }
242
0
    }
243
0
    return bthread::start_from_non_worker(tid, attr, fn, arg);
244
0
}
245
246
int bthread_start_background(bthread_t* __restrict tid,
247
                             const bthread_attr_t* __restrict attr,
248
                             void * (*fn)(void*),
249
0
                             void* __restrict arg) {
250
0
    bthread::TaskGroup* g = bthread::tls_task_group;
251
0
    if (g) {
252
        // if attribute is null use thread local task group
253
0
        if (bthread::can_run_thread_local(attr)) {
254
0
            return g->start_background<false>(tid, attr, fn, arg);
255
0
        }
256
0
    }
257
0
    return bthread::start_from_non_worker(tid, attr, fn, arg);
258
0
}
259
260
0
void bthread_flush() {
261
0
    bthread::TaskGroup* g = bthread::tls_task_group;
262
0
    if (g) {
263
0
        return g->flush_nosignal_tasks();
264
0
    }
265
0
    g = bthread::tls_task_group_nosignal;
266
0
    if (g) {
267
        // NOSIGNAL tasks were created in this non-worker.
268
0
        bthread::tls_task_group_nosignal = NULL;
269
0
        return g->flush_nosignal_tasks_remote();
270
0
    }
271
0
}
272
273
0
int bthread_interrupt(bthread_t tid, bthread_tag_t tag) {
274
0
    return bthread::TaskGroup::interrupt(tid, bthread::get_task_control(), tag);
275
0
}
276
277
0
int bthread_stop(bthread_t tid) {
278
0
    bthread::TaskGroup::set_stopped(tid);
279
0
    return bthread_interrupt(tid);
280
0
}
281
282
0
int bthread_stopped(bthread_t tid) {
283
0
    return (int)bthread::TaskGroup::is_stopped(tid);
284
0
}
285
286
23.2k
bthread_t bthread_self(void) {
287
23.2k
    bthread::TaskGroup* g = bthread::tls_task_group;
288
    // note: return 0 for main tasks now, which include main thread and
289
    // all work threads. So that we can identify main tasks from logs
290
    // more easily. This is probably questionable in future.
291
23.2k
    if (g != NULL && !g->is_current_main_task()/*note*/) {
292
0
        return g->current_tid();
293
0
    }
294
23.2k
    return INVALID_BTHREAD;
295
23.2k
}
296
297
0
int bthread_equal(bthread_t t1, bthread_t t2) {
298
0
    return t1 == t2;
299
0
}
300
301
0
void bthread_exit(void* retval) {
302
0
    bthread::TaskGroup* g = bthread::tls_task_group;
303
0
    if (g != NULL && !g->is_current_main_task()) {
304
0
        throw bthread::ExitException(retval);
305
0
    } else {
306
0
        pthread_exit(retval);
307
0
    }
308
0
}
309
310
0
int bthread_join(bthread_t tid, void** thread_return) {
311
0
    return bthread::TaskGroup::join(tid, thread_return);
312
0
}
313
314
0
int bthread_attr_init(bthread_attr_t* a) {
315
0
    *a = BTHREAD_ATTR_NORMAL;
316
0
    return 0;
317
0
}
318
319
0
int bthread_attr_destroy(bthread_attr_t*) {
320
0
    return 0;
321
0
}
322
323
0
int bthread_getattr(bthread_t tid, bthread_attr_t* attr) {
324
0
    return bthread::TaskGroup::get_attr(tid, attr);
325
0
}
326
327
0
int bthread_getconcurrency(void) {
328
0
    return bthread::FLAGS_bthread_concurrency;
329
0
}
330
331
0
int bthread_setconcurrency(int num) {
332
0
    if (num < BTHREAD_MIN_CONCURRENCY || num > BTHREAD_MAX_CONCURRENCY) {
333
0
        LOG(ERROR) << "Invalid concurrency=" << num;
334
0
        return EINVAL;
335
0
    }
336
0
    if (bthread::FLAGS_bthread_min_concurrency > 0) {
337
0
        if (num < bthread::FLAGS_bthread_min_concurrency) {
338
0
            return EINVAL;
339
0
        }
340
0
        if (bthread::never_set_bthread_concurrency) {
341
0
            bthread::never_set_bthread_concurrency = false;
342
0
        }
343
0
        bthread::FLAGS_bthread_concurrency = num;
344
0
        return 0;
345
0
    }
346
0
    bthread::TaskControl* c = bthread::get_task_control();
347
0
    if (c != NULL) {
348
0
        if (num < c->concurrency()) {
349
0
            return EPERM;
350
0
        } else if (num == c->concurrency()) {
351
0
            return 0;
352
0
        }
353
0
    }
354
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
355
0
    c = bthread::get_task_control();
356
0
    if (c == NULL) {
357
0
        if (bthread::never_set_bthread_concurrency) {
358
0
            bthread::never_set_bthread_concurrency = false;
359
0
            bthread::FLAGS_bthread_concurrency = num;
360
0
        } else if (num > bthread::FLAGS_bthread_concurrency) {
361
0
            bthread::FLAGS_bthread_concurrency = num;
362
0
        }
363
0
        return 0;
364
0
    }
365
0
    if (bthread::FLAGS_bthread_concurrency != c->concurrency()) {
366
0
        LOG(ERROR) << "CHECK failed: bthread_concurrency="
367
0
                   << bthread::FLAGS_bthread_concurrency
368
0
                   << " != tc_concurrency=" << c->concurrency();
369
0
        bthread::FLAGS_bthread_concurrency = c->concurrency();
370
0
    }
371
0
    if (num > bthread::FLAGS_bthread_concurrency) {
372
        // Create more workers if needed.
373
0
        auto added = bthread::add_workers_for_each_tag(num - bthread::FLAGS_bthread_concurrency);
374
0
        bthread::FLAGS_bthread_concurrency += added;
375
0
    }
376
0
    return (num == bthread::FLAGS_bthread_concurrency ? 0 : EPERM);
377
0
}
378
379
0
int bthread_getconcurrency_by_tag(bthread_tag_t tag) {
380
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
381
0
    auto c = bthread::get_task_control();
382
0
    if (c == NULL) {
383
0
        return EPERM;
384
0
    }
385
0
    return c->concurrency(tag);
386
0
}
387
388
0
int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) {
389
0
    if (tag == BTHREAD_TAG_INVALID) {
390
0
        return 0;
391
0
    } else if (tag < BTHREAD_TAG_DEFAULT || tag >= FLAGS_task_group_ntags) {
392
0
        return EINVAL;
393
0
    }
394
0
    auto c = bthread::get_or_new_task_control();
395
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
396
0
    auto ngroup = c->concurrency();
397
0
    auto tag_ngroup = c->concurrency(tag);
398
0
    auto add = num - tag_ngroup;
399
0
    if (ngroup + add > bthread::FLAGS_bthread_concurrency) {
400
0
        LOG(ERROR) << "Fail to set concurrency by tag " << tag
401
0
                   << ", Total concurrency larger than bthread_concurrency";
402
0
        return EPERM;
403
0
    }
404
0
    auto added = 0;
405
0
    if (add > 0) {
406
0
        added = c->add_workers(add, tag);
407
0
        return (add == added ? 0 : EPERM);
408
0
    }
409
0
    return (num == tag_ngroup ? 0 : EPERM);
410
0
}
411
412
0
int bthread_about_to_quit() {
413
0
    bthread::TaskGroup* g = bthread::tls_task_group;
414
0
    if (g != NULL) {
415
0
        bthread::TaskMeta* current_task = g->current_task();
416
0
        if(!(current_task->attr.flags & BTHREAD_NEVER_QUIT)) {
417
0
            current_task->about_to_quit = true;
418
0
        }
419
0
        return 0;
420
0
    }
421
0
    return EPERM;
422
0
}
423
424
int bthread_timer_add(bthread_timer_t* id, timespec abstime,
425
0
                      void (*on_timer)(void*), void* arg) {
426
0
    bthread::TaskControl* c = bthread::get_or_new_task_control();
427
0
    if (c == NULL) {
428
0
        return ENOMEM;
429
0
    }
430
0
    bthread::TimerThread* tt = bthread::get_or_create_global_timer_thread();
431
0
    if (tt == NULL) {
432
0
        return ENOMEM;
433
0
    }
434
0
    bthread_timer_t tmp = tt->schedule(on_timer, arg, abstime);
435
0
    if (tmp != 0) {
436
0
        *id = tmp;
437
0
        return 0;
438
0
    }
439
0
    return ESTOP;
440
0
}
441
442
0
int bthread_timer_del(bthread_timer_t id) {
443
0
    bthread::TaskControl* c = bthread::get_task_control();
444
0
    if (c != NULL) {
445
0
        bthread::TimerThread* tt = bthread::get_global_timer_thread();
446
0
        if (tt == NULL) {
447
0
            return EINVAL;
448
0
        }
449
0
        const int state = tt->unschedule(id);
450
0
        if (state >= 0) {
451
0
            return state;
452
0
        }
453
0
    }
454
0
    return EINVAL;
455
0
}
456
457
0
int bthread_usleep(uint64_t microseconds) {
458
0
    bthread::TaskGroup* g = bthread::tls_task_group;
459
0
    if (NULL != g && !g->is_current_pthread_task()) {
460
0
        return bthread::TaskGroup::usleep(&g, microseconds);
461
0
    }
462
0
    return ::usleep(microseconds);
463
0
}
464
465
0
int bthread_yield(void) {
466
0
    bthread::TaskGroup* g = bthread::tls_task_group;
467
0
    if (NULL != g && !g->is_current_pthread_task()) {
468
0
        bthread::TaskGroup::yield(&g);
469
0
        return 0;
470
0
    }
471
    // pthread_yield is not available on MAC
472
0
    return sched_yield();
473
0
}
474
475
0
int bthread_set_worker_startfn(void (*start_fn)()) {
476
0
    if (start_fn == NULL) {
477
0
        return EINVAL;
478
0
    }
479
0
    bthread::g_worker_startfn = start_fn;
480
0
    return 0;
481
0
}
482
483
0
int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)) {
484
0
    if (start_fn == NULL) {
485
0
        return EINVAL;
486
0
    }
487
0
    bthread::g_tagged_worker_startfn = start_fn;
488
0
    return 0;
489
0
}
490
491
0
int bthread_set_create_span_func(void* (*func)()) {
492
0
    if (func == NULL) {
493
0
        return EINVAL;
494
0
    }
495
0
    bthread::g_create_span_func = func;
496
0
    return 0;
497
0
}
498
499
0
void bthread_stop_world() {
500
0
    bthread::TaskControl* c = bthread::get_task_control();
501
0
    if (c != NULL) {
502
0
        c->stop_and_join();
503
0
    }
504
0
}
505
506
int bthread_list_init(bthread_list_t* list,
507
                      unsigned /*size*/,
508
0
                      unsigned /*conflict_size*/) {
509
0
    list->impl = new (std::nothrow) bthread::TidList;
510
0
    if (NULL == list->impl) {
511
0
        return ENOMEM;
512
0
    }
513
    // Set unused fields to zero as well.
514
0
    list->head = 0;
515
0
    list->size = 0;
516
0
    list->conflict_head = 0;
517
0
    list->conflict_size = 0;
518
0
    return 0;
519
0
}
520
521
0
void bthread_list_destroy(bthread_list_t* list) {
522
0
    delete static_cast<bthread::TidList*>(list->impl);
523
0
    list->impl = NULL;
524
0
}
525
526
0
int bthread_list_add(bthread_list_t* list, bthread_t id) {
527
0
    if (list->impl == NULL) {
528
0
        return EINVAL;
529
0
    }
530
0
    return static_cast<bthread::TidList*>(list->impl)->add(id);
531
0
}
532
533
0
int bthread_list_stop(bthread_list_t* list) {
534
0
    if (list->impl == NULL) {
535
0
        return EINVAL;
536
0
    }
537
0
    static_cast<bthread::TidList*>(list->impl)->apply(bthread::TidStopper());
538
0
    return 0;
539
0
}
540
541
0
int bthread_list_join(bthread_list_t* list) {
542
0
    if (list->impl == NULL) {
543
0
        return EINVAL;
544
0
    }
545
0
    static_cast<bthread::TidList*>(list->impl)->apply(bthread::TidJoiner());
546
0
    return 0;
547
0
}
548
549
0
bthread_tag_t bthread_self_tag(void) {
550
0
    return bthread::tls_task_group != nullptr ? bthread::tls_task_group->tag()
551
0
                                              : BTHREAD_TAG_DEFAULT;
552
0
}
553
554
}  // extern "C"