Coverage Report

Created: 2025-11-01 07:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/bthread/task_control.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
// Date: Tue Jul 10 17:40:58 CST 2012
21
22
#include <pthread.h>
23
#include <set>
24
#include <regex>
25
#include <sys/syscall.h>                   // SYS_gettid
26
#include "butil/scoped_lock.h"             // BAIDU_SCOPED_LOCK
27
#include "butil/errno.h"                   // berror
28
#include "butil/logging.h"
29
#include "butil/threading/platform_thread.h"
30
#include "butil/third_party/murmurhash3/murmurhash3.h"
31
#include "bthread/sys_futex.h"            // futex_wake_private
32
#include "bthread/interrupt_pthread.h"
33
#include "bthread/processor.h"            // cpu_relax
34
#include "bthread/task_group.h"           // TaskGroup
35
#include "bthread/task_control.h"
36
#include "bthread/timer_thread.h"         // global_timer_thread
37
#include <gflags/gflags.h>
38
#include "bthread/log.h"
39
#if defined(OS_MACOSX)
40
#include <mach/mach.h>
41
#endif
42
43
DEFINE_int32(task_group_delete_delay, 1,
44
             "delay deletion of TaskGroup for so many seconds");
45
DEFINE_int32(task_group_runqueue_capacity, 4096,
46
             "capacity of runqueue in each TaskGroup");
47
DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags");
48
DEFINE_bool(task_group_set_worker_name, true,
49
            "Whether to set the name of the worker thread");
50
DEFINE_string(cpu_set, "",
51
              "Set of CPUs to which cores are bound. "
52
              "for example, 0-3,5,7; default: disable");
