Coverage Report

Created: 2025-10-28 07:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/bthread/task_control.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
// Date: Tue Jul 10 17:40:58 CST 2012
21
22
#ifndef BTHREAD_TASK_CONTROL_H
23
#define BTHREAD_TASK_CONTROL_H
24
25
#ifndef NDEBUG
26
#include <iostream>                             // std::ostream
27
#endif
28
#include <signal.h>
29
#include <stddef.h>                             // size_t
30
#include <vector>
31
#include <array>
32
#include <memory>
33
#include "butil/atomicops.h"                     // butil::atomic
34
#include "bvar/bvar.h"                          // bvar::PassiveStatus
35
#include "bthread/task_tracer.h"
36
#include "bthread/task_meta.h"                  // TaskMeta
37
#include "bthread/work_stealing_queue.h"        // WorkStealingQueue
38
#include "bthread/parking_lot.h"
39
40
DECLARE_int32(task_group_ntags);
41
namespace bthread {
42
43
class TaskGroup;
44
45
// Control all task groups
46
class TaskControl {
47
friend class TaskGroup;
48
friend void wait_for_butex(void*);
49
#ifdef BRPC_BTHREAD_TRACER
50
friend bthread_t init_for_pthread_stack_trace();
51
#endif // BRPC_BTHREAD_TRACER
52
53
public:
54
    TaskControl();
55
    ~TaskControl();
56
57
    // Must be called before using. `nconcurrency' is # of worker pthreads.
58
    int init(int nconcurrency);
59
    
60
    // Create a TaskGroup in this control.
61
    TaskGroup* create_group(bthread_tag_t tag);
62
63
    // Steal a task from a "random" group.
64
    bool steal_task(bthread_t* tid, size_t* seed, size_t offset);
65
66
    // Tell other groups that `n' tasks was just added to caller's runqueue
67
    void signal_task(int num_task, bthread_tag_t tag);
68
69
    // Stop and join worker threads in TaskControl.
70
    void stop_and_join();
71
    
72
    // Get # of worker threads.
73
    int concurrency() const 
74
0
    { return _concurrency.load(butil::memory_order_acquire); }
75
76
    int concurrency(bthread_tag_t tag) const 
77
0
    { return _tagged_ngroup[tag].load(butil::memory_order_acquire); }
78
79
    void print_rq_sizes(std::ostream& os);
80
81
    double get_cumulated_worker_time();
82
    double get_cumulated_worker_time(bthread_tag_t tag);
83
    int64_t get_cumulated_switch_count();
84
    int64_t get_cumulated_signal_count();
85
86
    // [Not thread safe] Add more worker threads.
87
    // Return the number of workers actually added, which may be less than |num|
88
    int add_workers(int num, bthread_tag_t tag);
89
90
    // Choose one TaskGroup (randomly right now).
91
    // If this method is called after init(), it never returns NULL.
92
    TaskGroup* choose_one_group(bthread_tag_t tag);
93
94
#ifdef BRPC_BTHREAD_TRACER
95
    // A stacktrace of bthread can be helpful in debugging.
96
    void stack_trace(std::ostream& os, bthread_t tid);
97
    std::string stack_trace(bthread_t tid);
98
#endif // BRPC_BTHREAD_TRACER
99
100
0
    void push_priority_queue(bthread_tag_t tag, bthread_t tid) {
101
0
        _priority_queues[tag].push(tid);
102
0
    }
103
104
    std::vector<bthread_t> get_living_bthreads();
105
private:
106
    typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
107
    typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot;
108
    // Add/Remove a TaskGroup.
109
    // Returns 0 on success, -1 otherwise.
110
    int _add_group(TaskGroup*, bthread_tag_t tag);
111
    int _destroy_group(TaskGroup*);
112
113
    // Tag group
114
0
    TaggedGroups& tag_group(bthread_tag_t tag) { return _tagged_groups[tag]; }
115
116
    // Tag ngroup
117
0
    butil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; }
118
119
    // Tag parking slot
120
0
    TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; }
121
122
    static void delete_task_group(void* arg);
123
124
    static void* worker_thread(void* task_control);
125
126
    template <typename F>
127
    void for_each_task_group(F const& f);
128
129
    bvar::LatencyRecorder& exposed_pending_time();
130
    bvar::LatencyRecorder* create_exposed_pending_time();
131
    bvar::Adder<int64_t>& tag_nworkers(bthread_tag_t tag);
