Coverage Report

Created: 2023-05-19 06:16

/src/ntp-dev/libntp/work_thread.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * work_thread.c - threads implementation for blocking worker child.
3
 */
4
#include <config.h>
5
#include "ntp_workimpl.h"
6
7
#ifdef WORK_THREAD
8
9
#include <stdio.h>
10
#include <ctype.h>
11
#include <signal.h>
12
#ifndef SYS_WINNT
13
#include <pthread.h>
14
#endif
15
16
#include "ntp_stdlib.h"
17
#include "ntp_malloc.h"
18
#include "ntp_syslog.h"
19
#include "ntpd.h"
20
#include "ntp_io.h"
21
#include "ntp_assert.h"
22
#include "ntp_unixtime.h"
23
#include "timespecops.h"
24
#include "ntp_worker.h"
25
26
0
#define CHILD_EXIT_REQ  ((blocking_pipe_header *)(intptr_t)-1)
27
0
#define CHILD_GONE_RESP CHILD_EXIT_REQ
28
/* Queue size increments:
29
 * The request queue grows a bit faster than the response queue -- the
30
 * daemon can push requests and pull results faster on avarage than the
31
 * worker can process requests and push results...  If this really pays
32
 * off is debatable.
33
 */
34
0
#define WORKITEMS_ALLOC_INC 16
35
0
#define RESPONSES_ALLOC_INC 4
36
37
/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38
 * set the maximum to 256kB. If the minimum goes below the
39
 * system-defined minimum stack size, we have to adjust accordingly.
40
 */
41
#ifndef THREAD_MINSTACKSIZE
42
0
# define THREAD_MINSTACKSIZE  (64U * 1024)
43
#endif
44
#ifndef __sun
45
#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
46
# undef THREAD_MINSTACKSIZE
47
# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
48
#endif
49
#endif
50
51
#ifndef THREAD_MAXSTACKSIZE
52
0
# define THREAD_MAXSTACKSIZE  (256U * 1024)
53
#endif
54
#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
55
# undef  THREAD_MAXSTACKSIZE
56
# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
57
#endif
58
59
/* need a good integer to store a pointer... */
60
#ifndef UINTPTR_T
61
# if defined(UINTPTR_MAX)
62
#  define UINTPTR_T uintptr_t
63
# elif defined(UINT_PTR)
64
#  define UINTPTR_T UINT_PTR
65
# else
66
#  define UINTPTR_T size_t
67
# endif
68
#endif
69
70
71
#ifdef SYS_WINNT
72
73
# define thread_exit(c) _endthreadex(c)
74
# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
75
u_int WINAPI  blocking_thread(void *);
76
static BOOL same_os_sema(const sem_ref obj, void * osobj);
77
78
#else
79
80
0
# define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c))
81
0
# define tickle_sem sem_post
82
void *    blocking_thread(void *);
83
static  void  block_thread_signals(sigset_t *);
84
85
#endif
86
87
#ifdef WORK_PIPE
88
addremove_io_fd_func    addremove_io_fd;
89
#else
90
addremove_io_semaphore_func addremove_io_semaphore;
91
#endif
92
93
static  void  start_blocking_thread(blocking_child *);
94
static  void  start_blocking_thread_internal(blocking_child *);
95
static  void  prepare_child_sems(blocking_child *);
96
static  int wait_for_sem(sem_ref, struct timespec *);
97
static  int ensure_workitems_empty_slot(blocking_child *);
98
static  int ensure_workresp_empty_slot(blocking_child *);
99
static  int queue_req_pointer(blocking_child *, blocking_pipe_header *);
100
static  void  cleanup_after_child(blocking_child *);
101
102
static sema_type worker_mmutex;
103
static sem_ref   worker_memlock;
104
105
/* --------------------------------------------------------------------
106
 * locking the global worker state table (and other global stuff)
107
 */
108
void
109
worker_global_lock(
110
  int inOrOut)
