Coverage Report

Created: 2025-10-10 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/bthread/task_group.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
// Date: Tue Jul 10 17:40:58 CST 2012
21
22
#ifndef BTHREAD_TASK_GROUP_H
23
#define BTHREAD_TASK_GROUP_H
24
25
#include "butil/time.h"                             // cpuwide_time_ns
26
#include "bthread/task_control.h"
27
#include "bthread/task_meta.h"                     // bthread_t, TaskMeta
28
#include "bthread/work_stealing_queue.h"           // WorkStealingQueue
29
#include "bthread/remote_task_queue.h"             // RemoteTaskQueue
30
#include "butil/resource_pool.h"                    // ResourceId
31
#include "bthread/parking_lot.h"
32
#include "bthread/prime_offset.h"
33
34
namespace bthread {
35
36
// For exiting a bthread.
37
class ExitException : public std::exception {
38
public:
39
0
    explicit ExitException(void* value) : _value(value) {}
40
0
    ~ExitException() throw() {}
41
0
    const char* what() const throw() override {
42
0
        return "ExitException";
43
0
    }
44
0
    void* value() const {
45
0
        return _value;
46
0
    }
47
private:
48
    void* _value;
49
};
50
51
// Refer to https://rigtorp.se/isatomic/, On the modern CPU microarchitectures
52
// (Skylake and Zen 2) AVX/AVX2 128b/256b aligned loads and stores are atomic
53
// even though Intel and AMD officially doesn’t guarantee this.
54
// On X86, SSE instructions can ensure atomic loads and stores.
55
// Starting from Armv8.4-A, neon can ensure atomic loads and stores.
56
// Otherwise, use mutex to guarantee atomicity.
57
class AtomicInteger128 {
58
public:
59
    struct BAIDU_CACHELINE_ALIGNMENT Value {
60
        int64_t v1;
61
        int64_t v2;
62
    };
63
64
0
    AtomicInteger128() = default;
65
0
    explicit AtomicInteger128(Value value) : _value(value) {}
66
67
    Value load() const;
68
0
    Value load_unsafe() const {
69
0
        return _value;
70
0
    }
71
72
    void store(Value value);
73
74
private:
75
    Value _value{};
76
    // Used to protect `_cpu_time_stat' when __x86_64__ and __ARM_NEON is not defined.
77
    FastPthreadMutex _mutex;
78
};
79
80
// Thread-local group of tasks.
81
// Notice that most methods involving context switching are static otherwise
82
// pointer `this' may change after wakeup. The **pg parameters in following
83
// function are updated before returning.
84
class TaskGroup {
85
public:
86
    // Create task `fn(arg)' with attributes `attr' in TaskGroup *pg and put
87
    // the identifier into `tid'. Switch to the new task and schedule old task
88
    // to run.
89
    // Return 0 on success, errno otherwise.
90
    static int start_foreground(TaskGroup** pg,
91
                                bthread_t* __restrict tid,
92
                                const bthread_attr_t* __restrict attr,
93
                                void * (*fn)(void*),
94
                                void* __restrict arg);
95
96
    // Create task `fn(arg)' with attributes `attr' in this TaskGroup, put the
97
    // identifier into `tid'. Schedule the new thread to run.
98
    //   Called from worker: start_background<false>
99
    //   Called from non-worker: start_background<true>
100
    // Return 0 on success, errno otherwise.
101
    template <bool REMOTE>
102
    int start_background(bthread_t* __restrict tid,
103
                         const bthread_attr_t* __restrict attr,
104
                         void * (*fn)(void*),
105
                         void* __restrict arg);
106
107
    // Suspend caller and run next bthread in TaskGroup *pg.
108
    static void sched(TaskGroup** pg);
109
    static void ending_sched(TaskGroup** pg);
110
111
    // Suspend caller and run bthread `next_tid' in TaskGroup *pg.
112
    // Purpose of this function is to avoid pushing `next_tid' to _rq and
113
    // then being popped by sched(pg), which is not necessary.
114
    static void sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending);
115
    static void sched_to(TaskGroup** pg, bthread_t next_tid);
116
    static void exchange(TaskGroup** pg, TaskMeta* next_meta);
117
118
    // The callback will be run in the beginning of next-run bthread.
119
    // Can't be called by current bthread directly because it often needs
120
    // the target to be suspended already.
121
    typedef void (*RemainedFn)(void*);
122
0
    void set_remained(RemainedFn cb, void* arg) {
123
0
        _last_context_remained = cb;
124
0
        _last_context_remained_arg = arg;
125
0
    }
126
    
127
    // Suspend caller for at least |timeout_us| microseconds.
128
    // If |timeout_us| is 0, this function does nothing.
129
    // If |group| is NULL or current thread is non-bthread, call usleep(3)
130
    // instead. This function does not create thread-local TaskGroup.
131
    // Returns: 0 on success, -1 otherwise and errno is set.
132
    static int usleep(TaskGroup** pg, uint64_t timeout_us);
133
134
    // Suspend caller and run another bthread. When the caller will resume
135
    // is undefined.
136
    static void yield(TaskGroup** pg);
137
138
    // Suspend caller until bthread `tid' terminates.
139
    static int join(bthread_t tid, void** return_value);
140
141
    // Returns true iff the bthread `tid' still exists. Notice that it is
142
    // just the result at this very moment which may change soon.
143
    // Don't use this function unless you have to. Never write code like this:
144
    //    if (exists(tid)) {
145
    //        Wait for events of the thread.   // Racy, may block indefinitely.
146
    //    }
147
    static bool exists(bthread_t tid);
148
149
    // Put attribute associated with `tid' into `*attr'.
150
    // Returns 0 on success, -1 otherwise and errno is set.
151
    static int get_attr(bthread_t tid, bthread_attr_t* attr);
152
153
    // Get/set TaskMeta.stop of the tid.
154
    static void set_stopped(bthread_t tid);
155
    static bool is_stopped(bthread_t tid);
156
157
    // The bthread running run_main_task();
158
0
    bthread_t main_tid() const { return _main_tid; }
159
    TaskStatistics main_stat() const;
160
    // Routine of the main task which should be called from a dedicated pthread.
161
    void run_main_task();
162
163
    // current_task is a function in macOS 10.0+
164
#ifdef current_task
165
#undef current_task
166
#endif
167
    // Meta/Identifier of current task in this group.
168
0
    TaskMeta* current_task() const { return _cur_meta; }
169
0
    bthread_t current_tid() const { return _cur_meta->tid; }
170
    // Uptime of current task in nanoseconds.
171
    int64_t current_uptime_ns() const
172
0
    { return butil::cpuwide_time_ns() - _cur_meta->cpuwide_start_ns; }
173
174
    // True iff current task is the one running run_main_task()
175
0
    bool is_current_main_task() const { return current_tid() == _main_tid; }
176
    // True iff current task is in pthread-mode.
177
    bool is_current_pthread_task() const
178
0
    { return _cur_meta->stack == _main_stack; }
179
180
    // Active time in nanoseconds spent by this TaskGroup.
181
    int64_t cumulated_cputime_ns() const;
182
183
    // Push a bthread into the runqueue
184
    void ready_to_run(TaskMeta* meta, bool nosignal = false);
185
    // Flush tasks pushed to rq but signalled.
186
    void flush_nosignal_tasks();
187
188
    // Push a bthread into the runqueue from another non-worker thread.
189
    void ready_to_run_remote(TaskMeta* meta, bool nosignal = false);
190
    void flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex);
