Coverage Report

Created: 2024-02-11 06:26

/src/brpc/src/bthread/bthread.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
// Date: Tue Jul 10 17:40:58 CST 2012
21
22
#include <gflags/gflags.h>
23
#include "butil/macros.h"                       // BAIDU_CASSERT
24
#include "butil/logging.h"
25
#include "bthread/task_group.h"                // TaskGroup
26
#include "bthread/task_control.h"              // TaskControl
27
#include "bthread/timer_thread.h"
28
#include "bthread/list_of_abafree_id.h"
29
#include "bthread/bthread.h"
30
31
namespace bthread {
32
33
DEFINE_int32(bthread_concurrency, 8 + BTHREAD_EPOLL_THREAD_NUM,
34
             "Number of pthread workers");
35
36
DEFINE_int32(bthread_min_concurrency, 0,
37
            "Initial number of pthread workers which will be added on-demand."
38
            " The laziness is disabled when this value is non-positive,"
39
            " and workers will be created eagerly according to -bthread_concurrency and bthread_setconcurrency(). ");
40
41
DEFINE_int32(bthread_current_tag, BTHREAD_TAG_DEFAULT, "Set bthread concurrency for this tag");
42
43
DEFINE_int32(bthread_concurrency_by_tag, 0,
44
             "Number of pthread workers of FLAGS_bthread_current_tag");
45
46
static bool never_set_bthread_concurrency = true;
47
static bool never_set_bthread_concurrency_by_tag = true;
48
49
0
static bool validate_bthread_concurrency(const char*, int32_t val) {
50
    // bthread_setconcurrency sets the flag on success path which should
51
    // not be strictly in a validator. But it's OK for a int flag.
52
0
    return bthread_setconcurrency(val) == 0;
53
0
}
54
const int ALLOW_UNUSED register_FLAGS_bthread_concurrency = 
55
    ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_concurrency,
56
                                    validate_bthread_concurrency);
57
58
static bool validate_bthread_min_concurrency(const char*, int32_t val);
59
60
const int ALLOW_UNUSED register_FLAGS_bthread_min_concurrency =
61
    ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_min_concurrency,
62
                                    validate_bthread_min_concurrency);
63
64
static bool validate_bthread_current_tag(const char*, int32_t val);
65
66
const int ALLOW_UNUSED register_FLAGS_bthread_current_tag =
67
    ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_current_tag, validate_bthread_current_tag);
68
69
static bool validate_bthread_concurrency_by_tag(const char*, int32_t val);
70
71
const int ALLOW_UNUSED register_FLAGS_bthread_concurrency_by_tag =
72
    ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_concurrency_by_tag,
73
                                       validate_bthread_concurrency_by_tag);