111
0
{
112
0
  if (worker_memlock) {
113
0
    if (inOrOut)
114
0
      wait_for_sem(worker_memlock, NULL);
115
0
    else
116
0
      tickle_sem(worker_memlock);
117
0
  }
118
0
}
119
120
/* --------------------------------------------------------------------
121
 * implementation isolation wrapper
122
 */
123
void
124
exit_worker(
125
  int exitcode
126
  )
127
0
{
128
0
  thread_exit(exitcode); /* see #define thread_exit */
129
0
}
130
131
/* --------------------------------------------------------------------
132
 * sleep for a given time or until the wakup semaphore is tickled.
133
 */
134
int
135
worker_sleep(
136
  blocking_child *  c,
137
  time_t      seconds
138
  )
139
0
{
140
0
  struct timespec until;
141
0
  int   rc;
142
143
0
# ifdef HAVE_CLOCK_GETTIME
144
0
  if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
145
0
    msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
146
0
    return -1;
147
0
  }
148
# else
149
  if (0 != getclock(TIMEOFDAY, &until)) {
150
    msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
151
    return -1;
152
  }
153
# endif
154
0
  until.tv_sec += seconds;
155
0
  rc = wait_for_sem(c->wake_scheduled_sleep, &until);
156
0
  if (0 == rc)
157
0
    return -1;
158
0
  if (-1 == rc && ETIMEDOUT == errno)
159
0
    return 0;
160
0
  msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
161
0
  return -1;
162
0
}
163
164
165
/* --------------------------------------------------------------------
166
 * Wake up a worker that takes a nap.
167
 */
168
void
169
interrupt_worker_sleep(void)
170
0
{
171
0
  u_int     idx;
172
0
  blocking_child *  c;
173
174
0
  for (idx = 0; idx < blocking_children_alloc; idx++) {
175
0
    c = blocking_children[idx];
176
0
    if (NULL == c || NULL == c->wake_scheduled_sleep)
177
0
      continue;
178
0
    tickle_sem(c->wake_scheduled_sleep);
179
0
  }
180
0
}
181
182
/* --------------------------------------------------------------------
183
 * Make sure there is an empty slot at the head of the request
184
 * queue. Tell if the queue is currently empty.
185
 */
186
static int
187
ensure_workitems_empty_slot(
188
  blocking_child *c
189
  )
190
0
{
191
  /*
192
  ** !!! PRECONDITION: caller holds access lock!
193
  **
194
  ** This simply tries to increase the size of the buffer if it
195
  ** becomes full. The resize operation does *not* maintain the
196
  ** order of requests, but that should be irrelevant since the
197
  ** processing is considered asynchronous anyway.
198
  **
199
  ** Return if the buffer is currently empty.
200
  */
201
  
202
0
  static const size_t each =
203
0
      sizeof(blocking_children[0]->workitems[0]);
204
205
0
  size_t  new_alloc;
206
0
  size_t  slots_used;
207
0
  size_t  sidx;
208
209
0
  slots_used = c->head_workitem - c->tail_workitem;
210
0
  if (slots_used >= c->workitems_alloc) {
211
0
    new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
212
0
    c->workitems = erealloc(c->workitems, new_alloc * each);
213
0
    for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
214
0
        c->workitems[sidx] = NULL;
215
0
    c->tail_workitem   = 0;
216
0
    c->head_workitem   = c->workitems_alloc;
217
0
    c->workitems_alloc = new_alloc;
218
0
  }
219
0
  INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
220
0
  return (0 == slots_used);
221
0
}
222
223
/* --------------------------------------------------------------------
224
 * Make sure there is an empty slot at the head of the response
225
 * queue. Tell if the queue is currently empty.
226
 */
227
static int
228
ensure_workresp_empty_slot(
229
  blocking_child *c
230
  )
