Coverage Report

Created: 2026-02-26 06:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ntp-dev/libntp/work_thread.c
Line
Count
Source
1
/*
2
 * work_thread.c - threads implementation for blocking worker child.
3
 */
4
#include <config.h>
5
#include "ntp_workimpl.h"
6
7
#ifdef WORK_THREAD
8
9
#include <stdio.h>
10
#include <ctype.h>
11
#include <signal.h>
12
#ifndef SYS_WINNT
13
#include <pthread.h>
14
#endif
15
16
#include "ntp_stdlib.h"
17
#include "ntp_malloc.h"
18
#include "ntp_syslog.h"
19
#include "ntpd.h"
20
#include "ntp_io.h"
21
#include "ntp_assert.h"
22
#include "ntp_unixtime.h"
23
#include "timespecops.h"
24
#include "ntp_worker.h"
25
26
0
#define CHILD_EXIT_REQ  ((blocking_pipe_header *)(intptr_t)-1)
27
0
#define CHILD_GONE_RESP CHILD_EXIT_REQ
28
/* Queue size increments:
29
 * The request queue grows a bit faster than the response queue -- the
30
 * daemon can push requests and pull results faster on avarage than the
31
 * worker can process requests and push results...  If this really pays
32
 * off is debatable.
33
 */
34
0
#define WORKITEMS_ALLOC_INC 16
35
0
#define RESPONSES_ALLOC_INC 4
36
37
/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38
 * set the maximum to 256kB. If the minimum goes below the
39
 * system-defined minimum stack size, we have to adjust accordingly.
40
 */
41
#ifndef THREAD_MINSTACKSIZE
42
0
# define THREAD_MINSTACKSIZE  (64U * 1024)
43
#endif
44
45
#ifndef THREAD_MAXSTACKSIZE
46
0
# define THREAD_MAXSTACKSIZE  (256U * 1024)
47
#endif
48
49
/* need a good integer to store a pointer... */
50
#ifndef UINTPTR_T
51
# if defined(UINTPTR_MAX)
52
#  define UINTPTR_T uintptr_t
53
# elif defined(UINT_PTR)
54
#  define UINTPTR_T UINT_PTR
55
# else
56
#  define UINTPTR_T size_t
57
# endif
58
#endif
59
60
61
#ifdef SYS_WINNT
62
63
# define thread_exit(c) _endthreadex(c)
64
# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
65
u_int WINAPI  blocking_thread(void *);
66
static BOOL same_os_sema(const sem_ref obj, void * osobj);
67
68
#else
69
70
0
# define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c))
71
0
# define tickle_sem sem_post
72
void *    blocking_thread(void *);
73
static  void  block_thread_signals(sigset_t *);
74
75
#endif
76
77
#ifdef WORK_PIPE
78
addremove_io_fd_func    addremove_io_fd;
79
#else
80
addremove_io_semaphore_func addremove_io_semaphore;
81
#endif
82
83
static  void  start_blocking_thread(blocking_child *);
84
static  void  start_blocking_thread_internal(blocking_child *);
85
static  void  prepare_child_sems(blocking_child *);
86
static  int wait_for_sem(sem_ref, struct timespec *);
87
static  int ensure_workitems_empty_slot(blocking_child *);
88
static  int ensure_workresp_empty_slot(blocking_child *);
89
static  int queue_req_pointer(blocking_child *, blocking_pipe_header *);
90
static  void  cleanup_after_child(blocking_child *);
91
92
static sema_type worker_mmutex;
93
static sem_ref   worker_memlock;
94
95
/* --------------------------------------------------------------------
96
 * locking the global worker state table (and other global stuff)
97
 */
98
void
99
worker_global_lock(
100
  int inOrOut)
101
0
{
102
0
  if (worker_memlock) {
103
0
    if (inOrOut)
104
0
      wait_for_sem(worker_memlock, NULL);
105
0
    else
106
0
      tickle_sem(worker_memlock);
107
0
  }
108
0
}
109
110
/* --------------------------------------------------------------------
111
 * implementation isolation wrapper
112
 */
