Coverage Report

Created: 2026-01-09 06:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/bthread/timer_thread.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
21
#include <queue>                           // heap functions
22
#include <gflags/gflags.h>
23
#include "butil/scoped_lock.h"
24
#include "butil/logging.h"
25
#include "butil/third_party/murmurhash3/murmurhash3.h"   // fmix64
26
#include "butil/resource_pool.h"
27
#include "butil/threading/platform_thread.h"
28
#include "bvar/bvar.h"
29
#include "bthread/sys_futex.h"
30
#include "bthread/timer_thread.h"
31
#include "bthread/log.h"
32
33
namespace bthread {
34
35
DEFINE_uint32(brpc_timer_num_buckets, 13, "brpc timer num buckets");
36
37
// Defined in task_control.cpp
38
void run_worker_startfn();
39
40
const TimerThread::TaskId TimerThread::INVALID_TASK_ID = 0;
41
42
TimerThreadOptions::TimerThreadOptions()
43
0
    : num_buckets(13) {
44
0
}
45
46
// A task contains the necessary information for running fn(arg).
47
// Tasks are created in Bucket::schedule and destroyed in TimerThread::run
48
struct BAIDU_CACHELINE_ALIGNMENT TimerThread::Task {
49
    Task* next;                 // For linking tasks in a Bucket.
50
    int64_t run_time;           // run the task at this realtime
51
    void (*fn)(void*);          // the fn(arg) to run
52
    void* arg;
53
    // Current TaskId, checked against version in TimerThread::run to test
54
    // if this task is unscheduled.
55
    TaskId task_id;
56
    // initial_version:     not run yet
57
    // initial_version + 1: running
58
    // initial_version + 2: removed (also the version of next Task reused
59
    //                      this struct)
60
    butil::atomic<uint32_t> version;
61
62
0
    Task() : version(2/*skip 0*/) {}
63
64
    // Run this task and delete this struct.
65
    // Returns true if fn(arg) did run.
66
    bool run_and_delete();
67
68
    // Delete this struct if this task was unscheduled.
69
    // Returns true on deletion.
70
    bool try_delete();
71
};
72
73
// Timer tasks are sharded into different Buckets to reduce contentions.
74
class BAIDU_CACHELINE_ALIGNMENT TimerThread::Bucket {
75
public:
76
    Bucket()
77
0
        : _nearest_run_time(std::numeric_limits<int64_t>::max())
78
0
        , _task_head(NULL) {
79
0
    }
80
81
0
    ~Bucket() {}
82
83
    struct ScheduleResult {
84
        TimerThread::TaskId task_id;
85
        bool earlier;
86
    };
87
    
88
    // Schedule a task into this bucket.
89
    // Returns the TaskId and if it has the nearest run time.
90
    ScheduleResult schedule(void (*fn)(void*), void* arg,
91
                            const timespec& abstime);
92
93
    // Pull all scheduled tasks.
94
    // This function is called in timer thread.
95
    Task* consume_tasks();
96
97
private:
98
    FastPthreadMutex _mutex;
99
    int64_t _nearest_run_time;
100
    Task* _task_head;
101
};
102
103
// Utilies for making and extracting TaskId.
104
inline TimerThread::TaskId make_task_id(
105
0
    butil::ResourceId<TimerThread::Task> slot, uint32_t version) {
106
0
    return TimerThread::TaskId((((uint64_t)version) << 32) | slot.value);
107
0
}
108
109
inline
110
0
butil::ResourceId<TimerThread::Task> slot_of_task_id(TimerThread::TaskId id) {
111
0
    butil::ResourceId<TimerThread::Task> slot = { (id & 0xFFFFFFFFul) };
112
0
    return slot;
113
0
}
114
115
0
inline uint32_t version_of_task_id(TimerThread::TaskId id) {
116
0
    return (uint32_t)(id >> 32);
117
0
}
118
119
0
inline bool task_greater(const TimerThread::Task* a, const TimerThread::Task* b) {
120
0
    return a->run_time > b->run_time;
121
0
}
122
123
0
void* TimerThread::run_this(void* arg) {
124
0
    butil::PlatformThread::SetNameSimple("brpc_timer");
125
0
    static_cast<TimerThread*>(arg)->run();
126
0
    return NULL;
127
0
}
128
129
TimerThread::TimerThread()
130
0
    : _started(false)
131
0
    , _stop(false)
132
0
    , _buckets(NULL)
133
0
    , _nearest_run_time(std::numeric_limits<int64_t>::max())
134
0
    , _nsignals(0)
135
0
    , _thread(0) {
136
0
}
137
138
0
TimerThread::~TimerThread() {
139
0
    stop_and_join();
140
0
    delete [] _buckets;
141
0
    _buckets = NULL;
142
0
}
143
144
0
int TimerThread::start(const TimerThreadOptions* options_in) {
145
0
    if (_started) {
146
0
        return 0;
147
0
    }
148
0
    if (options_in) {
149
0
        _options = *options_in;
150
0
    }
151
0
    if (_options.num_buckets == 0) {
152
0
        LOG(ERROR) << "num_buckets can't be 0";
153
0
        return EINVAL;
154
0
    }
155
0
    if (_options.num_buckets > 1024) {
156
0
        LOG(ERROR) << "num_buckets=" << _options.num_buckets << " is too big";
157
0
        return EINVAL;
158
0
    }
159
0
    _buckets = new (std::nothrow) Bucket[_options.num_buckets];
160
0
    if (NULL == _buckets) {
161
0
        LOG(ERROR) << "Fail to new _buckets";
162
0
        return ENOMEM;
163
0
    }        
164
0
    const int ret = pthread_create(&_thread, NULL, TimerThread::run_this, this);
165
0
    if (ret) {
166
0
        return ret;
167
0
    }
168
0
    _started = true;
169
0
    return 0;
170
0
}
171
172
0
TimerThread::Task* TimerThread::Bucket::consume_tasks() {
173
0
    Task* head = NULL;
174
0
    if (_task_head) { // NOTE: schedule() and consume_tasks() are sequenced
175
        // by TimerThread._nearest_run_time and fenced by TimerThread._mutex.
176
        // We can avoid touching the mutex and related cacheline when the
177
        // bucket is actually empty.
178
0
        BAIDU_SCOPED_LOCK(_mutex);
179
0
        if (_task_head) {
180
0
            head = _task_head;
181
0
            _task_head = NULL;
182
0
            _nearest_run_time = std::numeric_limits<int64_t>::max();
183
0
        }
184
0
    }
185
0
    return head;
186
0
}
187
188
TimerThread::Bucket::ScheduleResult
189
TimerThread::Bucket::schedule(void (*fn)(void*), void* arg,
190
0
                              const timespec& abstime) {
191
0
    butil::ResourceId<Task> slot_id;
192
0
    Task* task = butil::get_resource<Task>(&slot_id);
193
0
    if (task == NULL) {
194
0
        ScheduleResult result = { INVALID_TASK_ID, false };
195
0
        return result;
196
0
    }
197
0
    task->next = NULL;
198
0
    task->fn = fn;
199
0
    task->arg = arg;
200
0
    task->run_time = butil::timespec_to_microseconds(abstime);
201
0
    uint32_t version = task->version.load(butil::memory_order_relaxed);
202
0
    if (version == 0) {  // skip 0.
203
0
        task->version.fetch_add(2, butil::memory_order_relaxed);
204
0
        version = 2;
205
0
    }
206
0
    const TaskId id = make_task_id(slot_id, version);
207
0
    task->task_id = id;
208
0
    bool earlier = false;
209
0
    {
210
0
        BAIDU_SCOPED_LOCK(_mutex);
211
0
        task->next = _task_head;
212
0
        _task_head = task;
213
0
        if (task->run_time < _nearest_run_time) {
214
0
            _nearest_run_time = task->run_time;
215
0
            earlier = true;
216
0
        }
217
0
    }
218
0
    ScheduleResult result = { id, earlier };
219
0
    return result;
220
0
}
221
222
TimerThread::TaskId TimerThread::schedule(
223
0
    void (*fn)(void*), void* arg, const timespec& abstime) {
224
0
    if (_stop.load(butil::memory_order_relaxed) || !_started) {
225
        // Not add tasks when TimerThread is about to stop.
226
0
        return INVALID_TASK_ID;
227
0
    }
228
    // Hashing by pthread id is better for cache locality.
229
0
    const Bucket::ScheduleResult result = 
230
0
        _buckets[butil::fmix64(pthread_numeric_id()) % _options.num_buckets]
231
0
        .schedule(fn, arg, abstime);
232
0
    if (result.earlier) {
233
0
        bool earlier = false;
234
0
        const int64_t run_time = butil::timespec_to_microseconds(abstime);
235
0
        {
236
0
            BAIDU_SCOPED_LOCK(_mutex);
237
0
            if (run_time < _nearest_run_time) {
238
0
                _nearest_run_time = run_time;
239
0
                ++_nsignals;
240
0
                earlier = true;
241
0
            }
242
0
        }
243
0
        if (earlier) {
244
0
            futex_wake_private(&_nsignals, 1);
245
0
        }
246
0
    }
247
0
    return result.task_id;
248
0
}
249
250
// Notice that we don't recycle the Task in this function, let TimerThread::run
251
// do it. The side effect is that we may allocate many unscheduled tasks before
252
// TimerThread wakes up. The number is approximately qps * timeout_s. Under the
253
// precondition that ResourcePool<Task> caches 128K for each thread, with some
254
// further calculations, we can conclude that in a RPC scenario:
255
//   when timeout / latency < 2730 (128K / sizeof(Task))
256
// unscheduled tasks do not occupy additional memory. 2730 is a large ratio
257
// between timeout and latency in most RPC scenarios, this is why we don't
258
// try to reuse tasks right now inside unschedule() with more complicated code.
259
0
int TimerThread::unschedule(TaskId task_id) {
260
0
    const butil::ResourceId<Task> slot_id = slot_of_task_id(task_id);
261
0
    Task* const task = butil::address_resource(slot_id);
262
0
    if (task == NULL) {
263
0
        LOG(ERROR) << "Invalid task_id=" << task_id;
264
0
        return -1;
265
0
    }
266
0
    const uint32_t id_version = version_of_task_id(task_id);
267
0
    uint32_t expected_version = id_version;
268
    // This CAS is rarely contended, should be fast.
269
    // The acquire fence is paired with release fence in Task::run_and_delete
270
    // to make sure that we see all changes brought by fn(arg).
271
0
    if (task->version.compare_exchange_strong(
272
0
            expected_version, id_version + 2,
273
0
            butil::memory_order_acquire)) {
274
0
        return 0;
275
0
    }
276
0
    return (expected_version == id_version + 1) ? 1 : -1;
277
0
}
278
279
0
bool TimerThread::Task::run_and_delete() {
280
0
    const uint32_t id_version = version_of_task_id(task_id);
281
0
    uint32_t expected_version = id_version;
282
    // This CAS is rarely contended, should be fast.
283
0
    if (version.compare_exchange_strong(
284
0
            expected_version, id_version + 1, butil::memory_order_relaxed)) {
285
0
        fn(arg);
286
        // The release fence is paired with acquire fence in
287
        // TimerThread::unschedule to make changes of fn(arg) visible.
288
0
        version.store(id_version + 2, butil::memory_order_release);
289
0
        butil::return_resource(slot_of_task_id(task_id));
290
0
        return true;
291
0
    } else if (expected_version == id_version + 2) {
292
        // already unscheduled.
293
0
        butil::return_resource(slot_of_task_id(task_id));
294
0
        return false;
295
0
    } else {
296
        // Impossible.
297
0
        LOG(ERROR) << "Invalid version=" << expected_version
298
0
                   << ", expecting " << id_version + 2;
299
0
        return false;
300
0
    }
301
0
}
302
303
0
bool TimerThread::Task::try_delete() {
304
0
    const uint32_t id_version = version_of_task_id(task_id);
305
0
    if (version.load(butil::memory_order_relaxed) != id_version) {
306
0
        CHECK_EQ(version.load(butil::memory_order_relaxed), id_version + 2);
307
0
        butil::return_resource(slot_of_task_id(task_id));
308
0
        return true;
309
0
    }
310
0
    return false;
311
0
}
312
313
template <typename T>
314
0
static T deref_value(void* arg) {
315
0
    return *(T*)arg;
316
0
}
Unexecuted instantiation: timer_thread.cpp:unsigned long bthread::deref_value<unsigned long>(void*)
Unexecuted instantiation: timer_thread.cpp:double bthread::deref_value<double>(void*)
317
318
0
void TimerThread::run() {
319
0
    run_worker_startfn();
320
#ifdef BAIDU_INTERNAL
321
    logging::ComlogInitializer comlog_initializer;
322
#endif
323
324
0
    int64_t last_sleep_time = butil::gettimeofday_us();
325
0
    BT_VLOG << "Started TimerThread=" << pthread_self();
326
327
    // min heap of tasks (ordered by run_time)
328
0
    std::vector<Task*> tasks;
329
0
    tasks.reserve(4096);
330
331
    // vars
332
0
    size_t nscheduled = 0;
333
0
    bvar::PassiveStatus<size_t> nscheduled_var(deref_value<size_t>, &nscheduled);
334
0
    bvar::PerSecond<bvar::PassiveStatus<size_t> > nscheduled_second(&nscheduled_var);
335
0
    size_t ntriggered = 0;
336
0
    bvar::PassiveStatus<size_t> ntriggered_var(deref_value<size_t>, &ntriggered);
337
0
    bvar::PerSecond<bvar::PassiveStatus<size_t> > ntriggered_second(&ntriggered_var);
338
0
    double busy_seconds = 0;
339
0
    bvar::PassiveStatus<double> busy_seconds_var(deref_value<double>, &busy_seconds);
340
0
    bvar::PerSecond<bvar::PassiveStatus<double> > busy_seconds_second(&busy_seconds_var);
341
0
    if (!_options.bvar_prefix.empty()) {
342
0
        nscheduled_second.expose_as(_options.bvar_prefix, "scheduled_second");
343
0
        ntriggered_second.expose_as(_options.bvar_prefix, "triggered_second");
344
0
        busy_seconds_second.expose_as(_options.bvar_prefix, "usage");
345
0
    }
346
    
347
0
    while (!_stop.load(butil::memory_order_relaxed)) {
348
        // Clear _nearest_run_time before consuming tasks from buckets.
349
        // This helps us to be aware of earliest task of the new tasks before we
350
        // would run the consumed tasks.
351
0
        {
352
0
            BAIDU_SCOPED_LOCK(_mutex);
353
            // This check of _stop ensures we won't miss the reset of _nearest_run_time
354
            // to 0 in stop_and_join, avoiding potential race conditions.
355
0
            if (BAIDU_UNLIKELY(_stop.load(butil::memory_order_relaxed))) {
356
0
                break;
357
0
            }
358
0
            _nearest_run_time = std::numeric_limits<int64_t>::max();
359
0
        }
360
        
361
        // Pull tasks from buckets.
362
0
        for (size_t i = 0; i < _options.num_buckets; ++i) {
363
0
            Bucket& bucket = _buckets[i];
364
0
            for (Task* p = bucket.consume_tasks(); p != nullptr; ++nscheduled) {
365
                // p->next should be kept first
366
                // in case of the deletion of Task p which is unscheduled
367
0
                Task* next_task = p->next;
368
369
0
                if (!p->try_delete()) { // remove the task if it's unscheduled
370
0
                    tasks.push_back(p);
371
0
                    std::push_heap(tasks.begin(), tasks.end(), task_greater);
372
0
                }
373
0
                p = next_task;
374
0
            }
375
0
        }
376
377
0
        bool pull_again = false;
378
0
        while (!tasks.empty()) {
379
0
            Task* task1 = tasks[0];  // the about-to-run task
380
0
            if (butil::gettimeofday_us() < task1->run_time) {  // not ready yet.
381
0
                break;
382
0
            }
383
            // Each time before we run the earliest task (that we think), 
384
            // check the globally shared _nearest_run_time. If a task earlier
385
            // than task1 was scheduled during pulling from buckets, we'll
386
            // know. In RPC scenarios, _nearest_run_time is not often changed by
387
            // threads because the task needs to be the earliest in its bucket,
388
            // since run_time of scheduled tasks are often in ascending order,
389
            // most tasks are unlikely to be "earliest". (If run_time of tasks
390
            // are in descending orders, all tasks are "earliest" after every
391
            // insertion, and they'll grab _mutex and change _nearest_run_time
392
            // frequently, fortunately this is not true at most of time).
393
0
            {
394
0
                BAIDU_SCOPED_LOCK(_mutex);
395
0
                if (task1->run_time > _nearest_run_time) {
396
                    // a task is earlier than task1. We need to check buckets.
397
0
                    pull_again = true;
398
0
                    break;
399
0
                }
400
0
            }
401
0
            std::pop_heap(tasks.begin(), tasks.end(), task_greater);
402
0
            tasks.pop_back();
403
0
            if (task1->run_and_delete()) {
404
0
                ++ntriggered;
405
0
            }
406
0
        }
407
0
        if (pull_again) {
408
0
            BT_VLOG << "pull again, tasks=" << tasks.size();
409
0
            continue;
410
0
        }
411
412
        // The realtime to wait for.
413
0
        int64_t next_run_time = std::numeric_limits<int64_t>::max();
414
0
        if (!tasks.empty()) {
415
0
            next_run_time = tasks[0]->run_time;
416
0
        }
417
        // Similarly with the situation before running tasks, we check
418
        // _nearest_run_time to prevent us from waiting on a non-earliest
419
        // task. We also use the _nsignal to make sure that if new task 
420
        // is earlier than the realtime that we wait for, we'll wake up.
421
0
        int expected_nsignals = 0;
422
0
        {
423
0
            BAIDU_SCOPED_LOCK(_mutex);
424
0
            if (next_run_time > _nearest_run_time) {
425
                // a task is earlier than what we would wait for.
426
                // We need to check the buckets.
427
0
                continue;
428
0
            } else {
429
0
                _nearest_run_time = next_run_time;
430
0
                expected_nsignals = _nsignals;
431
0
            }
432
0
        }
433
0
        timespec* ptimeout = NULL;
434
0
        timespec next_timeout = { 0, 0 };
435
0
        const int64_t now = butil::gettimeofday_us();
436
0
        if (next_run_time != std::numeric_limits<int64_t>::max()) {
437
0
            next_timeout = butil::microseconds_to_timespec(next_run_time - now);
438
0
            ptimeout = &next_timeout;
439
0
        }
440
0
        busy_seconds += (now - last_sleep_time) / 1000000.0;
441
0
        futex_wait_private(&_nsignals, expected_nsignals, ptimeout);
442
0
        last_sleep_time = butil::gettimeofday_us();
443
0
    }
444
0
    BT_VLOG << "Ended TimerThread=" << pthread_self();
445
0
}
446
447
0
void TimerThread::stop_and_join() {
448
0
    _stop.store(true, butil::memory_order_relaxed);
449
0
    if (_started) {
450
0
        {
451
0
            BAIDU_SCOPED_LOCK(_mutex);
452
             // trigger pull_again and wakeup TimerThread
453
0
            _nearest_run_time = 0;
454
0
            ++_nsignals;
455
0
        }
456
0
        if (pthread_self() != _thread) {
457
            // stop_and_join was not called from a running task.
458
            // wake up the timer thread in case it is sleeping.
459
0
            futex_wake_private(&_nsignals, 1);
460
0
            pthread_join(_thread, NULL);
461
0
        }
462
0
    }
463
0
}
464
465
static pthread_once_t g_timer_thread_once = PTHREAD_ONCE_INIT;
466
static TimerThread* g_timer_thread = NULL;
467
0
static void init_global_timer_thread() {
468
0
    g_timer_thread = new (std::nothrow) TimerThread;
469
0
    if (g_timer_thread == NULL) {
470
0
        LOG(FATAL) << "Fail to new g_timer_thread";
471
0
        return;
472
0
    }
473
0
    TimerThreadOptions options;
474
0
    options.bvar_prefix = "bthread_timer";
475
0
    options.num_buckets = FLAGS_brpc_timer_num_buckets;
476
0
    const int rc = g_timer_thread->start(&options);
477
0
    if (rc != 0) {
478
0
        LOG(FATAL) << "Fail to start timer_thread, " << berror(rc);
479
0
        delete g_timer_thread;
480
0
        g_timer_thread = NULL;
481
0
        return;
482
0
    }
483
0
}
484
0
TimerThread* get_or_create_global_timer_thread() {
485
0
    pthread_once(&g_timer_thread_once, init_global_timer_thread);
486
0
    return g_timer_thread;
487
0
}
488
0
TimerThread* get_global_timer_thread() {
489
0
    return g_timer_thread;
490
0
}
491
492
}  // end namespace bthread