191
    void flush_nosignal_tasks_remote();
192
193
    // Automatically decide the caller is remote or local, and call
194
    // the corresponding function.
195
    void ready_to_run_general(TaskMeta* meta, bool nosignal = false);
196
    void flush_nosignal_tasks_general();
197
198
    // The TaskControl that this TaskGroup belongs to.
199
0
    TaskControl* control() const { return _control; }
200
201
    // Call this instead of delete.
202
    void destroy_self();
203
204
    // Wake up blocking ops in the thread.
205
    // Returns 0 on success, errno otherwise.
206
    static int interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag);
207
208
    // Get the meta associate with the task.
209
    static TaskMeta* address_meta(bthread_t tid);
210
211
    // Push a task into _rq, if _rq is full, retry after some time. This
212
    // process make go on indefinitely.
213
    void push_rq(bthread_t tid);
214
215
    // Returns size of local run queue.
216
0
    size_t rq_size() const {
217
0
        return _rq.volatile_size();
218
0
    }
219
220
0
    bthread_tag_t tag() const { return _tag; }
221
222
0
    pthread_t tid() const { return _tid; }
223
224
0
    int64_t current_task_cpu_clock_ns() {
225
0
        if (_last_cpu_clock_ns == 0) {
226
0
            return 0;
227
0
        }
228
0
        int64_t total_ns = _cur_meta->stat.cpu_usage_ns;
229
0
        total_ns += butil::cputhread_time_ns() - _last_cpu_clock_ns;
230
0
        return total_ns;
231
0
    }
232
233
private:
234
friend class TaskControl;
235
236
    // Last scheduling time, task type and cumulated CPU time.