231
0
{
232
  /*
233
  ** !!! PRECONDITION: caller holds access lock!
234
  **
235
  ** Works like the companion function above.
236
  */
237
  
238
0
  static const size_t each =
239
0
      sizeof(blocking_children[0]->responses[0]);
240
241
0
  size_t  new_alloc;
242
0
  size_t  slots_used;
243
0
  size_t  sidx;
244
245
0
  slots_used = c->head_response - c->tail_response;
246
0
  if (slots_used >= c->responses_alloc) {
247
0
    new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
248
0
    c->responses = erealloc(c->responses, new_alloc * each);
249
0
    for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
250
0
        c->responses[sidx] = NULL;
251
0
    c->tail_response   = 0;
252
0
    c->head_response   = c->responses_alloc;
253
0
    c->responses_alloc = new_alloc;
254
0
  }
255
0
  INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
256
0
  return (0 == slots_used);
257
0
}
258
259
260
/* --------------------------------------------------------------------
261
 * queue_req_pointer() - append a work item or idle exit request to
262
 *       blocking_workitems[]. Employ proper locking.
263
 */
264
static int
265
queue_req_pointer(
266
  blocking_child  * c,
267
  blocking_pipe_header *  hdr
268
  )
269
0
{
270
0
  size_t qhead;
271
  
272
  /* >>>> ACCESS LOCKING STARTS >>>> */
273
0
  wait_for_sem(c->accesslock, NULL);
274
0
  ensure_workitems_empty_slot(c);
275
0
  qhead = c->head_workitem;
276
0
  c->workitems[qhead % c->workitems_alloc] = hdr;
277
0
  c->head_workitem = 1 + qhead;
278
0
  tickle_sem(c->accesslock);
279
  /* <<<< ACCESS LOCKING ENDS <<<< */
280
281
  /* queue consumer wake-up notification */
282
0
  tickle_sem(c->workitems_pending);
283
284
0
  return 0;
285
0
}
286
287
/* --------------------------------------------------------------------
288
 * API function to make sure a worker is running, a proper private copy
289
 * of the data is made, the data eneterd into the queue and the worker
290
 * is signalled.
291
 */
292
int
293
send_blocking_req_internal(
294
  blocking_child *  c,
295
  blocking_pipe_header *  hdr,
296
  void *      data
297
  )
298
0
{
299
0
  blocking_pipe_header *  threadcopy;
300
0
  size_t      payload_octets;
301
302
0
  REQUIRE(hdr != NULL);
303
0
  REQUIRE(data != NULL);
304
0
  DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
305
306
0
  if (hdr->octets <= sizeof(*hdr))
307
0
    return 1; /* failure */
308
0
  payload_octets = hdr->octets - sizeof(*hdr);
309
310
0
  if (NULL == c->thread_ref)
311
0
    start_blocking_thread(c);
312
0
  threadcopy = emalloc(hdr->octets);
313
0
  memcpy(threadcopy, hdr, sizeof(*hdr));
314
0
  memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
315
316
0
  return queue_req_pointer(c, threadcopy);
317
0
}
318
319
/* --------------------------------------------------------------------
320
 * Wait for the 'incoming queue no longer empty' signal, lock the shared
321
 * structure and dequeue an item.
322
 */
323
blocking_pipe_header *
324
receive_blocking_req_internal(
325
  blocking_child *  c
326
  )
327
0
{
328
0
  blocking_pipe_header *  req;
329
0
  size_t      qhead, qtail;
330
331
0
  req = NULL;
332
0
  do {
333
    /* wait for tickle from the producer side */
334
0
    wait_for_sem(c->workitems_pending, NULL);
335
336
    /* >>>> ACCESS LOCKING STARTS >>>> */
337
0
    wait_for_sem(c->accesslock, NULL);
338
0
    qhead = c->head_workitem;
339
0
    do {
340
0
      qtail = c->tail_workitem;
341
0
      if (qhead == qtail)
342
0
        break;
343
0
      c->tail_workitem = qtail + 1;
344
0
      qtail %= c->workitems_alloc;
345
0
      req = c->workitems[qtail];
346
0
      c->workitems[qtail] = NULL;
347
0
    } while (NULL == req);
348
0
    tickle_sem(c->accesslock);
349
    /* <<<< ACCESS LOCKING ENDS <<<< */
350
351
0
  } while (NULL == req);
352
353
0
  INSIST(NULL != req);
354
0
  if (CHILD_EXIT_REQ == req) { /* idled out */
355
0
    send_blocking_resp_internal(c, CHILD_GONE_RESP);
356
0
    req = NULL;
357
0
  }
358
359
0
  return req;
360
0
}
361
362
/* --------------------------------------------------------------------
363
 * Push a response into the return queue and eventually tickle the
364
 * receiver.
365
 */