113
void
114
exit_worker(
115
  int exitcode
116
  )
117
0
{
118
0
  thread_exit(exitcode); /* see #define thread_exit */
119
0
}
120
121
/* --------------------------------------------------------------------
122
 * sleep for a given time or until the wakup semaphore is tickled.
123
 */
124
int
125
worker_sleep(
126
  blocking_child *  c,
127
  time_t      seconds
128
  )
129
0
{
130
0
  struct timespec until;
131
0
  int   rc;
132
133
0
# ifdef HAVE_CLOCK_GETTIME
134
0
  if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
135
0
    msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
136
0
    return -1;
137
0
  }
138
# else
139
  if (0 != getclock(TIMEOFDAY, &until)) {
140
    msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
141
    return -1;
142
  }
143
# endif
144
0
  until.tv_sec += seconds;
145
0
  rc = wait_for_sem(c->wake_scheduled_sleep, &until);
146
0
  if (0 == rc)
147
0
    return -1;
148
0
  if (-1 == rc && ETIMEDOUT == errno)
149
0
    return 0;
150
0
  msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
151
0
  return -1;
152
0
}
153
154
155
/* --------------------------------------------------------------------
156
 * Wake up a worker that takes a nap.
157
 */
158
void
159
interrupt_worker_sleep(void)
160
0
{
161
0
  u_int     idx;
162
0
  blocking_child *  c;
163
164
0
  for (idx = 0; idx < blocking_children_alloc; idx++) {
165
0
    c = blocking_children[idx];
166
0
    if (NULL == c || NULL == c->wake_scheduled_sleep)
167
0
      continue;
168
0
    tickle_sem(c->wake_scheduled_sleep);
169
0
  }
170
0
}
171
172
/* --------------------------------------------------------------------
173
 * Make sure there is an empty slot at the head of the request
174
 * queue. Tell if the queue is currently empty.
175
 */
176
static int
177
ensure_workitems_empty_slot(
178
  blocking_child *c
179
  )
180
0
{
181
  /*
182
  ** !!! PRECONDITION: caller holds access lock!
183
  **
184
  ** This simply tries to increase the size of the buffer if it
185
  ** becomes full. The resize operation does *not* maintain the
186
  ** order of requests, but that should be irrelevant since the
187
  ** processing is considered asynchronous anyway.
188
  **
189
  ** Return if the buffer is currently empty.
190
  */
191
  
192
0
  static const size_t each =
193
0
      sizeof(blocking_children[0]->workitems[0]);
194
195
0
  size_t  new_alloc;
196
0
  size_t  slots_used;
197
0
  size_t  sidx;
198
199
0
  slots_used = c->head_workitem - c->tail_workitem;
200
0
  if (slots_used >= c->workitems_alloc) {
201
0
    new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
202
0
    c->workitems = erealloc(c->workitems, new_alloc * each);
203
0
    for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
204
0
        c->workitems[sidx] = NULL;
205
0
    c->tail_workitem   = 0;
206
0
    c->head_workitem   = c->workitems_alloc;
207
0
    c->workitems_alloc = new_alloc;
208
0
  }
209
0
  INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
210
0
  return (0 == slots_used);
211
0
}
212
213
/* --------------------------------------------------------------------
214
 * Make sure there is an empty slot at the head of the response
215
 * queue. Tell if the queue is currently empty.
216
 */
217
static int
218
ensure_workresp_empty_slot(
219
  blocking_child *c
220
  )
221
0
{
222
  /*
223
  ** !!! PRECONDITION: caller holds access lock!
224
  **
225
  ** Works like the companion function above.
226
  */
227
  
228
0
  static const size_t each =
229
0
      sizeof(blocking_children[0]->responses[0]);
230
231
0
  size_t  new_alloc;
232
0
  size_t  slots_used;
233
0
  size_t  sidx;
234
235
0
  slots_used = c->head_response - c->tail_response;
236
0
  if (slots_used >= c->responses_alloc) {
237
0
    new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
238
0
    c->responses = erealloc(c->responses, new_alloc * each);
239
0
    for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
240
0
        c->responses[sidx] = NULL;
241
0
    c->tail_response   = 0;
242
0
    c->head_response   = c->responses_alloc;
243
0
    c->responses_alloc = new_alloc;
244
0
  }
245
0
  INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
246
0
  return (0 == slots_used);
247
0
}
248
249
250
/* --------------------------------------------------------------------
251
 * queue_req_pointer() - append a work item or idle exit request to
252
 *       blocking_workitems[]. Employ proper locking.
253
 */
