/src/samba/lib/tevent/tevent_queue.c
Line | Count | Source |
1 | | /* |
2 | | Unix SMB/CIFS implementation. |
3 | | Infrastructure for async requests |
4 | | Copyright (C) Volker Lendecke 2008 |
5 | | Copyright (C) Stefan Metzmacher 2009 |
6 | | |
7 | | ** NOTE! The following LGPL license applies to the tevent |
8 | | ** library. This does NOT imply that all of Samba is released |
9 | | ** under the LGPL |
10 | | |
11 | | This library is free software; you can redistribute it and/or |
12 | | modify it under the terms of the GNU Lesser General Public |
13 | | License as published by the Free Software Foundation; either |
14 | | version 3 of the License, or (at your option) any later version. |
15 | | |
16 | | This library is distributed in the hope that it will be useful, |
17 | | but WITHOUT ANY WARRANTY; without even the implied warranty of |
18 | | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
19 | | Lesser General Public License for more details. |
20 | | |
21 | | You should have received a copy of the GNU Lesser General Public |
22 | | License along with this library; if not, see <http://www.gnu.org/licenses/>. |
23 | | */ |
24 | | |
25 | | #include "replace.h" |
26 | | #include "tevent.h" |
27 | | #include "tevent_internal.h" |
28 | | #include "tevent_util.h" |
29 | | |
30 | | #undef tevent_queue_add |
31 | | #undef tevent_queue_add_entry |
32 | | #undef tevent_queue_add_optimize_empty |
33 | | |
34 | | struct tevent_queue_entry { |
35 | | struct tevent_queue_entry *prev, *next; |
36 | | struct tevent_queue *queue; |
37 | | |
38 | | bool triggered; |
39 | | |
40 | | struct tevent_req *req; |
41 | | struct tevent_context *ev; |
42 | | |
43 | | tevent_queue_trigger_fn_t trigger; |
44 | | const char *trigger_name; |
45 | | void *private_data; |
46 | | uint64_t tag; |
47 | | }; |
48 | | |
49 | | struct tevent_queue { |
50 | | const char *name; |
51 | | const char *location; |
52 | | |
53 | | bool running; |
54 | | struct tevent_immediate *immediate; |
55 | | |
56 | | size_t length; |
57 | | struct tevent_queue_entry *list; |
58 | | }; |
59 | | |
60 | | static void tevent_queue_immediate_trigger(struct tevent_context *ev, |
61 | | struct tevent_immediate *im, |
62 | | void *private_data); |
63 | | |
64 | | static int tevent_queue_entry_destructor(struct tevent_queue_entry *e) |
65 | 0 | { |
66 | 0 | struct tevent_queue *q = e->queue; |
67 | |
|
68 | 0 | if (!q) { |
69 | 0 | return 0; |
70 | 0 | } |
71 | | |
72 | 0 | tevent_trace_queue_callback(q->list->ev, e, TEVENT_EVENT_TRACE_DETACH); |
73 | 0 | tevent_thread_call_depth_notify(TEVENT_CALL_FLOW_REQ_QUEUE_LEAVE, |
74 | 0 | q->list->req, |
75 | 0 | q->list->req->internal.call_depth, |
76 | 0 | e->trigger_name); |
77 | 0 | DLIST_REMOVE(q->list, e); |
78 | 0 | q->length--; |
79 | |
|
80 | 0 | if (!q->running) { |
81 | 0 | return 0; |
82 | 0 | } |
83 | | |
84 | 0 | if (!q->list) { |
85 | 0 | return 0; |
86 | 0 | } |
87 | | |
88 | 0 | if (q->list->triggered) { |
89 | 0 | return 0; |
90 | 0 | } |
91 | | |
92 | 0 | tevent_schedule_immediate(q->immediate, |
93 | 0 | q->list->ev, |
94 | 0 | tevent_queue_immediate_trigger, |
95 | 0 | q); |
96 | |
|
97 | 0 | return 0; |
98 | 0 | } |
99 | | |
100 | | static int tevent_queue_destructor(struct tevent_queue *q) |
101 | 0 | { |
102 | 0 | q->running = false; |
103 | |
|
104 | 0 | while (q->list) { |
105 | 0 | struct tevent_queue_entry *e = q->list; |
106 | 0 | talloc_free(e); |
107 | 0 | } |
108 | |
|
109 | 0 | return 0; |
110 | 0 | } |
111 | | |
112 | | struct tevent_queue *_tevent_queue_create(TALLOC_CTX *mem_ctx, |
113 | | const char *name, |
114 | | const char *location) |
115 | 0 | { |
116 | 0 | struct tevent_queue *queue; |
117 | |
|
118 | 0 | queue = talloc_zero(mem_ctx, struct tevent_queue); |
119 | 0 | if (!queue) { |
120 | 0 | return NULL; |
121 | 0 | } |
122 | | |
123 | 0 | queue->name = talloc_strdup(queue, name); |
124 | 0 | if (!queue->name) { |
125 | 0 | talloc_free(queue); |
126 | 0 | return NULL; |
127 | 0 | } |
128 | 0 | queue->immediate = tevent_create_immediate(queue); |
129 | 0 | if (!queue->immediate) { |
130 | 0 | talloc_free(queue); |
131 | 0 | return NULL; |
132 | 0 | } |
133 | | |
134 | 0 | queue->location = location; |
135 | | |
136 | | /* queue is running by default */ |
137 | 0 | queue->running = true; |
138 | |
|
139 | 0 | talloc_set_destructor(queue, tevent_queue_destructor); |
140 | 0 | return queue; |
141 | 0 | } |
142 | | |
143 | | static void tevent_queue_immediate_trigger(struct tevent_context *ev, |
144 | | struct tevent_immediate *im, |
145 | | void *private_data) |
146 | 0 | { |
147 | 0 | struct tevent_queue *q = |
148 | 0 | talloc_get_type_abort(private_data, |
149 | 0 | struct tevent_queue); |
150 | |
|
151 | 0 | if (!q->running) { |
152 | 0 | return; |
153 | 0 | } |
154 | | |
155 | 0 | if (!q->list) { |
156 | 0 | return; |
157 | 0 | } |
158 | | |
159 | 0 | tevent_trace_queue_callback(ev, q->list, |
160 | 0 | TEVENT_EVENT_TRACE_BEFORE_HANDLER); |
161 | | /* Set the call depth of the request coming from the queue. */ |
162 | 0 | tevent_thread_call_depth_notify(TEVENT_CALL_FLOW_REQ_QUEUE_TRIGGER, |
163 | 0 | q->list->req, |
164 | 0 | q->list->req->internal.call_depth, |
165 | 0 | q->list->trigger_name); |
166 | 0 | q->list->triggered = true; |
167 | 0 | q->list->trigger(q->list->req, q->list->private_data); |
168 | 0 | } |
169 | | |
170 | | static void tevent_queue_noop_trigger(struct tevent_req *req, |
171 | | void *_private_data) |
172 | 0 | { |
173 | | /* this is doing nothing but blocking the queue */ |
174 | 0 | } |
175 | | |
176 | | static struct tevent_queue_entry *tevent_queue_add_internal( |
177 | | struct tevent_queue *queue, |
178 | | struct tevent_context *ev, |
179 | | struct tevent_req *req, |
180 | | tevent_queue_trigger_fn_t trigger, |
181 | | const char *trigger_name, |
182 | | void *private_data, |
183 | | bool allow_direct) |
184 | 0 | { |
185 | 0 | struct tevent_queue_entry *e; |
186 | |
|
187 | 0 | e = talloc_zero(req, struct tevent_queue_entry); |
188 | 0 | if (e == NULL) { |
189 | 0 | return NULL; |
190 | 0 | } |
191 | | |
192 | | /* |
193 | | * if there is no trigger, it is just a blocker |
194 | | */ |
195 | 0 | if (trigger == NULL) { |
196 | 0 | trigger = tevent_queue_noop_trigger; |
197 | 0 | } |
198 | |
|
199 | 0 | e->queue = queue; |
200 | 0 | e->req = req; |
201 | 0 | e->ev = ev; |
202 | 0 | e->trigger = trigger; |
203 | 0 | e->trigger_name = trigger_name; |
204 | 0 | e->private_data = private_data; |
205 | |
|
206 | 0 | if (queue->length > 0) { |
207 | | /* |
208 | | * if there are already entries in the |
209 | | * queue do not optimize. |
210 | | */ |
211 | 0 | allow_direct = false; |
212 | 0 | } |
213 | |
|
214 | 0 | if (req->async.fn != NULL) { |
215 | | /* |
216 | | * If the caller wants to optimize for the |
217 | | * empty queue case, call the trigger only |
218 | | * if there is no callback defined for the |
219 | | * request yet. |
220 | | */ |
221 | 0 | allow_direct = false; |
222 | 0 | } |
223 | |
|
224 | 0 | DLIST_ADD_END(queue->list, e); |
225 | 0 | queue->length++; |
226 | 0 | talloc_set_destructor(e, tevent_queue_entry_destructor); |
227 | 0 | tevent_trace_queue_callback(ev, e, TEVENT_EVENT_TRACE_ATTACH); |
228 | 0 | tevent_thread_call_depth_notify(TEVENT_CALL_FLOW_REQ_QUEUE_ENTER, |
229 | 0 | req, |
230 | 0 | req->internal.call_depth, |
231 | 0 | e->trigger_name); |
232 | |
|
233 | 0 | if (!queue->running) { |
234 | 0 | return e; |
235 | 0 | } |
236 | | |
237 | 0 | if (queue->list->triggered) { |
238 | 0 | return e; |
239 | 0 | } |
240 | | |
241 | | /* |
242 | | * If allowed we directly call the trigger |
243 | | * avoiding possible delays caused by |
244 | | * an immediate event. |
245 | | */ |
246 | 0 | if (allow_direct) { |
247 | 0 | tevent_trace_queue_callback(ev, |
248 | 0 | queue->list, |
249 | 0 | TEVENT_EVENT_TRACE_BEFORE_HANDLER); |
250 | 0 | queue->list->triggered = true; |
251 | 0 | queue->list->trigger(queue->list->req, |
252 | 0 | queue->list->private_data); |
253 | 0 | return e; |
254 | 0 | } |
255 | | |
256 | 0 | tevent_schedule_immediate(queue->immediate, |
257 | 0 | queue->list->ev, |
258 | 0 | tevent_queue_immediate_trigger, |
259 | 0 | queue); |
260 | |
|
261 | 0 | return e; |
262 | 0 | } |
263 | | |
264 | | bool tevent_queue_add(struct tevent_queue *queue, |
265 | | struct tevent_context *ev, |
266 | | struct tevent_req *req, |
267 | | tevent_queue_trigger_fn_t trigger, |
268 | | void *private_data) |
269 | 0 | { |
270 | 0 | return _tevent_queue_add(queue, ev, req, trigger, NULL, private_data); |
271 | 0 | } |
272 | | |
273 | | bool _tevent_queue_add(struct tevent_queue *queue, |
274 | | struct tevent_context *ev, |
275 | | struct tevent_req *req, |
276 | | tevent_queue_trigger_fn_t trigger, |
277 | | const char* trigger_name, |
278 | | void *private_data) |
279 | 0 | { |
280 | 0 | struct tevent_queue_entry *e; |
281 | |
|
282 | 0 | e = tevent_queue_add_internal(queue, ev, req, |
283 | 0 | trigger, trigger_name, |
284 | 0 | private_data, false); |
285 | 0 | if (e == NULL) { |
286 | 0 | return false; |
287 | 0 | } |
288 | | |
289 | 0 | return true; |
290 | 0 | } |
291 | | |
292 | | struct tevent_queue_entry *tevent_queue_add_entry( |
293 | | struct tevent_queue *queue, |
294 | | struct tevent_context *ev, |
295 | | struct tevent_req *req, |
296 | | tevent_queue_trigger_fn_t trigger, |
297 | | void *private_data) |
298 | 0 | { |
299 | 0 | return _tevent_queue_add_entry(queue, ev, req, |
300 | 0 | trigger, NULL, |
301 | 0 | private_data); |
302 | 0 | } |
303 | | |
304 | | struct tevent_queue_entry *_tevent_queue_add_entry( |
305 | | struct tevent_queue *queue, |
306 | | struct tevent_context *ev, |
307 | | struct tevent_req *req, |
308 | | tevent_queue_trigger_fn_t trigger, |
309 | | const char* trigger_name, |
310 | | void *private_data) |
311 | 0 | { |
312 | 0 | return tevent_queue_add_internal(queue, ev, req, |
313 | 0 | trigger, trigger_name, |
314 | 0 | private_data, false); |
315 | 0 | } |
316 | | |
317 | | struct tevent_queue_entry *tevent_queue_add_optimize_empty( |
318 | | struct tevent_queue *queue, |
319 | | struct tevent_context *ev, |
320 | | struct tevent_req *req, |
321 | | tevent_queue_trigger_fn_t trigger, |
322 | | void *private_data) |
323 | 0 | { |
324 | 0 | return _tevent_queue_add_optimize_empty(queue, ev, req, |
325 | 0 | trigger, NULL, |
326 | 0 | private_data); |
327 | 0 | } |
328 | | |
329 | | struct tevent_queue_entry *_tevent_queue_add_optimize_empty( |
330 | | struct tevent_queue *queue, |
331 | | struct tevent_context *ev, |
332 | | struct tevent_req *req, |
333 | | tevent_queue_trigger_fn_t trigger, |
334 | | const char* trigger_name, |
335 | | void *private_data) |
336 | 0 | { |
337 | 0 | return tevent_queue_add_internal(queue, ev, req, |
338 | 0 | trigger, trigger_name, |
339 | 0 | private_data, true); |
340 | 0 | } |
341 | | |
342 | | void tevent_queue_entry_untrigger(struct tevent_queue_entry *entry) |
343 | 0 | { |
344 | 0 | if (entry->queue->running) { |
345 | 0 | abort(); |
346 | 0 | } |
347 | | |
348 | 0 | if (entry->queue->list != entry) { |
349 | 0 | abort(); |
350 | 0 | } |
351 | | |
352 | 0 | entry->triggered = false; |
353 | 0 | } |
354 | | |
355 | | void tevent_queue_start(struct tevent_queue *queue) |
356 | 0 | { |
357 | 0 | if (queue->running) { |
358 | | /* already started */ |
359 | 0 | return; |
360 | 0 | } |
361 | | |
362 | 0 | queue->running = true; |
363 | |
|
364 | 0 | if (!queue->list) { |
365 | 0 | return; |
366 | 0 | } |
367 | | |
368 | 0 | if (queue->list->triggered) { |
369 | 0 | return; |
370 | 0 | } |
371 | | |
372 | 0 | tevent_schedule_immediate(queue->immediate, |
373 | 0 | queue->list->ev, |
374 | 0 | tevent_queue_immediate_trigger, |
375 | 0 | queue); |
376 | 0 | } |
377 | | |
378 | | void tevent_queue_stop(struct tevent_queue *queue) |
379 | 0 | { |
380 | 0 | queue->running = false; |
381 | 0 | } |
382 | | |
383 | | size_t tevent_queue_length(struct tevent_queue *queue) |
384 | 0 | { |
385 | 0 | return queue->length; |
386 | 0 | } |
387 | | |
388 | | bool tevent_queue_running(struct tevent_queue *queue) |
389 | 0 | { |
390 | 0 | return queue->running; |
391 | 0 | } |
392 | | |
393 | | struct tevent_queue_wait_state { |
394 | | uint8_t dummy; |
395 | | }; |
396 | | |
397 | | static void tevent_queue_wait_trigger(struct tevent_req *req, |
398 | | void *private_data); |
399 | | |
400 | | struct tevent_req *tevent_queue_wait_send(TALLOC_CTX *mem_ctx, |
401 | | struct tevent_context *ev, |
402 | | struct tevent_queue *queue) |
403 | 0 | { |
404 | 0 | struct tevent_req *req; |
405 | 0 | struct tevent_queue_wait_state *state; |
406 | 0 | bool ok; |
407 | |
|
408 | 0 | req = tevent_req_create(mem_ctx, &state, |
409 | 0 | struct tevent_queue_wait_state); |
410 | 0 | if (req == NULL) { |
411 | 0 | return NULL; |
412 | 0 | } |
413 | | |
414 | 0 | ok = _tevent_queue_add(queue, ev, req, |
415 | 0 | tevent_queue_wait_trigger, |
416 | 0 | "tevent_queue_wait_trigger", |
417 | 0 | NULL); |
418 | 0 | if (!ok) { |
419 | 0 | tevent_req_oom(req); |
420 | 0 | return tevent_req_post(req, ev); |
421 | 0 | } |
422 | | |
423 | 0 | return req; |
424 | 0 | } |
425 | | |
426 | | static void tevent_queue_wait_trigger(struct tevent_req *req, |
427 | | void *private_data) |
428 | 0 | { |
429 | 0 | tevent_req_done(req); |
430 | 0 | } |
431 | | |
432 | | bool tevent_queue_wait_recv(struct tevent_req *req) |
433 | 0 | { |
434 | 0 | enum tevent_req_state state; |
435 | 0 | uint64_t err; |
436 | |
|
437 | 0 | if (tevent_req_is_error(req, &state, &err)) { |
438 | 0 | tevent_req_received(req); |
439 | 0 | return false; |
440 | 0 | } |
441 | | |
442 | 0 | tevent_req_received(req); |
443 | 0 | return true; |
444 | 0 | } |
445 | | |
446 | | void tevent_queue_entry_set_tag(struct tevent_queue_entry *qe, uint64_t tag) |
447 | 0 | { |
448 | 0 | if (qe == NULL) { |
449 | 0 | return; |
450 | 0 | } |
451 | | |
452 | 0 | qe->tag = tag; |
453 | 0 | } |
454 | | |
455 | | uint64_t tevent_queue_entry_get_tag(const struct tevent_queue_entry *qe) |
456 | 0 | { |
457 | 0 | if (qe == NULL) { |
458 | 0 | return 0; |
459 | 0 | } |
460 | | |
461 | 0 | return qe->tag; |
462 | 0 | } |