/src/brpc/src/bthread/task_group.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // bthread - An M:N threading library to make applications more concurrent. |
19 | | |
20 | | // Date: Tue Jul 10 17:40:58 CST 2012 |
21 | | |
22 | | #ifndef BTHREAD_TASK_GROUP_H |
23 | | #define BTHREAD_TASK_GROUP_H |
24 | | |
25 | | #include "butil/time.h" // cpuwide_time_ns |
26 | | #include "bthread/task_control.h" |
27 | | #include "bthread/task_meta.h" // bthread_t, TaskMeta |
28 | | #include "bthread/work_stealing_queue.h" // WorkStealingQueue |
29 | | #include "bthread/remote_task_queue.h" // RemoteTaskQueue |
30 | | #include "butil/resource_pool.h" // ResourceId |
31 | | #include "bthread/parking_lot.h" |
32 | | #include "bthread/prime_offset.h" |
33 | | |
34 | | namespace bthread { |
35 | | |
36 | | // For exiting a bthread. |
37 | | class ExitException : public std::exception { |
38 | | public: |
39 | 0 | explicit ExitException(void* value) : _value(value) {} |
40 | 0 | ~ExitException() throw() {} |
41 | 0 | const char* what() const throw() override { |
42 | 0 | return "ExitException"; |
43 | 0 | } |
44 | 0 | void* value() const { |
45 | 0 | return _value; |
46 | 0 | } |
47 | | private: |
48 | | void* _value; |
49 | | }; |
50 | | |
51 | | // Refer to https://rigtorp.se/isatomic/, On the modern CPU microarchitectures |
52 | | // (Skylake and Zen 2) AVX/AVX2 128b/256b aligned loads and stores are atomic |
53 | | // even though Intel and AMD officially doesn’t guarantee this. |
54 | | // On X86, SSE instructions can ensure atomic loads and stores. |
55 | | // Starting from Armv8.4-A, neon can ensure atomic loads and stores. |
56 | | // Otherwise, use mutex to guarantee atomicity. |
57 | | class AtomicInteger128 { |
58 | | public: |
59 | | struct BAIDU_CACHELINE_ALIGNMENT Value { |
60 | | int64_t v1; |
61 | | int64_t v2; |
62 | | }; |
63 | | |
64 | 0 | AtomicInteger128() = default; |
65 | 0 | explicit AtomicInteger128(Value value) : _value(value) {} |
66 | | |
67 | | Value load() const; |
68 | 0 | Value load_unsafe() const { |
69 | 0 | return _value; |
70 | 0 | } |
71 | | |
72 | | void store(Value value); |
73 | | |
74 | | private: |
75 | | Value _value{}; |
76 | | // Used to protect `_cpu_time_stat' when __x86_64__ and __ARM_NEON is not defined. |
77 | | FastPthreadMutex _mutex; |
78 | | }; |
79 | | |
80 | | // Thread-local group of tasks. |
81 | | // Notice that most methods involving context switching are static otherwise |
82 | | // pointer `this' may change after wakeup. The **pg parameters in following |
83 | | // function are updated before returning. |
84 | | class TaskGroup { |
85 | | public: |
86 | | // Create task `fn(arg)' with attributes `attr' in TaskGroup *pg and put |
87 | | // the identifier into `tid'. Switch to the new task and schedule old task |
88 | | // to run. |
89 | | // Return 0 on success, errno otherwise. |
90 | | static int start_foreground(TaskGroup** pg, |
91 | | bthread_t* __restrict tid, |
92 | | const bthread_attr_t* __restrict attr, |
93 | | void * (*fn)(void*), |
94 | | void* __restrict arg); |
95 | | |
96 | | // Create task `fn(arg)' with attributes `attr' in this TaskGroup, put the |
97 | | // identifier into `tid'. Schedule the new thread to run. |
98 | | // Called from worker: start_background<false> |
99 | | // Called from non-worker: start_background<true> |
100 | | // Return 0 on success, errno otherwise. |
101 | | template <bool REMOTE> |
102 | | int start_background(bthread_t* __restrict tid, |
103 | | const bthread_attr_t* __restrict attr, |
104 | | void * (*fn)(void*), |
105 | | void* __restrict arg); |
106 | | |
107 | | // Suspend caller and run next bthread in TaskGroup *pg. |
108 | | static void sched(TaskGroup** pg); |
109 | | static void ending_sched(TaskGroup** pg); |
110 | | |
111 | | // Suspend caller and run bthread `next_tid' in TaskGroup *pg. |
112 | | // Purpose of this function is to avoid pushing `next_tid' to _rq and |
113 | | // then being popped by sched(pg), which is not necessary. |
114 | | static void sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending); |
115 | | static void sched_to(TaskGroup** pg, bthread_t next_tid); |
116 | | static void exchange(TaskGroup** pg, TaskMeta* next_meta); |
117 | | |
118 | | // The callback will be run in the beginning of next-run bthread. |
119 | | // Can't be called by current bthread directly because it often needs |
120 | | // the target to be suspended already. |
121 | | typedef void (*RemainedFn)(void*); |
122 | 0 | void set_remained(RemainedFn cb, void* arg) { |
123 | 0 | _last_context_remained = cb; |
124 | 0 | _last_context_remained_arg = arg; |
125 | 0 | } |
126 | | |
127 | | // Suspend caller for at least |timeout_us| microseconds. |
128 | | // If |timeout_us| is 0, this function does nothing. |
129 | | // If |group| is NULL or current thread is non-bthread, call usleep(3) |
130 | | // instead. This function does not create thread-local TaskGroup. |
131 | | // Returns: 0 on success, -1 otherwise and errno is set. |
132 | | static int usleep(TaskGroup** pg, uint64_t timeout_us); |
133 | | |
134 | | // Suspend caller and run another bthread. When the caller will resume |
135 | | // is undefined. |
136 | | static void yield(TaskGroup** pg); |
137 | | |
138 | | // Suspend caller until bthread `tid' terminates. |
139 | | static int join(bthread_t tid, void** return_value); |
140 | | |
141 | | // Returns true iff the bthread `tid' still exists. Notice that it is |
142 | | // just the result at this very moment which may change soon. |
143 | | // Don't use this function unless you have to. Never write code like this: |
144 | | // if (exists(tid)) { |
145 | | // Wait for events of the thread. // Racy, may block indefinitely. |
146 | | // } |
147 | | static bool exists(bthread_t tid); |
148 | | |
149 | | // Put attribute associated with `tid' into `*attr'. |
150 | | // Returns 0 on success, -1 otherwise and errno is set. |
151 | | static int get_attr(bthread_t tid, bthread_attr_t* attr); |
152 | | |
153 | | // Get/set TaskMeta.stop of the tid. |
154 | | static void set_stopped(bthread_t tid); |
155 | | static bool is_stopped(bthread_t tid); |
156 | | |
157 | | // The bthread running run_main_task(); |
158 | 0 | bthread_t main_tid() const { return _main_tid; } |
159 | | TaskStatistics main_stat() const; |
160 | | // Routine of the main task which should be called from a dedicated pthread. |
161 | | void run_main_task(); |
162 | | |
163 | | // current_task is a function in macOS 10.0+ |
164 | | #ifdef current_task |
165 | | #undef current_task |
166 | | #endif |
167 | | // Meta/Identifier of current task in this group. |
168 | 0 | TaskMeta* current_task() const { return _cur_meta; } |
169 | 0 | bthread_t current_tid() const { return _cur_meta->tid; } |
170 | | // Uptime of current task in nanoseconds. |
171 | | int64_t current_uptime_ns() const |
172 | 0 | { return butil::cpuwide_time_ns() - _cur_meta->cpuwide_start_ns; } |
173 | | |
174 | | // True iff current task is the one running run_main_task() |
175 | 0 | bool is_current_main_task() const { return current_tid() == _main_tid; } |
176 | | // True iff current task is in pthread-mode. |
177 | | bool is_current_pthread_task() const |
178 | 0 | { return _cur_meta->stack == _main_stack; } |
179 | | |
180 | | // Active time in nanoseconds spent by this TaskGroup. |
181 | | int64_t cumulated_cputime_ns() const; |
182 | | |
183 | | // Push a bthread into the runqueue |
184 | | void ready_to_run(TaskMeta* meta, bool nosignal = false); |
185 | | // Flush tasks pushed to rq but signalled. |
186 | | void flush_nosignal_tasks(); |
187 | | |
188 | | // Push a bthread into the runqueue from another non-worker thread. |
189 | | void ready_to_run_remote(TaskMeta* meta, bool nosignal = false); |
190 | | void flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex); |
191 | | void flush_nosignal_tasks_remote(); |
192 | | |
193 | | // Automatically decide the caller is remote or local, and call |
194 | | // the corresponding function. |
195 | | void ready_to_run_general(TaskMeta* meta, bool nosignal = false); |
196 | | void flush_nosignal_tasks_general(); |
197 | | |
198 | | // The TaskControl that this TaskGroup belongs to. |
199 | 0 | TaskControl* control() const { return _control; } |
200 | | |
201 | | // Call this instead of delete. |
202 | | void destroy_self(); |
203 | | |
204 | | // Wake up blocking ops in the thread. |
205 | | // Returns 0 on success, errno otherwise. |
206 | | static int interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag); |
207 | | |
208 | | // Get the meta associate with the task. |
209 | | static TaskMeta* address_meta(bthread_t tid); |
210 | | |
211 | | // Push a task into _rq, if _rq is full, retry after some time. This |
212 | | // process make go on indefinitely. |
213 | | void push_rq(bthread_t tid); |
214 | | |
215 | | // Returns size of local run queue. |
216 | 0 | size_t rq_size() const { |
217 | 0 | return _rq.volatile_size(); |
218 | 0 | } |
219 | | |
220 | 0 | bthread_tag_t tag() const { return _tag; } |
221 | | |
222 | 0 | pthread_t tid() const { return _tid; } |
223 | | |
224 | 0 | int64_t current_task_cpu_clock_ns() { |
225 | 0 | if (_last_cpu_clock_ns == 0) { |
226 | 0 | return 0; |
227 | 0 | } |
228 | 0 | int64_t total_ns = _cur_meta->stat.cpu_usage_ns; |
229 | 0 | total_ns += butil::cputhread_time_ns() - _last_cpu_clock_ns; |
230 | 0 | return total_ns; |
231 | 0 | } |
232 | | |
233 | | private: |
234 | | friend class TaskControl; |
235 | | |
236 | | // Last scheduling time, task type and cumulated CPU time. |
237 | | class CPUTimeStat { |
238 | | static constexpr int64_t LAST_SCHEDULING_TIME_MASK = 0x7FFFFFFFFFFFFFFFLL; |
239 | | static constexpr int64_t TASK_TYPE_MASK = 0x8000000000000000LL; |
240 | | public: |
241 | 0 | CPUTimeStat() : _last_run_ns_and_type(0), _cumulated_cputime_ns(0) {} |
242 | | CPUTimeStat(AtomicInteger128::Value value) |
243 | 0 | : _last_run_ns_and_type(value.v1), _cumulated_cputime_ns(value.v2) {} |
244 | | |
245 | | // Convert to AtomicInteger128::Value for atomic operations. |
246 | 0 | explicit operator AtomicInteger128::Value() const { |
247 | 0 | return {_last_run_ns_and_type, _cumulated_cputime_ns}; |
248 | 0 | } |
249 | | |
250 | 0 | void set_last_run_ns(int64_t last_run_ns, bool main_task) { |
251 | 0 | _last_run_ns_and_type = (last_run_ns & LAST_SCHEDULING_TIME_MASK) | |
252 | 0 | (static_cast<int64_t>(main_task) << 63); |
253 | 0 | } |
254 | 0 | int64_t last_run_ns() const { |
255 | 0 | return _last_run_ns_and_type & LAST_SCHEDULING_TIME_MASK; |
256 | 0 | } |
257 | 0 | int64_t last_run_ns_and_type() const { |
258 | 0 | return _last_run_ns_and_type; |
259 | 0 | } |
260 | | |
261 | 0 | bool is_main_task() const { |
262 | 0 | return _last_run_ns_and_type & TASK_TYPE_MASK; |
263 | 0 | } |
264 | | |
265 | 0 | void add_cumulated_cputime_ns(int64_t cputime_ns, bool main_task) { |
266 | 0 | if (main_task) { |
267 | 0 | return; |
268 | 0 | } |
269 | 0 | _cumulated_cputime_ns += cputime_ns; |
270 | 0 | } |
271 | 0 | int64_t cumulated_cputime_ns() const { |
272 | 0 | return _cumulated_cputime_ns; |
273 | 0 | } |
274 | | |
275 | | private: |
276 | | // The higher bit for task type, main task is 1, otherwise 0. |
277 | | // Lowest 63 bits for last scheduling time. |
278 | | int64_t _last_run_ns_and_type; |
279 | | // Cumulated CPU time in nanoseconds. |
280 | | int64_t _cumulated_cputime_ns; |
281 | | }; |
282 | | |
283 | | class AtomicCPUTimeStat { |
284 | | public: |
285 | 0 | CPUTimeStat load() const { |
286 | 0 | return _cpu_time_stat.load(); |
287 | 0 | } |
288 | 0 | CPUTimeStat load_unsafe() const { |
289 | 0 | return _cpu_time_stat.load_unsafe(); |
290 | 0 | } |
291 | | |
292 | 0 | void store(CPUTimeStat cpu_time_stat) { |
293 | 0 | _cpu_time_stat.store(AtomicInteger128::Value(cpu_time_stat)); |
294 | 0 | } |
295 | | |
296 | | private: |
297 | | AtomicInteger128 _cpu_time_stat; |
298 | | }; |
299 | | |
300 | | // You shall use TaskControl::create_group to create new instance. |
301 | | explicit TaskGroup(TaskControl* c); |
302 | | |
303 | | int init(size_t runqueue_capacity); |
304 | | |
305 | | // You shall call destroy_selfm() instead of destructor because deletion |
306 | | // of groups are postponed to avoid race. |
307 | | ~TaskGroup(); |
308 | | |
309 | | #ifdef BUTIL_USE_ASAN |
310 | | static void asan_task_runner(intptr_t); |
311 | | #endif // BUTIL_USE_ASAN |
312 | | static void task_runner(intptr_t skip_remained); |
313 | | |
314 | | // Callbacks for set_remained() |
315 | | static void _release_last_context(void*); |
316 | | static void _add_sleep_event(void*); |
317 | | struct ReadyToRunArgs { |
318 | | bthread_tag_t tag; |
319 | | TaskMeta* meta; |
320 | | bool nosignal; |
321 | | }; |
322 | | static void ready_to_run_in_worker(void*); |
323 | | static void ready_to_run_in_worker_ignoresignal(void*); |
324 | | static void priority_to_run(void*); |
325 | | |
326 | | // Wait for a task to run. |
327 | | // Returns true on success, false is treated as permanent error and the |
328 | | // loop calling this function should end. |
329 | | bool wait_task(bthread_t* tid); |
330 | | |
331 | 0 | bool steal_task(bthread_t* tid) { |
332 | 0 | if (_remote_rq.pop(tid)) { |
333 | 0 | return true; |
334 | 0 | } |
335 | 0 | #ifndef BTHREAD_DONT_SAVE_PARKING_STATE |
336 | 0 | _last_pl_state = _pl->get_state(); |
337 | 0 | #endif |
338 | 0 | return _control->steal_task(tid, &_steal_seed, _steal_offset); |
339 | 0 | } |
340 | | |
341 | 0 | void set_tag(bthread_tag_t tag) { _tag = tag; } |
342 | | |
343 | 0 | void set_pl(ParkingLot* pl) { _pl = pl; } |
344 | | |
345 | 0 | static bool is_main_task(TaskGroup* g, bthread_t tid) { |
346 | 0 | return g->_main_tid == tid; |
347 | 0 | } |
348 | | |
349 | | TaskMeta* _cur_meta{NULL}; |
350 | | |
351 | | // the control that this group belongs to |
352 | | TaskControl* _control{NULL}; |
353 | | int _num_nosignal{0}; |
354 | | int _nsignaled{0}; |
355 | | AtomicCPUTimeStat _cpu_time_stat; |
356 | | // last thread cpu clock |
357 | | int64_t _last_cpu_clock_ns{0}; |
358 | | |
359 | | size_t _nswitch{0}; |
360 | | RemainedFn _last_context_remained{NULL}; |
361 | | void* _last_context_remained_arg{NULL}; |
362 | | |
363 | | ParkingLot* _pl{NULL}; |
364 | | #ifndef BTHREAD_DONT_SAVE_PARKING_STATE |
365 | | ParkingLot::State _last_pl_state; |
366 | | #endif |
367 | | size_t _steal_seed{butil::fast_rand()}; |
368 | | size_t _steal_offset{prime_offset(_steal_seed)}; |
369 | | ContextualStack* _main_stack{NULL}; |
370 | | bthread_t _main_tid{INVALID_BTHREAD}; |
371 | | WorkStealingQueue<bthread_t> _rq; |
372 | | RemoteTaskQueue _remote_rq; |
373 | | int _remote_num_nosignal{0}; |
374 | | int _remote_nsignaled{0}; |
375 | | |
376 | | int _sched_recursive_guard{0}; |
377 | | // tag of this taskgroup |
378 | | bthread_tag_t _tag{BTHREAD_TAG_DEFAULT}; |
379 | | |
380 | | // Worker thread id. |
381 | | pthread_t _tid{}; |
382 | | }; |
383 | | |
384 | | } // namespace bthread |
385 | | |
386 | | #include "task_group_inl.h" |
387 | | |
388 | | #endif // BTHREAD_TASK_GROUP_H |