237
    class CPUTimeStat {
238
        static constexpr int64_t LAST_SCHEDULING_TIME_MASK = 0x7FFFFFFFFFFFFFFFLL;
239
        static constexpr int64_t TASK_TYPE_MASK = 0x8000000000000000LL;
240
    public:
241
0
        CPUTimeStat() : _last_run_ns_and_type(0), _cumulated_cputime_ns(0) {}
242
        CPUTimeStat(AtomicInteger128::Value value)
243
0
            : _last_run_ns_and_type(value.v1), _cumulated_cputime_ns(value.v2) {}
244
245
        // Convert to AtomicInteger128::Value for atomic operations.
246
0
        explicit operator AtomicInteger128::Value() const {
247
0
            return {_last_run_ns_and_type, _cumulated_cputime_ns};
248
0
        }
249
250
0
        void set_last_run_ns(int64_t last_run_ns, bool main_task) {
251
0
            _last_run_ns_and_type = (last_run_ns & LAST_SCHEDULING_TIME_MASK) |
252
0
                                    (static_cast<int64_t>(main_task) << 63);
253
0
        }
254
0
        int64_t last_run_ns() const {
255
0
            return _last_run_ns_and_type & LAST_SCHEDULING_TIME_MASK;
256
0
        }
257
0
        int64_t last_run_ns_and_type() const {
258
0
            return _last_run_ns_and_type;
259
0
        }
260
261
0
        bool is_main_task() const {
262
0
            return _last_run_ns_and_type & TASK_TYPE_MASK;
263
0
        }
264
265
0
        void add_cumulated_cputime_ns(int64_t cputime_ns, bool main_task) {
266
0
            if (main_task) {
267
0
                return;
268
0
            }
269
0
            _cumulated_cputime_ns += cputime_ns;
270
0
        }
271
0
        int64_t cumulated_cputime_ns() const {
272
0
            return _cumulated_cputime_ns;
273
0
        }
274
275
    private:
276
        // The higher bit for task type, main task is 1, otherwise 0.
277
        // Lowest 63 bits for last scheduling time.
278
        int64_t _last_run_ns_and_type;
279
        // Cumulated CPU time in nanoseconds.
280
        int64_t _cumulated_cputime_ns;
281
    };
282
283
    class AtomicCPUTimeStat {
284
    public:
285
0
        CPUTimeStat load() const {
286
0
            return  _cpu_time_stat.load();
287
0
        }
288
0
        CPUTimeStat load_unsafe() const {
289
0
            return _cpu_time_stat.load_unsafe();
290
0
        }
291
292
0
        void store(CPUTimeStat cpu_time_stat) {
293
0
            _cpu_time_stat.store(AtomicInteger128::Value(cpu_time_stat));
294
0
        }
295
296
    private:
297
        AtomicInteger128 _cpu_time_stat;
298
    };
299
300
    // You shall use TaskControl::create_group to create new instance.
301
    explicit TaskGroup(TaskControl* c);
302
303
    int init(size_t runqueue_capacity);
304
305
    // You shall call destroy_selfm() instead of destructor because deletion
306
    // of groups are postponed to avoid race.
307
    ~TaskGroup();
308
309
#ifdef BUTIL_USE_ASAN
310
    static void asan_task_runner(intptr_t);
311
#endif // BUTIL_USE_ASAN
312
    static void task_runner(intptr_t skip_remained);
313
314
    // Callbacks for set_remained()
315
    static void _release_last_context(void*);
316
    static void _add_sleep_event(void*);
317
    struct ReadyToRunArgs {
318
        bthread_tag_t tag;
319
        TaskMeta* meta;
320
        bool nosignal;
321
    };
322
    static void ready_to_run_in_worker(void*);
323
    static void ready_to_run_in_worker_ignoresignal(void*);
324
    static void priority_to_run(void*);
325
326
    // Wait for a task to run.
327
    // Returns true on success, false is treated as permanent error and the
328
    // loop calling this function should end.
329
    bool wait_task(bthread_t* tid);
330
331
0
    bool steal_task(bthread_t* tid) {
332
0
        if (_remote_rq.pop(tid)) {
333
0
            return true;
334
0
        }
335
0
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
336
0
        _last_pl_state = _pl->get_state();
337
0
#endif
338
0
        return _control->steal_task(tid, &_steal_seed, _steal_offset);
339
0
    }
340
341
0
    void set_tag(bthread_tag_t tag) { _tag = tag; }
342
343
0
    void set_pl(ParkingLot* pl) { _pl = pl; }
344
345
0
    static bool is_main_task(TaskGroup* g, bthread_t tid) {
346
0
        return g->_main_tid == tid;
347
0
    }
348
349
    TaskMeta* _cur_meta{NULL};
350
    
351
    // the control that this group belongs to
352
    TaskControl* _control{NULL};
353
    int _num_nosignal{0};
354
    int _nsignaled{0};
355
    AtomicCPUTimeStat _cpu_time_stat;
356
    // last thread cpu clock
357
    int64_t _last_cpu_clock_ns{0};
358
359
    size_t _nswitch{0};
360
    RemainedFn _last_context_remained{NULL};
361
    void* _last_context_remained_arg{NULL};
362
363
    ParkingLot* _pl{NULL};
364
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
365
    ParkingLot::State _last_pl_state;
366
#endif
367
    size_t _steal_seed{butil::fast_rand()};
368
    size_t _steal_offset{prime_offset(_steal_seed)};
369
    ContextualStack* _main_stack{NULL};
370
    bthread_t _main_tid{INVALID_BTHREAD};
371
    WorkStealingQueue<bthread_t> _rq;
372
    RemoteTaskQueue _remote_rq;
373
    int _remote_num_nosignal{0};
374
    int _remote_nsignaled{0};
375
376
    int _sched_recursive_guard{0};
377
    // tag of this taskgroup
378
    bthread_tag_t _tag{BTHREAD_TAG_DEFAULT};
379
380
    // Worker thread id.
381
    pthread_t _tid{};
382
};
383
384
}  // namespace bthread
385
386
#include "task_group_inl.h"
387
388
#endif  // BTHREAD_TASK_GROUP_H