/src/samba/lib/pthreadpool/pthreadpool_tevent.c
Line | Count | Source |
1 | | /* |
2 | | * Unix SMB/CIFS implementation. |
3 | | * threadpool implementation based on pthreads |
4 | | * Copyright (C) Volker Lendecke 2009,2011 |
5 | | * |
6 | | * This program is free software; you can redistribute it and/or modify |
7 | | * it under the terms of the GNU General Public License as published by |
8 | | * the Free Software Foundation; either version 3 of the License, or |
9 | | * (at your option) any later version. |
10 | | * |
11 | | * This program is distributed in the hope that it will be useful, |
12 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14 | | * GNU General Public License for more details. |
15 | | * |
16 | | * You should have received a copy of the GNU General Public License |
17 | | * along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | | */ |
19 | | |
20 | | #include "replace.h" |
21 | | #include "system/filesys.h" |
22 | | #include "system/threads.h" |
23 | | #include "pthreadpool_tevent.h" |
24 | | #include "pthreadpool.h" |
25 | | #include "lib/util/tevent_unix.h" |
26 | | #include "lib/util/dlinklist.h" |
27 | | |
28 | | struct pthreadpool_tevent_job_state; |
29 | | |
30 | | /* |
31 | | * We need one pthreadpool_tevent_glue object per unique combination of tevent |
32 | | * contexts and pthreadpool_tevent objects. Maintain a list of used tevent |
33 | | * contexts in a pthreadpool_tevent. |
34 | | */ |
35 | | struct pthreadpool_tevent_glue { |
36 | | struct pthreadpool_tevent_glue *prev, *next; |
37 | | struct pthreadpool_tevent *pool; /* back-pointer to owning object. */ |
38 | | /* Tuple we are keeping track of in this list. */ |
39 | | struct tevent_context *ev; |
40 | | struct tevent_threaded_context *tctx; |
41 | | /* Pointer to link object owned by *ev. */ |
42 | | struct pthreadpool_tevent_glue_ev_link *ev_link; |
43 | | }; |
44 | | |
45 | | /* |
46 | | * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the |
47 | | * tevent context from our list of active event contexts if the event context |
48 | | * is destroyed. |
49 | | * This structure is talloc()'ed from the struct tevent_context *, and is a |
50 | | * back-pointer allowing the related struct pthreadpool_tevent_glue object |
51 | | * to be removed from the struct pthreadpool_tevent glue list if the owning |
52 | | * tevent_context is talloc_free()'ed. |
53 | | */ |
54 | | struct pthreadpool_tevent_glue_ev_link { |
55 | | struct pthreadpool_tevent_glue *glue; |
56 | | }; |
57 | | |
58 | | struct pthreadpool_tevent { |
59 | | struct pthreadpool *pool; |
60 | | struct pthreadpool_tevent_glue *glue_list; |
61 | | /* |
62 | | * Control access to the glue_list |
63 | | */ |
64 | | pthread_mutex_t glue_mutex; |
65 | | |
66 | | struct pthreadpool_tevent_job_state *jobs; |
67 | | /* |
68 | | * Control access to the jobs |
69 | | */ |
70 | | pthread_mutex_t jobs_mutex; |
71 | | |
72 | | }; |
73 | | |
74 | | struct pthreadpool_tevent_job_state { |
75 | | struct pthreadpool_tevent_job_state *prev, *next; |
76 | | struct pthreadpool_tevent *pool; |
77 | | struct tevent_context *ev; |
78 | | struct tevent_immediate *im; |
79 | | struct tevent_req *req; |
80 | | |
81 | | void (*fn)(void *private_data); |
82 | | void *private_data; |
83 | | }; |
84 | | |
85 | | static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool); |
86 | | |
87 | | static int pthreadpool_tevent_job_signal(int jobid, |
88 | | void (*job_fn)(void *private_data), |
89 | | void *job_private_data, |
90 | | void *private_data); |
91 | | |
92 | | int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads, |
93 | | struct pthreadpool_tevent **presult) |
94 | 0 | { |
95 | 0 | struct pthreadpool_tevent *pool; |
96 | 0 | int ret; |
97 | |
|
98 | 0 | pool = talloc_zero(mem_ctx, struct pthreadpool_tevent); |
99 | 0 | if (pool == NULL) { |
100 | 0 | return ENOMEM; |
101 | 0 | } |
102 | | |
103 | 0 | ret = pthreadpool_init(max_threads, &pool->pool, |
104 | 0 | pthreadpool_tevent_job_signal, pool); |
105 | 0 | if (ret != 0) { |
106 | 0 | TALLOC_FREE(pool); |
107 | 0 | return ret; |
108 | 0 | } |
109 | | |
110 | 0 | ret = pthread_mutex_init(&pool->glue_mutex, NULL); |
111 | 0 | if (ret != 0) { |
112 | 0 | TALLOC_FREE(pool); |
113 | 0 | return ret; |
114 | 0 | } |
115 | | |
116 | 0 | ret = pthread_mutex_init(&pool->jobs_mutex, NULL); |
117 | 0 | if (ret != 0) { |
118 | 0 | pthread_mutex_destroy(&pool->glue_mutex); |
119 | 0 | TALLOC_FREE(pool); |
120 | 0 | return ret; |
121 | 0 | } |
122 | | |
123 | 0 | talloc_set_destructor(pool, pthreadpool_tevent_destructor); |
124 | |
|
125 | 0 | *presult = pool; |
126 | 0 | return 0; |
127 | 0 | } |
128 | | |
129 | | size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool) |
130 | 0 | { |
131 | 0 | if (pool->pool == NULL) { |
132 | 0 | return 0; |
133 | 0 | } |
134 | | |
135 | 0 | return pthreadpool_max_threads(pool->pool); |
136 | 0 | } |
137 | | |
138 | | size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool) |
139 | 0 | { |
140 | 0 | if (pool->pool == NULL) { |
141 | 0 | return 0; |
142 | 0 | } |
143 | | |
144 | 0 | return pthreadpool_queued_jobs(pool->pool); |
145 | 0 | } |
146 | | |
147 | | static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool) |
148 | 0 | { |
149 | 0 | struct pthreadpool_tevent_job_state *state, *next; |
150 | 0 | struct pthreadpool_tevent_glue *glue = NULL; |
151 | 0 | int ret; |
152 | |
|
153 | 0 | ret = pthreadpool_stop(pool->pool); |
154 | 0 | if (ret != 0) { |
155 | 0 | return ret; |
156 | 0 | } |
157 | | |
158 | 0 | ret = pthread_mutex_lock(&pool->jobs_mutex); |
159 | 0 | if (ret != 0 ) { |
160 | 0 | return ret; |
161 | 0 | } |
162 | 0 | for (state = pool->jobs; state != NULL; state = next) { |
163 | 0 | next = state->next; |
164 | 0 | DLIST_REMOVE(pool->jobs, state); |
165 | 0 | state->pool = NULL; |
166 | 0 | } |
167 | |
|
168 | 0 | ret = pthread_mutex_unlock(&pool->jobs_mutex); |
169 | 0 | if (ret != 0 ) { |
170 | 0 | return ret; |
171 | 0 | } |
172 | | |
173 | 0 | ret = pthread_mutex_lock(&pool->glue_mutex); |
174 | 0 | if (ret != 0) { |
175 | 0 | return ret; |
176 | 0 | } |
177 | | |
178 | | /* |
179 | | * Delete all the registered |
180 | | * tevent_context/tevent_threaded_context |
181 | | * pairs. |
182 | | */ |
183 | 0 | for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) { |
184 | | /* The glue destructor removes it from the list */ |
185 | 0 | TALLOC_FREE(glue); |
186 | 0 | } |
187 | |
|
188 | 0 | pthread_mutex_unlock(&pool->glue_mutex); |
189 | 0 | pthread_mutex_destroy(&pool->jobs_mutex); |
190 | 0 | pthread_mutex_destroy(&pool->glue_mutex); |
191 | |
|
192 | 0 | pool->glue_list = NULL; |
193 | |
|
194 | 0 | ret = pthreadpool_destroy(pool->pool); |
195 | 0 | if (ret != 0) { |
196 | 0 | return ret; |
197 | 0 | } |
198 | 0 | pool->pool = NULL; |
199 | |
|
200 | 0 | return 0; |
201 | 0 | } |
202 | | |
203 | | /* |
204 | | * glue destruction is only called with |
205 | | * glue_mutex already locked either from |
206 | | * a) pthreadpool_tevent_destructor or |
207 | | * b) pthreadpool_tevent_glue_link_destructor |
208 | | * pthreadpool_tevent_destructor accesses |
209 | | * the glue_list while calling pthreadpool_tevent_glue_destructor |
210 | | * which modifies the glue_list so it needs the lock held while |
211 | | * doing that. |
212 | | */ |
213 | | static int pthreadpool_tevent_glue_destructor( |
214 | | struct pthreadpool_tevent_glue *glue) |
215 | 0 | { |
216 | 0 | if (glue->pool->glue_list != NULL) { |
217 | 0 | DLIST_REMOVE(glue->pool->glue_list, glue); |
218 | 0 | } |
219 | | |
220 | | /* Ensure the ev_link destructor knows we're gone */ |
221 | 0 | glue->ev_link->glue = NULL; |
222 | |
|
223 | 0 | TALLOC_FREE(glue->ev_link); |
224 | 0 | TALLOC_FREE(glue->tctx); |
225 | |
|
226 | 0 | return 0; |
227 | 0 | } |
228 | | |
229 | | /* |
230 | | * Destructor called either explicitly from |
231 | | * pthreadpool_tevent_glue_destructor(), or indirectly |
232 | | * when owning tevent_context is destroyed. |
233 | | * |
234 | | * When called from pthreadpool_tevent_glue_destructor() |
235 | | * ev_link->glue is already NULL, so this does nothing. |
236 | | * |
237 | | * When called from talloc_free() of the owning |
238 | | * tevent_context we must ensure we also remove the |
239 | | * linked glue object from the list inside |
240 | | * struct pthreadpool_tevent. |
241 | | */ |
242 | | static int pthreadpool_tevent_glue_link_destructor( |
243 | | struct pthreadpool_tevent_glue_ev_link *ev_link) |
244 | 0 | { |
245 | 0 | if (ev_link->glue) { |
246 | 0 | int ret; |
247 | | /* save the mutex */ |
248 | 0 | pthread_mutex_t *glue_mutex = |
249 | 0 | &ev_link->glue->pool->glue_mutex; |
250 | 0 | ret = pthread_mutex_lock(glue_mutex); |
251 | 0 | if (ret != 0) { |
252 | 0 | return ret; |
253 | 0 | } |
254 | 0 | TALLOC_FREE(ev_link->glue); |
255 | 0 | ret = pthread_mutex_unlock(glue_mutex); |
256 | 0 | if (ret != 0) { |
257 | 0 | return ret; |
258 | 0 | } |
259 | 0 | } |
260 | 0 | return 0; |
261 | 0 | } |
262 | | |
263 | | static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool, |
264 | | struct tevent_context *ev) |
265 | 0 | { |
266 | 0 | struct pthreadpool_tevent_glue *glue = NULL; |
267 | 0 | struct pthreadpool_tevent_glue_ev_link *ev_link = NULL; |
268 | 0 | int ret; |
269 | |
|
270 | 0 | ret = pthread_mutex_lock(&pool->glue_mutex); |
271 | 0 | if (ret != 0) { |
272 | 0 | return ret; |
273 | 0 | } |
274 | | /* |
275 | | * See if this tevent_context was already registered by |
276 | | * searching the glue object list. If so we have nothing |
277 | | * to do here - we already have a tevent_context/tevent_threaded_context |
278 | | * pair. |
279 | | */ |
280 | 0 | for (glue = pool->glue_list; glue != NULL; glue = glue->next) { |
281 | 0 | if (glue->ev == ev) { |
282 | 0 | ret = pthread_mutex_unlock(&pool->glue_mutex); |
283 | 0 | if (ret != 0) { |
284 | 0 | return ret; |
285 | 0 | } |
286 | 0 | return 0; |
287 | 0 | } |
288 | 0 | } |
289 | | |
290 | 0 | ret = pthread_mutex_unlock(&pool->glue_mutex); |
291 | 0 | if (ret != 0) { |
292 | 0 | return ret; |
293 | 0 | } |
294 | | /* |
295 | | * Event context not yet registered - create a new glue |
296 | | * object containing a tevent_context/tevent_threaded_context |
297 | | * pair and put it on the list to remember this registration. |
298 | | * We also need a link object to ensure the event context |
299 | | * can't go away without us knowing about it. |
300 | | */ |
301 | 0 | glue = talloc_zero(pool, struct pthreadpool_tevent_glue); |
302 | 0 | if (glue == NULL) { |
303 | 0 | return ENOMEM; |
304 | 0 | } |
305 | 0 | *glue = (struct pthreadpool_tevent_glue) { |
306 | 0 | .pool = pool, |
307 | 0 | .ev = ev, |
308 | 0 | }; |
309 | 0 | talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor); |
310 | | |
311 | | /* |
312 | | * Now allocate the link object to the event context. Note this |
313 | | * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event |
314 | | * context is freed we are able to cleanup the glue object |
315 | | * in the link object destructor. |
316 | | */ |
317 | |
|
318 | 0 | ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link); |
319 | 0 | if (ev_link == NULL) { |
320 | 0 | TALLOC_FREE(glue); |
321 | 0 | return ENOMEM; |
322 | 0 | } |
323 | 0 | ev_link->glue = glue; |
324 | 0 | talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor); |
325 | |
|
326 | 0 | glue->ev_link = ev_link; |
327 | |
|
328 | 0 | #ifdef HAVE_PTHREAD |
329 | 0 | glue->tctx = tevent_threaded_context_create(glue, ev); |
330 | 0 | if (glue->tctx == NULL) { |
331 | 0 | TALLOC_FREE(ev_link); |
332 | 0 | return ENOMEM; |
333 | 0 | } |
334 | 0 | #endif |
335 | | |
336 | 0 | ret = pthread_mutex_lock(&pool->glue_mutex); |
337 | 0 | if (ret != 0) { |
338 | 0 | return ret; |
339 | 0 | } |
340 | | |
341 | 0 | DLIST_ADD(pool->glue_list, glue); |
342 | |
|
343 | 0 | ret = pthread_mutex_unlock(&pool->glue_mutex); |
344 | 0 | if (ret != 0) { |
345 | 0 | return ret; |
346 | 0 | } |
347 | 0 | return 0; |
348 | 0 | } |
349 | | |
350 | | static void pthreadpool_tevent_job_fn(void *private_data); |
351 | | static void pthreadpool_tevent_job_done(struct tevent_context *ctx, |
352 | | struct tevent_immediate *im, |
353 | | void *private_data); |
354 | | |
355 | | static int pthreadpool_tevent_job_state_destructor( |
356 | | struct pthreadpool_tevent_job_state *state) |
357 | 0 | { |
358 | 0 | if (state->pool == NULL) { |
359 | 0 | return 0; |
360 | 0 | } |
361 | | |
362 | | /* |
363 | | * We should never be called with state->req == NULL, |
364 | | * state->pool must be cleared before the 2nd talloc_free(). |
365 | | */ |
366 | 0 | if (state->req == NULL) { |
367 | 0 | abort(); |
368 | 0 | } |
369 | | |
370 | | /* |
371 | | * We need to reparent to a long term context. |
372 | | */ |
373 | 0 | (void)talloc_reparent(state->req, NULL, state); |
374 | 0 | state->req = NULL; |
375 | 0 | return -1; |
376 | 0 | } |
377 | | |
378 | | struct tevent_req *pthreadpool_tevent_job_send( |
379 | | TALLOC_CTX *mem_ctx, struct tevent_context *ev, |
380 | | struct pthreadpool_tevent *pool, |
381 | | void (*fn)(void *private_data), void *private_data) |
382 | 0 | { |
383 | 0 | struct tevent_req *req; |
384 | 0 | struct pthreadpool_tevent_job_state *state; |
385 | 0 | int ret; |
386 | |
|
387 | 0 | req = tevent_req_create(mem_ctx, &state, |
388 | 0 | struct pthreadpool_tevent_job_state); |
389 | 0 | if (req == NULL) { |
390 | 0 | return NULL; |
391 | 0 | } |
392 | 0 | state->pool = pool; |
393 | 0 | state->ev = ev; |
394 | 0 | state->req = req; |
395 | 0 | state->fn = fn; |
396 | 0 | state->private_data = private_data; |
397 | |
|
398 | 0 | if (pool == NULL) { |
399 | 0 | tevent_req_error(req, EINVAL); |
400 | 0 | return tevent_req_post(req, ev); |
401 | 0 | } |
402 | 0 | if (pool->pool == NULL) { |
403 | 0 | tevent_req_error(req, EINVAL); |
404 | 0 | return tevent_req_post(req, ev); |
405 | 0 | } |
406 | | |
407 | 0 | state->im = tevent_create_immediate(state); |
408 | 0 | if (tevent_req_nomem(state->im, req)) { |
409 | 0 | return tevent_req_post(req, ev); |
410 | 0 | } |
411 | | |
412 | 0 | ret = pthreadpool_tevent_register_ev(pool, ev); |
413 | 0 | if (tevent_req_error(req, ret)) { |
414 | 0 | return tevent_req_post(req, ev); |
415 | 0 | } |
416 | | |
417 | 0 | ret = pthreadpool_add_job(pool->pool, 0, |
418 | 0 | pthreadpool_tevent_job_fn, |
419 | 0 | state); |
420 | 0 | if (tevent_req_error(req, ret)) { |
421 | 0 | return tevent_req_post(req, ev); |
422 | 0 | } |
423 | | |
424 | | /* |
425 | | * Once the job is scheduled, we need to protect |
426 | | * our memory. |
427 | | */ |
428 | 0 | talloc_set_destructor(state, pthreadpool_tevent_job_state_destructor); |
429 | |
|
430 | 0 | ret = pthread_mutex_lock(&pool->jobs_mutex); |
431 | 0 | if (tevent_req_error(req, ret)) { |
432 | 0 | return tevent_req_post(req, ev); |
433 | 0 | } |
434 | 0 | DLIST_ADD_END(pool->jobs, state); |
435 | |
|
436 | 0 | ret = pthread_mutex_unlock(&pool->jobs_mutex); |
437 | 0 | if (tevent_req_error(req, ret)) { |
438 | 0 | return tevent_req_post(req, ev); |
439 | 0 | } |
440 | | |
441 | 0 | return req; |
442 | 0 | } |
443 | | |
444 | | static void pthreadpool_tevent_job_fn(void *private_data) |
445 | 0 | { |
446 | 0 | struct pthreadpool_tevent_job_state *state = talloc_get_type_abort( |
447 | 0 | private_data, struct pthreadpool_tevent_job_state); |
448 | 0 | state->fn(state->private_data); |
449 | 0 | } |
450 | | |
451 | | static int pthreadpool_tevent_job_signal(int jobid, |
452 | | void (*job_fn)(void *private_data), |
453 | | void *job_private_data, |
454 | | void *private_data) |
455 | 0 | { |
456 | 0 | struct pthreadpool_tevent_job_state *state = talloc_get_type_abort( |
457 | 0 | job_private_data, struct pthreadpool_tevent_job_state); |
458 | 0 | struct tevent_threaded_context *tctx = NULL; |
459 | 0 | struct pthreadpool_tevent_glue *g = NULL; |
460 | 0 | int ret; |
461 | |
|
462 | 0 | if (state->pool == NULL) { |
463 | | /* The pthreadpool_tevent is already gone */ |
464 | 0 | return 0; |
465 | 0 | } |
466 | | |
467 | 0 | #ifdef HAVE_PTHREAD |
468 | 0 | ret = pthread_mutex_lock(&state->pool->glue_mutex); |
469 | 0 | if (ret != 0) { |
470 | 0 | return ret; |
471 | 0 | } |
472 | | |
473 | 0 | for (g = state->pool->glue_list; g != NULL; g = g->next) { |
474 | 0 | if (g->ev == state->ev) { |
475 | 0 | tctx = g->tctx; |
476 | 0 | break; |
477 | 0 | } |
478 | 0 | } |
479 | |
|
480 | 0 | ret = pthread_mutex_unlock(&state->pool->glue_mutex); |
481 | 0 | if (ret != 0) { |
482 | 0 | return ret; |
483 | 0 | } |
484 | | |
485 | 0 | if (tctx == NULL) { |
486 | 0 | abort(); |
487 | 0 | } |
488 | 0 | #endif |
489 | | |
490 | 0 | if (tctx != NULL) { |
491 | | /* with HAVE_PTHREAD */ |
492 | 0 | tevent_threaded_schedule_immediate(tctx, state->im, |
493 | 0 | pthreadpool_tevent_job_done, |
494 | 0 | state); |
495 | 0 | } else { |
496 | | /* without HAVE_PTHREAD */ |
497 | 0 | tevent_schedule_immediate(state->im, state->ev, |
498 | 0 | pthreadpool_tevent_job_done, |
499 | 0 | state); |
500 | 0 | } |
501 | |
|
502 | 0 | return 0; |
503 | 0 | } |
504 | | |
505 | | static void pthreadpool_tevent_job_done(struct tevent_context *ctx, |
506 | | struct tevent_immediate *im, |
507 | | void *private_data) |
508 | 0 | { |
509 | 0 | struct pthreadpool_tevent_job_state *state = talloc_get_type_abort( |
510 | 0 | private_data, struct pthreadpool_tevent_job_state); |
511 | |
|
512 | 0 | if (state->pool != NULL) { |
513 | 0 | int ret; |
514 | 0 | ret = pthread_mutex_lock(&state->pool->jobs_mutex); |
515 | 0 | if (tevent_req_error(state->req, ret)) { |
516 | 0 | return; |
517 | 0 | } |
518 | 0 | DLIST_REMOVE(state->pool->jobs, state); |
519 | 0 | ret = pthread_mutex_unlock(&state->pool->jobs_mutex); |
520 | 0 | state->pool = NULL; |
521 | 0 | if (tevent_req_error(state->req, ret)) { |
522 | 0 | return; |
523 | 0 | } |
524 | 0 | } |
525 | | |
526 | 0 | if (state->req == NULL) { |
527 | | /* |
528 | | * There was a talloc_free() state->req |
529 | | * while the job was pending, |
530 | | * which mean we're reparented on a longterm |
531 | | * talloc context. |
532 | | * |
533 | | * We just cleanup here... |
534 | | */ |
535 | 0 | talloc_free(state); |
536 | 0 | return; |
537 | 0 | } |
538 | | |
539 | 0 | tevent_req_done(state->req); |
540 | 0 | } |
541 | | |
542 | | int pthreadpool_tevent_job_recv(struct tevent_req *req) |
543 | 0 | { |
544 | 0 | return tevent_req_simple_recv_unix(req); |
545 | 0 | } |