132
    bvar::Adder<int64_t>& tag_nbthreads(bthread_tag_t tag);
133
134
    std::vector<butil::atomic<size_t>> _tagged_ngroup;
135
    std::vector<TaggedGroups> _tagged_groups;
136
    butil::Mutex _modify_group_mutex;
137
138
    butil::atomic<bool> _init;  // if not init, bvar will case coredump
139
    bool _stop;
140
    butil::atomic<int> _concurrency;
141
    std::vector<pthread_t> _workers;
142
    butil::atomic<int> _next_worker_id;
143
144
    bvar::Adder<int64_t> _nworkers;
145
    butil::Mutex _pending_time_mutex;
146
    butil::atomic<bvar::LatencyRecorder*> _pending_time;
147
    bvar::PassiveStatus<double> _cumulated_worker_time;
148
    bvar::PerSecond<bvar::PassiveStatus<double> > _worker_usage_second;
149
    bvar::PassiveStatus<int64_t> _cumulated_switch_count;
150
    bvar::PerSecond<bvar::PassiveStatus<int64_t> > _switch_per_second;
151
    bvar::PassiveStatus<int64_t> _cumulated_signal_count;
152
    bvar::PerSecond<bvar::PassiveStatus<int64_t> > _signal_per_second;
153
    bvar::PassiveStatus<std::string> _status;
154
    bvar::Adder<int64_t> _nbthreads;
155
156
    std::vector<bvar::Adder<int64_t>*> _tagged_nworkers;
157
    std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time;
158
    std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> _tagged_worker_usage_second;
159
    std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
160
161
    bool _enable_priority_queue;
162
    std::vector<WorkStealingQueue<bthread_t>> _priority_queues;
163
164
    size_t _pl_num_of_each_tag;
165
    std::vector<TaggedParkingLot> _tagged_pl;
166
167
#ifdef BRPC_BTHREAD_TRACER
168
    TaskTracer _task_tracer;
169
#endif // BRPC_BTHREAD_TRACER
170
171
};
172
173
0
inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() {
174
0
    bvar::LatencyRecorder* pt = _pending_time.load(butil::memory_order_consume);
175
0
    if (!pt) {
176
0
        pt = create_exposed_pending_time();
177
0
    }
178
0
    return *pt;
179
0
}
180
181
0
inline bvar::Adder<int64_t>& TaskControl::tag_nworkers(bthread_tag_t tag) {
182
0
    return *_tagged_nworkers[tag];
183
0
}
184
185
0
inline bvar::Adder<int64_t>& TaskControl::tag_nbthreads(bthread_tag_t tag) {
186
0
    return *_tagged_nbthreads[tag];
187
0
}
188
189
template <typename F>
190
0
inline void TaskControl::for_each_task_group(F const& f) {
191
0
    if (_init.load(butil::memory_order_acquire) == false) {
192
0
        return;
193
0
    }
194
0
    for (size_t i = 0; i < _tagged_groups.size(); ++i) {
195
0
        auto ngroup = tag_ngroup(i).load(butil::memory_order_relaxed);
196
0
        auto& groups = tag_group(i);
197
0
        for (size_t j = 0; j < ngroup; ++j) {
198
0
            f(groups[j]);
199
0
        }
200
0
    }
201
0
}
Unexecuted instantiation: task_control.cpp:void bthread::TaskControl::for_each_task_group<bthread::TaskControl::print_rq_sizes(std::basic_ostream<char, std::char_traits<char> >&)::$_1>(bthread::TaskControl::print_rq_sizes(std::basic_ostream<char, std::char_traits<char> >&)::$_1 const&)
Unexecuted instantiation: task_control.cpp:void bthread::TaskControl::for_each_task_group<bthread::TaskControl::get_cumulated_worker_time()::$_0>(bthread::TaskControl::get_cumulated_worker_time()::$_0 const&)
Unexecuted instantiation: task_control.cpp:void bthread::TaskControl::for_each_task_group<bthread::TaskControl::get_cumulated_switch_count()::$_0>(bthread::TaskControl::get_cumulated_switch_count()::$_0 const&)
Unexecuted instantiation: task_control.cpp:void bthread::TaskControl::for_each_task_group<bthread::TaskControl::get_cumulated_signal_count()::$_0>(bthread::TaskControl::get_cumulated_signal_count()::$_0 const&)
202
203
}  // namespace bthread
204
205
#endif  // BTHREAD_TASK_CONTROL_H