74
75
BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>), atomic_size_match);
76
77
pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER;
78
// Referenced in rpc, needs to be extern.
79
// Notice that we can't declare the variable as atomic<TaskControl*> which
80
// are not constructed before main().
81
TaskControl* g_task_control = NULL;
82
83
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
84
extern void (*g_worker_startfn)();
85
extern void (*g_tagged_worker_startfn)(bthread_tag_t);
86
87
0
inline TaskControl* get_task_control() {
88
0
    return g_task_control;
89
0
}
90
91
0
inline TaskControl* get_or_new_task_control() {
92
0
    butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
93
0
    TaskControl* c = p->load(butil::memory_order_consume);
94
0
    if (c != NULL) {
95
0
        return c;
96
0
    }
97
0
    BAIDU_SCOPED_LOCK(g_task_control_mutex);
98
0
    c = p->load(butil::memory_order_consume);
99
0
    if (c != NULL) {
100
0
        return c;
101
0
    }
102
0
    c = new (std::nothrow) TaskControl;
103
0
    if (NULL == c) {
104
0
        return NULL;
105
0
    }
106
0
    int concurrency = FLAGS_bthread_min_concurrency > 0 ?
107
0
        FLAGS_bthread_min_concurrency :
108
0
        FLAGS_bthread_concurrency;
109
0
    if (c->init(concurrency) != 0) {
110
0
        LOG(ERROR) << "Fail to init g_task_control";
111
0
        delete c;
112
0
        return NULL;
113
0
    }
114
0
    p->store(c, butil::memory_order_release);
115
0
    return c;
116
0
}
117
118
0
static int add_workers_for_each_tag(int num) {
119
0
    int added = 0;
120
0
    auto c = get_task_control();
121
0
    for (auto i = 0; i < num; ++i) {
122
0
        added += c->add_workers(1, i % FLAGS_task_group_ntags);
123
0
    }
124
0
    return added;
125
0
}
126
127
0
static bool validate_bthread_min_concurrency(const char*, int32_t val) {
128
0
    if (val <= 0) {
129
0
        return true;
130
0
    }
131
0
    if (val < BTHREAD_MIN_CONCURRENCY || val > FLAGS_bthread_concurrency) {
132
0
        return false;
133
0
    }
134
0
    TaskControl* c = get_task_control();
135
0
    if (!c) {
136
0
        return true;
137
0
    }
138
0
    BAIDU_SCOPED_LOCK(g_task_control_mutex);
139
0
    int concurrency = c->concurrency();
140
0
    if (val > concurrency) {
141
0
        int added = bthread::add_workers_for_each_tag(val - concurrency);
142
0
        return added == (val - concurrency);
143
0
    } else {
144
0
        return true;
145
0
    }
146
0
}
147
148
0
static bool validate_bthread_current_tag(const char*, int32_t val) {
149
0
    if (val < BTHREAD_TAG_DEFAULT || val >= FLAGS_task_group_ntags) {
150
0
        return false;
151
0
    }
152
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
153
0
    auto c = bthread::get_task_control();
154
0
    if (c == NULL) {
155
0
        FLAGS_bthread_concurrency_by_tag = 0;
156
0
        return true;
157
0
    }
158
0
    FLAGS_bthread_concurrency_by_tag = c->concurrency(val);
159
0
    return true;
160
0
}
161
162
0
static bool validate_bthread_concurrency_by_tag(const char*, int32_t val) {
163
0
    return bthread_setconcurrency_by_tag(val, FLAGS_bthread_current_tag) == 0;
164
0
}
165
166
__thread TaskGroup* tls_task_group_nosignal = NULL;
167
168
BUTIL_FORCE_INLINE int
169
start_from_non_worker(bthread_t* __restrict tid,
170
                      const bthread_attr_t* __restrict attr,
171
                      void* (*fn)(void*),
172
0
                      void* __restrict arg) {
173
0
    TaskControl* c = get_or_new_task_control();
174
0
    if (NULL == c) {
175
0
        return ENOMEM;
176
0
    }
177
0
    auto tag = BTHREAD_TAG_DEFAULT;
178
0
    if (attr != NULL && attr->tag != BTHREAD_TAG_INVALID) {
179
0
        tag = attr->tag;
180
0
    }
181
0
    if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
182
        // Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons:
183
        // 1. NOSIGNAL is often for creating many bthreads in batch,
184
        //    inserting into the same TaskGroup maximizes the batch.
185
        // 2. bthread_flush() needs to know which TaskGroup to flush.
186
0
        auto g = tls_task_group_nosignal;
187
0
        if (NULL == g) {
188
0
            g = c->choose_one_group(tag);
189
0
            tls_task_group_nosignal = g;
190
0
        }
191
0
        return g->start_background<true>(tid, attr, fn, arg);
192
0
    }
193
0
    return c->choose_one_group(tag)->start_background<true>(tid, attr, fn, arg);
