/src/brpc/src/bthread/task_control.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // bthread - An M:N threading library to make applications more concurrent. |
19 | | |
20 | | // Date: Tue Jul 10 17:40:58 CST 2012 |
21 | | |
22 | | #include <pthread.h> |
23 | | #include <set> |
24 | | #include <regex> |
25 | | #include <sys/syscall.h> // SYS_gettid |
26 | | #include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK |
27 | | #include "butil/errno.h" // berror |
28 | | #include "butil/logging.h" |
29 | | #include "butil/threading/platform_thread.h" |
30 | | #include "butil/third_party/murmurhash3/murmurhash3.h" |
31 | | #include "bthread/sys_futex.h" // futex_wake_private |
32 | | #include "bthread/interrupt_pthread.h" |
33 | | #include "bthread/processor.h" // cpu_relax |
34 | | #include "bthread/task_group.h" // TaskGroup |
35 | | #include "bthread/task_control.h" |
36 | | #include "bthread/timer_thread.h" // global_timer_thread |
37 | | #include <gflags/gflags.h> |
38 | | #include "bthread/log.h" |
39 | | #if defined(OS_MACOSX) |
40 | | #include <mach/mach.h> |
41 | | #endif |
42 | | |
43 | | DEFINE_int32(task_group_delete_delay, 1, |
44 | | "delay deletion of TaskGroup for so many seconds"); |
45 | | DEFINE_int32(task_group_runqueue_capacity, 4096, |
46 | | "capacity of runqueue in each TaskGroup"); |
47 | | DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags"); |
48 | | DEFINE_bool(task_group_set_worker_name, true, |
49 | | "Whether to set the name of the worker thread"); |
50 | | DEFINE_string(cpu_set, "", |
51 | | "Set of CPUs to which cores are bound. " |
52 | | "for example, 0-3,5,7; default: disable"); |
53 | | |
54 | | namespace bthread { |
55 | | |
56 | | DEFINE_bool(parking_lot_no_signal_when_no_waiter, false, |
57 | | "ParkingLot doesn't signal when there is no waiter. " |
58 | | "In busy worker scenarios, signal overhead can be reduced."); |
59 | | DEFINE_bool(enable_bthread_priority_queue, false, "Whether to enable priority queue"); |
60 | | |
61 | | DECLARE_int32(bthread_concurrency); |
62 | | DECLARE_int32(bthread_min_concurrency); |
63 | | DECLARE_int32(bthread_parking_lot_of_each_tag); |
64 | | |
65 | | extern pthread_mutex_t g_task_control_mutex; |
66 | | extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; |
67 | | void (*g_worker_startfn)() = NULL; |
68 | | void (*g_tagged_worker_startfn)(bthread_tag_t) = NULL; |
69 | | |
70 | | // May be called in other modules to run startfn in non-worker pthreads. |
71 | 0 | void run_worker_startfn() { |
72 | 0 | if (g_worker_startfn) { |
73 | 0 | g_worker_startfn(); |
74 | 0 | } |
75 | 0 | } |
76 | | |
77 | 0 | void run_tagged_worker_startfn(bthread_tag_t tag) { |
78 | 0 | if (g_tagged_worker_startfn) { |
79 | 0 | g_tagged_worker_startfn(tag); |
80 | 0 | } |
81 | 0 | } |
82 | | |
83 | | struct WorkerThreadArgs { |
84 | 0 | WorkerThreadArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {} |
85 | | TaskControl* c; |
86 | | bthread_tag_t tag; |
87 | | }; |
88 | | |
89 | 0 | void* TaskControl::worker_thread(void* arg) { |
90 | 0 | run_worker_startfn(); |
91 | | #ifdef BAIDU_INTERNAL |
92 | | logging::ComlogInitializer comlog_initializer; |
93 | | #endif |
94 | |
|
95 | 0 | auto dummy = static_cast<WorkerThreadArgs*>(arg); |
96 | 0 | auto c = dummy->c; |
97 | 0 | auto tag = dummy->tag; |
98 | 0 | delete dummy; |
99 | 0 | run_tagged_worker_startfn(tag); |
100 | |
|
101 | 0 | TaskGroup* g = c->create_group(tag); |
102 | 0 | TaskStatistics stat; |
103 | 0 | if (NULL == g) { |
104 | 0 | LOG(ERROR) << "Fail to create TaskGroup in pthread=" << pthread_self(); |
105 | 0 | return NULL; |
106 | 0 | } |
107 | | |
108 | 0 | g->_tid = pthread_self(); |
109 | |
|
110 | 0 | int worker_id = c->_next_worker_id.fetch_add( |
111 | 0 | 1, butil::memory_order_relaxed); |
112 | 0 | if (!c->_cpus.empty()) { |
113 | 0 | bind_thread_to_cpu(pthread_self(), c->_cpus[worker_id % c->_cpus.size()]); |
114 | 0 | } |
115 | 0 | if (FLAGS_task_group_set_worker_name) { |
116 | 0 | std::string worker_thread_name = butil::string_printf( |
117 | 0 | "brpc_wkr:%d-%d", g->tag(), worker_id); |
118 | 0 | butil::PlatformThread::SetNameSimple(worker_thread_name.c_str()); |
119 | 0 | } |
120 | 0 | BT_VLOG << "Created worker=" << pthread_self() << " tid=" << g->_tid |
121 | 0 | << " bthread=" << g->main_tid() << " tag=" << g->tag(); |
122 | 0 | tls_task_group = g; |
123 | 0 | c->_nworkers << 1; |
124 | 0 | c->tag_nworkers(g->tag()) << 1; |
125 | |
|
126 | 0 | g->run_main_task(); |
127 | |
|
128 | 0 | stat = g->main_stat(); |
129 | 0 | BT_VLOG << "Destroying worker=" << pthread_self() << " bthread=" |
130 | 0 | << g->main_tid() << " idle=" << stat.cputime_ns / 1000000.0 |
131 | 0 | << "ms uptime=" << g->current_uptime_ns() / 1000000.0 << "ms"; |
132 | 0 | tls_task_group = NULL; |
133 | 0 | g->destroy_self(); |
134 | 0 | c->_nworkers << -1; |
135 | 0 | c->tag_nworkers(g->tag()) << -1; |
136 | 0 | return NULL; |
137 | 0 | } |
138 | | |
139 | 0 | TaskGroup* TaskControl::create_group(bthread_tag_t tag) { |
140 | 0 | TaskGroup* g = new (std::nothrow) TaskGroup(this); |
141 | 0 | if (NULL == g) { |
142 | 0 | LOG(FATAL) << "Fail to new TaskGroup"; |
143 | 0 | return NULL; |
144 | 0 | } |
145 | 0 | if (g->init(FLAGS_task_group_runqueue_capacity) != 0) { |
146 | 0 | LOG(ERROR) << "Fail to init TaskGroup"; |
147 | 0 | delete g; |
148 | 0 | return NULL; |
149 | 0 | } |
150 | 0 | if (_add_group(g, tag) != 0) { |
151 | 0 | delete g; |
152 | 0 | return NULL; |
153 | 0 | } |
154 | 0 | return g; |
155 | 0 | } |
156 | | |
157 | 0 | static void print_rq_sizes_in_the_tc(std::ostream &os, void *arg) { |
158 | 0 | TaskControl *tc = (TaskControl *)arg; |
159 | 0 | tc->print_rq_sizes(os); |
160 | 0 | } |
161 | | |
162 | 0 | static double get_cumulated_worker_time_from_this(void *arg) { |
163 | 0 | return static_cast<TaskControl*>(arg)->get_cumulated_worker_time(); |
164 | 0 | } |
165 | | |
166 | | struct CumulatedWithTagArgs { |
167 | 0 | CumulatedWithTagArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), t(_t) {} |
168 | | TaskControl* c; |
169 | | bthread_tag_t t; |
170 | | }; |
171 | | |
172 | 0 | static double get_cumulated_worker_time_from_this_with_tag(void* arg) { |
173 | 0 | auto a = static_cast<CumulatedWithTagArgs*>(arg); |
174 | 0 | auto c = a->c; |
175 | 0 | auto t = a->t; |
176 | 0 | return c->get_cumulated_worker_time(t); |
177 | 0 | } |
178 | | |
179 | 0 | static int64_t get_cumulated_switch_count_from_this(void *arg) { |
180 | 0 | return static_cast<TaskControl*>(arg)->get_cumulated_switch_count(); |
181 | 0 | } |
182 | | |
183 | 0 | static int64_t get_cumulated_signal_count_from_this(void *arg) { |
184 | 0 | return static_cast<TaskControl*>(arg)->get_cumulated_signal_count(); |
185 | 0 | } |
186 | | |
187 | | TaskControl::TaskControl() |
188 | | // NOTE: all fileds must be initialized before the vars. |
189 | 0 | : _tagged_ngroup(FLAGS_task_group_ntags) |
190 | 0 | , _tagged_groups(FLAGS_task_group_ntags) |
191 | 0 | , _init(false) |
192 | 0 | , _stop(false) |
193 | 0 | , _concurrency(0) |
194 | 0 | , _next_worker_id(0) |
195 | 0 | , _nworkers("bthread_worker_count") |
196 | 0 | , _pending_time(NULL) |
197 | | // Delay exposure of following two vars because they rely on TC which |
198 | | // is not initialized yet. |
199 | 0 | , _cumulated_worker_time(get_cumulated_worker_time_from_this, this) |
200 | 0 | , _worker_usage_second(&_cumulated_worker_time, 1) |
201 | 0 | , _cumulated_switch_count(get_cumulated_switch_count_from_this, this) |
202 | 0 | , _switch_per_second(&_cumulated_switch_count) |
203 | 0 | , _cumulated_signal_count(get_cumulated_signal_count_from_this, this) |
204 | 0 | , _signal_per_second(&_cumulated_signal_count) |
205 | 0 | , _status(print_rq_sizes_in_the_tc, this) |
206 | 0 | , _nbthreads("bthread_count") |
207 | 0 | , _enable_priority_queue(FLAGS_enable_bthread_priority_queue) |
208 | 0 | , _priority_queues(FLAGS_task_group_ntags) |
209 | 0 | , _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag) |
210 | 0 | , _tagged_pl(FLAGS_task_group_ntags) |
211 | 0 | {} |
212 | | |
213 | 0 | int TaskControl::init(int concurrency) { |
214 | 0 | if (_concurrency != 0) { |
215 | 0 | LOG(ERROR) << "Already initialized"; |
216 | 0 | return -1; |
217 | 0 | } |
218 | 0 | if (concurrency <= 0) { |
219 | 0 | LOG(ERROR) << "Invalid concurrency=" << concurrency; |
220 | 0 | return -1; |
221 | 0 | } |
222 | 0 | _concurrency = concurrency; |
223 | |
|
224 | 0 | if (!FLAGS_cpu_set.empty()) { |
225 | 0 | if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1) { |
226 | 0 | LOG(ERROR) << "invalid cpuset=" << FLAGS_cpu_set; |
227 | 0 | return -1; |
228 | 0 | } |
229 | 0 | } |
230 | | |
231 | | // task group group by tags |
232 | 0 | for (int i = 0; i < FLAGS_task_group_ntags; ++i) { |
233 | 0 | _tagged_ngroup[i].store(0, std::memory_order_relaxed); |
234 | 0 | auto tag_str = std::to_string(i); |
235 | 0 | _tagged_nworkers.push_back(new bvar::Adder<int64_t>("bthread_worker_count", tag_str)); |
236 | 0 | _tagged_cumulated_worker_time.push_back(new bvar::PassiveStatus<double>( |
237 | 0 | get_cumulated_worker_time_from_this_with_tag, new CumulatedWithTagArgs{this, i})); |
238 | 0 | _tagged_worker_usage_second.push_back(new bvar::PerSecond<bvar::PassiveStatus<double>>( |
239 | 0 | "bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1)); |
240 | 0 | _tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count", tag_str)); |
241 | 0 | if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) { |
242 | 0 | LOG(ERROR) << "Fail to init _priority_q"; |
243 | 0 | return -1; |
244 | 0 | } |
245 | 0 | } |
246 | | |
247 | | // Make sure TimerThread is ready. |
248 | 0 | if (get_or_create_global_timer_thread() == NULL) { |
249 | 0 | LOG(ERROR) << "Fail to get global_timer_thread"; |
250 | 0 | return -1; |
251 | 0 | } |
252 | | |
253 | | #ifdef BRPC_BTHREAD_TRACER |
254 | | if (!_task_tracer.Init()) { |
255 | | LOG(ERROR) << "Fail to init TaskTracer"; |
256 | | return -1; |
257 | | } |
258 | | #endif // BRPC_BTHREAD_TRACER |
259 | | |
260 | 0 | _workers.resize(_concurrency); |
261 | 0 | for (int i = 0; i < _concurrency; ++i) { |
262 | 0 | auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags); |
263 | 0 | const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg); |
264 | 0 | if (rc) { |
265 | 0 | delete arg; |
266 | 0 | PLOG(ERROR) << "Fail to create _workers[" << i << "]"; |
267 | 0 | return -1; |
268 | 0 | } |
269 | 0 | } |
270 | 0 | _worker_usage_second.expose("bthread_worker_usage"); |
271 | 0 | _switch_per_second.expose("bthread_switch_second"); |
272 | 0 | _signal_per_second.expose("bthread_signal_second"); |
273 | 0 | _status.expose("bthread_group_status"); |
274 | | |
275 | | // Wait for at least one group is added so that choose_one_group() |
276 | | // never returns NULL. |
277 | | // TODO: Handle the case that worker quits before add_group |
278 | 0 | for (int i = 0; i < FLAGS_task_group_ntags;) { |
279 | 0 | if (_tagged_ngroup[i].load(std::memory_order_acquire) == 0) { |
280 | 0 | usleep(100); // TODO: Elaborate |
281 | 0 | continue; |
282 | 0 | } |
283 | 0 | ++i; |
284 | 0 | } |
285 | |
|
286 | 0 | _init.store(true, butil::memory_order_release); |
287 | |
|
288 | 0 | return 0; |
289 | 0 | } |
290 | | |
291 | 0 | int TaskControl::add_workers(int num, bthread_tag_t tag) { |
292 | 0 | if (num <= 0) { |
293 | 0 | return 0; |
294 | 0 | } |
295 | 0 | try { |
296 | 0 | _workers.resize(_concurrency + num); |
297 | 0 | } catch (...) { |
298 | 0 | return 0; |
299 | 0 | } |
300 | 0 | const int old_concurency = _concurrency.load(butil::memory_order_relaxed); |
301 | 0 | for (int i = 0; i < num; ++i) { |
302 | | // Worker will add itself to _idle_workers, so we have to add |
303 | | // _concurrency before create a worker. |
304 | 0 | _concurrency.fetch_add(1); |
305 | 0 | auto arg = new WorkerThreadArgs(this, tag); |
306 | 0 | const int rc = pthread_create( |
307 | 0 | &_workers[i + old_concurency], NULL, worker_thread, arg); |
308 | 0 | if (rc) { |
309 | 0 | delete arg; |
310 | 0 | PLOG(WARNING) << "Fail to create _workers[" << i + old_concurency << "]"; |
311 | 0 | _concurrency.fetch_sub(1, butil::memory_order_release); |
312 | 0 | break; |
313 | 0 | } |
314 | 0 | } |
315 | | // Cannot fail |
316 | 0 | _workers.resize(_concurrency.load(butil::memory_order_relaxed)); |
317 | 0 | return _concurrency.load(butil::memory_order_relaxed) - old_concurency; |
318 | 0 | } |
319 | | |
320 | 0 | TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) { |
321 | 0 | CHECK(tag >= BTHREAD_TAG_DEFAULT && tag < FLAGS_task_group_ntags); |
322 | 0 | auto& groups = tag_group(tag); |
323 | 0 | const auto ngroup = tag_ngroup(tag).load(butil::memory_order_acquire); |
324 | 0 | if (ngroup != 0) { |
325 | 0 | return groups[butil::fast_rand_less_than(ngroup)]; |
326 | 0 | } |
327 | 0 | CHECK(false) << "Impossible: ngroup is 0"; |
328 | 0 | return NULL; |
329 | 0 | } |
330 | | |
331 | 0 | int TaskControl::parse_cpuset(std::string value, std::vector<unsigned>& cpus) { |
332 | 0 | static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*"); |
333 | 0 | std::smatch match; |
334 | 0 | std::set<unsigned> cpuset; |
335 | 0 | if (value.empty()) { |
336 | 0 | return -1; |
337 | 0 | } |
338 | 0 | if (std::regex_match(value, match, r)) { |
339 | 0 | for (butil::StringSplitter split(value.data(), ','); split; ++split) { |
340 | 0 | butil::StringPiece cpu_ids(split.field(), split.length()); |
341 | 0 | cpu_ids.trim_spaces(); |
342 | 0 | butil::StringPiece begin = cpu_ids; |
343 | 0 | butil::StringPiece end = cpu_ids; |
344 | 0 | auto dash = cpu_ids.find('-'); |
345 | 0 | if (dash != cpu_ids.npos) { |
346 | 0 | begin = cpu_ids.substr(0, dash); |
347 | 0 | end = cpu_ids.substr(dash + 1); |
348 | 0 | } |
349 | 0 | unsigned first = UINT_MAX; |
350 | 0 | unsigned last = 0; |
351 | 0 | int ret; |
352 | 0 | ret = butil::StringSplitter(begin, '\t').to_uint(&first); |
353 | 0 | ret = ret | butil::StringSplitter(end, '\t').to_uint(&last); |
354 | 0 | if (ret != 0 || first > last) { |
355 | 0 | return -1; |
356 | 0 | } |
357 | 0 | for (auto i = first; i <= last; ++i) { |
358 | 0 | cpuset.insert(i); |
359 | 0 | } |
360 | 0 | } |
361 | 0 | cpus.assign(cpuset.begin(), cpuset.end()); |
362 | 0 | return 0; |
363 | 0 | } |
364 | 0 | return -1; |
365 | 0 | } |
366 | | |
367 | 0 | void TaskControl::bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id) { |
368 | 0 | #if defined(OS_LINUX) |
369 | 0 | cpu_set_t cs; |
370 | 0 | CPU_ZERO(&cs); |
371 | 0 | CPU_SET(cpu_id, &cs); |
372 | 0 | auto r = pthread_setaffinity_np(pthread, sizeof(cs), &cs); |
373 | 0 | if (r != 0) { |
374 | 0 | LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id; |
375 | 0 | } |
376 | 0 | (void)r; |
377 | | #elif defined(OS_MACOSX) |
378 | | thread_port_t mach_thread = pthread_mach_thread_np(pthread); |
379 | | if (mach_thread != MACH_PORT_NULL) { |
380 | | LOG(WARNING) << "mach_thread is null" |
381 | | << "Failed to bind thread to cpu: " << cpu_id; |
382 | | return; |
383 | | } |
384 | | thread_affinity_policy_data_t policy; |
385 | | policy.affinity_tag = cpu_id; |
386 | | if (thread_policy_set(mach_thread, |
387 | | THREAD_AFFINITY_POLICY, |
388 | | (thread_policy_t)&policy, |
389 | | THREAD_AFFINITY_POLICY_COUNT) != KERN_SUCCESS) { |
390 | | LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id; |
391 | | } |
392 | | #endif |
393 | 0 | } |
394 | | |
395 | | #ifdef BRPC_BTHREAD_TRACER |
396 | | void TaskControl::stack_trace(std::ostream& os, bthread_t tid) { |
397 | | _task_tracer.Trace(os, tid); |
398 | | } |
399 | | |
400 | | std::string TaskControl::stack_trace(bthread_t tid) { |
401 | | return _task_tracer.Trace(tid); |
402 | | } |
403 | | #endif // BRPC_BTHREAD_TRACER |
404 | | |
405 | | extern int stop_and_join_epoll_threads(); |
406 | | |
407 | 0 | void TaskControl::stop_and_join() { |
408 | | // Close epoll threads so that worker threads are not waiting on epoll( |
409 | | // which cannot be woken up by signal_task below) |
410 | 0 | CHECK_EQ(0, stop_and_join_epoll_threads()); |
411 | | |
412 | | // Stop workers |
413 | 0 | { |
414 | 0 | BAIDU_SCOPED_LOCK(_modify_group_mutex); |
415 | 0 | _stop = true; |
416 | 0 | std::for_each( |
417 | 0 | _tagged_ngroup.begin(), _tagged_ngroup.end(), |
418 | 0 | [](butil::atomic<size_t>& index) { index.store(0, butil::memory_order_relaxed); }); |
419 | 0 | } |
420 | 0 | for (int i = 0; i < FLAGS_task_group_ntags; ++i) { |
421 | 0 | for (auto& pl : _tagged_pl[i]) { |
422 | 0 | pl.stop(); |
423 | 0 | } |
424 | 0 | } |
425 | |
|
426 | 0 | for (auto worker: _workers) { |
427 | | // Interrupt blocking operations. |
428 | | #ifdef BRPC_BTHREAD_TRACER |
429 | | // TaskTracer has registered signal handler for SIGURG. |
430 | | pthread_kill(worker, SIGURG); |
431 | | #else |
432 | 0 | interrupt_pthread(worker); |
433 | 0 | #endif // BRPC_BTHREAD_TRACER |
434 | 0 | } |
435 | | // Join workers |
436 | 0 | for (auto worker : _workers) { |
437 | 0 | pthread_join(worker, NULL); |
438 | 0 | } |
439 | 0 | } |
440 | | |
441 | 0 | TaskControl::~TaskControl() { |
442 | | // NOTE: g_task_control is not destructed now because the situation |
443 | | // is extremely racy. |
444 | 0 | delete _pending_time.exchange(NULL, butil::memory_order_relaxed); |
445 | 0 | _worker_usage_second.hide(); |
446 | 0 | _switch_per_second.hide(); |
447 | 0 | _signal_per_second.hide(); |
448 | 0 | _status.hide(); |
449 | | |
450 | 0 | stop_and_join(); |
451 | 0 | } |
452 | | |
453 | 0 | int TaskControl::_add_group(TaskGroup* g, bthread_tag_t tag) { |
454 | 0 | if (__builtin_expect(NULL == g, 0)) { |
455 | 0 | return -1; |
456 | 0 | } |
457 | 0 | std::unique_lock<butil::Mutex> mu(_modify_group_mutex); |
458 | 0 | if (_stop) { |
459 | 0 | return -1; |
460 | 0 | } |
461 | 0 | g->set_tag(tag); |
462 | 0 | g->set_pl(&_tagged_pl[tag][butil::fmix64(pthread_numeric_id()) % _pl_num_of_each_tag]); |
463 | 0 | size_t ngroup = _tagged_ngroup[tag].load(butil::memory_order_relaxed); |
464 | 0 | if (ngroup < (size_t)BTHREAD_MAX_CONCURRENCY) { |
465 | 0 | _tagged_groups[tag][ngroup] = g; |
466 | 0 | _tagged_ngroup[tag].store(ngroup + 1, butil::memory_order_release); |
467 | 0 | } |
468 | 0 | mu.unlock(); |
469 | | // See the comments in _destroy_group |
470 | | // TODO: Not needed anymore since non-worker pthread cannot have TaskGroup |
471 | | // signal_task(65536, tag); |
472 | 0 | return 0; |
473 | 0 | } |
474 | | |
475 | 0 | void TaskControl::delete_task_group(void* arg) { |
476 | 0 | delete(TaskGroup*)arg; |
477 | 0 | } |
478 | | |
479 | 0 | int TaskControl::_destroy_group(TaskGroup* g) { |
480 | 0 | if (NULL == g) { |
481 | 0 | LOG(ERROR) << "Param[g] is NULL"; |
482 | 0 | return -1; |
483 | 0 | } |
484 | 0 | if (g->_control != this) { |
485 | 0 | LOG(ERROR) << "TaskGroup=" << g |
486 | 0 | << " does not belong to this TaskControl=" << this; |
487 | 0 | return -1; |
488 | 0 | } |
489 | 0 | bool erased = false; |
490 | 0 | { |
491 | 0 | BAIDU_SCOPED_LOCK(_modify_group_mutex); |
492 | 0 | auto tag = g->tag(); |
493 | 0 | auto& groups = tag_group(tag); |
494 | 0 | const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed); |
495 | 0 | for (size_t i = 0; i < ngroup; ++i) { |
496 | 0 | if (groups[i] == g) { |
497 | | // No need for atomic_thread_fence because lock did it. |
498 | 0 | groups[i] = groups[ngroup - 1]; |
499 | | // Change _ngroup and keep _groups unchanged at last so that: |
500 | | // - If steal_task sees the newest _ngroup, it would not touch |
501 | | // _groups[ngroup -1] |
502 | | // - If steal_task sees old _ngroup and is still iterating on |
503 | | // _groups, it would not miss _groups[ngroup - 1] which was |
504 | | // swapped to _groups[i]. Although adding new group would |
505 | | // overwrite it, since we do signal_task in _add_group(), |
506 | | // we think the pending tasks of _groups[ngroup - 1] would |
507 | | // not miss. |
508 | 0 | tag_ngroup(tag).store(ngroup - 1, butil::memory_order_release); |
509 | | //_groups[ngroup - 1] = NULL; |
510 | 0 | erased = true; |
511 | 0 | break; |
512 | 0 | } |
513 | 0 | } |
514 | 0 | } |
515 | | |
516 | | // Can't delete g immediately because for performance consideration, |
517 | | // we don't lock _modify_group_mutex in steal_task which may |
518 | | // access the removed group concurrently. We use simple strategy here: |
519 | | // Schedule a function which deletes the TaskGroup after |
520 | | // FLAGS_task_group_delete_delay seconds |
521 | 0 | if (erased) { |
522 | 0 | get_global_timer_thread()->schedule( |
523 | 0 | delete_task_group, g, |
524 | 0 | butil::microseconds_from_now(FLAGS_task_group_delete_delay * 1000000L)); |
525 | 0 | } |
526 | 0 | return 0; |
527 | 0 | } |
528 | | |
529 | 0 | bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { |
530 | 0 | auto tag = tls_task_group->tag(); |
531 | |
|
532 | 0 | if (_priority_queues[tag].steal(tid)) { |
533 | 0 | return true; |
534 | 0 | } |
535 | | |
536 | | // 1: Acquiring fence is paired with releasing fence in _add_group to |
537 | | // avoid accessing uninitialized slot of _groups. |
538 | 0 | const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire/*1*/); |
539 | 0 | if (0 == ngroup) { |
540 | 0 | return false; |
541 | 0 | } |
542 | | |
543 | | // NOTE: Don't return inside `for' iteration since we need to update |seed| |
544 | 0 | bool stolen = false; |
545 | 0 | size_t s = *seed; |
546 | 0 | auto& groups = tag_group(tag); |
547 | 0 | for (size_t i = 0; i < ngroup; ++i, s += offset) { |
548 | 0 | TaskGroup* g = groups[s % ngroup]; |
549 | | // g is possibly NULL because of concurrent _destroy_group |
550 | 0 | if (g) { |
551 | 0 | if (g->_rq.steal(tid)) { |
552 | 0 | stolen = true; |
553 | 0 | break; |
554 | 0 | } |
555 | 0 | if (g->_remote_rq.pop(tid)) { |
556 | 0 | stolen = true; |
557 | 0 | break; |
558 | 0 | } |
559 | 0 | } |
560 | 0 | } |
561 | 0 | *seed = s; |
562 | 0 | return stolen; |
563 | 0 | } |
564 | | |
565 | 0 | void TaskControl::signal_task(int num_task, bthread_tag_t tag) { |
566 | 0 | if (num_task <= 0) { |
567 | 0 | return; |
568 | 0 | } |
569 | | // TODO(gejun): Current algorithm does not guarantee enough threads will |
570 | | // be created to match caller's requests. But in another side, there's also |
571 | | // many useless signalings according to current impl. Capping the concurrency |
572 | | // is a good balance between performance and timeliness of scheduling. |
573 | 0 | if (num_task > 2) { |
574 | 0 | num_task = 2; |
575 | 0 | } |
576 | 0 | auto& pl = tag_pl(tag); |
577 | 0 | size_t start_index = butil::fmix64(pthread_numeric_id()) % _pl_num_of_each_tag; |
578 | 0 | for (size_t i = 0; i < _pl_num_of_each_tag && num_task > 0; ++i) { |
579 | 0 | num_task -= pl[start_index].signal(1); |
580 | 0 | if (++start_index >= _pl_num_of_each_tag) { |
581 | 0 | start_index = 0; |
582 | 0 | } |
583 | 0 | } |
584 | 0 | if (num_task > 0 && |
585 | 0 | FLAGS_bthread_min_concurrency > 0 && // test min_concurrency for performance |
586 | 0 | _concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) { |
587 | | // TODO: Reduce this lock |
588 | 0 | BAIDU_SCOPED_LOCK(g_task_control_mutex); |
589 | 0 | if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) { |
590 | 0 | add_workers(1, tag); |
591 | 0 | } |
592 | 0 | } |
593 | 0 | } |
594 | | |
595 | 0 | void TaskControl::print_rq_sizes(std::ostream& os) { |
596 | 0 | size_t ngroup = 0; |
597 | 0 | std::for_each(_tagged_ngroup.begin(), _tagged_ngroup.end(), [&](butil::atomic<size_t>& index) { |
598 | 0 | ngroup += index.load(butil::memory_order_relaxed); |
599 | 0 | }); |
600 | 0 | DEFINE_SMALL_ARRAY(int, nums, ngroup, 128); |
601 | 0 | { |
602 | 0 | BAIDU_SCOPED_LOCK(_modify_group_mutex); |
603 | | // ngroup > _ngroup: nums[_ngroup ... ngroup-1] = 0 |
604 | | // ngroup < _ngroup: just ignore _groups[_ngroup ... ngroup-1] |
605 | 0 | int i = 0; |
606 | 0 | for_each_task_group([&](TaskGroup* g) { |
607 | 0 | nums[i] = (g ? g->_rq.volatile_size() : 0); |
608 | 0 | ++i; |
609 | 0 | }); |
610 | 0 | } |
611 | 0 | for (size_t i = 0; i < ngroup; ++i) { |
612 | 0 | os << nums[i] << ' '; |
613 | 0 | } |
614 | 0 | } |
615 | | |
616 | 0 | double TaskControl::get_cumulated_worker_time() { |
617 | 0 | int64_t cputime_ns = 0; |
618 | 0 | BAIDU_SCOPED_LOCK(_modify_group_mutex); |
619 | 0 | for_each_task_group([&](TaskGroup* g) { |
620 | 0 | cputime_ns += g->cumulated_cputime_ns(); |
621 | 0 | }); |
622 | 0 | return cputime_ns / 1000000000.0; |
623 | 0 | } |
624 | | |
625 | 0 | double TaskControl::get_cumulated_worker_time(bthread_tag_t tag) { |
626 | 0 | int64_t cputime_ns = 0; |
627 | 0 | BAIDU_SCOPED_LOCK(_modify_group_mutex); |
628 | 0 | const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed); |
629 | 0 | auto& groups = tag_group(tag); |
630 | 0 | for (size_t i = 0; i < ngroup; ++i) { |
631 | 0 | cputime_ns += groups[i]->cumulated_cputime_ns(); |
632 | 0 | } |
633 | 0 | return cputime_ns / 1000000000.0; |
634 | 0 | } |
635 | | |
636 | 0 | int64_t TaskControl::get_cumulated_switch_count() { |
637 | 0 | int64_t c = 0; |
638 | 0 | BAIDU_SCOPED_LOCK(_modify_group_mutex); |
639 | 0 | for_each_task_group([&](TaskGroup* g) { |
640 | 0 | if (g) { |
641 | 0 | c += g->_nswitch; |
642 | 0 | } |
643 | 0 | }); |
644 | 0 | return c; |
645 | 0 | } |
646 | | |
647 | 0 | int64_t TaskControl::get_cumulated_signal_count() { |
648 | 0 | int64_t c = 0; |
649 | 0 | BAIDU_SCOPED_LOCK(_modify_group_mutex); |
650 | 0 | for_each_task_group([&](TaskGroup* g) { |
651 | 0 | if (g) { |
652 | 0 | c += g->_nsignaled + g->_remote_nsignaled; |
653 | 0 | } |
654 | 0 | }); |
655 | 0 | return c; |
656 | 0 | } |
657 | | |
658 | 0 | bvar::LatencyRecorder* TaskControl::create_exposed_pending_time() { |
659 | 0 | bool is_creator = false; |
660 | 0 | _pending_time_mutex.lock(); |
661 | 0 | bvar::LatencyRecorder* pt = _pending_time.load(butil::memory_order_consume); |
662 | 0 | if (!pt) { |
663 | 0 | pt = new bvar::LatencyRecorder; |
664 | 0 | _pending_time.store(pt, butil::memory_order_release); |
665 | 0 | is_creator = true; |
666 | 0 | } |
667 | 0 | _pending_time_mutex.unlock(); |
668 | 0 | if (is_creator) { |
669 | 0 | pt->expose("bthread_creation"); |
670 | 0 | } |
671 | 0 | return pt; |
672 | 0 | } |
673 | | |
674 | 0 | std::vector<bthread_t> TaskControl::get_living_bthreads() { |
675 | 0 | std::vector<bthread_t> living_bthread_ids; |
676 | 0 | living_bthread_ids.reserve(1024); |
677 | 0 | butil::for_each_resource<TaskMeta>([&living_bthread_ids](TaskMeta* m) { |
678 | | // filter out those bthreads created by bthread_start* functions, |
679 | | // i.e. not those created internally to run main task as they are |
680 | | // opaque to user. |
681 | 0 | if (m && m->fn) { |
682 | | // determine whether the bthread is living by checking version |
683 | 0 | const uint32_t given_ver = get_version(m->tid); |
684 | 0 | BAIDU_SCOPED_LOCK(m->version_lock); |
685 | 0 | if (given_ver == *m->version_butex) { |
686 | 0 | living_bthread_ids.push_back(m->tid); |
687 | 0 | } |
688 | 0 | } |
689 | 0 | }); |
690 | 0 | return living_bthread_ids; |
691 | 0 | } |
692 | | |
693 | | } // namespace bthread |