366
int
367
send_blocking_resp_internal(
368
  blocking_child *  c,
369
  blocking_pipe_header *  resp
370
  )
371
0
{
372
0
  size_t  qhead;
373
0
  int empty;
374
  
375
  /* >>>> ACCESS LOCKING STARTS >>>> */
376
0
  wait_for_sem(c->accesslock, NULL);
377
0
  empty = ensure_workresp_empty_slot(c);
378
0
  qhead = c->head_response;
379
0
  c->responses[qhead % c->responses_alloc] = resp;
380
0
  c->head_response = 1 + qhead;
381
0
  tickle_sem(c->accesslock);
382
  /* <<<< ACCESS LOCKING ENDS <<<< */
383
384
  /* queue consumer wake-up notification */
385
0
  if (empty)
386
0
  {
387
0
#     ifdef WORK_PIPE
388
0
    if (1 != write(c->resp_write_pipe, "", 1))
389
0
      msyslog(LOG_WARNING, "async resolver: %s",
390
0
        "failed to notify main thread!");
391
#     else
392
    tickle_sem(c->responses_pending);
393
#     endif
394
0
  }
395
0
  return 0;
396
0
}
397
398
399
#ifndef WORK_PIPE
400
401
/* --------------------------------------------------------------------
402
 * Check if a (Windows-)hanndle to a semaphore is actually the same we
403
 * are using inside the sema wrapper.
404
 */
405
static BOOL
406
same_os_sema(
407
  const sem_ref obj,
408
  void*   osh
409
  )
410
{
411
  return obj && osh && (obj->shnd == (HANDLE)osh);
412
}
413
414
/* --------------------------------------------------------------------
415
 * Find the shared context that associates to an OS handle and make sure
416
 * the data is dequeued and processed.
417
 */
418
void
419
handle_blocking_resp_sem(
420
  void *  context
421
  )
422
{
423
  blocking_child *  c;
424
  u_int     idx;
425
426
  c = NULL;
427
  for (idx = 0; idx < blocking_children_alloc; idx++) {
428
    c = blocking_children[idx];
429
    if (c != NULL &&
430
      c->thread_ref != NULL &&
431
      same_os_sema(c->responses_pending, context))
432
      break;
433
  }
434
  if (idx < blocking_children_alloc)
435
    process_blocking_resp(c);
436
}
437
#endif  /* !WORK_PIPE */
438
439
/* --------------------------------------------------------------------
440
 * Fetch the next response from the return queue. In case of signalling
441
 * via pipe, make sure the pipe is flushed, too.
442
 */
443
blocking_pipe_header *
444
receive_blocking_resp_internal(
445
  blocking_child *  c
446
  )
447
0
{
448
0
  blocking_pipe_header *  removed;
449
0
  size_t      qhead, qtail, slot;
450
451
0
#ifdef WORK_PIPE
452
0
  int     rc;
453
0
  char      scratch[32];
454
455
0
  do
456
0
    rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
457
0
  while (-1 == rc && EINTR == errno);
458
0
#endif
459
460
  /* >>>> ACCESS LOCKING STARTS >>>> */
461
0
  wait_for_sem(c->accesslock, NULL);
462
0
  qhead = c->head_response;
463
0
  qtail = c->tail_response;
464
0
  for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
465
0
    slot = qtail % c->responses_alloc;
466
0
    removed = c->responses[slot];
467
0
    c->responses[slot] = NULL;
468
0
  }