194
0
}
195
196
// Meet one of the three conditions, can run in thread local
197
// attr is nullptr
198
// tag equal to thread local
199
// tag equal to BTHREAD_TAG_INVALID
200
0
BUTIL_FORCE_INLINE bool can_run_thread_local(const bthread_attr_t* __restrict attr) {
201
0
    return attr == nullptr || attr->tag == bthread::tls_task_group->tag() ||
202
0
           attr->tag == BTHREAD_TAG_INVALID;
203
0
}
204
205
struct TidTraits {
206
    static const size_t BLOCK_SIZE = 63;
207
    static const size_t MAX_ENTRIES = 65536;
208
    static const bthread_t ID_INIT;
209
0
    static bool exists(bthread_t id) { return bthread::TaskGroup::exists(id); }
210
};
211
const bthread_t TidTraits::ID_INIT = INVALID_BTHREAD;
212
213
typedef ListOfABAFreeId<bthread_t, TidTraits> TidList;
214
215
struct TidStopper {
216
0
    void operator()(bthread_t id) const { bthread_stop(id); }
217
};
218
struct TidJoiner {
219
0
    void operator()(bthread_t & id) const {
220
0
        bthread_join(id, NULL);
221
0
        id = INVALID_BTHREAD;
222
0
    }
223
};
224
225
}  // namespace bthread
226
227
extern "C" {
228
229
int bthread_start_urgent(bthread_t* __restrict tid,
230
                         const bthread_attr_t* __restrict attr,
231
                         void * (*fn)(void*),
232
0
                         void* __restrict arg) {
233
0
    bthread::TaskGroup* g = bthread::tls_task_group;
234
0
    if (g) {
235
        // if attribute is null use thread local task group
236
0
        if (bthread::can_run_thread_local(attr)) {
237
0
            return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
238
0
        }
239
0
    }
240
0
    return bthread::start_from_non_worker(tid, attr, fn, arg);
241
0
}
242
243
int bthread_start_background(bthread_t* __restrict tid,
244
                             const bthread_attr_t* __restrict attr,
245
                             void * (*fn)(void*),
246
0
                             void* __restrict arg) {
247
0
    bthread::TaskGroup* g = bthread::tls_task_group;
248
0
    if (g) {
249
        // if attribute is null use thread local task group
250
0
        if (bthread::can_run_thread_local(attr)) {
251
0
            return g->start_background<false>(tid, attr, fn, arg);
252
0
        }
253
0
    }
254
0
    return bthread::start_from_non_worker(tid, attr, fn, arg);
255
0
}
256
257
0
void bthread_flush() {
258
0
    bthread::TaskGroup* g = bthread::tls_task_group;
259
0
    if (g) {
260
0
        return g->flush_nosignal_tasks();
261
0
    }
262
0
    g = bthread::tls_task_group_nosignal;
263
0
    if (g) {
264
        // NOSIGNAL tasks were created in this non-worker.
265
0
        bthread::tls_task_group_nosignal = NULL;
266
0
        return g->flush_nosignal_tasks_remote();
267
0
    }
268
0
}
269
270
0
int bthread_interrupt(bthread_t tid) {
271
0
    return bthread::TaskGroup::interrupt(tid, bthread::get_task_control());
272
0
}
273
274
0
int bthread_stop(bthread_t tid) {
275
0
    bthread::TaskGroup::set_stopped(tid);
276
0
    return bthread_interrupt(tid);
277
0
}
278
279
0
int bthread_stopped(bthread_t tid) {
280
0
    return (int)bthread::TaskGroup::is_stopped(tid);
281
0
}
282
283
0
bthread_t bthread_self(void) {
284
0
    bthread::TaskGroup* g = bthread::tls_task_group;
285
    // note: return 0 for main tasks now, which include main thread and
286
    // all work threads. So that we can identify main tasks from logs
287
    // more easily. This is probably questionable in future.
288
0
    if (g != NULL && !g->is_current_main_task()/*note*/) {
289
0
        return g->current_tid();
290
0
    }
291
0
    return INVALID_BTHREAD;
292
0
}
293
294
0
int bthread_equal(bthread_t t1, bthread_t t2) {
295
0
    return t1 == t2;
296
0
}
297
298
0
void bthread_exit(void* retval) {
299
0
    bthread::TaskGroup* g = bthread::tls_task_group;
300
0
    if (g != NULL && !g->is_current_main_task()) {
301
0
        throw bthread::ExitException(retval);
302
0
    } else {
303
0
        pthread_exit(retval);
304
0
    }
305
0
}
306
307
0
int bthread_join(bthread_t tid, void** thread_return) {
308
0
    return bthread::TaskGroup::join(tid, thread_return);
309
0
}
310
311
0
int bthread_attr_init(bthread_attr_t* a) {
312
0
    *a = BTHREAD_ATTR_NORMAL;
313
0
    return 0;
314
0
}
315
316
0
int bthread_attr_destroy(bthread_attr_t*) {
317
0
    return 0;
318
0
}
319
320
0
int bthread_getattr(bthread_t tid, bthread_attr_t* attr) {
321
0
    return bthread::TaskGroup::get_attr(tid, attr);
322
0
}
323
324
0
int bthread_getconcurrency(void) {
325
0
    return bthread::FLAGS_bthread_concurrency;
326
0
}
327
328
0
int bthread_setconcurrency(int num) {
329
0
    if (num < BTHREAD_MIN_CONCURRENCY || num > BTHREAD_MAX_CONCURRENCY) {
330
0
        LOG(ERROR) << "Invalid concurrency=" << num;
331
0
        return EINVAL;
332
0
    }
333
0
    if (bthread::FLAGS_bthread_min_concurrency > 0) {
334
0
        if (num < bthread::FLAGS_bthread_min_concurrency) {
335
0
            return EINVAL;
336
0
        }
337
0
        if (bthread::never_set_bthread_concurrency) {
338
0
            bthread::never_set_bthread_concurrency = false;
339
0
        }
340
0
        bthread::FLAGS_bthread_concurrency = num;
341
0
        return 0;
342
0
    }
343
0
    bthread::TaskControl* c = bthread::get_task_control();
344
0
    if (c != NULL) {
345
0
        if (num < c->concurrency()) {
346
0
            return EPERM;
347
0
        } else if (num == c->concurrency()) {
348
0
            return 0;
349
0
        }
350
0
    }
351
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
352
0
    c = bthread::get_task_control();
353
0
    if (c == NULL) {
354
0
        if (bthread::never_set_bthread_concurrency) {
355
0
            bthread::never_set_bthread_concurrency = false;
356
0
            bthread::FLAGS_bthread_concurrency = num;
357
0
        } else if (num > bthread::FLAGS_bthread_concurrency) {
358
0
            bthread::FLAGS_bthread_concurrency = num;
359
0
        }
360
0
        return 0;
361
0
    }
362
0
    if (bthread::FLAGS_bthread_concurrency != c->concurrency()) {
363
0
        LOG(ERROR) << "CHECK failed: bthread_concurrency="
364
0
                   << bthread::FLAGS_bthread_concurrency
365
0
                   << " != tc_concurrency=" << c->concurrency();
366
0
        bthread::FLAGS_bthread_concurrency = c->concurrency();
367
0
    }
368
0
    if (num > bthread::FLAGS_bthread_concurrency) {
369
        // Create more workers if needed.
370
0
        auto added = bthread::add_workers_for_each_tag(num - bthread::FLAGS_bthread_concurrency);
371
0
        bthread::FLAGS_bthread_concurrency += added;
372
0
    }
373
0
    return (num == bthread::FLAGS_bthread_concurrency ? 0 : EPERM);
374
0
}
375
376
0
int bthread_getconcurrency_by_tag(bthread_tag_t tag) {
377
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
378
0
    auto c = bthread::get_task_control();
379
0
    if (c == NULL) {
380
0
        return EPERM;
381
0
    }
382
0
    return c->concurrency(tag);
383
0
}
384
385
0
int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) {
386
0
    if (bthread::never_set_bthread_concurrency_by_tag) {
387
0
        bthread::never_set_bthread_concurrency_by_tag = false;
388
0
        return 0;
389
0
    }
390
0
    BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
391
0
    auto c = bthread::get_task_control();
392
0
    if (c == NULL) {
393
0
        return EPERM;
394
0
    }
395
0
    auto ngroup = c->concurrency();
396
0
    auto tag_ngroup = c->concurrency(tag);
397
0
    auto add = num - tag_ngroup;
398
0
    if (ngroup + add > bthread::FLAGS_bthread_concurrency) {
399
0
        LOG(ERROR) << "Fail to set concurrency by tag " << tag
400
0
                   << ", Whole concurrency larger than bthread_concurrency";
401
0
        return EPERM;
402
0
    }
403
0
    auto added = 0;
404
0
    if (add > 0) {
405
0
        added = c->add_workers(add, tag);
406
0
        return (add == added ? 0 : EPERM);
407
0
    }
408
0
    return (num == tag_ngroup ? 0 : EPERM);
409
0
}
410
411
0
int bthread_about_to_quit() {
412
0
    bthread::TaskGroup* g = bthread::tls_task_group;
413
0
    if (g != NULL) {
414
0
        bthread::TaskMeta* current_task = g->current_task();
415
0
        if(!(current_task->attr.flags & BTHREAD_NEVER_QUIT)) {
416
0
            current_task->about_to_quit = true;
417
0
        }
418
0
        return 0;
419
0
    }
420
0
    return EPERM;
421
0
}
422
423
int bthread_timer_add(bthread_timer_t* id, timespec abstime,
424
0
                      void (*on_timer)(void*), void* arg) {
425
0
    bthread::TaskControl* c = bthread::get_or_new_task_control();
426
0
    if (c == NULL) {
427
0
        return ENOMEM;
428
0
    }
429
0
    bthread::TimerThread* tt = bthread::get_or_create_global_timer_thread();
430
0
    if (tt == NULL) {
431
0
        return ENOMEM;
432
0
    }
433
0
    bthread_timer_t tmp = tt->schedule(on_timer, arg, abstime);
434
0
    if (tmp != 0) {
435
0
        *id = tmp;
436
0
        return 0;
437
0
    }
438
0
    return ESTOP;
439
0
}
440
441
0
int bthread_timer_del(bthread_timer_t id) {
442
0
    bthread::TaskControl* c = bthread::get_task_control();
443
0
    if (c != NULL) {
444
0
        bthread::TimerThread* tt = bthread::get_global_timer_thread();
445
0
        if (tt == NULL) {
446
0
            return EINVAL;
447
0
        }
448
0
        const int state = tt->unschedule(id);
449
0
        if (state >= 0) {
450
0
            return state;
451
0
        }
452
0
    }
453
0
    return EINVAL;
454
0
}
455
456
0
int bthread_usleep(uint64_t microseconds) {
457
0
    bthread::TaskGroup* g = bthread::tls_task_group;
458
0
    if (NULL != g && !g->is_current_pthread_task()) {
459
0
        return bthread::TaskGroup::usleep(&g, microseconds);
460
0
    }
461
0
    return ::usleep(microseconds);
462
0
}
463
464
0
int bthread_yield(void) {
465
0
    bthread::TaskGroup* g = bthread::tls_task_group;
466
0
    if (NULL != g && !g->is_current_pthread_task()) {
467
0
        bthread::TaskGroup::yield(&g);
468
0
        return 0;
469
0
    }
470
    // pthread_yield is not available on MAC
471
0
    return sched_yield();
472
0
}
473
474
0
int bthread_set_worker_startfn(void (*start_fn)()) {
475
0
    if (start_fn == NULL) {
476
0
        return EINVAL;
477
0
    }
478
0
    bthread::g_worker_startfn = start_fn;
479
0
    return 0;
480
0
}
481
482
0
int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)) {
483
0
    if (start_fn == NULL) {
484
0
        return EINVAL;
485
0
    }
486
0
    bthread::g_tagged_worker_startfn = start_fn;
487
0
    return 0;
488
0
}
489
490
0
void bthread_stop_world() {
491
0
    bthread::TaskControl* c = bthread::get_task_control();
492
0
    if (c != NULL) {
493
0
        c->stop_and_join();
494
0
    }
495
0
}
496
497
int bthread_list_init(bthread_list_t* list,
498
                      unsigned /*size*/,
499
0
                      unsigned /*conflict_size*/) {
500
0
    list->impl = new (std::nothrow) bthread::TidList;
501
0
    if (NULL == list->impl) {
502
0
        return ENOMEM;
503
0
    }
504
    // Set unused fields to zero as well.
505
0
    list->head = 0;
506
0
    list->size = 0;
507
0
    list->conflict_head = 0;
508
0
    list->conflict_size = 0;
509
0
    return 0;
510
0
}
511
512
0
void bthread_list_destroy(bthread_list_t* list) {
513
0
    delete static_cast<bthread::TidList*>(list->impl);
514
0
    list->impl = NULL;
515
0
}
516
517
0
int bthread_list_add(bthread_list_t* list, bthread_t id) {
518
0
    if (list->impl == NULL) {
519
0
        return EINVAL;
520
0
    }
521
0
    return static_cast<bthread::TidList*>(list->impl)->add(id);
522
0
}
523
524
0
int bthread_list_stop(bthread_list_t* list) {
525
0
    if (list->impl == NULL) {
526
0
        return EINVAL;
527
0
    }
528
0
    static_cast<bthread::TidList*>(list->impl)->apply(bthread::TidStopper());
529
0
    return 0;
530
0
}
531
532
0
int bthread_list_join(bthread_list_t* list) {
533
0
    if (list->impl == NULL) {
534
0
        return EINVAL;
535
0
    }
536
0
    static_cast<bthread::TidList*>(list->impl)->apply(bthread::TidJoiner());
537
0
    return 0;
538
0
}
539
540
0
bthread_tag_t bthread_self_tag(void) {
541
0
    return bthread::tls_task_group != nullptr ? bthread::tls_task_group->tag()
542
0
                                              : BTHREAD_TAG_DEFAULT;
543
0
}
544
545
}  // extern "C"