53
54
namespace bthread {
55
56
DEFINE_bool(parking_lot_no_signal_when_no_waiter, false,
57
            "ParkingLot doesn't signal when there is no waiter. "
58
            "In busy worker scenarios, signal overhead can be reduced.");
59
DEFINE_bool(enable_bthread_priority_queue, false, "Whether to enable priority queue");
60
61
DECLARE_int32(bthread_concurrency);
62
DECLARE_int32(bthread_min_concurrency);
63
DECLARE_int32(bthread_parking_lot_of_each_tag);
64
65
extern pthread_mutex_t g_task_control_mutex;
66
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
67
void (*g_worker_startfn)() = NULL;
68
void (*g_tagged_worker_startfn)(bthread_tag_t) = NULL;
69
70
// May be called in other modules to run startfn in non-worker pthreads.
71
0
void run_worker_startfn() {
72
0
    if (g_worker_startfn) {
73
0
        g_worker_startfn();
74
0
    }
75
0
}
76
77
0
void run_tagged_worker_startfn(bthread_tag_t tag) {
78
0
    if (g_tagged_worker_startfn) {
79
0
        g_tagged_worker_startfn(tag);
80
0
    }
81
0
}
82
83
struct WorkerThreadArgs {
84
0
    WorkerThreadArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {}
85
    TaskControl* c;
86
    bthread_tag_t tag;
87
};
88
89
0
void* TaskControl::worker_thread(void* arg) {
90
0
    run_worker_startfn();
91
#ifdef BAIDU_INTERNAL
92
    logging::ComlogInitializer comlog_initializer;
93
#endif
94
95
0
    auto dummy = static_cast<WorkerThreadArgs*>(arg);
96
0
    auto c = dummy->c;
97
0
    auto tag = dummy->tag;
98
0
    delete dummy;
99
0
    run_tagged_worker_startfn(tag);
100
101
0
    TaskGroup* g = c->create_group(tag);
102
0
    TaskStatistics stat;
103
0
    if (NULL == g) {
104
0
        LOG(ERROR) << "Fail to create TaskGroup in pthread=" << pthread_self();
105
0
        return NULL;
106
0
    }
107
108
0
    g->_tid = pthread_self();
109
110
0
    int worker_id = c->_next_worker_id.fetch_add(
111
0
                        1, butil::memory_order_relaxed);
112
0
    if (!c->_cpus.empty()) {
113
0
        bind_thread_to_cpu(pthread_self(), c->_cpus[worker_id % c->_cpus.size()]);
114
0
    }
115
0
    if (FLAGS_task_group_set_worker_name) {
116
0
        std::string worker_thread_name = butil::string_printf(
117
0
            "brpc_wkr:%d-%d", g->tag(), worker_id);
118
0
        butil::PlatformThread::SetNameSimple(worker_thread_name.c_str());
119
0
    }
120
0
    BT_VLOG << "Created worker=" << pthread_self() << " tid=" << g->_tid
121
0
            << " bthread=" << g->main_tid() << " tag=" << g->tag();
122
0
    tls_task_group = g;
123
0
    c->_nworkers << 1;
124
0
    c->tag_nworkers(g->tag()) << 1;
125
126
0
    g->run_main_task();
127
128
0
    stat = g->main_stat();
129
0
    BT_VLOG << "Destroying worker=" << pthread_self() << " bthread="
130
0
            << g->main_tid() << " idle=" << stat.cputime_ns / 1000000.0
131
0
            << "ms uptime=" << g->current_uptime_ns() / 1000000.0 << "ms";
132
0
    tls_task_group = NULL;
133
0
    g->destroy_self();
134
0
    c->_nworkers << -1;
135
0
    c->tag_nworkers(g->tag()) << -1;
136
0
    return NULL;
137
0
}
138
139
0
TaskGroup* TaskControl::create_group(bthread_tag_t tag) {
140
0
    TaskGroup* g = new (std::nothrow) TaskGroup(this);
141
0
    if (NULL == g) {
142
0
        LOG(FATAL) << "Fail to new TaskGroup";
143
0
        return NULL;
144
0
    }
145
0
    if (g->init(FLAGS_task_group_runqueue_capacity) != 0) {
146
0
        LOG(ERROR) << "Fail to init TaskGroup";
147
0
        delete g;
148
0
        return NULL;
149
0
    }
150
0
    if (_add_group(g, tag) != 0) {
151
0
        delete g;
152
0
        return NULL;
153
0
    }
154
0
    return g;
155
0
}
156
157
0
static void print_rq_sizes_in_the_tc(std::ostream &os, void *arg) {
158
0
    TaskControl *tc = (TaskControl *)arg;
159
0
    tc->print_rq_sizes(os);
160
0
}
161
162
0
static double get_cumulated_worker_time_from_this(void *arg) {
163
0
    return static_cast<TaskControl*>(arg)->get_cumulated_worker_time();
164
0
}
165
166
struct CumulatedWithTagArgs {
167
0
    CumulatedWithTagArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), t(_t) {}
168
    TaskControl* c;
169
    bthread_tag_t t;
170
};
171
172
0
static double get_cumulated_worker_time_from_this_with_tag(void* arg) {
173
0
    auto a = static_cast<CumulatedWithTagArgs*>(arg);
174
0
    auto c = a->c;
175
0
    auto t = a->t;
176
0
    return c->get_cumulated_worker_time(t);
177
0
}
178
179
0
static int64_t get_cumulated_switch_count_from_this(void *arg) {
180
0
    return static_cast<TaskControl*>(arg)->get_cumulated_switch_count();
181
0
}
182
183
0
static int64_t get_cumulated_signal_count_from_this(void *arg) {
184
0
    return static_cast<TaskControl*>(arg)->get_cumulated_signal_count();
185
0
}
186
187
TaskControl::TaskControl()
188
    // NOTE: all fileds must be initialized before the vars.
189
0
    : _tagged_ngroup(FLAGS_task_group_ntags)
190
0
    , _tagged_groups(FLAGS_task_group_ntags)
191
0
    , _init(false)
192
0
    , _stop(false)
193
0
    , _concurrency(0)
194
0
    , _next_worker_id(0)
195
0
    , _nworkers("bthread_worker_count")
196
0
    , _pending_time(NULL)
197
      // Delay exposure of following two vars because they rely on TC which
198
      // is not initialized yet.
199
0
    , _cumulated_worker_time(get_cumulated_worker_time_from_this, this)
200
0
    , _worker_usage_second(&_cumulated_worker_time, 1)
201
0
    , _cumulated_switch_count(get_cumulated_switch_count_from_this, this)
202
0
    , _switch_per_second(&_cumulated_switch_count)
203
0
    , _cumulated_signal_count(get_cumulated_signal_count_from_this, this)
204
0
    , _signal_per_second(&_cumulated_signal_count)
205
0
    , _status(print_rq_sizes_in_the_tc, this)