254
static int
255
queue_req_pointer(
256
  blocking_child  * c,
257
  blocking_pipe_header *  hdr
258
  )
259
0
{
260
0
  size_t qhead;
261
  
262
  /* >>>> ACCESS LOCKING STARTS >>>> */
263
0
  wait_for_sem(c->accesslock, NULL);
264
0
  ensure_workitems_empty_slot(c);
265
0
  qhead = c->head_workitem;
266
0
  c->workitems[qhead % c->workitems_alloc] = hdr;
267
0
  c->head_workitem = 1 + qhead;
268
0
  tickle_sem(c->accesslock);
269
  /* <<<< ACCESS LOCKING ENDS <<<< */
270
271
  /* queue consumer wake-up notification */
272
0
  tickle_sem(c->workitems_pending);
273
274
0
  return 0;
275
0
}
276
277
/* --------------------------------------------------------------------
278
 * API function to make sure a worker is running, a proper private copy
279
 * of the data is made, the data eneterd into the queue and the worker
280
 * is signalled.
281
 */
282
int
283
send_blocking_req_internal(
284
  blocking_child *  c,
285
  blocking_pipe_header *  hdr,
286
  void *      data
287
  )
288
0
{
289
0
  blocking_pipe_header *  threadcopy;
290
0
  size_t      payload_octets;
291
292
0
  REQUIRE(hdr != NULL);
293
0
  REQUIRE(data != NULL);
294
0
  DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
295
296
0
  if (hdr->octets <= sizeof(*hdr))
297
0
    return 1; /* failure */
298
0
  payload_octets = hdr->octets - sizeof(*hdr);
299
300
0
  if (NULL == c->thread_ref)
301
0
    start_blocking_thread(c);
302
0
  threadcopy = emalloc(hdr->octets);
303
0
  memcpy(threadcopy, hdr, sizeof(*hdr));
304
0
  memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
305
306
0
  return queue_req_pointer(c, threadcopy);
307
0
}
308
309
/* --------------------------------------------------------------------
310
 * Wait for the 'incoming queue no longer empty' signal, lock the shared
311
 * structure and dequeue an item.
312
 */
313
blocking_pipe_header *
314
receive_blocking_req_internal(
315
  blocking_child *  c
316
  )
317
0
{
318
0
  blocking_pipe_header *  req;
319
0
  size_t      qhead, qtail;
320
321
0
  req = NULL;
322
0
  do {
323
    /* wait for tickle from the producer side */
324
0
    wait_for_sem(c->workitems_pending, NULL);
325
326
    /* >>>> ACCESS LOCKING STARTS >>>> */
327
0
    wait_for_sem(c->accesslock, NULL);
328
0
    qhead = c->head_workitem;
329
0
    do {
330
0
      qtail = c->tail_workitem;
331
0
      if (qhead == qtail)
332
0
        break;
333
0
      c->tail_workitem = qtail + 1;
334
0
      qtail %= c->workitems_alloc;
335
0
      req = c->workitems[qtail];
336
0
      c->workitems[qtail] = NULL;
337
0
    } while (NULL == req);
338
0
    tickle_sem(c->accesslock);
339
    /* <<<< ACCESS LOCKING ENDS <<<< */
340
341
0
  } while (NULL == req);
342
343
0
  INSIST(NULL != req);
344
0
  if (CHILD_EXIT_REQ == req) { /* idled out */
345
0
    send_blocking_resp_internal(c, CHILD_GONE_RESP);
346
0
    req = NULL;
347
0
  }
348
349
0
  return req;
350
0
}
351
352
/* --------------------------------------------------------------------
353
 * Push a response into the return queue and eventually tickle the
354
 * receiver.
355
 */
