Coverage Report

Created: 2025-12-31 06:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/samba/lib/tevent/tevent_threads.c
Line
Count
Source
1
/*
2
   tevent event library.
3
4
   Copyright (C) Jeremy Allison 2015
5
6
     ** NOTE! The following LGPL license applies to the tevent
7
     ** library. This does NOT imply that all of Samba is released
8
     ** under the LGPL
9
10
   This library is free software; you can redistribute it and/or
11
   modify it under the terms of the GNU Lesser General Public
12
   License as published by the Free Software Foundation; either
13
   version 3 of the License, or (at your option) any later version.
14
15
   This library is distributed in the hope that it will be useful,
16
   but WITHOUT ANY WARRANTY; without even the implied warranty of
17
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18
   Lesser General Public License for more details.
19
20
   You should have received a copy of the GNU Lesser General Public
21
   License along with this library; if not, see <http://www.gnu.org/licenses/>.
22
*/
23
24
#include "replace.h"
25
#include "system/filesys.h"
26
#include "talloc.h"
27
#include "tevent.h"
28
#include "tevent_internal.h"
29
#include "tevent_util.h"
30
31
#ifdef HAVE_PTHREAD
32
#include "system/threads.h"
33
34
struct tevent_immediate_list {
35
  struct tevent_immediate_list *next, *prev;
36
  tevent_immediate_handler_t handler;
37
  struct tevent_immediate *im;
38
  void *private_ptr;
39
};
40
41
struct tevent_thread_proxy {
42
  pthread_mutex_t mutex;
43
  struct tevent_context *dest_ev_ctx;
44
  int read_fd;
45
  int write_fd;
46
  struct tevent_fd *pipe_read_fde;
47
  /* Pending events list. */
48
  struct tevent_immediate_list *im_list;
49
  /* Completed events list. */
50
  struct tevent_immediate_list *tofree_im_list;
51
  struct tevent_immediate *free_im;
52
};
53
54
static void free_im_list(struct tevent_immediate_list **pp_list_head)
55
0
{
56
0
  struct tevent_immediate_list *im_entry = NULL;
57
0
  struct tevent_immediate_list *im_next = NULL;
58
59
0
  for (im_entry = *pp_list_head; im_entry; im_entry = im_next) {
60
0
    im_next = im_entry->next;
61
0
    DLIST_REMOVE(*pp_list_head, im_entry);
62
0
    TALLOC_FREE(im_entry);
63
0
  }
64
0
}
65
66
static void free_list_handler(struct tevent_context *ev,
67
        struct tevent_immediate *im,
68
        void *private_ptr)
69
0
{
70
0
  struct tevent_thread_proxy *tp =
71
0
    talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
72
0
  int ret;
73
74
0
  ret = pthread_mutex_lock(&tp->mutex);
75
0
  if (ret != 0) {
76
0
    abort();
77
    /* Notreached. */
78
0
    return;
79
0
  }
80
81
0
  free_im_list(&tp->tofree_im_list);
82
83
0
  ret = pthread_mutex_unlock(&tp->mutex);
84
0
  if (ret != 0) {
85
0
    abort();
86
    /* Notreached. */
87
0
    return;
88
0
  }
89
0
}
90
91
static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
92
0
{
93
0
  struct tevent_immediate_list *im_entry = NULL;
94
0
  struct tevent_immediate_list *im_next = NULL;
95
96
0
  for (im_entry = tp->im_list; im_entry; im_entry = im_next) {
97
0
    im_next = im_entry->next;
98
0
    DLIST_REMOVE(tp->im_list, im_entry);
99
100
0
    tevent_schedule_immediate(im_entry->im,
101
0
          tp->dest_ev_ctx,
102
0
          im_entry->handler,
103
0
          im_entry->private_ptr);
104
105
    /* Move from pending list to free list. */
106
0
    DLIST_ADD(tp->tofree_im_list, im_entry);
107
0
  }
108
0
  if (tp->tofree_im_list != NULL) {
109
    /*
110
     * Once the current immediate events
111
     * are processed, we need to reschedule
112
     * ourselves to free them. This works
113
     * as tevent_schedule_immediate()
114
     * always adds events to the *END* of
115
     * the immediate events list.
116
     */
117
0
    tevent_schedule_immediate(tp->free_im,
118
0
          tp->dest_ev_ctx,
119
0
          free_list_handler,
120
0
          tp);
121
0
  }
122
0
}
123
124
static void pipe_read_handler(struct tevent_context *ev,
125
        struct tevent_fd *fde,
126
        uint16_t flags,
127
        void *private_ptr)