206
0
    , _nbthreads("bthread_count")
207
0
    , _enable_priority_queue(FLAGS_enable_bthread_priority_queue)
208
0
    , _priority_queues(FLAGS_task_group_ntags)
209
0
    , _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
210
0
    , _tagged_pl(FLAGS_task_group_ntags)
211
0
{}
212
213
0
int TaskControl::init(int concurrency) {
214
0
    if (_concurrency != 0) {
215
0
        LOG(ERROR) << "Already initialized";
216
0
        return -1;
217
0
    }
218
0
    if (concurrency <= 0) {
219
0
        LOG(ERROR) << "Invalid concurrency=" << concurrency;
220
0
        return -1;
221
0
    }
222
0
    _concurrency = concurrency;
223
224
0
    if (!FLAGS_cpu_set.empty()) {
225
0
        if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1) {
226
0
            LOG(ERROR) << "invalid cpuset=" << FLAGS_cpu_set;
227
0
            return -1;
228
0
        }
229
0
    }
230
231
    // task group group by tags
232
0
    for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
233
0
        _tagged_ngroup[i].store(0, std::memory_order_relaxed);
234
0
        auto tag_str = std::to_string(i);
235
0
        _tagged_nworkers.push_back(new bvar::Adder<int64_t>("bthread_worker_count", tag_str));
236
0
        _tagged_cumulated_worker_time.push_back(new bvar::PassiveStatus<double>(
237
0
            get_cumulated_worker_time_from_this_with_tag, new CumulatedWithTagArgs{this, i}));
238
0
        _tagged_worker_usage_second.push_back(new bvar::PerSecond<bvar::PassiveStatus<double>>(
239
0
            "bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1));
240
0
        _tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count", tag_str));
241
0
        if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) {
242
0
            LOG(ERROR) << "Fail to init _priority_q";
243
0
            return -1;
244
0
        }
245
0
    }
246
247
    // Make sure TimerThread is ready.
248
0
    if (get_or_create_global_timer_thread() == NULL) {
249
0
        LOG(ERROR) << "Fail to get global_timer_thread";
250
0
        return -1;
251
0
    }
252
253
#ifdef BRPC_BTHREAD_TRACER
254
    if (!_task_tracer.Init()) {
255
        LOG(ERROR) << "Fail to init TaskTracer";
256
        return -1;
257
    }
258
#endif // BRPC_BTHREAD_TRACER
259
    
260
0
    _workers.resize(_concurrency);   
261
0
    for (int i = 0; i < _concurrency; ++i) {
262
0
        auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags);
263
0
        const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg);
264
0
        if (rc) {
265
0
            delete arg;
266
0
            PLOG(ERROR) << "Fail to create _workers[" << i << "]";
267
0
            return -1;
268
0
        }
269
0
    }
270
0
    _worker_usage_second.expose("bthread_worker_usage");
271
0
    _switch_per_second.expose("bthread_switch_second");
272
0
    _signal_per_second.expose("bthread_signal_second");
273
0
    _status.expose("bthread_group_status");
274
275
    // Wait for at least one group is added so that choose_one_group()
276
    // never returns NULL.
277
    // TODO: Handle the case that worker quits before add_group
278
0
    for (int i = 0; i < FLAGS_task_group_ntags;) {
279
0
        if (_tagged_ngroup[i].load(std::memory_order_acquire) == 0) {
280
0
            usleep(100);  // TODO: Elaborate
281
0
            continue;
282
0
        }
283
0
        ++i;
284
0
    }
285
286
0
    _init.store(true, butil::memory_order_release);
287
288
0
    return 0;