469
0
  c->tail_response = qtail;
470
0
  tickle_sem(c->accesslock);
471
  /* <<<< ACCESS LOCKING ENDS <<<< */
472
473
0
  if (NULL != removed) {
474
0
    DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
475
0
           BLOCKING_RESP_MAGIC == removed->magic_sig);
476
0
  }
477
0
  if (CHILD_GONE_RESP == removed) {
478
0
    cleanup_after_child(c);
479
0
    removed = NULL;
480
0
  }
481
482
0
  return removed;
483
0
}
484
485
/* --------------------------------------------------------------------
486
 * Light up a new worker.
487
 */
488
static void
489
start_blocking_thread(
490
  blocking_child *  c
491
  )
492
0
{
493
494
0
  DEBUG_INSIST(!c->reusable);
495
496
0
  prepare_child_sems(c);
497
0
  start_blocking_thread_internal(c);
498
0
}
499
500
/* --------------------------------------------------------------------
501
 * Create a worker thread. There are several differences between POSIX
502
 * and Windows, of course -- most notably the Windows thread is no
503
 * detached thread, and we keep the handle around until we want to get
504
 * rid of the thread. The notification scheme also differs: Windows
505
 * makes use of semaphores in both directions, POSIX uses a pipe for
506
 * integration with 'select()' or alike.
507
 */
508
static void
509
start_blocking_thread_internal(
510
  blocking_child *  c
511
  )