128
0
{
129
0
  struct tevent_thread_proxy *tp =
130
0
    talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
131
0
  ssize_t len = 64;
132
0
  int ret;
133
134
0
  ret = pthread_mutex_lock(&tp->mutex);
135
0
  if (ret != 0) {
136
0
    abort();
137
    /* Notreached. */
138
0
    return;
139
0
  }
140
141
  /*
142
   * Clear out all data in the pipe. We
143
   * don't really care if this returns -1.
144
   */
145
0
  while (len == 64) {
146
0
    char buf[64];
147
0
    len = read(tp->read_fd, buf, 64);
148
0
  };
149
150
0
  schedule_immediate_functions(tp);
151
152
0
  ret = pthread_mutex_unlock(&tp->mutex);
153
0
  if (ret != 0) {
154
0
    abort();
155
    /* Notreached. */
156
0
    return;
157
0
  }
158
0
}
159
160
static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp)
161
0
{
162
0
  int ret;
163
164
0
  ret = pthread_mutex_lock(&tp->mutex);
165
0
  if (ret != 0) {
166
0
    abort();
167
    /* Notreached. */
168
0
    return 0;
169
0
  }
170
171
0
  TALLOC_FREE(tp->pipe_read_fde);
172
173
0
  if (tp->read_fd != -1) {
174
0
    (void)close(tp->read_fd);
175
0
    tp->read_fd = -1;
176
0
  }
177
0
  if (tp->write_fd != -1) {
178
0
    (void)close(tp->write_fd);
179
0
    tp->write_fd = -1;
180
0
  }
181
182
  /* Hmmm. It's probably an error if we get here with
183
     any non-NULL immediate entries.. */
184
185
0
  free_im_list(&tp->im_list);
186
0
  free_im_list(&tp->tofree_im_list);
187
188
0
  TALLOC_FREE(tp->free_im);
189
190
0
  ret = pthread_mutex_unlock(&tp->mutex);
191
0
  if (ret != 0) {
192
0
    abort();
193
    /* Notreached. */
194
0
    return 0;
195
0
  }
196
197
0
  ret = pthread_mutex_destroy(&tp->mutex);
198
0
  if (ret != 0) {
199
0
    abort();
200
    /* Notreached. */
201
0
    return 0;
202
0
  }
203
204
0
  return 0;
205
0
}
206
207
/*
208
 * Create a struct that can be passed to other threads
209
 * to allow them to signal the struct tevent_context *
210
 * passed in.
211
 */
212
213
struct tevent_thread_proxy *tevent_thread_proxy_create(
214
    struct tevent_context *dest_ev_ctx)