289
0
}
290
291
0
int TaskControl::add_workers(int num, bthread_tag_t tag) {
292
0
    if (num <= 0) {
293
0
        return 0;
294
0
    }
295
0
    try {
296
0
        _workers.resize(_concurrency + num);
297
0
    } catch (...) {
298
0
        return 0;
299
0
    }
300
0
    const int old_concurency = _concurrency.load(butil::memory_order_relaxed);
301
0
    for (int i = 0; i < num; ++i) {
302
        // Worker will add itself to _idle_workers, so we have to add
303
        // _concurrency before create a worker.
304
0
        _concurrency.fetch_add(1);
305
0
        auto arg = new WorkerThreadArgs(this, tag);
306
0
        const int rc = pthread_create(
307
0
                &_workers[i + old_concurency], NULL, worker_thread, arg);
308
0
        if (rc) {
309
0
            delete arg;
310
0
            PLOG(WARNING) << "Fail to create _workers[" << i + old_concurency << "]";
311
0
            _concurrency.fetch_sub(1, butil::memory_order_release);
312
0
            break;
313
0
        }
314
0
    }
315
    // Cannot fail
316
0
    _workers.resize(_concurrency.load(butil::memory_order_relaxed));
317
0
    return _concurrency.load(butil::memory_order_relaxed) - old_concurency;
318
0
}
319
320
0
TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) {
321
0
    CHECK(tag >= BTHREAD_TAG_DEFAULT && tag < FLAGS_task_group_ntags);
322
0
    auto& groups = tag_group(tag);
323
0
    const auto ngroup = tag_ngroup(tag).load(butil::memory_order_acquire);
324
0
    if (ngroup != 0) {
325
0
        return groups[butil::fast_rand_less_than(ngroup)];
326
0
    }
327
0
    CHECK(false) << "Impossible: ngroup is 0";
328
0
    return NULL;
329
0
}
330
331
0
int TaskControl::parse_cpuset(std::string value, std::vector<unsigned>& cpus) {
332
0
    static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*");
333
0
    std::smatch match;
334
0
    std::set<unsigned> cpuset;
335
0
    if (value.empty()) {
336
0
        return -1;
337
0
    }
338
0
    if (std::regex_match(value, match, r)) {
339
0
        for (butil::StringSplitter split(value.data(), ','); split; ++split) {
340
0
            butil::StringPiece cpu_ids(split.field(), split.length());
341
0
            cpu_ids.trim_spaces();
342
0
            butil::StringPiece begin = cpu_ids;
343
0
            butil::StringPiece end = cpu_ids;
344
0
            auto dash = cpu_ids.find('-');
345
0
            if (dash != cpu_ids.npos) {
346
0
                begin = cpu_ids.substr(0, dash);
347
0
                end = cpu_ids.substr(dash + 1);
348
0
            }
349
0
            unsigned first = UINT_MAX;
350
0
            unsigned last = 0;
351
0
            int ret;
352
0
            ret = butil::StringSplitter(begin, '\t').to_uint(&first);
353
0
            ret = ret | butil::StringSplitter(end, '\t').to_uint(&last);
354
0
            if (ret != 0 || first > last) {
355
0
                return -1;
356
0
            }
357
0
            for (auto i = first; i <= last; ++i) {
358
0
                cpuset.insert(i);
359
0
            }
360
0
        }
361
0
        cpus.assign(cpuset.begin(), cpuset.end());
362
0
        return 0;
363
0
    }
364
0
    return -1;
365
0
}
366
367
0
void TaskControl::bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id) {
368
0
#if defined(OS_LINUX)
369
0
        cpu_set_t cs;
370
0
        CPU_ZERO(&cs);
371
0
        CPU_SET(cpu_id, &cs);
372
0
        auto r = pthread_setaffinity_np(pthread, sizeof(cs), &cs);
373
0
        if (r != 0) {
374
0
            LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id;
375
0
        }
376
0
        (void)r;
377
#elif defined(OS_MACOSX)
378
        thread_port_t mach_thread = pthread_mach_thread_np(pthread);
379
        if (mach_thread != MACH_PORT_NULL) {
380
            LOG(WARNING) << "mach_thread is null"
381
                         << "Failed to bind thread to cpu: " << cpu_id;
382
            return;
383
        }
384
        thread_affinity_policy_data_t policy;
385
        policy.affinity_tag = cpu_id;
386
        if (thread_policy_set(mach_thread,
387
                THREAD_AFFINITY_POLICY,
388
                (thread_policy_t)&policy,
389
                THREAD_AFFINITY_POLICY_COUNT) != KERN_SUCCESS) {
390
            LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id;
391
        }
392
#endif
393
0
}
394
395
#ifdef BRPC_BTHREAD_TRACER
396
void TaskControl::stack_trace(std::ostream& os, bthread_t tid) {
397
    _task_tracer.Trace(os, tid);
398
}
399
400
std::string TaskControl::stack_trace(bthread_t tid) {
401
    return _task_tracer.Trace(tid);
402
}
403
#endif // BRPC_BTHREAD_TRACER
404
405
extern int stop_and_join_epoll_threads();
406
407
0
void TaskControl::stop_and_join() {
408
    // Close epoll threads so that worker threads are not waiting on epoll(
409
    // which cannot be woken up by signal_task below)
410
0
    CHECK_EQ(0, stop_and_join_epoll_threads());
411
412
    // Stop workers
413
0
    {
414
0
        BAIDU_SCOPED_LOCK(_modify_group_mutex);
415
0
        _stop = true;
416
0
        std::for_each(
417
0
            _tagged_ngroup.begin(), _tagged_ngroup.end(),
418
0
            [](butil::atomic<size_t>& index) { index.store(0, butil::memory_order_relaxed); });
419
0
    }
420
0
    for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
421
0
        for (auto& pl : _tagged_pl[i]) {
422
0
            pl.stop();
423
0
        }
424
0
    }
