Coverage Report

Created: 2026-03-20 06:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/bthread/execution_queue.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
// Date: 2015/10/23 18:16:16
21
22
#ifndef  BTHREAD_EXECUTION_QUEUE_H
23
#define  BTHREAD_EXECUTION_QUEUE_H
24
25
#include "bthread/bthread.h"
26
#include "butil/type_traits.h"
27
28
namespace bthread {
29
30
// ExecutionQueue is a special wait-free MPSC queue of which the consumer thread
31
// is auto started by the execute operation and auto quits if there are no more 
32
// tasks, in another word there isn't a daemon bthread waiting to consume tasks.
33
34
template <typename T> struct ExecutionQueueId;
35
template <typename T> class ExecutionQueue;
36
struct TaskNode;
37
class ExecutionQueueBase;
38
39
class TaskIteratorBase {
40
DISALLOW_COPY_AND_ASSIGN(TaskIteratorBase);
41
friend class ExecutionQueueBase;
42
public:
43
    // Returns true when the ExecutionQueue is stopped and there will never be
44
    // more tasks and you can safely release all the related resources ever 
45
    // after.
46
0
    bool is_queue_stopped() const { return _is_stopped; }
47
    explicit operator bool() const;
48
protected:
49
    TaskIteratorBase(TaskNode* head, ExecutionQueueBase* queue,
50
                     bool is_stopped, bool high_priority)
51
0
        : _cur_node(head)
52
0
        , _head(head)
53
0
        , _q(queue)
54
0
        , _is_stopped(is_stopped)
55
0
        , _high_priority(high_priority)
56
0
        , _should_break(false)
57
0
        , _num_iterated(0)
58
0
    { operator++(); }
59
    ~TaskIteratorBase();
60
    void operator++();
61
0
    TaskNode* cur_node() const { return _cur_node; }
62
private:
63
0
    int num_iterated() const { return _num_iterated; }
64
    bool should_break_for_high_priority_tasks();
65
66
    TaskNode*               _cur_node;
67
    TaskNode*               _head;
68
    ExecutionQueueBase*     _q;
69
    bool                    _is_stopped;
70
    bool                    _high_priority;
71
    bool                    _should_break;
72
    int                     _num_iterated;
73
};
74
75
// Iterate over the given tasks
76
// 
77
// Examples:
78
// int demo_execute(void* meta, TaskIterator<T>& iter) {
79
//     if (iter.is_queue_stopped()) {
80
//         // destroy meta and related resources
81
//         return 0;
82
//     }
83
//     for (; iter; ++iter) {
84
//         // do_something(*iter)
85
//         // or do_something(iter->a_member_of_T)
86
//     }
87
//     return 0;
88
// }
89
template <typename T>
90
class TaskIterator : public TaskIteratorBase {
91
public:
92
    typedef T*          pointer;
93
    typedef T&          reference;
94
95
    TaskIterator() = delete;
96
    reference operator*() const;
97
    pointer operator->() const { return &(operator*()); }
98
    TaskIterator& operator++();
99
    void operator++(int);
100
};
101
102
struct TaskHandle {
103
    TaskHandle();
104
    TaskNode* node;
105
    int64_t version;
106
};
107
108
struct TaskOptions {
109
    TaskOptions();
110
    TaskOptions(bool high_priority, bool in_place_if_possible);
111
112
    // Executor would execute high-priority tasks in the FIFO order but before 
113
    // all pending normal-priority tasks.
114
    // NOTE: We don't guarantee any kind of real-time as there might be tasks still
115
    // in process which are uninterruptible.
116
    //
117
    // Default: false 
118
    bool high_priority;
119
120
    // If |in_place_if_possible| is true, execution_queue_execute would call 
121
    // execute immediately instead of starting a bthread if possible
122
    //
123
    // Note: Running callbacks in place might cause the deadlock issue, you
124
    // should be very careful turning this flag on.
125
    //
126
    // Default: false
127
    bool in_place_if_possible;
128
};
129
130
const static TaskOptions TASK_OPTIONS_NORMAL = TaskOptions(false, false);
131
const static TaskOptions TASK_OPTIONS_URGENT = TaskOptions(true, false);
132
const static TaskOptions TASK_OPTIONS_INPLACE = TaskOptions(false, true);
133
134
class Executor {
135
public:
136
    virtual ~Executor() = default;
137
138
    // Return 0 on success.
139
    virtual int submit(void * (*fn)(void*), void* args) = 0;
140
};
141
142
struct ExecutionQueueOptions {
143
    ExecutionQueueOptions();
144
145
    // Execute in resident pthread instead of bthread. default: false.
146
    bool use_pthread;
147
148
    // Attribute of the bthread which execute runs on. default: BTHREAD_ATTR_NORMAL
149
    // Bthread will be used when executor = NULL and use_pthread == false.
150
    bthread_attr_t bthread_attr;
151
152
    // Executor that tasks run on. default: NULL
153
    // Note that TaskOptions.in_place_if_possible = false will not work, if implementation of
154
    // Executor is in-place(synchronous).
155
    Executor * executor;
156
};
157
158
// Start an ExecutionQueue. If |options| is NULL, the queue will be created with
159
// the default options. 
160
// Returns 0 on success, errno otherwise
161
// NOTE: type |T| can be non-POD but must be copy-constructive
162
template <typename T>
163
int execution_queue_start(
164
        ExecutionQueueId<T>* id, 
165
        const ExecutionQueueOptions* options,
166
        int (*execute)(void* meta, TaskIterator<T>& iter),
167
        void* meta);
168
169
// Stop the ExecutionQueue.
170
// After this function is called:
171
//  - All the following calls to execution_queue_execute would fail immediately.
172
//  - The executor will call |execute| with TaskIterator::is_queue_stopped() being 
173
//    true exactly once when all the pending tasks have been executed, and after
174
//    this point it's ok to release the resource referenced by |meta|.
175
// Returns 0 on success, errno otherwise.
176
template <typename T>
177
int execution_queue_stop(ExecutionQueueId<T> id);
178
179
// Wait until the stop task (Iterator::is_queue_stopped() returns true) has
180
// been executed
181
template <typename T>
182
int execution_queue_join(ExecutionQueueId<T> id);
183
184
// Thread-safe and Wait-free.
185
// Execute a task with default TaskOptions (normal task);
186
template <typename T>
187
int execution_queue_execute(ExecutionQueueId<T> id, 
188
                            typename butil::add_const_reference<T>::type task);
189
190
// Thread-safe and Wait-free.
191
// Execute a task with options. e.g
192
// bthread::execution_queue_execute(queue, task, &bthread::TASK_OPTIONS_URGENT)
193
// If |options| is NULL, we will use default options (normal task)
194
// If |handle| is not NULL, we will assign it with the handler of this task.
195
template <typename T>
196
int execution_queue_execute(ExecutionQueueId<T> id, 
197
                            typename butil::add_const_reference<T>::type task,
198
                            const TaskOptions* options);
199
template <typename T>
200
int execution_queue_execute(ExecutionQueueId<T> id, 
201
                            typename butil::add_const_reference<T>::type task,
202
                            const TaskOptions* options,
203
                            TaskHandle* handle);
204
205
template <typename T>
206
int execution_queue_execute(ExecutionQueueId<T> id,
207
                            T&& task);
208
209
template <typename T>
210
int execution_queue_execute(ExecutionQueueId<T> id,
211
                            T&& task,
212
                            const TaskOptions* options);
213
214
template <typename T>
215
int execution_queue_execute(ExecutionQueueId<T> id,
216
                            T&& task,
217
                            const TaskOptions* options,
218
                            TaskHandle* handle);
219
220
// [Thread safe and ABA free] Cancel the corresponding task.
221
// Returns:
222
//  -1: The task was executed or h is an invalid handle
223
//  0: Success
224
//  1: The task is executing 
225
int execution_queue_cancel(const TaskHandle& h);
226
227
// Thread-safe and Wait-free
228
// Address a reference of ExecutionQueue if |id| references to a valid 
229
// ExecutionQueue
230
//
231
// |execution_queue_execute| internally fetches a reference of ExecutionQueue at
232
// the beginning and releases it at the end, which makes 2 additional cache
233
// updates. In some critical situation where the overhead of
234
// execution_queue_execute matters, you can avoid this by addressing the 
235
// reference at the beginning of every producer, and execute tasks execatly
236
// through the reference instead of id.
237
//
238
// Note: It makes |execution_queue_stop| a little complicated in the user level,
239
// as we don't pass the `stop task' to |execute| until no one holds any reference.
240
// If you are not sure about the ownership of the return value (which releases
241
// the reference of the very ExecutionQueue in the destructor) and don't that
242
// care the overhead of ExecutionQueue, DON'T use this function
243
template <typename T>
244
typename ExecutionQueue<T>::scoped_ptr_t 
245
execution_queue_address(ExecutionQueueId<T> id);
246
247
}  // namespace bthread
248
249
#include "bthread/execution_queue_inl.h"
250
251
#endif  //BTHREAD_EXECUTION_QUEUE_H