215
0
{
216
0
  int ret;
217
0
  int pipefds[2];
218
0
  struct tevent_thread_proxy *tp;
219
220
0
  if (dest_ev_ctx->wrapper.glue != NULL) {
221
    /*
222
     * stacking of wrappers is not supported
223
     */
224
0
    tevent_debug(dest_ev_ctx->wrapper.glue->main_ev,
225
0
           TEVENT_DEBUG_FATAL,
226
0
           "%s() not allowed on a wrapper context\n",
227
0
           __func__);
228
0
    errno = EINVAL;
229
0
    return NULL;
230
0
  }
231
232
0
  tp = talloc_zero(dest_ev_ctx, struct tevent_thread_proxy);
233
0
  if (tp == NULL) {
234
0
    return NULL;
235
0
  }
236
237
0
  ret = pthread_mutex_init(&tp->mutex, NULL);
238
0
  if (ret != 0) {
239
0
    goto fail;
240
0
  }
241
242
0
  tp->dest_ev_ctx = dest_ev_ctx;
243
0
  tp->read_fd = -1;
244
0
  tp->write_fd = -1;
245
246
0
  talloc_set_destructor(tp, tevent_thread_proxy_destructor);
247
248
0
  ret = pipe(pipefds);
249
0
  if (ret == -1) {
250
0
    goto fail;
251
0
  }
252
253
0
  tp->read_fd = pipefds[0];
254
0
  tp->write_fd = pipefds[1];
255
256
0
  ret = ev_set_blocking(pipefds[0], false);
257
0
  if (ret != 0) {
258
0
    goto fail;
259
0
  }
260
0
  ret = ev_set_blocking(pipefds[1], false);
261
0
  if (ret != 0) {
262
0
    goto fail;
263
0
  }
264
0
  if (!ev_set_close_on_exec(pipefds[0])) {
265
0
    goto fail;
266
0
  }
267
0
  if (!ev_set_close_on_exec(pipefds[1])) {
268
0
    goto fail;
269
0
  }
270
271
0
  tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx,
272
0
        tp,
273
0
        tp->read_fd,
274
0
        TEVENT_FD_READ,
275
0
        pipe_read_handler,
276
0
        tp);
277
0
  if (tp->pipe_read_fde == NULL) {
278
0
    goto fail;
279
0
  }
280
281
  /*
282
   * Create an immediate event to free
283
   * completed lists.
284
   */
285
0
  tp->free_im = tevent_create_immediate(tp);
286
0
  if (tp->free_im == NULL) {
287
0
    goto fail;
288
0
  }
289
290
0
  return tp;
291
292
0
  fail:
293
294
0
  TALLOC_FREE(tp);
295
0
  return NULL;
296
0
}
297
298
/*
299
 * This function schedules an immediate event to be called with argument
300
 * *pp_private in the thread context of dest_ev_ctx. Caller doesn't
301
 * wait for activation to take place, this is simply fire-and-forget.
302
 *
303
 * pp_im must be a pointer to an immediate event talloced on
304
 * a context owned by the calling thread, or the NULL context.
305
 * Ownership of *pp_im will be transferred to the tevent library.
306
 *
307
 * pp_private can be null, or contents of *pp_private must be
308
 * talloc'ed memory on a context owned by the calling thread
309
 * or the NULL context. If non-null, ownership of *pp_private will
310
 * be transferred to the tevent library.
311
 *
312
 * If you want to return a message, have the destination use the
313
 * same function call to send back to the caller.
314
 */
315
316
317
void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
318
          struct tevent_immediate **pp_im,
319
          tevent_immediate_handler_t handler,
320
          void *pp_private_data)
321
0
{
322
0
  struct tevent_immediate_list *im_entry;
323
0
  int ret;
324
0
  char c;
325
0
  ssize_t written;
326
327
0
  ret = pthread_mutex_lock(&tp->mutex);
328
0
  if (ret != 0) {
329
0
    abort();
330
    /* Notreached. */
331
0
    return;
332
0
  }
333
334
0
  if (tp->write_fd == -1) {
335
    /* In the process of being destroyed. Ignore. */
336
0
    goto end;
337
0
  }
338
339
  /* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
340
0
  im_entry = talloc_zero(NULL, struct tevent_immediate_list);
341
0
  if (im_entry == NULL) {
342
0
    goto end;
343
0
  }
344
345
0
  im_entry->handler = handler;
346
0
  im_entry->im = talloc_move(im_entry, pp_im);
347
348
0
  if (pp_private_data != NULL) {
349
0
    void **pptr = (void **)pp_private_data;
350
0
    im_entry->private_ptr = talloc_move(im_entry, pptr);
351
0
  }
352
353
0
  DLIST_ADD(tp->im_list, im_entry);
354
355
  /* And notify the dest_ev_ctx to wake up. */
356
0
  c = '\0';
357
0
  do {
358
0
    written = write(tp->write_fd, &c, 1);
359
0
  } while (written == -1 && errno == EINTR);
360
361
0
  end:
362
363
0
  ret = pthread_mutex_unlock(&tp->mutex);
364
0
  if (ret != 0) {
365
0
    abort();
366
    /* Notreached. */
367
0
  }
368
0
}
369
#else
370
/* !HAVE_PTHREAD */
371
struct tevent_thread_proxy *tevent_thread_proxy_create(
372
    struct tevent_context *dest_ev_ctx)