425
426
0
    for (auto worker: _workers) {
427
        // Interrupt blocking operations.
428
#ifdef BRPC_BTHREAD_TRACER
429
        // TaskTracer has registered signal handler for SIGURG.
430
        pthread_kill(worker, SIGURG);
431
#else
432
0
        interrupt_pthread(worker);
433
0
#endif // BRPC_BTHREAD_TRACER
434
0
    }
435
    // Join workers
436
0
    for (auto worker : _workers) {
437
0
        pthread_join(worker, NULL);
438
0
    }
439
0
}
440
441
0
TaskControl::~TaskControl() {
442
    // NOTE: g_task_control is not destructed now because the situation
443
    //       is extremely racy.
444
0
    delete _pending_time.exchange(NULL, butil::memory_order_relaxed);
445
0
    _worker_usage_second.hide();
446
0
    _switch_per_second.hide();
447
0
    _signal_per_second.hide();
448
0
    _status.hide();
449
    
450
0
    stop_and_join();
451
0
}
452
453
0
int TaskControl::_add_group(TaskGroup* g, bthread_tag_t tag) {
454
0
    if (__builtin_expect(NULL == g, 0)) {
455
0
        return -1;
456
0
    }
457
0
    std::unique_lock<butil::Mutex> mu(_modify_group_mutex);
458
0
    if (_stop) {
459
0
        return -1;
460
0
    }
461
0
    g->set_tag(tag);
462
0
    g->set_pl(&_tagged_pl[tag][butil::fmix64(pthread_numeric_id()) % _pl_num_of_each_tag]);
463
0
    size_t ngroup = _tagged_ngroup[tag].load(butil::memory_order_relaxed);
464
0
    if (ngroup < (size_t)BTHREAD_MAX_CONCURRENCY) {
465
0
        _tagged_groups[tag][ngroup] = g;
466
0
        _tagged_ngroup[tag].store(ngroup + 1, butil::memory_order_release);
467
0
    }
468
0
    mu.unlock();
469
    // See the comments in _destroy_group
470
    // TODO: Not needed anymore since non-worker pthread cannot have TaskGroup
471
    // signal_task(65536, tag);
472
0
    return 0;
473
0
}
474
475
0
void TaskControl::delete_task_group(void* arg) {
476
0
    delete(TaskGroup*)arg;
477
0
}
478
479
0
int TaskControl::_destroy_group(TaskGroup* g) {
480
0
    if (NULL == g) {
481
0
        LOG(ERROR) << "Param[g] is NULL";
482
0
        return -1;
483
0
    }
484
0
    if (g->_control != this) {
485
0
        LOG(ERROR) << "TaskGroup=" << g
486
0
                   << " does not belong to this TaskControl=" << this;
487
0
        return -1;
488
0
    }
489
0
    bool erased = false;
490
0
    {
491
0
        BAIDU_SCOPED_LOCK(_modify_group_mutex);
492
0
        auto tag = g->tag();
493
0
        auto& groups = tag_group(tag);
494
0
        const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed);
495
0
        for (size_t i = 0; i < ngroup; ++i) {
496
0
            if (groups[i] == g) {
497
                // No need for atomic_thread_fence because lock did it.
498
0
                groups[i] = groups[ngroup - 1];
499
                // Change _ngroup and keep _groups unchanged at last so that:
500
                //  - If steal_task sees the newest _ngroup, it would not touch
501
                //    _groups[ngroup -1]
502
                //  - If steal_task sees old _ngroup and is still iterating on
503
                //    _groups, it would not miss _groups[ngroup - 1] which was 
504
                //    swapped to _groups[i]. Although adding new group would
505
                //    overwrite it, since we do signal_task in _add_group(),
506
                //    we think the pending tasks of _groups[ngroup - 1] would
507
                //    not miss.
508
0
                tag_ngroup(tag).store(ngroup - 1, butil::memory_order_release);
509
                //_groups[ngroup - 1] = NULL;
510
0
                erased = true;
511
0
                break;
512
0
            }
513
0
        }
514
0
    }
