/src/brpc/src/bthread/task_group.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // bthread - An M:N threading library to make applications more concurrent. |
19 | | |
20 | | // Date: Tue Jul 10 17:40:58 CST 2012 |
21 | | |
22 | | #include <sys/types.h> |
23 | | #include <stddef.h> // size_t |
24 | | #include <gflags/gflags.h> |
25 | | #include "butil/compat.h" // OS_MACOSX |
26 | | #include "butil/macros.h" // ARRAY_SIZE |
27 | | #include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK |
28 | | #include "butil/fast_rand.h" |
29 | | #include "butil/unique_ptr.h" |
30 | | #include "butil/third_party/murmurhash3/murmurhash3.h" // fmix64 |
31 | | #include "butil/reloadable_flags.h" |
32 | | #include "bthread/errno.h" // ESTOP |
33 | | #include "bthread/butex.h" // butex_* |
34 | | #include "bthread/sys_futex.h" // futex_wake_private |
35 | | #include "bthread/processor.h" // cpu_relax |
36 | | #include "bthread/task_control.h" |
37 | | #include "bthread/task_group.h" |
38 | | #include "bthread/timer_thread.h" |
39 | | #include "bthread/bthread.h" |
40 | | |
41 | | #ifdef __x86_64__ |
42 | | #include <x86intrin.h> |
43 | | #endif // __x86_64__ |
44 | | |
45 | | #ifdef __ARM_NEON |
46 | | #include <arm_neon.h> |
47 | | #endif // __ARM_NEON |
48 | | |
49 | | namespace bthread { |
50 | | |
51 | | static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = { |
52 | | BTHREAD_STACKTYPE_UNKNOWN, 0, NULL, BTHREAD_TAG_INVALID, {0} }; |
53 | | |
54 | | DEFINE_bool(show_bthread_creation_in_vars, false, "When this flags is on, The time " |
55 | | "from bthread creation to first run will be recorded and shown in /vars"); |
56 | | BUTIL_VALIDATE_GFLAG(show_bthread_creation_in_vars, butil::PassValidate); |
57 | | |
58 | | DEFINE_bool(show_per_worker_usage_in_vars, false, |
59 | | "Show per-worker usage in /vars/bthread_per_worker_usage_<tid>"); |
60 | | BUTIL_VALIDATE_GFLAG(show_per_worker_usage_in_vars, butil::PassValidate); |
61 | | |
62 | | DEFINE_bool(bthread_enable_cpu_clock_stat, false, |
63 | | "Enable CPU clock statistics for bthread"); |
64 | | BUTIL_VALIDATE_GFLAG(bthread_enable_cpu_clock_stat, butil::PassValidate); |
65 | | |
66 | | BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group, NULL); |
67 | | // Sync with TaskMeta::local_storage when a bthread is created or destroyed. |
68 | | // During running, the two fields may be inconsistent, use tls_bls as the |
69 | | // groundtruth. |
70 | | BAIDU_VOLATILE_THREAD_LOCAL(LocalStorage, tls_bls, BTHREAD_LOCAL_STORAGE_INITIALIZER); |
71 | | |
72 | | // defined in bthread/key.cpp |
73 | | extern void return_keytable(bthread_keytable_pool_t*, KeyTable*); |
74 | | |
75 | | // [Hacky] This is a special TLS set by bthread-rpc privately... to save |
76 | | // overhead of creation keytable, may be removed later. |
77 | | BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, NULL); |
78 | | |
79 | | const TaskStatistics EMPTY_STAT = { 0, 0, 0 }; |
80 | | |
81 | | void* (*g_create_span_func)() = NULL; |
82 | | |
83 | 0 | void* run_create_span_func() { |
84 | 0 | if (g_create_span_func) { |
85 | 0 | return g_create_span_func(); |
86 | 0 | } |
87 | 0 | return BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls).rpcz_parent_span; |
88 | 0 | } |
89 | | |
90 | 0 | AtomicInteger128::Value AtomicInteger128::load() const { |
91 | 0 | #if __x86_64__ || __ARM_NEON |
92 | | // Supress compiler warning. |
93 | 0 | (void)_mutex; |
94 | 0 | #endif // __x86_64__ || __ARM_NEON |
95 | |
|
96 | 0 | #if __x86_64__ || __ARM_NEON |
97 | 0 | #ifdef __x86_64__ |
98 | 0 | __m128i value = _mm_load_si128(reinterpret_cast<const __m128i*>(&_value)); |
99 | | #else // __ARM_NEON |
100 | | int64x2_t value = vld1q_s64(reinterpret_cast<const int64_t*>(&_value)); |
101 | | #endif // __x86_64__ |
102 | 0 | return {value[0], value[1]}; |
103 | | #else // __x86_64__ || __ARM_NEON |
104 | | // RISC-V and other architectures use mutex fallback |
105 | | BAIDU_SCOPED_LOCK(const_cast<FastPthreadMutex&>(_mutex)); |
106 | | return _value; |
107 | | #endif // __x86_64__ || __ARM_NEON |
108 | 0 | } |
109 | | |
110 | 0 | void AtomicInteger128::store(Value value) { |
111 | 0 | #if __x86_64__ |
112 | 0 | __m128i v = _mm_load_si128(reinterpret_cast<__m128i*>(&value)); |
113 | 0 | _mm_store_si128(reinterpret_cast<__m128i*>(&_value), v); |
114 | | #elif __ARM_NEON |
115 | | int64x2_t v = vld1q_s64(reinterpret_cast<int64_t*>(&value)); |
116 | | vst1q_s64(reinterpret_cast<int64_t*>(&_value), v); |
117 | | #else |
118 | | // RISC-V and other architectures use mutex fallback |
119 | | BAIDU_SCOPED_LOCK(const_cast<FastPthreadMutex&>(_mutex)); |
120 | | _value = value; |
121 | | #endif // __x86_64__ || __ARM_NEON |
122 | 0 | } |
123 | | |
124 | | |
125 | 0 | int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) { |
126 | 0 | TaskMeta* const m = address_meta(tid); |
127 | 0 | if (m != NULL) { |
128 | 0 | const uint32_t given_ver = get_version(tid); |
129 | 0 | BAIDU_SCOPED_LOCK(m->version_lock); |
130 | 0 | if (given_ver == *m->version_butex) { |
131 | 0 | *out = m->attr; |
132 | 0 | return 0; |
133 | 0 | } |
134 | 0 | } |
135 | 0 | errno = EINVAL; |
136 | 0 | return -1; |
137 | 0 | } |
138 | | |
139 | 0 | void TaskGroup::set_stopped(bthread_t tid) { |
140 | 0 | TaskMeta* const m = address_meta(tid); |
141 | 0 | if (m != NULL) { |
142 | 0 | const uint32_t given_ver = get_version(tid); |
143 | 0 | BAIDU_SCOPED_LOCK(m->version_lock); |
144 | 0 | if (given_ver == *m->version_butex) { |
145 | 0 | m->stop = true; |
146 | 0 | } |
147 | 0 | } |
148 | 0 | } |
149 | | |
150 | 0 | bool TaskGroup::is_stopped(bthread_t tid) { |
151 | 0 | TaskMeta* const m = address_meta(tid); |
152 | 0 | if (m != NULL) { |
153 | 0 | const uint32_t given_ver = get_version(tid); |
154 | 0 | BAIDU_SCOPED_LOCK(m->version_lock); |
155 | 0 | if (given_ver == *m->version_butex) { |
156 | 0 | return m->stop; |
157 | 0 | } |
158 | 0 | } |
159 | | // If the tid does not exist or version does not match, it's intuitive |
160 | | // to treat the thread as "stopped". |
161 | 0 | return true; |
162 | 0 | } |
163 | | |
164 | 0 | bool TaskGroup::wait_task(bthread_t* tid) { |
165 | 0 | do { |
166 | 0 | #ifndef BTHREAD_DONT_SAVE_PARKING_STATE |
167 | 0 | if (_last_pl_state.stopped()) { |
168 | 0 | return false; |
169 | 0 | } |
170 | 0 | _pl->wait(_last_pl_state); |
171 | 0 | if (steal_task(tid)) { |
172 | 0 | return true; |
173 | 0 | } |
174 | | #else |
175 | | const ParkingLot::State st = _pl->get_state(); |
176 | | if (st.stopped()) { |
177 | | return false; |
178 | | } |
179 | | if (steal_task(tid)) { |
180 | | return true; |
181 | | } |
182 | | _pl->wait(st); |
183 | | #endif |
184 | 0 | } while (true); |
185 | 0 | } |
186 | | |
187 | 0 | static double get_cumulated_cputime_from_this(void* arg) { |
188 | 0 | return static_cast<TaskGroup*>(arg)->cumulated_cputime_ns() / 1000000000.0; |
189 | 0 | } |
190 | | |
191 | 0 | int64_t TaskGroup::cumulated_cputime_ns() const { |
192 | 0 | CPUTimeStat cpu_time_stat = _cpu_time_stat.load(); |
193 | | // Add the elapsed time of running bthread. |
194 | 0 | int64_t cumulated_cputime_ns = cpu_time_stat.cumulated_cputime_ns(); |
195 | 0 | if (!cpu_time_stat.is_main_task()) { |
196 | 0 | cumulated_cputime_ns += butil::cpuwide_time_ns() - cpu_time_stat.last_run_ns(); |
197 | 0 | } |
198 | 0 | return cumulated_cputime_ns; |
199 | 0 | } |
200 | | |
201 | 0 | void TaskGroup::run_main_task() { |
202 | 0 | bvar::PassiveStatus<double> cumulated_cputime( |
203 | 0 | get_cumulated_cputime_from_this, this); |
204 | 0 | std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar; |
205 | |
|
206 | 0 | TaskGroup* dummy = this; |
207 | 0 | bthread_t tid; |
208 | 0 | while (wait_task(&tid)) { |
209 | 0 | sched_to(&dummy, tid); |
210 | 0 | DCHECK_EQ(this, dummy); |
211 | 0 | DCHECK_EQ(_cur_meta->stack, _main_stack); |
212 | 0 | if (_cur_meta->tid != _main_tid) { |
213 | 0 | task_runner(1/*skip remained*/); |
214 | 0 | } |
215 | 0 | if (FLAGS_show_per_worker_usage_in_vars && !usage_bvar) { |
216 | 0 | char name[32]; |
217 | | #if defined(OS_MACOSX) |
218 | | snprintf(name, sizeof(name), "bthread_worker_usage_%" PRIu64, |
219 | | pthread_numeric_id()); |
220 | | #else |
221 | 0 | snprintf(name, sizeof(name), "bthread_worker_usage_%ld", |
222 | 0 | (long)syscall(SYS_gettid)); |
223 | 0 | #endif |
224 | 0 | usage_bvar.reset(new bvar::PerSecond<bvar::PassiveStatus<double> > |
225 | 0 | (name, &cumulated_cputime, 1)); |
226 | 0 | } |
227 | 0 | } |
228 | | // Don't forget to add elapse of last wait_task. |
229 | 0 | current_task()->stat.cputime_ns += |
230 | 0 | butil::cpuwide_time_ns() - _cpu_time_stat.load_unsafe().last_run_ns(); |
231 | 0 | } |
232 | | |
233 | | TaskGroup::TaskGroup(TaskControl* c) |
234 | 0 | : _control(c) { |
235 | 0 | CHECK(c); |
236 | 0 | } |
237 | | |
238 | 0 | TaskGroup::~TaskGroup() { |
239 | 0 | if (_main_tid) { |
240 | 0 | TaskMeta* m = address_meta(_main_tid); |
241 | 0 | CHECK(_main_stack == m->stack); |
242 | | #ifdef BUTIL_USE_ASAN |
243 | | _main_stack->storage.bottom = NULL; |
244 | | _main_stack->storage.stacksize = 0; |
245 | | #endif // BUTIL_USE_ASAN |
246 | 0 | return_stack(m->release_stack()); |
247 | 0 | return_resource(get_slot(_main_tid)); |
248 | 0 | _main_tid = 0; |
249 | 0 | } |
250 | 0 | } |
251 | | |
252 | | #ifdef BUTIL_USE_ASAN |
253 | | int PthreadAttrGetStack(void*& stack_addr, size_t& stack_size) { |
254 | | #if defined(OS_MACOSX) |
255 | | stack_addr = pthread_get_stackaddr_np(pthread_self()); |
256 | | stack_size = pthread_get_stacksize_np(pthread_self()); |
257 | | return 0; |
258 | | #else |
259 | | pthread_attr_t attr; |
260 | | int rc = pthread_getattr_np(pthread_self(), &attr); |
261 | | if (0 != rc) { |
262 | | LOG(ERROR) << "Fail to get pthread attributes: " << berror(rc); |
263 | | return rc; |
264 | | } |
265 | | rc = pthread_attr_getstack(&attr, &stack_addr, &stack_size); |
266 | | if (0 != rc) { |
267 | | LOG(ERROR) << "Fail to get pthread stack: " << berror(rc); |
268 | | } |
269 | | pthread_attr_destroy(&attr); |
270 | | return rc; |
271 | | #endif // OS_MACOSX |
272 | | } |
273 | | #endif // BUTIL_USE_ASAN |
274 | | |
275 | 0 | int TaskGroup::init(size_t runqueue_capacity) { |
276 | 0 | if (_rq.init(runqueue_capacity) != 0) { |
277 | 0 | LOG(FATAL) << "Fail to init _rq"; |
278 | 0 | return -1; |
279 | 0 | } |
280 | 0 | if (_remote_rq.init(runqueue_capacity / 2) != 0) { |
281 | 0 | LOG(FATAL) << "Fail to init _remote_rq"; |
282 | 0 | return -1; |
283 | 0 | } |
284 | | |
285 | | #ifdef BUTIL_USE_ASAN |
286 | | void* stack_addr = NULL; |
287 | | size_t stack_size = 0; |
288 | | if (0 != PthreadAttrGetStack(stack_addr, stack_size)) { |
289 | | return -1; |
290 | | } |
291 | | #endif // BUTIL_USE_ASAN |
292 | | |
293 | 0 | ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL); |
294 | 0 | if (NULL == stk) { |
295 | 0 | LOG(FATAL) << "Fail to get main stack container"; |
296 | 0 | return -1; |
297 | 0 | } |
298 | 0 | butil::ResourceId<TaskMeta> slot; |
299 | 0 | TaskMeta* m = butil::get_resource<TaskMeta>(&slot); |
300 | 0 | if (NULL == m) { |
301 | 0 | LOG(FATAL) << "Fail to get TaskMeta"; |
302 | 0 | return -1; |
303 | 0 | } |
304 | 0 | m->sleep_failed = false; |
305 | 0 | m->stop = false; |
306 | 0 | m->interrupted = false; |
307 | 0 | m->about_to_quit = false; |
308 | 0 | m->fn = NULL; |
309 | 0 | m->arg = NULL; |
310 | 0 | m->local_storage = LOCAL_STORAGE_INIT; |
311 | 0 | m->cpuwide_start_ns = butil::cpuwide_time_ns(); |
312 | 0 | m->stat = EMPTY_STAT; |
313 | 0 | m->attr = BTHREAD_ATTR_TASKGROUP; |
314 | 0 | m->tid = make_tid(*m->version_butex, slot); |
315 | 0 | m->set_stack(stk); |
316 | |
|
317 | | #ifdef BUTIL_USE_ASAN |
318 | | stk->storage.bottom = stack_addr; |
319 | | stk->storage.stacksize = stack_size; |
320 | | // No guard size required for ASan. |
321 | | #endif // BUTIL_USE_ASAN |
322 | |
|
323 | 0 | _cur_meta = m; |
324 | 0 | _main_tid = m->tid; |
325 | 0 | _main_stack = stk; |
326 | |
|
327 | 0 | CPUTimeStat cpu_time_stat; |
328 | 0 | cpu_time_stat.set_last_run_ns(m->cpuwide_start_ns, true); |
329 | 0 | _cpu_time_stat.store(cpu_time_stat); |
330 | 0 | _last_cpu_clock_ns = 0; |
331 | |
|
332 | 0 | return 0; |
333 | 0 | } |
334 | | |
335 | | #ifdef BUTIL_USE_ASAN |
336 | | void TaskGroup::asan_task_runner(intptr_t) { |
337 | | // This is a new thread, and it doesn't have the fake stack yet. ASan will |
338 | | // create it lazily, for now just pass NULL. |
339 | | internal::FinishSwitchFiber(NULL); |
340 | | task_runner(0); |
341 | | } |
342 | | #endif // BUTIL_USE_ASAN |
343 | | |
344 | 0 | void TaskGroup::task_runner(intptr_t skip_remained) { |
345 | | // NOTE: tls_task_group is volatile since tasks are moved around |
346 | | // different groups. |
347 | 0 | TaskGroup* g = tls_task_group; |
348 | | #ifdef BRPC_BTHREAD_TRACER |
349 | | TaskTracer::set_running_status(g->tid(), g->_cur_meta); |
350 | | #endif // BRPC_BTHREAD_TRACER |
351 | |
|
352 | 0 | if (!skip_remained) { |
353 | 0 | while (g->_last_context_remained) { |
354 | 0 | RemainedFn fn = g->_last_context_remained; |
355 | 0 | g->_last_context_remained = NULL; |
356 | 0 | fn(g->_last_context_remained_arg); |
357 | 0 | g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); |
358 | 0 | } |
359 | |
|
360 | 0 | #ifndef NDEBUG |
361 | 0 | --g->_sched_recursive_guard; |
362 | 0 | #endif |
363 | 0 | } |
364 | |
|
365 | 0 | do { |
366 | | // A task can be stopped before it gets running, in which case |
367 | | // we may skip user function, but that may confuse user: |
368 | | // Most tasks have variables to remember running result of the task, |
369 | | // which is often initialized to values indicating success. If an |
370 | | // user function is never called, the variables will be unchanged |
371 | | // however they'd better reflect failures because the task is stopped |
372 | | // abnormally. |
373 | | |
374 | | // Meta and identifier of the task is persistent in this run. |
375 | 0 | TaskMeta* const m = g->_cur_meta; |
376 | |
|
377 | 0 | if (FLAGS_show_bthread_creation_in_vars) { |
378 | | // NOTE: the thread triggering exposure of pending time may spend |
379 | | // considerable time because a single bvar::LatencyRecorder |
380 | | // contains many bvar. |
381 | 0 | g->_control->exposed_pending_time() << |
382 | 0 | (butil::cpuwide_time_ns() - m->cpuwide_start_ns) / 1000L; |
383 | 0 | } |
384 | | |
385 | | // Not catch exceptions except ExitException which is for implementing |
386 | | // bthread_exit(). User code is intended to crash when an exception is |
387 | | // not caught explicitly. This is consistent with other threading |
388 | | // libraries. |
389 | 0 | void* thread_return; |
390 | 0 | try { |
391 | 0 | thread_return = m->fn(m->arg); |
392 | 0 | } catch (ExitException& e) { |
393 | 0 | thread_return = e.value(); |
394 | 0 | } |
395 | | |
396 | | // TODO: Save thread_return |
397 | 0 | (void)thread_return; |
398 | | |
399 | | // Logging must be done before returning the keytable, since the logging lib |
400 | | // use bthread local storage internally, or will cause memory leak. |
401 | | // FIXME: the time from quiting fn to here is not counted into cputime |
402 | 0 | if (m->attr.flags & BTHREAD_LOG_START_AND_FINISH) { |
403 | 0 | LOG(INFO) << "Finished bthread " << m->tid << ", cputime=" |
404 | 0 | << m->stat.cputime_ns / 1000000.0 << "ms"; |
405 | 0 | } |
406 | | |
407 | | // Clean tls variables, must be done before changing version_butex |
408 | | // otherwise another thread just joined this thread may not see side |
409 | | // effects of destructing tls variables. |
410 | 0 | LocalStorage* tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls); |
411 | 0 | KeyTable* kt = tls_bls_ptr->keytable; |
412 | 0 | if (kt != NULL) { |
413 | 0 | return_keytable(m->attr.keytable_pool, kt); |
414 | | // After deletion: tls may be set during deletion. |
415 | 0 | tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls); |
416 | 0 | tls_bls_ptr->keytable = NULL; |
417 | 0 | m->local_storage.keytable = NULL; // optional |
418 | 0 | } |
419 | | |
420 | | // During running the function in TaskMeta and deleting the KeyTable in |
421 | | // return_KeyTable, the group is probably changed. |
422 | 0 | g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); |
423 | | |
424 | | // Increase the version and wake up all joiners, if resulting version |
425 | | // is 0, change it to 1 to make bthread_t never be 0. Any access |
426 | | // or join to the bthread after changing version will be rejected. |
427 | | // The spinlock is for visibility of TaskGroup::get_attr. |
428 | | #ifdef BRPC_BTHREAD_TRACER |
429 | | bool tracing = false; |
430 | | #endif // BRPC_BTHREAD_TRACER |
431 | 0 | { |
432 | 0 | BAIDU_SCOPED_LOCK(m->version_lock); |
433 | | #ifdef BRPC_BTHREAD_TRACER |
434 | | tracing = TaskTracer::set_end_status_unsafe(m); |
435 | | #endif // BRPC_BTHREAD_TRACER |
436 | 0 | if (0 == ++*m->version_butex) { |
437 | 0 | ++*m->version_butex; |
438 | 0 | } |
439 | 0 | } |
440 | 0 | butex_wake_except(m->version_butex, 0); |
441 | |
|
442 | | #ifdef BRPC_BTHREAD_TRACER |
443 | | if (tracing) { |
444 | | // Wait for tracing completion. |
445 | | g->_control->_task_tracer.WaitForTracing(m); |
446 | | } |
447 | | g->_control->_task_tracer.set_status(TASK_STATUS_UNKNOWN, m); |
448 | | #endif // BRPC_BTHREAD_TRACER |
449 | |
|
450 | 0 | g->_control->_nbthreads << -1; |
451 | 0 | g->_control->tag_nbthreads(g->tag()) << -1; |
452 | 0 | g->set_remained(_release_last_context, m); |
453 | 0 | ending_sched(&g); |
454 | |
|
455 | 0 | } while (g->_cur_meta->tid != g->_main_tid); |
456 | | |
457 | | // Was called from a pthread and we don't have BTHREAD_STACKTYPE_PTHREAD |
458 | | // tasks to run, quit for more tasks. |
459 | 0 | } |
460 | | |
461 | 0 | void TaskGroup::_release_last_context(void* arg) { |
462 | 0 | TaskMeta* m = static_cast<TaskMeta*>(arg); |
463 | 0 | if (m->stack_type() != STACK_TYPE_PTHREAD) { |
464 | 0 | return_stack(m->release_stack()/*may be NULL*/); |
465 | 0 | } else { |
466 | | // it's _main_stack, don't return. |
467 | 0 | m->set_stack(NULL); |
468 | 0 | } |
469 | 0 | return_resource(get_slot(m->tid)); |
470 | 0 | } |
471 | | |
472 | | int TaskGroup::start_foreground(TaskGroup** pg, |
473 | | bthread_t* __restrict th, |
474 | | const bthread_attr_t* __restrict attr, |
475 | | void * (*fn)(void*), |
476 | 0 | void* __restrict arg) { |
477 | 0 | if (__builtin_expect(!fn, 0)) { |
478 | 0 | return EINVAL; |
479 | 0 | } |
480 | 0 | const int64_t start_ns = butil::cpuwide_time_ns(); |
481 | 0 | const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL); |
482 | 0 | butil::ResourceId<TaskMeta> slot; |
483 | 0 | TaskMeta* m = butil::get_resource(&slot); |
484 | 0 | if (BAIDU_UNLIKELY(NULL == m)) { |
485 | 0 | return ENOMEM; |
486 | 0 | } |
487 | 0 | CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL); |
488 | 0 | m->sleep_failed = false; |
489 | 0 | m->stop = false; |
490 | 0 | m->interrupted = false; |
491 | 0 | m->about_to_quit = false; |
492 | 0 | m->fn = fn; |
493 | 0 | m->arg = arg; |
494 | 0 | CHECK(m->stack == NULL); |
495 | 0 | m->attr = using_attr; |
496 | 0 | m->local_storage = LOCAL_STORAGE_INIT; |
497 | 0 | if (using_attr.flags & BTHREAD_INHERIT_SPAN) { |
498 | 0 | m->local_storage.rpcz_parent_span = run_create_span_func(); |
499 | 0 | } |
500 | 0 | m->cpuwide_start_ns = start_ns; |
501 | 0 | m->stat = EMPTY_STAT; |
502 | 0 | m->tid = make_tid(*m->version_butex, slot); |
503 | 0 | *th = m->tid; |
504 | 0 | if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) { |
505 | 0 | LOG(INFO) << "Started bthread " << m->tid; |
506 | 0 | } |
507 | |
|
508 | 0 | TaskGroup* g = *pg; |
509 | 0 | g->_control->_nbthreads << 1; |
510 | 0 | g->_control->tag_nbthreads(g->tag()) << 1; |
511 | | #ifdef BRPC_BTHREAD_TRACER |
512 | | g->_control->_task_tracer.set_status(TASK_STATUS_CREATED, m); |
513 | | #endif // BRPC_BTHREAD_TRACER |
514 | 0 | if (g->is_current_pthread_task()) { |
515 | | // never create foreground task in pthread. |
516 | 0 | g->ready_to_run(m, using_attr.flags & BTHREAD_NOSIGNAL); |
517 | 0 | } else { |
518 | | // NOSIGNAL affects current task, not the new task. |
519 | 0 | RemainedFn fn = NULL; |
520 | 0 | auto& cur_attr = g->_cur_meta->attr; |
521 | 0 | if (g->_control->_enable_priority_queue && cur_attr.flags & BTHREAD_GLOBAL_PRIORITY) { |
522 | 0 | fn = priority_to_run; |
523 | 0 | } else if (g->current_task()->about_to_quit) { |
524 | 0 | fn = ready_to_run_in_worker_ignoresignal; |
525 | 0 | } else { |
526 | 0 | fn = ready_to_run_in_worker; |
527 | 0 | } |
528 | 0 | ReadyToRunArgs args = { |
529 | 0 | g->tag(), g->_cur_meta, (bool)(using_attr.flags & BTHREAD_NOSIGNAL) |
530 | 0 | }; |
531 | 0 | g->set_remained(fn, &args); |
532 | 0 | sched_to(pg, m->tid); |
533 | 0 | } |
534 | 0 | return 0; |
535 | 0 | } |
536 | | |
537 | | template <bool REMOTE> |
538 | | int TaskGroup::start_background(bthread_t* __restrict th, |
539 | | const bthread_attr_t* __restrict attr, |
540 | | void * (*fn)(void*), |
541 | 0 | void* __restrict arg) { |
542 | 0 | if (__builtin_expect(!fn, 0)) { |
543 | 0 | return EINVAL; |
544 | 0 | } |
545 | 0 | const int64_t start_ns = butil::cpuwide_time_ns(); |
546 | 0 | const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL); |
547 | 0 | butil::ResourceId<TaskMeta> slot; |
548 | 0 | TaskMeta* m = butil::get_resource(&slot); |
549 | 0 | if (BAIDU_UNLIKELY(NULL == m)) { |
550 | 0 | return ENOMEM; |
551 | 0 | } |
552 | 0 | CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL); |
553 | 0 | m->sleep_failed = false; |
554 | 0 | m->stop = false; |
555 | 0 | m->interrupted = false; |
556 | 0 | m->about_to_quit = false; |
557 | 0 | m->fn = fn; |
558 | 0 | m->arg = arg; |
559 | 0 | CHECK(m->stack == NULL); |
560 | 0 | m->attr = using_attr; |
561 | 0 | m->local_storage = LOCAL_STORAGE_INIT; |
562 | 0 | if (using_attr.flags & BTHREAD_INHERIT_SPAN) { |
563 | 0 | m->local_storage.rpcz_parent_span = run_create_span_func(); |
564 | 0 | } |
565 | 0 | m->cpuwide_start_ns = start_ns; |
566 | 0 | m->stat = EMPTY_STAT; |
567 | 0 | m->tid = make_tid(*m->version_butex, slot); |
568 | 0 | *th = m->tid; |
569 | 0 | if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) { |
570 | 0 | LOG(INFO) << "Started bthread " << m->tid; |
571 | 0 | } |
572 | 0 | _control->_nbthreads << 1; |
573 | 0 | _control->tag_nbthreads(tag()) << 1; |
574 | | #ifdef BRPC_BTHREAD_TRACER |
575 | | _control->_task_tracer.set_status(TASK_STATUS_CREATED, m); |
576 | | #endif // BRPC_BTHREAD_TRACER |
577 | 0 | if (REMOTE) { |
578 | 0 | ready_to_run_remote(m, (using_attr.flags & BTHREAD_NOSIGNAL)); |
579 | 0 | } else { |
580 | 0 | ready_to_run(m, (using_attr.flags & BTHREAD_NOSIGNAL)); |
581 | 0 | } |
582 | 0 | return 0; |
583 | 0 | } Unexecuted instantiation: int bthread::TaskGroup::start_background<true>(unsigned long*, bthread_attr_t const*, void* (*)(void*), void*) Unexecuted instantiation: int bthread::TaskGroup::start_background<false>(unsigned long*, bthread_attr_t const*, void* (*)(void*), void*) |
584 | | |
585 | | // Explicit instantiations. |
586 | | template int |
587 | | TaskGroup::start_background<true>(bthread_t* __restrict th, |
588 | | const bthread_attr_t* __restrict attr, |
589 | | void * (*fn)(void*), |
590 | | void* __restrict arg); |
591 | | template int |
592 | | TaskGroup::start_background<false>(bthread_t* __restrict th, |
593 | | const bthread_attr_t* __restrict attr, |
594 | | void * (*fn)(void*), |
595 | | void* __restrict arg); |
596 | | |
597 | 0 | int TaskGroup::join(bthread_t tid, void** return_value) { |
598 | 0 | if (__builtin_expect(!tid, 0)) { // tid of bthread is never 0. |
599 | 0 | return EINVAL; |
600 | 0 | } |
601 | 0 | TaskMeta* m = address_meta(tid); |
602 | 0 | if (BAIDU_UNLIKELY(NULL == m)) { |
603 | | // The bthread is not created yet, this join is definitely wrong. |
604 | 0 | return EINVAL; |
605 | 0 | } |
606 | 0 | TaskGroup* g = tls_task_group; |
607 | 0 | if (g != NULL && g->current_tid() == tid) { |
608 | | // joining self causes indefinite waiting. |
609 | 0 | return EINVAL; |
610 | 0 | } |
611 | 0 | const uint32_t expected_version = get_version(tid); |
612 | 0 | while (*m->version_butex == expected_version) { |
613 | 0 | if (butex_wait(m->version_butex, expected_version, NULL) < 0 && |
614 | 0 | errno != EWOULDBLOCK && errno != EINTR) { |
615 | 0 | return errno; |
616 | 0 | } |
617 | 0 | } |
618 | 0 | if (return_value) { |
619 | 0 | *return_value = NULL; |
620 | 0 | } |
621 | 0 | return 0; |
622 | 0 | } |
623 | | |
624 | 0 | bool TaskGroup::exists(bthread_t tid) { |
625 | 0 | if (tid != 0) { // tid of bthread is never 0. |
626 | 0 | TaskMeta* m = address_meta(tid); |
627 | 0 | if (m != NULL) { |
628 | 0 | return (*m->version_butex == get_version(tid)); |
629 | 0 | } |
630 | 0 | } |
631 | 0 | return false; |
632 | 0 | } |
633 | | |
634 | 0 | TaskStatistics TaskGroup::main_stat() const { |
635 | 0 | TaskMeta* m = address_meta(_main_tid); |
636 | 0 | return m ? m->stat : EMPTY_STAT; |
637 | 0 | } |
638 | | |
639 | 0 | void TaskGroup::ending_sched(TaskGroup** pg) { |
640 | 0 | TaskGroup* g = *pg; |
641 | 0 | bthread_t next_tid = 0; |
642 | | // Find next task to run, if none, switch to idle thread of the group. |
643 | 0 | #ifndef BTHREAD_FAIR_WSQ |
644 | | // When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of |
645 | | // WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9% |
646 | | // to 2.9% |
647 | 0 | const bool popped = g->_rq.pop(&next_tid); |
648 | | #else |
649 | | const bool popped = g->_rq.steal(&next_tid); |
650 | | #endif |
651 | 0 | if (!popped && !g->steal_task(&next_tid)) { |
652 | | // Jump to main task if there's no task to run. |
653 | 0 | next_tid = g->_main_tid; |
654 | 0 | } |
655 | |
|
656 | 0 | TaskMeta* const cur_meta = g->_cur_meta; |
657 | 0 | TaskMeta* next_meta = address_meta(next_tid); |
658 | 0 | if (next_meta->stack == NULL) { |
659 | 0 | if (next_meta->stack_type() == cur_meta->stack_type()) { |
660 | | // Reuse the stack of the current ending task. |
661 | | // |
662 | | // also works with pthread_task scheduling to pthread_task, the |
663 | | // transfered stack is just _main_stack. |
664 | 0 | next_meta->set_stack(cur_meta->release_stack()); |
665 | 0 | } else { |
666 | | #ifdef BUTIL_USE_ASAN |
667 | | ContextualStack* stk = get_stack( |
668 | | next_meta->stack_type(), asan_task_runner); |
669 | | #else |
670 | 0 | ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner); |
671 | 0 | #endif // BUTIL_USE_ASAN |
672 | 0 | if (stk) { |
673 | 0 | next_meta->set_stack(stk); |
674 | 0 | } else { |
675 | | // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory, |
676 | | // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD. |
677 | | // This basically means that if we can't allocate stack, run |
678 | | // the task in pthread directly. |
679 | 0 | next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD; |
680 | 0 | next_meta->set_stack(g->_main_stack); |
681 | 0 | } |
682 | 0 | } |
683 | 0 | } |
684 | 0 | sched_to(pg, next_meta); |
685 | 0 | } |
686 | | |
687 | 0 | void TaskGroup::sched(TaskGroup** pg) { |
688 | 0 | TaskGroup* g = *pg; |
689 | 0 | bthread_t next_tid = 0; |
690 | | // Find next task to run, if none, switch to idle thread of the group. |
691 | 0 | #ifndef BTHREAD_FAIR_WSQ |
692 | 0 | const bool popped = g->_rq.pop(&next_tid); |
693 | | #else |
694 | | const bool popped = g->_rq.steal(&next_tid); |
695 | | #endif |
696 | 0 | if (!popped && !g->steal_task(&next_tid)) { |
697 | | // Jump to main task if there's no task to run. |
698 | 0 | next_tid = g->_main_tid; |
699 | 0 | } |
700 | 0 | sched_to(pg, next_tid); |
701 | 0 | } |
702 | | |
703 | | extern void CheckBthreadScheSafety(); |
704 | | |
705 | 0 | void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) { |
706 | 0 | TaskGroup* g = *pg; |
707 | 0 | #ifndef NDEBUG |
708 | 0 | if ((++g->_sched_recursive_guard) > 1) { |
709 | 0 | LOG(FATAL) << "Recursively(" << g->_sched_recursive_guard - 1 |
710 | 0 | << ") call sched_to(" << g << ")"; |
711 | 0 | } |
712 | 0 | #endif |
713 | | // Save errno so that errno is bthread-specific. |
714 | 0 | int saved_errno = errno; |
715 | 0 | void* saved_unique_user_ptr = tls_unique_user_ptr; |
716 | |
|
717 | 0 | TaskMeta* const cur_meta = g->_cur_meta; |
718 | 0 | int64_t now = butil::cpuwide_time_ns(); |
719 | 0 | CPUTimeStat cpu_time_stat = g->_cpu_time_stat.load_unsafe(); |
720 | 0 | int64_t elp_ns = now - cpu_time_stat.last_run_ns(); |
721 | 0 | cur_meta->stat.cputime_ns += elp_ns; |
722 | | // Update cpu_time_stat. |
723 | 0 | cpu_time_stat.set_last_run_ns(now, is_main_task(g, next_meta->tid)); |
724 | 0 | cpu_time_stat.add_cumulated_cputime_ns(elp_ns, is_main_task(g, cur_meta->tid)); |
725 | 0 | g->_cpu_time_stat.store(cpu_time_stat); |
726 | |
|
727 | 0 | if (FLAGS_bthread_enable_cpu_clock_stat) { |
728 | 0 | const int64_t cpu_thread_time = butil::cputhread_time_ns(); |
729 | 0 | if (g->_last_cpu_clock_ns != 0) { |
730 | 0 | cur_meta->stat.cpu_usage_ns += cpu_thread_time - g->_last_cpu_clock_ns; |
731 | 0 | } |
732 | 0 | g->_last_cpu_clock_ns = cpu_thread_time; |
733 | 0 | } else { |
734 | 0 | g->_last_cpu_clock_ns = 0; |
735 | 0 | } |
736 | |
|
737 | 0 | ++cur_meta->stat.nswitch; |
738 | 0 | ++ g->_nswitch; |
739 | | // Switch to the task |
740 | 0 | if (__builtin_expect(next_meta != cur_meta, 1)) { |
741 | 0 | g->_cur_meta = next_meta; |
742 | | // Switch tls_bls |
743 | 0 | cur_meta->local_storage = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls); |
744 | 0 | BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_bls, next_meta->local_storage); |
745 | | |
746 | | // Logging must be done after switching the local storage, since the logging lib |
747 | | // use bthread local storage internally, or will cause memory leak. |
748 | 0 | if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) || |
749 | 0 | (next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) { |
750 | 0 | LOG(INFO) << "Switch bthread: " << cur_meta->tid << " -> " |
751 | 0 | << next_meta->tid; |
752 | 0 | } |
753 | |
|
754 | 0 | if (cur_meta->stack != NULL) { |
755 | 0 | if (next_meta->stack != cur_meta->stack) { |
756 | 0 | CheckBthreadScheSafety(); |
757 | | #ifdef BRPC_BTHREAD_TRACER |
758 | | g->_control->_task_tracer.set_status(TASK_STATUS_JUMPING, cur_meta); |
759 | | g->_control->_task_tracer.set_status(TASK_STATUS_JUMPING, next_meta); |
760 | | #endif // BRPC_BTHREAD_TRACER |
761 | 0 | { |
762 | 0 | BTHREAD_SCOPED_ASAN_FIBER_SWITCHER(next_meta->stack->storage); |
763 | 0 | jump_stack(cur_meta->stack, next_meta->stack); |
764 | 0 | } |
765 | | // probably went to another group, need to assign g again. |
766 | 0 | g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); |
767 | | #ifdef BRPC_BTHREAD_TRACER |
768 | | TaskTracer::set_running_status(g->tid(), g->_cur_meta); |
769 | | #endif // BRPC_BTHREAD_TRACER |
770 | 0 | } |
771 | 0 | #ifndef NDEBUG |
772 | 0 | else { |
773 | | // else pthread_task is switching to another pthread_task, sc |
774 | | // can only equal when they're both _main_stack |
775 | 0 | CHECK(cur_meta->stack == g->_main_stack); |
776 | 0 | } |
777 | 0 | #endif |
778 | 0 | } /* else because of ending_sched(including pthread_task->pthread_task). */ |
779 | | #ifdef BRPC_BTHREAD_TRACER |
780 | | else { |
781 | | // _cur_meta: TASK_STATUS_FIRST_READY -> TASK_STATUS_RUNNING. |
782 | | TaskTracer::set_running_status(g->tid(), g->_cur_meta); |
783 | | } |
784 | | #endif // BRPC_BTHREAD_TRACER |
785 | 0 | } else { |
786 | 0 | LOG(FATAL) << "bthread=" << g->current_tid() << " sched_to itself!"; |
787 | 0 | } |
788 | |
|
789 | 0 | while (g->_last_context_remained) { |
790 | 0 | RemainedFn fn = g->_last_context_remained; |
791 | 0 | g->_last_context_remained = NULL; |
792 | 0 | fn(g->_last_context_remained_arg); |
793 | 0 | g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); |
794 | 0 | } |
795 | | |
796 | | // Restore errno |
797 | 0 | errno = saved_errno; |
798 | | // tls_unique_user_ptr probably changed. |
799 | 0 | BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_unique_user_ptr, saved_unique_user_ptr); |
800 | |
|
801 | 0 | #ifndef NDEBUG |
802 | 0 | --g->_sched_recursive_guard; |
803 | 0 | #endif |
804 | 0 | *pg = g; |
805 | 0 | } |
806 | | |
807 | 0 | void TaskGroup::destroy_self() { |
808 | 0 | if (_control) { |
809 | 0 | _control->_destroy_group(this); |
810 | 0 | _control = NULL; |
811 | 0 | } else { |
812 | 0 | CHECK(false); |
813 | 0 | } |
814 | 0 | } |
815 | | |
816 | | |
817 | 0 | void TaskGroup::ready_to_run(TaskMeta* meta, bool nosignal) { |
818 | | #ifdef BRPC_BTHREAD_TRACER |
819 | | _control->_task_tracer.set_status(TASK_STATUS_READY, meta); |
820 | | #endif // BRPC_BTHREAD_TRACER |
821 | 0 | push_rq(meta->tid); |
822 | 0 | if (nosignal) { |
823 | 0 | ++_num_nosignal; |
824 | 0 | } else { |
825 | 0 | const int additional_signal = _num_nosignal; |
826 | 0 | _num_nosignal = 0; |
827 | 0 | _nsignaled += 1 + additional_signal; |
828 | 0 | _control->signal_task(1 + additional_signal, _tag); |
829 | 0 | } |
830 | 0 | } |
831 | | |
832 | 0 | void TaskGroup::flush_nosignal_tasks() { |
833 | 0 | const int val = _num_nosignal; |
834 | 0 | if (val) { |
835 | 0 | _num_nosignal = 0; |
836 | 0 | _nsignaled += val; |
837 | 0 | _control->signal_task(val, _tag); |
838 | 0 | } |
839 | 0 | } |
840 | | |
841 | 0 | void TaskGroup::ready_to_run_remote(TaskMeta* meta, bool nosignal) { |
842 | | #ifdef BRPC_BTHREAD_TRACER |
843 | | _control->_task_tracer.set_status(TASK_STATUS_READY, meta); |
844 | | #endif // BRPC_BTHREAD_TRACER |
845 | 0 | _remote_rq._mutex.lock(); |
846 | 0 | while (!_remote_rq.push_locked(meta->tid)) { |
847 | 0 | flush_nosignal_tasks_remote_locked(_remote_rq._mutex); |
848 | 0 | LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity=" |
849 | 0 | << _remote_rq.capacity(); |
850 | 0 | ::usleep(1000); |
851 | 0 | _remote_rq._mutex.lock(); |
852 | 0 | } |
853 | 0 | if (nosignal) { |
854 | 0 | ++_remote_num_nosignal; |
855 | 0 | _remote_rq._mutex.unlock(); |
856 | 0 | } else { |
857 | 0 | const int additional_signal = _remote_num_nosignal; |
858 | 0 | _remote_num_nosignal = 0; |
859 | 0 | _remote_nsignaled += 1 + additional_signal; |
860 | 0 | _remote_rq._mutex.unlock(); |
861 | 0 | _control->signal_task(1 + additional_signal, _tag); |
862 | 0 | } |
863 | 0 | } |
864 | | |
865 | 0 | void TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) { |
866 | 0 | const int val = _remote_num_nosignal; |
867 | 0 | if (!val) { |
868 | 0 | locked_mutex.unlock(); |
869 | 0 | return; |
870 | 0 | } |
871 | 0 | _remote_num_nosignal = 0; |
872 | 0 | _remote_nsignaled += val; |
873 | 0 | locked_mutex.unlock(); |
874 | 0 | _control->signal_task(val, _tag); |
875 | 0 | } |
876 | | |
877 | 0 | void TaskGroup::ready_to_run_general(TaskMeta* meta, bool nosignal) { |
878 | 0 | if (tls_task_group == this) { |
879 | 0 | return ready_to_run(meta, nosignal); |
880 | 0 | } |
881 | 0 | return ready_to_run_remote(meta, nosignal); |
882 | 0 | } |
883 | | |
884 | 0 | void TaskGroup::flush_nosignal_tasks_general() { |
885 | 0 | if (tls_task_group == this) { |
886 | 0 | return flush_nosignal_tasks(); |
887 | 0 | } |
888 | 0 | return flush_nosignal_tasks_remote(); |
889 | 0 | } |
890 | | |
891 | 0 | void TaskGroup::ready_to_run_in_worker(void* args_in) { |
892 | 0 | ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in); |
893 | 0 | return tls_task_group->ready_to_run(args->meta, args->nosignal); |
894 | 0 | } |
895 | | |
896 | 0 | void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) { |
897 | 0 | ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in); |
898 | | #ifdef BRPC_BTHREAD_TRACER |
899 | | tls_task_group->_control->_task_tracer.set_status( |
900 | | TASK_STATUS_READY, args->meta); |
901 | | #endif // BRPC_BTHREAD_TRACER |
902 | 0 | return tls_task_group->push_rq(args->meta->tid); |
903 | 0 | } |
904 | | |
905 | 0 | void TaskGroup::priority_to_run(void* args_in) { |
906 | 0 | ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in); |
907 | | #ifdef BRPC_BTHREAD_TRACER |
908 | | tls_task_group->_control->_task_tracer.set_status( |
909 | | TASK_STATUS_READY, args->meta); |
910 | | #endif // BRPC_BTHREAD_TRACER |
911 | 0 | return tls_task_group->control()->push_priority_queue(args->tag, args->meta->tid); |
912 | 0 | } |
913 | | |
914 | | struct SleepArgs { |
915 | | uint64_t timeout_us; |
916 | | bthread_t tid; |
917 | | TaskMeta* meta; |
918 | | TaskGroup* group; |
919 | | }; |
920 | | |
921 | 0 | static void ready_to_run_from_timer_thread(void* arg) { |
922 | 0 | CHECK(tls_task_group == NULL); |
923 | 0 | const SleepArgs* e = static_cast<const SleepArgs*>(arg); |
924 | 0 | auto g = e->group; |
925 | 0 | auto tag = g->tag(); |
926 | 0 | g->control()->choose_one_group(tag)->ready_to_run_remote(e->meta); |
927 | 0 | } |
928 | | |
929 | 0 | void TaskGroup::_add_sleep_event(void* void_args) { |
930 | | // Must copy SleepArgs. After calling TimerThread::schedule(), previous |
931 | | // thread may be stolen by a worker immediately and the on-stack SleepArgs |
932 | | // will be gone. |
933 | 0 | SleepArgs e = *static_cast<SleepArgs*>(void_args); |
934 | 0 | TaskGroup* g = e.group; |
935 | | #ifdef BRPC_BTHREAD_TRACER |
936 | | g->_control->_task_tracer.set_status(TASK_STATUS_SUSPENDED, e.meta); |
937 | | #endif // BRPC_BTHREAD_TRACER |
938 | |
|
939 | 0 | TimerThread::TaskId sleep_id; |
940 | 0 | sleep_id = get_global_timer_thread()->schedule( |
941 | 0 | ready_to_run_from_timer_thread, void_args, |
942 | 0 | butil::microseconds_from_now(e.timeout_us)); |
943 | |
|
944 | 0 | if (!sleep_id) { |
945 | 0 | e.meta->sleep_failed = true; |
946 | | // Fail to schedule timer, go back to previous thread. |
947 | 0 | g->ready_to_run(e.meta); |
948 | 0 | return; |
949 | 0 | } |
950 | | |
951 | | // Set TaskMeta::current_sleep which is for interruption. |
952 | 0 | const uint32_t given_ver = get_version(e.tid); |
953 | 0 | { |
954 | 0 | BAIDU_SCOPED_LOCK(e.meta->version_lock); |
955 | 0 | if (given_ver == *e.meta->version_butex && !e.meta->interrupted) { |
956 | 0 | e.meta->current_sleep = sleep_id; |
957 | 0 | return; |
958 | 0 | } |
959 | 0 | } |
960 | | // The thread is stopped or interrupted. |
961 | | // interrupt() always sees that current_sleep == 0. It will not schedule |
962 | | // the calling thread. The race is between current thread and timer thread. |
963 | 0 | if (get_global_timer_thread()->unschedule(sleep_id) == 0) { |
964 | | // added to timer, previous thread may be already woken up by timer and |
965 | | // even stopped. It's safe to schedule previous thread when unschedule() |
966 | | // returns 0 which means "the not-run-yet sleep_id is removed". If the |
967 | | // sleep_id is running(returns 1), ready_to_run_in_worker() will |
968 | | // schedule previous thread as well. If sleep_id does not exist, |
969 | | // previous thread is scheduled by timer thread before and we don't |
970 | | // have to do it again. |
971 | 0 | g->ready_to_run(e.meta); |
972 | 0 | } |
973 | 0 | } |
974 | | |
975 | | // To be consistent with sys_usleep, set errno and return -1 on error. |
976 | 0 | int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) { |
977 | 0 | if (0 == timeout_us) { |
978 | 0 | yield(pg); |
979 | 0 | return 0; |
980 | 0 | } |
981 | 0 | TaskGroup* g = *pg; |
982 | | // We have to schedule timer after we switched to next bthread otherwise |
983 | | // the timer may wake up(jump to) current still-running context. |
984 | 0 | SleepArgs e = { timeout_us, g->current_tid(), g->current_task(), g }; |
985 | 0 | g->set_remained(_add_sleep_event, &e); |
986 | 0 | sched(pg); |
987 | 0 | g = *pg; |
988 | 0 | if (e.meta->sleep_failed) { |
989 | | // Fail to schedule timer, return error. |
990 | 0 | e.meta->sleep_failed = false; |
991 | 0 | errno = ESTOP; |
992 | 0 | return -1; |
993 | 0 | } |
994 | 0 | e.meta->current_sleep = 0; |
995 | 0 | if (e.meta->interrupted) { |
996 | | // Race with set and may consume multiple interruptions, which are OK. |
997 | 0 | e.meta->interrupted = false; |
998 | | // NOTE: setting errno to ESTOP is not necessary from bthread's |
999 | | // pespective, however many RPC code expects bthread_usleep to set |
1000 | | // errno to ESTOP when the thread is stopping, and print FATAL |
1001 | | // otherwise. To make smooth transitions, ESTOP is still set instead |
1002 | | // of EINTR when the thread is stopping. |
1003 | 0 | errno = (e.meta->stop ? ESTOP : EINTR); |
1004 | 0 | return -1; |
1005 | 0 | } |
1006 | 0 | return 0; |
1007 | 0 | } |
1008 | | |
1009 | | // Defined in butex.cpp |
1010 | | bool erase_from_butex_because_of_interruption(ButexWaiter* bw); |
1011 | | |
1012 | | static int interrupt_and_consume_waiters( |
1013 | 0 | bthread_t tid, ButexWaiter** pw, uint64_t* sleep_id) { |
1014 | 0 | TaskMeta* const m = TaskGroup::address_meta(tid); |
1015 | 0 | if (m == NULL) { |
1016 | 0 | return EINVAL; |
1017 | 0 | } |
1018 | 0 | const uint32_t given_ver = get_version(tid); |
1019 | 0 | BAIDU_SCOPED_LOCK(m->version_lock); |
1020 | 0 | if (given_ver == *m->version_butex) { |
1021 | 0 | *pw = m->current_waiter.exchange(NULL, butil::memory_order_acquire); |
1022 | 0 | *sleep_id = m->current_sleep; |
1023 | 0 | m->current_sleep = 0; // only one stopper gets the sleep_id |
1024 | 0 | m->interrupted = true; |
1025 | 0 | return 0; |
1026 | 0 | } |
1027 | 0 | return EINVAL; |
1028 | 0 | } |
1029 | | |
1030 | 0 | static int set_butex_waiter(bthread_t tid, ButexWaiter* w) { |
1031 | 0 | TaskMeta* const m = TaskGroup::address_meta(tid); |
1032 | 0 | if (m != NULL) { |
1033 | 0 | const uint32_t given_ver = get_version(tid); |
1034 | 0 | BAIDU_SCOPED_LOCK(m->version_lock); |
1035 | 0 | if (given_ver == *m->version_butex) { |
1036 | | // Release fence makes m->interrupted visible to butex_wait |
1037 | 0 | m->current_waiter.store(w, butil::memory_order_release); |
1038 | 0 | return 0; |
1039 | 0 | } |
1040 | 0 | } |
1041 | 0 | return EINVAL; |
1042 | 0 | } |
1043 | | |
1044 | | // The interruption is "persistent" compared to the ones caused by signals, |
1045 | | // namely if a bthread is interrupted when it's not blocked, the interruption |
1046 | | // is still remembered and will be checked at next blocking. This designing |
1047 | | // choice simplifies the implementation and reduces notification loss caused |
1048 | | // by race conditions. |
1049 | | // TODO: bthreads created by BTHREAD_ATTR_PTHREAD blocking on bthread_usleep() |
1050 | | // can't be interrupted. |
1051 | 0 | int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) { |
1052 | | // Consume current_waiter in the TaskMeta, wake it up then set it back. |
1053 | 0 | ButexWaiter* w = NULL; |
1054 | 0 | uint64_t sleep_id = 0; |
1055 | 0 | int rc = interrupt_and_consume_waiters(tid, &w, &sleep_id); |
1056 | 0 | if (rc) { |
1057 | 0 | return rc; |
1058 | 0 | } |
1059 | | // a bthread cannot wait on a butex and be sleepy at the same time. |
1060 | 0 | CHECK(!sleep_id || !w); |
1061 | 0 | if (w != NULL) { |
1062 | 0 | erase_from_butex_because_of_interruption(w); |
1063 | | // If butex_wait() already wakes up before we set current_waiter back, |
1064 | | // the function will spin until current_waiter becomes non-NULL. |
1065 | 0 | rc = set_butex_waiter(tid, w); |
1066 | 0 | if (rc) { |
1067 | 0 | LOG(FATAL) << "butex_wait should spin until setting back waiter"; |
1068 | 0 | return rc; |
1069 | 0 | } |
1070 | 0 | } else if (sleep_id != 0) { |
1071 | 0 | if (get_global_timer_thread()->unschedule(sleep_id) == 0) { |
1072 | 0 | TaskGroup* g = tls_task_group; |
1073 | 0 | TaskMeta* m = address_meta(tid); |
1074 | 0 | if (g) { |
1075 | 0 | g->ready_to_run(m); |
1076 | 0 | } else { |
1077 | 0 | if (!c) { |
1078 | 0 | return EINVAL; |
1079 | 0 | } |
1080 | 0 | c->choose_one_group(tag)->ready_to_run_remote(m); |
1081 | 0 | } |
1082 | 0 | } |
1083 | 0 | } |
1084 | 0 | return 0; |
1085 | 0 | } |
1086 | | |
1087 | 0 | void TaskGroup::yield(TaskGroup** pg) { |
1088 | 0 | TaskGroup* g = *pg; |
1089 | 0 | ReadyToRunArgs args = { g->tag(), g->_cur_meta, false }; |
1090 | 0 | g->set_remained(ready_to_run_in_worker, &args); |
1091 | 0 | sched(pg); |
1092 | 0 | } |
1093 | | |
1094 | | void print_task(std::ostream& os, bthread_t tid, bool enable_trace, |
1095 | 0 | bool ignore_not_matched = false) { |
1096 | 0 | TaskMeta* const m = TaskGroup::address_meta(tid); |
1097 | 0 | if (m == NULL) { |
1098 | 0 | os << "bthread=" << tid << " : never existed\n"; |
1099 | 0 | return; |
1100 | 0 | } |
1101 | 0 | const uint32_t given_ver = get_version(tid); |
1102 | 0 | bool matched = false; |
1103 | 0 | bool stop = false; |
1104 | 0 | bool interrupted = false; |
1105 | 0 | bool about_to_quit = false; |
1106 | 0 | void* (*fn)(void*) = NULL; |
1107 | 0 | void* arg = NULL; |
1108 | 0 | bthread_attr_t attr = BTHREAD_ATTR_NORMAL; |
1109 | 0 | bool has_tls = false; |
1110 | 0 | int64_t cpuwide_start_ns = 0; |
1111 | 0 | TaskStatistics stat = {0, 0, 0}; |
1112 | 0 | TaskStatus status = TASK_STATUS_UNKNOWN; |
1113 | 0 | bool traced = false; |
1114 | 0 | pthread_t worker_tid{}; |
1115 | 0 | { |
1116 | 0 | BAIDU_SCOPED_LOCK(m->version_lock); |
1117 | 0 | if (given_ver == *m->version_butex) { |
1118 | 0 | matched = true; |
1119 | 0 | stop = m->stop; |
1120 | 0 | interrupted = m->interrupted; |
1121 | 0 | about_to_quit = m->about_to_quit; |
1122 | 0 | fn = m->fn; |
1123 | 0 | arg = m->arg; |
1124 | 0 | attr = m->attr; |
1125 | 0 | has_tls = m->local_storage.keytable; |
1126 | 0 | cpuwide_start_ns = m->cpuwide_start_ns; |
1127 | 0 | stat = m->stat; |
1128 | 0 | status = m->status; |
1129 | 0 | traced = m->traced; |
1130 | 0 | worker_tid = m->worker_tid; |
1131 | 0 | } |
1132 | 0 | } |
1133 | 0 | if (!matched) { |
1134 | 0 | if (!ignore_not_matched) { |
1135 | 0 | os << "bthread=" << tid << " : not exist now\n"; |
1136 | 0 | } |
1137 | 0 | } else { |
1138 | 0 | os << "bthread=" << tid << " :\nstop=" << stop |
1139 | 0 | << "\ninterrupted=" << interrupted |
1140 | 0 | << "\nabout_to_quit=" << about_to_quit |
1141 | 0 | << "\nfn=" << (void*)fn |
1142 | 0 | << "\narg=" << (void*)arg |
1143 | 0 | << "\nattr={stack_type=" << attr.stack_type |
1144 | 0 | << " flags=" << attr.flags |
1145 | 0 | << " specified_tag=" << attr.tag |
1146 | 0 | << " name=" << attr.name |
1147 | 0 | << " keytable_pool=" << attr.keytable_pool |
1148 | 0 | << "}\nhas_tls=" << has_tls |
1149 | 0 | << "\nuptime_ns=" << butil::cpuwide_time_ns() - cpuwide_start_ns |
1150 | 0 | << "\ncputime_ns=" << stat.cputime_ns |
1151 | 0 | << "\nnswitch=" << stat.nswitch |
1152 | | #ifdef BRPC_BTHREAD_TRACER |
1153 | | << "\nstatus=" << status |
1154 | | << "\ntraced=" << traced |
1155 | | << "\nworker_tid=" << worker_tid; |
1156 | | if (enable_trace) { |
1157 | | os << "\nbthread call stack:\n"; |
1158 | | stack_trace(os, tid); |
1159 | | } |
1160 | | os << "\n\n"; |
1161 | | #else |
1162 | 0 | << "\n\n"; |
1163 | 0 | (void)status;(void)traced;(void)worker_tid; |
1164 | 0 | #endif // BRPC_BTHREAD_TRACER |
1165 | 0 | } |
1166 | 0 | } |
1167 | | |
1168 | | } // namespace bthread |