356
int
357
send_blocking_resp_internal(
358
  blocking_child *  c,
359
  blocking_pipe_header *  resp
360
  )
361
0
{
362
0
  size_t  qhead;
363
0
  int empty;
364
  
365
  /* >>>> ACCESS LOCKING STARTS >>>> */
366
0
  wait_for_sem(c->accesslock, NULL);
367
0
  empty = ensure_workresp_empty_slot(c);
368
0
  qhead = c->head_response;
369
0
  c->responses[qhead % c->responses_alloc] = resp;
370
0
  c->head_response = 1 + qhead;
371
0
  tickle_sem(c->accesslock);
372
  /* <<<< ACCESS LOCKING ENDS <<<< */
373
374
  /* queue consumer wake-up notification */
375
0
  if (empty)
376
0
  {
377
0
#     ifdef WORK_PIPE
378
0
    if (1 != write(c->resp_write_pipe, "", 1))
379
0
      msyslog(LOG_WARNING, "async resolver: blocking_get%sinfo"
380
0
        " failed to notify main thread!",
381
0
        (BLOCKING_GETNAMEINFO == resp->rtype)
382
0
            ? "name"
383
0
            : "addr"
384
0
        );
385
#     else
386
    tickle_sem(c->responses_pending);
387
#     endif
388
0
  }
389
0
  return 0;
390
0
}
391
392
393
#ifndef WORK_PIPE
394
395
/* --------------------------------------------------------------------
396
 * Check if a (Windows-)handle to a semaphore is actually the same we
397
 * are using inside the sema wrapper.
398
 */
399
static BOOL
400
same_os_sema(
401
  const sem_ref obj,
402
  void*   osh
403
  )
404
{
405
  return obj && osh && (obj->shnd == (HANDLE)osh);
406
}
407
408
/* --------------------------------------------------------------------
409
 * Find the shared context that associates to an OS handle and make sure
410
 * the data is dequeued and processed.
411
 */
412
void
413
handle_blocking_resp_sem(
414
  void *  context
415
  )
416
{
417
  blocking_child *  c;
418
  u_int     idx;
419
420
  c = NULL;
421
  for (idx = 0; idx < blocking_children_alloc; idx++) {
422
    c = blocking_children[idx];
423
    if (c != NULL &&
424
      c->thread_ref != NULL &&
425
      same_os_sema(c->responses_pending, context))
426
      break;
427
  }
428
  if (idx < blocking_children_alloc)
429
    process_blocking_resp(c);
430
}
431
#endif  /* !WORK_PIPE */
432
433
/* --------------------------------------------------------------------
434
 * Fetch the next response from the return queue. In case of signalling
435
 * via pipe, make sure the pipe is flushed, too.
436
 */
437
blocking_pipe_header *
438
receive_blocking_resp_internal(
439
  blocking_child *  c
440
  )
441
0
{
442
0
  blocking_pipe_header *  removed;
443
0
  size_t      qhead, qtail, slot;
444
445
0
#ifdef WORK_PIPE
446
0
  int     rc;
447
0
  char      scratch[32];
448
449
0
  do
450
0
    rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
451
0
  while (-1 == rc && EINTR == errno);
452
0
#endif
453
454
  /* >>>> ACCESS LOCKING STARTS >>>> */
455
0
  wait_for_sem(c->accesslock, NULL);
456
0
  qhead = c->head_response;
457
0
  qtail = c->tail_response;
458
0
  for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
459
0
    slot = qtail % c->responses_alloc;
460
0
    removed = c->responses[slot];
461
0
    c->responses[slot] = NULL;
462
0
  }
463
0
  c->tail_response = qtail;
464
0
  tickle_sem(c->accesslock);
465
  /* <<<< ACCESS LOCKING ENDS <<<< */
466
467
0
  if (NULL != removed) {
468
0
    DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
469
0
           BLOCKING_RESP_MAGIC == removed->magic_sig);