515
516
    // Can't delete g immediately because for performance consideration,
517
    // we don't lock _modify_group_mutex in steal_task which may
518
    // access the removed group concurrently. We use simple strategy here:
519
    // Schedule a function which deletes the TaskGroup after
520
    // FLAGS_task_group_delete_delay seconds
521
0
    if (erased) {
522
0
        get_global_timer_thread()->schedule(
523
0
            delete_task_group, g,
524
0
            butil::microseconds_from_now(FLAGS_task_group_delete_delay * 1000000L));
525
0
    }
526
0
    return 0;
527
0
}
528
529
0
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
530
0
    auto tag = tls_task_group->tag();
531
532
0
    if (_priority_queues[tag].steal(tid)) {
533
0
        return true;
534
0
    }
535
536
    // 1: Acquiring fence is paired with releasing fence in _add_group to
537
    // avoid accessing uninitialized slot of _groups.
538
0
    const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire/*1*/);
539
0
    if (0 == ngroup) {
540
0
        return false;
541
0
    }
542
543
    // NOTE: Don't return inside `for' iteration since we need to update |seed|
544
0
    bool stolen = false;
545
0
    size_t s = *seed;
546
0
    auto& groups = tag_group(tag);
547
0
    for (size_t i = 0; i < ngroup; ++i, s += offset) {
548
0
        TaskGroup* g = groups[s % ngroup];
549
        // g is possibly NULL because of concurrent _destroy_group
550
0
        if (g) {
551
0
            if (g->_rq.steal(tid)) {
552
0
                stolen = true;
553
0
                break;
554
0
            }
555
0
            if (g->_remote_rq.pop(tid)) {
556
0
                stolen = true;
557
0
                break;
558
0
            }
559
0
        }
560
0
    }
561
0
    *seed = s;
562
0
    return stolen;
563
0
}
564
565
0
void TaskControl::signal_task(int num_task, bthread_tag_t tag) {
566
0
    if (num_task <= 0) {
567
0
        return;
568
0
    }
569
    // TODO(gejun): Current algorithm does not guarantee enough threads will
570
    // be created to match caller's requests. But in another side, there's also
571
    // many useless signalings according to current impl. Capping the concurrency
572
    // is a good balance between performance and timeliness of scheduling.
573
0
    if (num_task > 2) {
574
0
        num_task = 2;
575
0
    }
576
0
    auto& pl = tag_pl(tag);
577
0
    size_t start_index = butil::fmix64(pthread_numeric_id()) % _pl_num_of_each_tag;
578
0
    for (size_t i = 0; i < _pl_num_of_each_tag && num_task > 0; ++i) {
579
0
        num_task -= pl[start_index].signal(1);
580
0
        if (++start_index >= _pl_num_of_each_tag) {
581
0
            start_index = 0;
582
0
        }
583
0
    }
584
0
    if (num_task > 0 &&
585
0
        FLAGS_bthread_min_concurrency > 0 &&    // test min_concurrency for performance
586
0
        _concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) {
587
        // TODO: Reduce this lock
588
0
        BAIDU_SCOPED_LOCK(g_task_control_mutex);
589
0
        if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) {
590
0
            add_workers(1, tag);
591
0
        }
592
0
    }
