/src/brpc/src/bthread/task_control.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // bthread - An M:N threading library to make applications more concurrent. |
19 | | |
20 | | // Date: Tue Jul 10 17:40:58 CST 2012 |
21 | | |
22 | | #ifndef BTHREAD_TASK_CONTROL_H |
23 | | #define BTHREAD_TASK_CONTROL_H |
24 | | |
25 | | #ifndef NDEBUG |
26 | | #include <iostream> // std::ostream |
27 | | #endif |
28 | | #include <signal.h> |
29 | | #include <stddef.h> // size_t |
30 | | #include <vector> |
31 | | #include <array> |
32 | | #include <memory> |
33 | | #include "butil/atomicops.h" // butil::atomic |
34 | | #include "bvar/bvar.h" // bvar::PassiveStatus |
35 | | #include "bthread/task_tracer.h" |
36 | | #include "bthread/task_meta.h" // TaskMeta |
37 | | #include "bthread/work_stealing_queue.h" // WorkStealingQueue |
38 | | #include "bthread/parking_lot.h" |
39 | | |
40 | | DECLARE_int32(task_group_ntags); |
41 | | namespace bthread { |
42 | | |
43 | | class TaskGroup; |
44 | | |
45 | | // Control all task groups |
46 | | class TaskControl { |
47 | | friend class TaskGroup; |
48 | | friend void wait_for_butex(void*); |
49 | | #ifdef BRPC_BTHREAD_TRACER |
50 | | friend bthread_t init_for_pthread_stack_trace(); |
51 | | #endif // BRPC_BTHREAD_TRACER |
52 | | |
53 | | public: |
54 | | TaskControl(); |
55 | | ~TaskControl(); |
56 | | |
57 | | // Must be called before using. `nconcurrency' is # of worker pthreads. |
58 | | int init(int nconcurrency); |
59 | | |
60 | | // Create a TaskGroup in this control. |
61 | | TaskGroup* create_group(bthread_tag_t tag); |
62 | | |
63 | | // Steal a task from a "random" group. |
64 | | bool steal_task(bthread_t* tid, size_t* seed, size_t offset); |
65 | | |
66 | | // Tell other groups that `n' tasks was just added to caller's runqueue |
67 | | void signal_task(int num_task, bthread_tag_t tag); |
68 | | |
69 | | // Stop and join worker threads in TaskControl. |
70 | | void stop_and_join(); |
71 | | |
72 | | // Get # of worker threads. |
73 | | int concurrency() const |
74 | 0 | { return _concurrency.load(butil::memory_order_acquire); } |
75 | | |
76 | | int concurrency(bthread_tag_t tag) const |
77 | 0 | { return _tagged_ngroup[tag].load(butil::memory_order_acquire); } |
78 | | |
79 | | void print_rq_sizes(std::ostream& os); |
80 | | |
81 | | double get_cumulated_worker_time(); |
82 | | double get_cumulated_worker_time(bthread_tag_t tag); |
83 | | int64_t get_cumulated_switch_count(); |
84 | | int64_t get_cumulated_signal_count(); |
85 | | |
86 | | // [Not thread safe] Add more worker threads. |
87 | | // Return the number of workers actually added, which may be less than |num| |
88 | | int add_workers(int num, bthread_tag_t tag); |
89 | | |
90 | | // Choose one TaskGroup (randomly right now). |
91 | | // If this method is called after init(), it never returns NULL. |
92 | | TaskGroup* choose_one_group(bthread_tag_t tag); |
93 | | |
94 | | #ifdef BRPC_BTHREAD_TRACER |
95 | | // A stacktrace of bthread can be helpful in debugging. |
96 | | void stack_trace(std::ostream& os, bthread_t tid); |
97 | | std::string stack_trace(bthread_t tid); |
98 | | #endif // BRPC_BTHREAD_TRACER |
99 | | |
100 | 0 | void push_priority_queue(bthread_tag_t tag, bthread_t tid) { |
101 | 0 | _priority_queues[tag].push(tid); |
102 | 0 | } |
103 | | |
104 | | std::vector<bthread_t> get_living_bthreads(); |
105 | | private: |
106 | | typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups; |
107 | | typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot; |
108 | | // Add/Remove a TaskGroup. |
109 | | // Returns 0 on success, -1 otherwise. |
110 | | int _add_group(TaskGroup*, bthread_tag_t tag); |
111 | | int _destroy_group(TaskGroup*); |
112 | | |
113 | | // Tag group |
114 | 0 | TaggedGroups& tag_group(bthread_tag_t tag) { return _tagged_groups[tag]; } |
115 | | |
116 | | // Tag ngroup |
117 | 0 | butil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; } |
118 | | |
119 | | // Tag parking slot |
120 | 0 | TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; } |
121 | | |
122 | | static void delete_task_group(void* arg); |
123 | | |
124 | | static void* worker_thread(void* task_control); |
125 | | |
126 | | template <typename F> |
127 | | void for_each_task_group(F const& f); |
128 | | |
129 | | bvar::LatencyRecorder& exposed_pending_time(); |
130 | | bvar::LatencyRecorder* create_exposed_pending_time(); |
131 | | bvar::Adder<int64_t>& tag_nworkers(bthread_tag_t tag); |
132 | | bvar::Adder<int64_t>& tag_nbthreads(bthread_tag_t tag); |
133 | | |
134 | | std::vector<butil::atomic<size_t>> _tagged_ngroup; |
135 | | std::vector<TaggedGroups> _tagged_groups; |
136 | | butil::Mutex _modify_group_mutex; |
137 | | |
138 | | butil::atomic<bool> _init; // if not init, bvar will case coredump |
139 | | bool _stop; |
140 | | butil::atomic<int> _concurrency; |
141 | | std::vector<pthread_t> _workers; |
142 | | butil::atomic<int> _next_worker_id; |
143 | | |
144 | | bvar::Adder<int64_t> _nworkers; |
145 | | butil::Mutex _pending_time_mutex; |
146 | | butil::atomic<bvar::LatencyRecorder*> _pending_time; |
147 | | bvar::PassiveStatus<double> _cumulated_worker_time; |
148 | | bvar::PerSecond<bvar::PassiveStatus<double> > _worker_usage_second; |
149 | | bvar::PassiveStatus<int64_t> _cumulated_switch_count; |
150 | | bvar::PerSecond<bvar::PassiveStatus<int64_t> > _switch_per_second; |
151 | | bvar::PassiveStatus<int64_t> _cumulated_signal_count; |
152 | | bvar::PerSecond<bvar::PassiveStatus<int64_t> > _signal_per_second; |
153 | | bvar::PassiveStatus<std::string> _status; |
154 | | bvar::Adder<int64_t> _nbthreads; |
155 | | |
156 | | std::vector<bvar::Adder<int64_t>*> _tagged_nworkers; |
157 | | std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time; |
158 | | std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> _tagged_worker_usage_second; |
159 | | std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads; |
160 | | |
161 | | bool _enable_priority_queue; |
162 | | std::vector<WorkStealingQueue<bthread_t>> _priority_queues; |
163 | | |
164 | | size_t _pl_num_of_each_tag; |
165 | | std::vector<TaggedParkingLot> _tagged_pl; |
166 | | |
167 | | #ifdef BRPC_BTHREAD_TRACER |
168 | | TaskTracer _task_tracer; |
169 | | #endif // BRPC_BTHREAD_TRACER |
170 | | |
171 | | }; |
172 | | |
173 | 0 | inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() { |
174 | 0 | bvar::LatencyRecorder* pt = _pending_time.load(butil::memory_order_consume); |
175 | 0 | if (!pt) { |
176 | 0 | pt = create_exposed_pending_time(); |
177 | 0 | } |
178 | 0 | return *pt; |
179 | 0 | } |
180 | | |
181 | 0 | inline bvar::Adder<int64_t>& TaskControl::tag_nworkers(bthread_tag_t tag) { |
182 | 0 | return *_tagged_nworkers[tag]; |
183 | 0 | } |
184 | | |
185 | 0 | inline bvar::Adder<int64_t>& TaskControl::tag_nbthreads(bthread_tag_t tag) { |
186 | 0 | return *_tagged_nbthreads[tag]; |
187 | 0 | } |
188 | | |
189 | | template <typename F> |
190 | 0 | inline void TaskControl::for_each_task_group(F const& f) { |
191 | 0 | if (_init.load(butil::memory_order_acquire) == false) { |
192 | 0 | return; |
193 | 0 | } |
194 | 0 | for (size_t i = 0; i < _tagged_groups.size(); ++i) { |
195 | 0 | auto ngroup = tag_ngroup(i).load(butil::memory_order_relaxed); |
196 | 0 | auto& groups = tag_group(i); |
197 | 0 | for (size_t j = 0; j < ngroup; ++j) { |
198 | 0 | f(groups[j]); |
199 | 0 | } |
200 | 0 | } |
201 | 0 | } Unexecuted instantiation: task_control.cpp:void bthread::TaskControl::for_each_task_group<bthread::TaskControl::print_rq_sizes(std::basic_ostream<char, std::char_traits<char> >&)::$_1>(bthread::TaskControl::print_rq_sizes(std::basic_ostream<char, std::char_traits<char> >&)::$_1 const&) Unexecuted instantiation: task_control.cpp:void bthread::TaskControl::for_each_task_group<bthread::TaskControl::get_cumulated_worker_time()::$_0>(bthread::TaskControl::get_cumulated_worker_time()::$_0 const&) Unexecuted instantiation: task_control.cpp:void bthread::TaskControl::for_each_task_group<bthread::TaskControl::get_cumulated_switch_count()::$_0>(bthread::TaskControl::get_cumulated_switch_count()::$_0 const&) Unexecuted instantiation: task_control.cpp:void bthread::TaskControl::for_each_task_group<bthread::TaskControl::get_cumulated_signal_count()::$_0>(bthread::TaskControl::get_cumulated_signal_count()::$_0 const&) |
202 | | |
203 | | } // namespace bthread |
204 | | |
205 | | #endif // BTHREAD_TASK_CONTROL_H |