512
#ifdef SYS_WINNT
513
{
514
  BOOL  resumed;
515
516
  c->thread_ref = NULL;
517
  (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
518
  c->thr_table[0].thnd =
519
    (HANDLE)_beginthreadex(
520
      NULL,
521
      0,
522
      &blocking_thread,
523
      c,
524
      CREATE_SUSPENDED,
525
      NULL);
526
527
  if (NULL == c->thr_table[0].thnd) {
528
    msyslog(LOG_ERR, "start blocking thread failed: %m");
529
    exit(-1);
530
  }
531
  /* remember the thread priority is only within the process class */
532
  if (!SetThreadPriority(c->thr_table[0].thnd,
533
             THREAD_PRIORITY_BELOW_NORMAL))
534
    msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
535
536
  resumed = ResumeThread(c->thr_table[0].thnd);
537
  DEBUG_INSIST(resumed);
538
  c->thread_ref = &c->thr_table[0];
539
}
540
#else /* pthreads start_blocking_thread_internal() follows */
541
0
{
542
# ifdef NEED_PTHREAD_INIT
543
  static int  pthread_init_called;
544
# endif
545
0
  pthread_attr_t  thr_attr;
546
0
  int   rc;
547
0
  int   pipe_ends[2]; /* read then write */
548
0
  int   is_pipe;
549
0
  int   flags;
550
0
  size_t    ostacksize;
551
0
  size_t    nstacksize;
552
0
  sigset_t  saved_sig_mask;
553
554
0
  c->thread_ref = NULL;
555
556
# ifdef NEED_PTHREAD_INIT
557
  /*
558
   * from lib/isc/unix/app.c:
559
   * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
560
   */
561
  if (!pthread_init_called) {
562
    pthread_init();
563
    pthread_init_called = TRUE;
564
  }
565
# endif
566
567
0
  rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
568
0
  if (0 != rc) {
569
0
    msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
570
0
    exit(1);
571
0
  }
572
0
  c->resp_read_pipe = move_fd(pipe_ends[0]);
573
0
  c->resp_write_pipe = move_fd(pipe_ends[1]);
574
0
  c->ispipe = is_pipe;
575
0
  flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
576
0
  if (-1 == flags) {
577
0
    msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
578
0
    exit(1);
579
0
  }
580
0
  rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
581
0
  if (-1 == rc) {
582
0
    msyslog(LOG_ERR,
583
0
      "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
584
0
    exit(1);
585
0
  }
586
0
  (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
587
0
  pthread_attr_init(&thr_attr);
588
0
  pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
589
0
#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
590
0
    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
591
0
  rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
592
0
  if (0 != rc) {
593
0
    msyslog(LOG_ERR,
594
0
      "start_blocking_thread: pthread_attr_getstacksize() -> %s",
595
0
      strerror(rc));
596
0
  } else {
597
0
    if (ostacksize < THREAD_MINSTACKSIZE)
598
0
      nstacksize = THREAD_MINSTACKSIZE;
599
0
    else if (ostacksize > THREAD_MAXSTACKSIZE)
600
0
      nstacksize = THREAD_MAXSTACKSIZE;
601
0
    else
602
0
      nstacksize = ostacksize;
603
0
    if (nstacksize != ostacksize)
604
0
      rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
605
0
    if (0 != rc)
606
0
      msyslog(LOG_ERR,
607
0
        "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
608
0
        (u_long)ostacksize, (u_long)nstacksize,
609
0
        strerror(rc));
610
0
  }
611
#else
612
  UNUSED_ARG(nstacksize);
613
  UNUSED_ARG(ostacksize);
614
#endif
615
#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
616
  pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
617
#endif
618
0
  c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
619
0
  block_thread_signals(&saved_sig_mask);
620
0
  rc = pthread_create(&c->thr_table[0], &thr_attr,
621
0
          &blocking_thread, c);
622
0
  pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
623
0
  pthread_attr_destroy(&thr_attr);
624
0
  if (0 != rc) {
625
0
    msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
626
0
      strerror(rc));
627
0
    exit(1);
628
0
  }
629
0
  c->thread_ref = &c->thr_table[0];
630
0
}
631
#endif
632
633
/* --------------------------------------------------------------------
634
 * block_thread_signals()
635
 *
636
 * Temporarily block signals used by ntpd main thread, so that signal
637
 * mask inherited by child threads leaves them blocked.  Returns prior
638
 * active signal mask via pmask, to be restored by the main thread
639
 * after pthread_create().
640
 */
641
#ifndef SYS_WINNT
642
void
643
block_thread_signals(
644
  sigset_t *  pmask
645
  )
646
0
{
647
0
  sigset_t  block;
648
649
0
  sigemptyset(&block);
650
# ifdef HAVE_SIGNALED_IO
651
#  ifdef SIGIO
652
  sigaddset(&block, SIGIO);
653
#  endif
654
#  ifdef SIGPOLL
655
  sigaddset(&block, SIGPOLL);
656
#  endif
657
# endif /* HAVE_SIGNALED_IO */
658
0
  sigaddset(&block, SIGALRM);
659
0
  sigaddset(&block, MOREDEBUGSIG);
660
0
  sigaddset(&block, LESSDEBUGSIG);
661
0
# ifdef SIGDIE1
662
0
  sigaddset(&block, SIGDIE1);
663
0
# endif
664
0
# ifdef SIGDIE2
665
0
  sigaddset(&block, SIGDIE2);
666
0
# endif
667
0
# ifdef SIGDIE3
668
0
  sigaddset(&block, SIGDIE3);
669
0
# endif
670
0
# ifdef SIGDIE4
671
0
  sigaddset(&block, SIGDIE4);
672
0
# endif
673
0
# ifdef SIGBUS
674
0
  sigaddset(&block, SIGBUS);
675
0
# endif
676
0
  sigemptyset(pmask);
677
0
  pthread_sigmask(SIG_BLOCK, &block, pmask);
678
0
}
679
#endif  /* !SYS_WINNT */
680
681
682
/* --------------------------------------------------------------------
683
 * Create & destroy semaphores. This is sufficiently different between
684
 * POSIX and Windows to warrant wrapper functions and close enough to
685
 * use the concept of synchronization via semaphore for all platforms.
686
 */
687
static sem_ref
688
create_sema(
689
  sema_type*  semptr,
690
  u_int   inival,
691
  u_int   maxval)
692
0
{
693
#ifdef SYS_WINNT
694
  
695
  long svini, svmax;
696
  if (NULL != semptr) {
697
    svini = (inival < LONG_MAX)
698
        ? (long)inival : LONG_MAX;
699
    svmax = (maxval < LONG_MAX && maxval > 0)
700
        ? (long)maxval : LONG_MAX;
701
    semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
702
    if (NULL == semptr->shnd)
703
      semptr = NULL;
704
  }
705
  
706
#else
707
  
708
0
  (void)maxval;
709
0
  if (semptr && sem_init(semptr, FALSE, inival))
710
0
    semptr = NULL;
711
  
712
0
#endif
713
714
0
  return semptr;
715
0
}
716
717
/* ------------------------------------------------------------------ */
718
static sem_ref
719
delete_sema(
720
  sem_ref obj)
721
0
{
722
    
723
#   ifdef SYS_WINNT
724
    
725
  if (obj) {
726
    if (obj->shnd)
727
      CloseHandle(obj->shnd);
728
    obj->shnd = NULL;
729
  }
730
  
731
#   else
732
    
733
0
  if (obj)
734
0
    sem_destroy(obj);
735
    
736
0
#   endif
737
738
0
  return NULL;
739
0
}
740
741
/* --------------------------------------------------------------------
742
 * prepare_child_sems()
743
 *
744
 * create sync & access semaphores
745
 *
746
 * All semaphores are cleared, only the access semaphore has 1 unit.
747
 * Childs wait on 'workitems_pending', then grabs 'sema_access'
748
 * and dequeues jobs. When done, 'sema_access' is given one unit back.
749
 *
750
 * The producer grabs 'sema_access', manages the queue, restores
751
 * 'sema_access' and puts one unit into 'workitems_pending'.
752
 *
753
 * The story goes the same for the response queue.
754
 */
755
static void
756
prepare_child_sems(
757
  blocking_child *c
758
  )
759
0
{
760
0
  if (NULL == worker_memlock)
761
0
    worker_memlock = create_sema(&worker_mmutex, 1, 1);
762
  
763
0
  c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
764
0
  c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
765
0
  c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
766
#   ifndef WORK_PIPE
767
  c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
768
#   endif
769
0
}
770
771
/* --------------------------------------------------------------------
772
 * wait for semaphore. Where the wait can be interrupted, it will
773
 * internally resume -- When this function returns, there is either no
774
 * semaphore at all, a timeout occurred, or the caller could
775
 * successfully take a token from the semaphore.
776
 *
777
 * For untimed wait, not checking the result of this function at all is
778
 * definitely an option.
779
 */
780
static int
781
wait_for_sem(
782
  sem_ref     sem,
783
  struct timespec * timeout   /* wall-clock */
784
  )
785
#ifdef SYS_WINNT
786
{
787
  struct timespec now;
788
  struct timespec delta;
789
  DWORD   msec;
790
  DWORD   rc;
791
792
  if (!(sem && sem->shnd)) {
793
    errno = EINVAL;
794
    return -1;
795
  }
796
  
797
  if (NULL == timeout) {
798
    msec = INFINITE;
799
  } else {
800
    getclock(TIMEOFDAY, &now);
801
    delta = sub_tspec(*timeout, now);
802
    if (delta.tv_sec < 0) {
803
      msec = 0;
804
    } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
805
      msec = INFINITE;
806
    } else {
807
      msec = 1000 * (DWORD)delta.tv_sec;
808
      msec += delta.tv_nsec / (1000 * 1000);
809
    }
810
  }
811
  rc = WaitForSingleObject(sem->shnd, msec);
812
  if (WAIT_OBJECT_0 == rc)
813
    return 0;
814
  if (WAIT_TIMEOUT == rc) {
815
    errno = ETIMEDOUT;
816
    return -1;
817
  }
818
  msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
819
  errno = EFAULT;
820
  return -1;
821
}
822
#else /* pthreads wait_for_sem() follows */
823
0
{
824
0
  int rc = -1;
825
826
0
  if (sem) do {
827
0
      if (NULL == timeout)
828
0
        rc = sem_wait(sem);
829
0
      else
830
0
        rc = sem_timedwait(sem, timeout);
831
0
    } while (rc == -1 && errno == EINTR);
832
0
  else
833
0
    errno = EINVAL;
834
    
835
0
  return rc;
836
0
}
837
#endif
838
839
/* --------------------------------------------------------------------
840
 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
841
 * calling conventions under Windows and POSIX-defined signature
842
 * otherwise.
843
 */
844
#ifdef SYS_WINNT
845
u_int WINAPI
846
#else
847
void *
848
#endif
849
blocking_thread(
850
  void *  ThreadArg
851
  )
852
0
{
853
0
  blocking_child *c;
854
855
0
  c = ThreadArg;
856
0
  exit_worker(blocking_child_common(c));
857
858
  /* NOTREACHED */
859
0
  return 0;
860
0
}
861
862
/* --------------------------------------------------------------------
863
 * req_child_exit() runs in the parent.
864
 *
865
 * This function is called from from the idle timer, too, and possibly
866
 * without a thread being there any longer. Since we have folded up our
867
 * tent in that case and all the semaphores are already gone, we simply
868
 * ignore this request in this case.
869
 *
870
 * Since the existence of the semaphores is controlled exclusively by
871
 * the parent, there's no risk of data race here.
872
 */
873
int
874
req_child_exit(
875
  blocking_child *c
876
  )
877
0
{
878
0
  return (c->accesslock)
879
0
      ? queue_req_pointer(c, CHILD_EXIT_REQ)
880
0
      : 0;
881
0
}
882
883
/* --------------------------------------------------------------------
884
 * cleanup_after_child() runs in parent.
885
 */
886
static void
887
cleanup_after_child(
888
  blocking_child *  c
889
  )
890
0
{
891
0
  DEBUG_INSIST(!c->reusable);
892
  
893
#   ifdef SYS_WINNT
894
  /* The thread was not created in detached state, so we better
895
   * clean up.
896
   */
897
  if (c->thread_ref && c->thread_ref->thnd) {
898
    WaitForSingleObject(c->thread_ref->thnd, INFINITE);
899
    INSIST(CloseHandle(c->thread_ref->thnd));
900
    c->thread_ref->thnd = NULL;
901
  }
902
#   endif
903
0
  c->thread_ref = NULL;
904
905
  /* remove semaphores and (if signalling vi IO) pipes */
906
  
907
0
  c->accesslock           = delete_sema(c->accesslock);
908
0
  c->workitems_pending    = delete_sema(c->workitems_pending);
909
0
  c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
910
911
0
#   ifdef WORK_PIPE
912
0
  DEBUG_INSIST(-1 != c->resp_read_pipe);
913
0
  DEBUG_INSIST(-1 != c->resp_write_pipe);
914
0
  (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
915
0
  close(c->resp_write_pipe);
916
0
  close(c->resp_read_pipe);
917
0
  c->resp_write_pipe = -1;
918
0
  c->resp_read_pipe = -1;
919
#   else
920
  DEBUG_INSIST(NULL != c->responses_pending);
921
  (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
922
  c->responses_pending = delete_sema(c->responses_pending);
923
#   endif
924
925
  /* Is it necessary to check if there are pending requests and
926
   * responses? If so, and if there are, what to do with them?
927
   */
928
  
929
  /* re-init buffer index sequencers */
930
0
  c->head_workitem = 0;
931
0
  c->tail_workitem = 0;
932
0
  c->head_response = 0;
933
0
  c->tail_response = 0;
934
935
0
  c->reusable = TRUE;
936
0
}
937
938
939
#else /* !WORK_THREAD follows */
940
char work_thread_nonempty_compilation_unit;
941
#endif