593
0
}
594
595
0
void TaskControl::print_rq_sizes(std::ostream& os) {
596
0
    size_t ngroup = 0;
597
0
    std::for_each(_tagged_ngroup.begin(), _tagged_ngroup.end(), [&](butil::atomic<size_t>& index) {
598
0
        ngroup += index.load(butil::memory_order_relaxed);
599
0
    });
600
0
    DEFINE_SMALL_ARRAY(int, nums, ngroup, 128);
601
0
    {
602
0
        BAIDU_SCOPED_LOCK(_modify_group_mutex);
603
        // ngroup > _ngroup: nums[_ngroup ... ngroup-1] = 0
604
        // ngroup < _ngroup: just ignore _groups[_ngroup ... ngroup-1]
605
0
        int i = 0;
606
0
        for_each_task_group([&](TaskGroup* g) {
607
0
            nums[i] = (g ? g->_rq.volatile_size() : 0);
608
0
            ++i;
609
0
        });
610
0
    }
611
0
    for (size_t i = 0; i < ngroup; ++i) {
612
0
        os << nums[i] << ' ';
613
0
    }
614
0
}
615
616
0
double TaskControl::get_cumulated_worker_time() {
617
0
    int64_t cputime_ns = 0;
618
0
    BAIDU_SCOPED_LOCK(_modify_group_mutex);
619
0
    for_each_task_group([&](TaskGroup* g) {
620
0
        cputime_ns += g->cumulated_cputime_ns();
621
0
    });
622
0
    return cputime_ns / 1000000000.0;
623
0
}
624
625
0
double TaskControl::get_cumulated_worker_time(bthread_tag_t tag) {
626
0
    int64_t cputime_ns = 0;
627
0
    BAIDU_SCOPED_LOCK(_modify_group_mutex);
628
0
    const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed);
629
0
    auto& groups = tag_group(tag);
630
0
    for (size_t i = 0; i < ngroup; ++i) {
631
0
        cputime_ns += groups[i]->cumulated_cputime_ns();
632
0
    }
633
0
    return cputime_ns / 1000000000.0;
634
0
}
635
636
0
int64_t TaskControl::get_cumulated_switch_count() {
637
0
    int64_t c = 0;
638
0
    BAIDU_SCOPED_LOCK(_modify_group_mutex);
639
0
    for_each_task_group([&](TaskGroup* g) {
640
0
        if (g) {
641
0
            c += g->_nswitch;
642
0
        }
643
0
    });
644
0
    return c;
645
0
}
646
647
0
int64_t TaskControl::get_cumulated_signal_count() {
648
0
    int64_t c = 0;
649
0
    BAIDU_SCOPED_LOCK(_modify_group_mutex);
650
0
    for_each_task_group([&](TaskGroup* g) {
651
0
        if (g) {
652
0
            c += g->_nsignaled + g->_remote_nsignaled;
653
0
        }
654
0
    });
655
0
    return c;
656
0
}
657
658
0
bvar::LatencyRecorder* TaskControl::create_exposed_pending_time() {
659
0
    bool is_creator = false;
660
0
    _pending_time_mutex.lock();
661
0
    bvar::LatencyRecorder* pt = _pending_time.load(butil::memory_order_consume);
662
0
    if (!pt) {
663
0
        pt = new bvar::LatencyRecorder;
664
0
        _pending_time.store(pt, butil::memory_order_release);
665
0
        is_creator = true;
666
0
    }
667
0
    _pending_time_mutex.unlock();
668
0
    if (is_creator) {
669
0
        pt->expose("bthread_creation");
670
0
    }
671
0
    return pt;
672
0
}
673
674
0
std::vector<bthread_t> TaskControl::get_living_bthreads() {
675
0
    std::vector<bthread_t> living_bthread_ids;
676
0
    living_bthread_ids.reserve(1024);
677
0
    butil::for_each_resource<TaskMeta>([&living_bthread_ids](TaskMeta* m) {
678
        // filter out those bthreads created by bthread_start* functions,
679
        // i.e. not those created internally to run main task as they are
680
        // opaque to user.
681
0
        if (m && m->fn) {
682
            // determine whether the bthread is living by checking version
683
0
            const uint32_t given_ver = get_version(m->tid);
684
0
            BAIDU_SCOPED_LOCK(m->version_lock);
685
0
            if (given_ver == *m->version_butex) {
686
0
                living_bthread_ids.push_back(m->tid);
687
0
            }
688
0
        }
689
0
    });
690
0
    return living_bthread_ids;
691
0
}
692
693
}  // namespace bthread