373
{
374
  errno = ENOSYS;
375
  return NULL;
376
}
377
378
void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
379
          struct tevent_immediate **pp_im,
380
          tevent_immediate_handler_t handler,
381
          void *pp_private_data)
382
{
383
  ;
384
}
385
#endif
386
387
static int tevent_threaded_context_destructor(
388
  struct tevent_threaded_context *tctx)
389
0
{
390
0
  struct tevent_context *main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
391
0
  int ret;
392
393
0
  if (main_ev != NULL) {
394
0
    DLIST_REMOVE(main_ev->threaded_contexts, tctx);
395
0
  }
396
397
  /*
398
   * We have to coordinate with _tevent_threaded_schedule_immediate's
399
   * unlock of the event_ctx_mutex. We're in the main thread here,
400
   * and we can be scheduled before the helper thread finalizes its
401
   * call _tevent_threaded_schedule_immediate. This means we would
402
   * pthreadpool_destroy a locked mutex, which is illegal.
403
   */
404
0
  ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
405
0
  if (ret != 0) {
406
0
    abort();
407
0
  }
408
409
0
  ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
410
0
  if (ret != 0) {
411
0
    abort();
412
0
  }
413
414
0
  ret = pthread_mutex_destroy(&tctx->event_ctx_mutex);
415
0
  if (ret != 0) {
416
0
    abort();
417
0
  }
418
419
0
  return 0;
420
0
}
421
422
struct tevent_threaded_context *tevent_threaded_context_create(
423
  TALLOC_CTX *mem_ctx, struct tevent_context *ev)
424
0
{
425
0
#ifdef HAVE_PTHREAD
426
0
  struct tevent_context *main_ev = tevent_wrapper_main_ev(ev);
427
0
  struct tevent_threaded_context *tctx;
428
0
  int ret;
429
430
0
  ret = tevent_common_wakeup_init(main_ev);
431
0
  if (ret != 0) {
432
0
    errno = ret;
433
0
    return NULL;
434
0
  }
435
436
0
  tctx = talloc(mem_ctx, struct tevent_threaded_context);
437
0
  if (tctx == NULL) {
438
0
    return NULL;
439
0
  }
440
0
  tctx->event_ctx = ev;
441
442
0
  ret = pthread_mutex_init(&tctx->event_ctx_mutex, NULL);
443
0
  if (ret != 0) {
444
0
    TALLOC_FREE(tctx);
445
0
    return NULL;
446
0
  }
447
448
0
  DLIST_ADD(main_ev->threaded_contexts, tctx);
449
0
  talloc_set_destructor(tctx, tevent_threaded_context_destructor);
450
451
0
  return tctx;
452
#else
453
  errno = ENOSYS;
454
  return NULL;
455
#endif
456
0
}
457
458
static int tevent_threaded_schedule_immediate_destructor(struct tevent_immediate *im)
459
0
{
460
0
  if (im->event_ctx != NULL) {
461
0
    abort();
462
0
  }
463
0
  return 0;
464
0
}
465
466
void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
467
           struct tevent_immediate *im,
468
           tevent_immediate_handler_t handler,
469
           void *private_data,
470
           const char *handler_name,
471
           const char *location)
