/src/brpc/src/bthread/execution_queue.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // bthread - An M:N threading library to make applications more concurrent. |
19 | | |
20 | | // Date: 2015/10/23 18:16:16 |
21 | | |
22 | | #ifndef BTHREAD_EXECUTION_QUEUE_H |
23 | | #define BTHREAD_EXECUTION_QUEUE_H |
24 | | |
25 | | #include "bthread/bthread.h" |
26 | | #include "butil/type_traits.h" |
27 | | |
28 | | namespace bthread { |
29 | | |
30 | | // ExecutionQueue is a special wait-free MPSC queue of which the consumer thread |
31 | | // is auto started by the execute operation and auto quits if there are no more |
32 | | // tasks, in another word there isn't a daemon bthread waiting to consume tasks. |
33 | | |
34 | | template <typename T> struct ExecutionQueueId; |
35 | | template <typename T> class ExecutionQueue; |
36 | | struct TaskNode; |
37 | | class ExecutionQueueBase; |
38 | | |
39 | | class TaskIteratorBase { |
40 | | DISALLOW_COPY_AND_ASSIGN(TaskIteratorBase); |
41 | | friend class ExecutionQueueBase; |
42 | | public: |
43 | | // Returns true when the ExecutionQueue is stopped and there will never be |
44 | | // more tasks and you can safely release all the related resources ever |
45 | | // after. |
46 | 0 | bool is_queue_stopped() const { return _is_stopped; } |
47 | | explicit operator bool() const; |
48 | | protected: |
49 | | TaskIteratorBase(TaskNode* head, ExecutionQueueBase* queue, |
50 | | bool is_stopped, bool high_priority) |
51 | 0 | : _cur_node(head) |
52 | 0 | , _head(head) |
53 | 0 | , _q(queue) |
54 | 0 | , _is_stopped(is_stopped) |
55 | 0 | , _high_priority(high_priority) |
56 | 0 | , _should_break(false) |
57 | 0 | , _num_iterated(0) |
58 | 0 | { operator++(); } |
59 | | ~TaskIteratorBase(); |
60 | | void operator++(); |
61 | 0 | TaskNode* cur_node() const { return _cur_node; } |
62 | | private: |
63 | 0 | int num_iterated() const { return _num_iterated; } |
64 | | bool should_break_for_high_priority_tasks(); |
65 | | |
66 | | TaskNode* _cur_node; |
67 | | TaskNode* _head; |
68 | | ExecutionQueueBase* _q; |
69 | | bool _is_stopped; |
70 | | bool _high_priority; |
71 | | bool _should_break; |
72 | | int _num_iterated; |
73 | | }; |
74 | | |
75 | | // Iterate over the given tasks |
76 | | // |
77 | | // Examples: |
78 | | // int demo_execute(void* meta, TaskIterator<T>& iter) { |
79 | | // if (iter.is_queue_stopped()) { |
80 | | // // destroy meta and related resources |
81 | | // return 0; |
82 | | // } |
83 | | // for (; iter; ++iter) { |
84 | | // // do_something(*iter) |
85 | | // // or do_something(iter->a_member_of_T) |
86 | | // } |
87 | | // return 0; |
88 | | // } |
89 | | template <typename T> |
90 | | class TaskIterator : public TaskIteratorBase { |
91 | | public: |
92 | | typedef T* pointer; |
93 | | typedef T& reference; |
94 | | |
95 | | TaskIterator() = delete; |
96 | | reference operator*() const; |
97 | | pointer operator->() const { return &(operator*()); } |
98 | | TaskIterator& operator++(); |
99 | | void operator++(int); |
100 | | }; |
101 | | |
102 | | struct TaskHandle { |
103 | | TaskHandle(); |
104 | | TaskNode* node; |
105 | | int64_t version; |
106 | | }; |
107 | | |
108 | | struct TaskOptions { |
109 | | TaskOptions(); |
110 | | TaskOptions(bool high_priority, bool in_place_if_possible); |
111 | | |
112 | | // Executor would execute high-priority tasks in the FIFO order but before |
113 | | // all pending normal-priority tasks. |
114 | | // NOTE: We don't guarantee any kind of real-time as there might be tasks still |
115 | | // in process which are uninterruptible. |
116 | | // |
117 | | // Default: false |
118 | | bool high_priority; |
119 | | |
120 | | // If |in_place_if_possible| is true, execution_queue_execute would call |
121 | | // execute immediately instead of starting a bthread if possible |
122 | | // |
123 | | // Note: Running callbacks in place might cause the deadlock issue, you |
124 | | // should be very careful turning this flag on. |
125 | | // |
126 | | // Default: false |
127 | | bool in_place_if_possible; |
128 | | }; |
129 | | |
130 | | const static TaskOptions TASK_OPTIONS_NORMAL = TaskOptions(false, false); |
131 | | const static TaskOptions TASK_OPTIONS_URGENT = TaskOptions(true, false); |
132 | | const static TaskOptions TASK_OPTIONS_INPLACE = TaskOptions(false, true); |
133 | | |
134 | | class Executor { |
135 | | public: |
136 | | virtual ~Executor() = default; |
137 | | |
138 | | // Return 0 on success. |
139 | | virtual int submit(void * (*fn)(void*), void* args) = 0; |
140 | | }; |
141 | | |
142 | | struct ExecutionQueueOptions { |
143 | | ExecutionQueueOptions(); |
144 | | |
145 | | // Execute in resident pthread instead of bthread. default: false. |
146 | | bool use_pthread; |
147 | | |
148 | | // Attribute of the bthread which execute runs on. default: BTHREAD_ATTR_NORMAL |
149 | | // Bthread will be used when executor = NULL and use_pthread == false. |
150 | | bthread_attr_t bthread_attr; |
151 | | |
152 | | // Executor that tasks run on. default: NULL |
153 | | // Note that TaskOptions.in_place_if_possible = false will not work, if implementation of |
154 | | // Executor is in-place(synchronous). |
155 | | Executor * executor; |
156 | | }; |
157 | | |
158 | | // Start an ExecutionQueue. If |options| is NULL, the queue will be created with |
159 | | // the default options. |
160 | | // Returns 0 on success, errno otherwise |
161 | | // NOTE: type |T| can be non-POD but must be copy-constructive |
162 | | template <typename T> |
163 | | int execution_queue_start( |
164 | | ExecutionQueueId<T>* id, |
165 | | const ExecutionQueueOptions* options, |
166 | | int (*execute)(void* meta, TaskIterator<T>& iter), |
167 | | void* meta); |
168 | | |
169 | | // Stop the ExecutionQueue. |
170 | | // After this function is called: |
171 | | // - All the following calls to execution_queue_execute would fail immediately. |
172 | | // - The executor will call |execute| with TaskIterator::is_queue_stopped() being |
173 | | // true exactly once when all the pending tasks have been executed, and after |
174 | | // this point it's ok to release the resource referenced by |meta|. |
175 | | // Returns 0 on success, errno otherwise. |
176 | | template <typename T> |
177 | | int execution_queue_stop(ExecutionQueueId<T> id); |
178 | | |
179 | | // Wait until the stop task (Iterator::is_queue_stopped() returns true) has |
180 | | // been executed |
181 | | template <typename T> |
182 | | int execution_queue_join(ExecutionQueueId<T> id); |
183 | | |
184 | | // Thread-safe and Wait-free. |
185 | | // Execute a task with default TaskOptions (normal task); |
186 | | template <typename T> |
187 | | int execution_queue_execute(ExecutionQueueId<T> id, |
188 | | typename butil::add_const_reference<T>::type task); |
189 | | |
190 | | // Thread-safe and Wait-free. |
191 | | // Execute a task with options. e.g |
192 | | // bthread::execution_queue_execute(queue, task, &bthread::TASK_OPTIONS_URGENT) |
193 | | // If |options| is NULL, we will use default options (normal task) |
194 | | // If |handle| is not NULL, we will assign it with the handler of this task. |
195 | | template <typename T> |
196 | | int execution_queue_execute(ExecutionQueueId<T> id, |
197 | | typename butil::add_const_reference<T>::type task, |
198 | | const TaskOptions* options); |
199 | | template <typename T> |
200 | | int execution_queue_execute(ExecutionQueueId<T> id, |
201 | | typename butil::add_const_reference<T>::type task, |
202 | | const TaskOptions* options, |
203 | | TaskHandle* handle); |
204 | | |
205 | | template <typename T> |
206 | | int execution_queue_execute(ExecutionQueueId<T> id, |
207 | | T&& task); |
208 | | |
209 | | template <typename T> |
210 | | int execution_queue_execute(ExecutionQueueId<T> id, |
211 | | T&& task, |
212 | | const TaskOptions* options); |
213 | | |
214 | | template <typename T> |
215 | | int execution_queue_execute(ExecutionQueueId<T> id, |
216 | | T&& task, |
217 | | const TaskOptions* options, |
218 | | TaskHandle* handle); |
219 | | |
220 | | // [Thread safe and ABA free] Cancel the corresponding task. |
221 | | // Returns: |
222 | | // -1: The task was executed or h is an invalid handle |
223 | | // 0: Success |
224 | | // 1: The task is executing |
225 | | int execution_queue_cancel(const TaskHandle& h); |
226 | | |
227 | | // Thread-safe and Wait-free |
228 | | // Address a reference of ExecutionQueue if |id| references to a valid |
229 | | // ExecutionQueue |
230 | | // |
231 | | // |execution_queue_execute| internally fetches a reference of ExecutionQueue at |
232 | | // the beginning and releases it at the end, which makes 2 additional cache |
233 | | // updates. In some critical situation where the overhead of |
234 | | // execution_queue_execute matters, you can avoid this by addressing the |
235 | | // reference at the beginning of every producer, and execute tasks execatly |
236 | | // through the reference instead of id. |
237 | | // |
238 | | // Note: It makes |execution_queue_stop| a little complicated in the user level, |
239 | | // as we don't pass the `stop task' to |execute| until no one holds any reference. |
240 | | // If you are not sure about the ownership of the return value (which releases |
241 | | // the reference of the very ExecutionQueue in the destructor) and don't that |
242 | | // care the overhead of ExecutionQueue, DON'T use this function |
243 | | template <typename T> |
244 | | typename ExecutionQueue<T>::scoped_ptr_t |
245 | | execution_queue_address(ExecutionQueueId<T> id); |
246 | | |
247 | | } // namespace bthread |
248 | | |
249 | | #include "bthread/execution_queue_inl.h" |
250 | | |
251 | | #endif //BTHREAD_EXECUTION_QUEUE_H |