470
0
  }
471
0
  if (CHILD_GONE_RESP == removed) {
472
0
    cleanup_after_child(c);
473
0
    removed = NULL;
474
0
  }
475
476
0
  return removed;
477
0
}
478
479
/* --------------------------------------------------------------------
480
 * Light up a new worker.
481
 */
482
static void
483
start_blocking_thread(
484
  blocking_child *  c
485
  )
486
0
{
487
488
0
  DEBUG_INSIST(!c->reusable);
489
490
0
  prepare_child_sems(c);
491
0
  start_blocking_thread_internal(c);
492
0
}
493
494
/* --------------------------------------------------------------------
495
 * Create a worker thread. There are several differences between POSIX
496
 * and Windows, of course -- most notably the Windows thread is a
497
 * detached thread, and we keep the handle around until we want to get
498
 * rid of the thread. The notification scheme also differs: Windows
499
 * makes use of semaphores in both directions, POSIX uses a pipe for
500
 * integration with 'select()' or alike.
501
 */
502
static void
503
start_blocking_thread_internal(
504
  blocking_child *  c
505
  )
506
#ifdef SYS_WINNT
507
{
508
  BOOL  resumed;
509
510
  c->thread_ref = NULL;
511
  (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
512
  c->thr_table[0].thnd =
513
    (HANDLE)_beginthreadex(
514
      NULL,
515
      0,
516
      &blocking_thread,
517
      c,
518
      CREATE_SUSPENDED,
519
      NULL);
520
521
  if (NULL == c->thr_table[0].thnd) {
522
    msyslog(LOG_ERR, "start blocking thread failed: %m");
523
    exit(-1);
524
  }
525
  /* remember the thread priority is only within the process class */
526
  if (!SetThreadPriority(c->thr_table[0].thnd,
527
             THREAD_PRIORITY_BELOW_NORMAL)) {
528
    msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
529
  }
530
  if (NULL != pSetThreadDescription) {
531
    (*pSetThreadDescription)(c->thr_table[0].thnd, L"ntp_worker");
532
  }
533
  resumed = ResumeThread(c->thr_table[0].thnd);
534
  DEBUG_INSIST(resumed);
535
  c->thread_ref = &c->thr_table[0];
536
}
537
#else /* pthreads start_blocking_thread_internal() follows */
538
0
{
539
# ifdef NEED_PTHREAD_INIT
540
  static int  pthread_init_called;
541
# endif
542
0
  pthread_attr_t  thr_attr;
543
0
  int   rc;
544
0
  int   pipe_ends[2]; /* read then write */
545
0
  int   is_pipe;
546
0
  int   flags;
547
0
  size_t    ostacksize;
548
0
  size_t    nstacksize;
549
0
  sigset_t  saved_sig_mask;
550
551
0
  c->thread_ref = NULL;
552
553
# ifdef NEED_PTHREAD_INIT
554
  /*
555
   * from lib/isc/unix/app.c:
556
   * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
557
   */
558
  if (!pthread_init_called) {
559
    pthread_init();
560
    pthread_init_called = TRUE;
561
  }
562
# endif
563
564
0
  rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
565
0
  if (0 != rc) {
566
0
    msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
567
0
    exit(1);
568
0
  }
569
0
  c->resp_read_pipe = move_fd(pipe_ends[0]);
570
0
  c->resp_write_pipe = move_fd(pipe_ends[1]);
571
0
  c->ispipe = is_pipe;
572
0
  flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
573
0
  if (-1 == flags) {
574
0
    msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
575
0
    exit(1);
576
0
  }
577
0
  rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
578
0
  if (-1 == rc) {
579
0
    msyslog(LOG_ERR,
580
0
      "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
581
0
    exit(1);
582
0
  }
583
0
  (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
584
0
  pthread_attr_init(&thr_attr);
585
0
  pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
586
0
#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
587
0
    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
588
0
  rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
589
0
  if (0 != rc) {
590
0
    msyslog(LOG_ERR,
591
0
      "start_blocking_thread: pthread_attr_getstacksize() -> %s",
592
0
      strerror(rc));
593
0
  } else {
594
0
    nstacksize = ostacksize;
595
    /* order is important here: first clamp on upper limit,
596
     * and the PTHREAD min stack size is ultimate override!
597
     */ 
598
0
    if (nstacksize > THREAD_MAXSTACKSIZE)
599
0
      nstacksize = THREAD_MAXSTACKSIZE;
600
#            ifdef PTHREAD_STACK_MAX
601
    if (nstacksize > PTHREAD_STACK_MAX)
602
      nstacksize = PTHREAD_STACK_MAX;
603
#            endif
604
605
    /* now clamp on lower stack limit. */
606
0
    if (nstacksize < THREAD_MINSTACKSIZE)
607
0
      nstacksize = THREAD_MINSTACKSIZE;
608
0
#            ifdef PTHREAD_STACK_MIN
609
0
    if (nstacksize < PTHREAD_STACK_MIN)
610
0
      nstacksize = PTHREAD_STACK_MIN;
611
0
#            endif
612
613
0
    if (nstacksize != ostacksize)
614
0
      rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
615
0
    if (0 != rc)
616
0
      msyslog(LOG_ERR,
617
0
        "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
618
0
        (u_long)ostacksize, (u_long)nstacksize,
619
0
        strerror(rc));
620
0
  }
621
#else
622
  UNUSED_ARG(nstacksize);
623
  UNUSED_ARG(ostacksize);
624
#endif
625
#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
626
  pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
627
#endif
628
0
  c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
629
0
  block_thread_signals(&saved_sig_mask);
630
0
  rc = pthread_create(&c->thr_table[0], &thr_attr,
631
0
          &blocking_thread, c);
632
0
  pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
633
0
  pthread_attr_destroy(&thr_attr);
634
0
  if (0 != rc) {
635
0
    msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
636
0
      strerror(rc));
637
0
    exit(1);
638
0
  }
639
0
  c->thread_ref = &c->thr_table[0];
640
0
}
641
#endif
642
643
/* --------------------------------------------------------------------
644
 * block_thread_signals()
645
 *
646
 * Temporarily block signals used by ntpd main thread, so that signal
647
 * mask inherited by child threads leaves them blocked.  Returns prior
648
 * active signal mask via pmask, to be restored by the main thread
649
 * after pthread_create().
650
 */
651
#ifndef SYS_WINNT
652
void
653
block_thread_signals(
654
  sigset_t *  pmask
655
  )
656
0
{
657
0
  sigset_t  block;
658
659
0
  sigemptyset(&block);
660
# ifdef HAVE_SIGNALED_IO
661
#  ifdef SIGIO
662
  sigaddset(&block, SIGIO);
663
#  endif
664
#  ifdef SIGPOLL
665
  sigaddset(&block, SIGPOLL);
666
#  endif
667
# endif /* HAVE_SIGNALED_IO */
668
0
  sigaddset(&block, SIGALRM);
669
0
  sigaddset(&block, MOREDEBUGSIG);
670
0
  sigaddset(&block, LESSDEBUGSIG);
671
0
# ifdef SIGDIE1
672
0
  sigaddset(&block, SIGDIE1);
673
0
# endif
674
0
# ifdef SIGDIE2
675
0
  sigaddset(&block, SIGDIE2);
676
0
# endif
677
0
# ifdef SIGDIE3
678
0
  sigaddset(&block, SIGDIE3);
679
0
# endif
680
0
# ifdef SIGDIE4
681
0
  sigaddset(&block, SIGDIE4);
682
0
# endif
683
0
# ifdef SIGBUS
684
0
  sigaddset(&block, SIGBUS);
685
0
# endif
686
0
  sigemptyset(pmask);
687
0
  pthread_sigmask(SIG_BLOCK, &block, pmask);
688
0
}
689
#endif  /* !SYS_WINNT */
690
691
692
/* --------------------------------------------------------------------
693
 * Create & destroy semaphores. This is sufficiently different between
694
 * POSIX and Windows to warrant wrapper functions and close enough to
695
 * use the concept of synchronization via semaphore for all platforms.
696
 */
697
static sem_ref
698
create_sema(
699
  sema_type*  semptr,
700
  u_int   inival,
701
  u_int   maxval)
702
0
{
703
#ifdef SYS_WINNT
704
  
705
  long svini, svmax;
706
  if (NULL != semptr) {
707
    svini = (inival < LONG_MAX)
708
        ? (long)inival : LONG_MAX;
709
    svmax = (maxval < LONG_MAX && maxval > 0)
710
        ? (long)maxval : LONG_MAX;
711
    semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
712
    if (NULL == semptr->shnd)
713
      semptr = NULL;
714
  }
715
  
716
#else
717
  
718
0
  (void)maxval;
719
0
  if (semptr && sem_init(semptr, FALSE, inival))
720
0
    semptr = NULL;
721
  
722
0
#endif
723
724
0
  return semptr;
725
0
}
726
727
/* ------------------------------------------------------------------ */
728
static sem_ref
729
delete_sema(
730
  sem_ref obj)
731
0
{
732
    
733
#   ifdef SYS_WINNT
734
    
735
  if (obj) {
736
    if (obj->shnd)
737
      CloseHandle(obj->shnd);
738
    obj->shnd = NULL;
739
  }
740
  
741
#   else
742
    
743
0
  if (obj)
744
0
    sem_destroy(obj);
745
    
746
0
#   endif
747
748
0
  return NULL;
749
0
}
750
751
/* --------------------------------------------------------------------
752
 * prepare_child_sems()
753
 *
754
 * create sync & access semaphores
755
 *
756
 * All semaphores are cleared, only the access semaphore has 1 unit.
757
 * Childs wait on 'workitems_pending', then grabs 'sema_access'
758
 * and dequeues jobs. When done, 'sema_access' is given one unit back.
759
 *
760
 * The producer grabs 'sema_access', manages the queue, restores
761
 * 'sema_access' and puts one unit into 'workitems_pending'.
762
 *
763
 * The story goes the same for the response queue.
764
 */
765
static void
766
prepare_child_sems(
767
  blocking_child *c
768
  )
769
0
{
770
0
  if (NULL == worker_memlock)
771
0
    worker_memlock = create_sema(&worker_mmutex, 1, 1);
772
  
773
0
  c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
774
0
  c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
775
0
  c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
776
#   ifndef WORK_PIPE
777
  c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
778
#   endif
779
0
}
780
781
/* --------------------------------------------------------------------
782
 * wait for semaphore. Where the wait can be interrupted, it will
783
 * internally resume -- When this function returns, there is either no
784
 * semaphore at all, a timeout occurred, or the caller could
785
 * successfully take a token from the semaphore.
786
 *
787
 * For untimed wait, not checking the result of this function at all is
788
 * definitely an option.
789
 */
790
static int
791
wait_for_sem(
792
  sem_ref     sem,
793
  struct timespec * timeout   /* wall-clock */
794
  )
795
#ifdef SYS_WINNT
796
{
797
  struct timespec now;
798
  struct timespec delta;
799
  DWORD   msec;
800
  DWORD   rc;
801
802
  if (!(sem && sem->shnd)) {
803
    errno = EINVAL;
804
    return -1;
805
  }
806
  
807
  if (NULL == timeout) {
808
    msec = INFINITE;
809
  } else {
810
    getclock(TIMEOFDAY, &now);
811
    delta = sub_tspec(*timeout, now);
812
    if (delta.tv_sec < 0) {
813
      msec = 0;
814
    } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
815
      msec = INFINITE;
816
    } else {
817
      msec = 1000 * (DWORD)delta.tv_sec;
818
      msec += delta.tv_nsec / (1000 * 1000);
819
    }
820
  }
821
  rc = WaitForSingleObject(sem->shnd, msec);
822
  if (WAIT_OBJECT_0 == rc)
823
    return 0;
824
  if (WAIT_TIMEOUT == rc) {
825
    errno = ETIMEDOUT;
826
    return -1;
827
  }
828
  msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
829
  errno = EFAULT;
830
  return -1;
831
}
832
#else /* pthreads wait_for_sem() follows */
833
0
{
834
0
  int rc = -1;
835
836
0
  if (sem) do {
837
0
      if (NULL == timeout)
838
0
        rc = sem_wait(sem);
839
0
      else
840
0
        rc = sem_timedwait(sem, timeout);
841
0
    } while (rc == -1 && errno == EINTR);
842
0
  else
843
0
    errno = EINVAL;
844
    
845
0
  return rc;
846
0
}
847
#endif
848
849
/* --------------------------------------------------------------------
850
 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
851
 * calling conventions under Windows and POSIX-defined signature
852
 * otherwise.
853
 */
854
#ifdef SYS_WINNT
855
u_int WINAPI
856
#else
857
void *
858
#endif
859
blocking_thread(
860
  void *  ThreadArg
861
  )
862
0
{
863
0
  blocking_child *c;
864
865
0
  c = ThreadArg;
866
0
  exit_worker(blocking_child_common(c));
867
868
  /* NOTREACHED */
869
0
  return 0;
870
0
}
871
872
/* --------------------------------------------------------------------
873
 * req_child_exit() runs in the parent.
874
 *
875
 * This function is called from from the idle timer, too, and possibly
876
 * without a thread being there any longer. Since we have folded up our
877
 * tent in that case and all the semaphores are already gone, we simply
878
 * ignore this request in this case.
879
 *
880
 * Since the existence of the semaphores is controlled exclusively by
881
 * the parent, there's no risk of data race here.
882
 */
883
int
884
req_child_exit(
885
  blocking_child *c
886
  )
887
0
{
888
0
  return (c->accesslock)
889
0
      ? queue_req_pointer(c, CHILD_EXIT_REQ)
890
0
      : 0;
891
0
}
892
893
/* --------------------------------------------------------------------
894
 * cleanup_after_child() runs in parent.
895
 */
896
static void
897
cleanup_after_child(
898
  blocking_child *  c
899
  )
900
0
{
901
0
  DEBUG_INSIST(!c->reusable);
902
  
903
#   ifdef SYS_WINNT
904
  /* The thread was not created in detached state, so we better
905
   * clean up.
906
   */
907
  if (c->thread_ref && c->thread_ref->thnd) {
908
    WaitForSingleObject(c->thread_ref->thnd, INFINITE);
909
    INSIST(CloseHandle(c->thread_ref->thnd));
910
    c->thread_ref->thnd = NULL;
911
  }
912
#   endif
913
0
  c->thread_ref = NULL;
914
915
  /* remove semaphores and (if signalling vi IO) pipes */
916
  
917
0
  c->accesslock           = delete_sema(c->accesslock);
918
0
  c->workitems_pending    = delete_sema(c->workitems_pending);
919
0
  c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
920
921
0
#   ifdef WORK_PIPE
922
0
  DEBUG_INSIST(-1 != c->resp_read_pipe);
923
0
  DEBUG_INSIST(-1 != c->resp_write_pipe);
924
0
  (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
925
0
  close(c->resp_write_pipe);
926
0
  close(c->resp_read_pipe);
927
0
  c->resp_write_pipe = -1;
928
0
  c->resp_read_pipe = -1;
929
#   else
930
  DEBUG_INSIST(NULL != c->responses_pending);
931
  (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
932
  c->responses_pending = delete_sema(c->responses_pending);
933
#   endif
934
935
  /* Is it necessary to check if there are pending requests and
936
   * responses? If so, and if there are, what to do with them?
937
   */
938
  
939
  /* re-init buffer index sequencers */
940
0
  c->head_workitem = 0;
941
0
  c->tail_workitem = 0;
942
0
  c->head_response = 0;
943
0
  c->tail_response = 0;
944
945
0
  c->reusable = TRUE;
946
0
}
947
948
949
#else /* !WORK_THREAD follows */
950
char work_thread_nonempty_compilation_unit;
951
#endif