472
0
{
473
0
#ifdef HAVE_PTHREAD
474
0
  const char *create_location = im->create_location;
475
0
  struct tevent_context *main_ev = NULL;
476
0
  struct tevent_wrapper_glue *glue = NULL;
477
0
  int ret, wakeup_fd;
478
479
0
  ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
480
0
  if (ret != 0) {
481
0
    abort();
482
0
  }
483
484
0
  if (tctx->event_ctx == NULL) {
485
    /*
486
     * Our event context is already gone.
487
     */
488
0
    ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
489
0
    if (ret != 0) {
490
0
      abort();
491
0
    }
492
0
    return;
493
0
  }
494
495
0
  glue = tctx->event_ctx->wrapper.glue;
496
497
0
  if ((im->event_ctx != NULL) || (handler == NULL)) {
498
0
    abort();
499
0
  }
500
0
  if (im->destroyed) {
501
0
    abort();
502
0
  }
503
0
  if (im->busy) {
504
0
    abort();
505
0
  }
506
507
0
  main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
508
509
0
  *im = (struct tevent_immediate) {
510
0
    .event_ctx    = tctx->event_ctx,
511
0
    .wrapper    = glue,
512
0
    .handler    = handler,
513
0
    .private_data   = private_data,
514
0
    .handler_name   = handler_name,
515
0
    .create_location  = create_location,
516
0
    .schedule_location  = location,
517
0
  };
518
519
  /*
520
   * Make sure the event won't be destroyed while
521
   * it's part of the ev->scheduled_immediates list.
522
   * _tevent_schedule_immediate() will reset the destructor
523
   * in tevent_common_threaded_activate_immediate().
524
   */
525
0
  talloc_set_destructor(im, tevent_threaded_schedule_immediate_destructor);
526
527
0
  ret = pthread_mutex_lock(&main_ev->scheduled_mutex);
528
0
  if (ret != 0) {
529
0
    abort();
530
0
  }
531
532
0
  DLIST_ADD_END(main_ev->scheduled_immediates, im);
533
0
  wakeup_fd = main_ev->wakeup_fd;
534
535
0
  ret = pthread_mutex_unlock(&main_ev->scheduled_mutex);
536
0
  if (ret != 0) {
537
0
    abort();
538
0
  }
539
540
0
  ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
541
0
  if (ret != 0) {
542
0
    abort();
543
0
  }
544
545
  /*
546
   * We might want to wake up the main thread under the lock. We
547
   * had a slightly similar situation in pthreadpool, changed
548
   * with 1c4284c7395f23. This is not exactly the same, as the
549
   * wakeup is only a last-resort thing in case the main thread
550
   * is sleeping. Doing the wakeup under the lock can easily
551
   * lead to a contended mutex, which is much more expensive
552
   * than a noncontended one. So I'd opt for the lower footprint
553
   * initially. Maybe we have to change that later.
554
   */
555
0
  tevent_common_wakeup_fd(wakeup_fd);
556
#else
557
  /*
558
   * tevent_threaded_context_create() returned NULL with ENOSYS...
559
   */
560
  abort();
561
#endif
562
0
}
563
564
void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
565
0
{
566
0
#ifdef HAVE_PTHREAD
567
0
  int ret;
568
0
  ret = pthread_mutex_lock(&ev->scheduled_mutex);
569
0
  if (ret != 0) {
570
0
    abort();
571
0
  }
572
573
0
  while (ev->scheduled_immediates != NULL) {
574
0
    struct tevent_immediate *im = ev->scheduled_immediates;
575
0
    struct tevent_immediate copy = *im;
576
577
0
    DLIST_REMOVE(ev->scheduled_immediates, im);
578
579
0
    TEVENT_DEBUG(ev, TEVENT_DEBUG_TRACE,
580
0
           "Schedule immediate event \"%s\": %p from thread into main\n",
581
0
           im->handler_name, im);
582
0
    im->handler_name = NULL;
583
0
    _tevent_schedule_immediate(im,
584
0
             ev,
585
0
             copy.handler,
586
0
             copy.private_data,
587
0
             copy.handler_name,
588
0
             copy.schedule_location);
589
0
  }
590
591
0
  ret = pthread_mutex_unlock(&ev->scheduled_mutex);
592
0
  if (ret != 0) {
593
0
    abort();
594
0
  }
595
#else
596
  /*
597
   * tevent_threaded_context_create() returned NULL with ENOSYS...
598
   */
599
  abort();
600
#endif
601
0
}