Coverage Report

Created: 2025-08-28 06:16

/src/unbound/util/tube.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * util/tube.c - pipe service
3
 *
4
 * Copyright (c) 2008, NLnet Labs. All rights reserved.
5
 *
6
 * This software is open source.
7
 * 
8
 * Redistribution and use in source and binary forms, with or without
9
 * modification, are permitted provided that the following conditions
10
 * are met:
11
 * 
12
 * Redistributions of source code must retain the above copyright notice,
13
 * this list of conditions and the following disclaimer.
14
 * 
15
 * Redistributions in binary form must reproduce the above copyright notice,
16
 * this list of conditions and the following disclaimer in the documentation
17
 * and/or other materials provided with the distribution.
18
 * 
19
 * Neither the name of the NLNET LABS nor the names of its contributors may
20
 * be used to endorse or promote products derived from this software without
21
 * specific prior written permission.
22
 * 
23
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27
 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29
 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34
 */
35
36
/**
37
 * \file
38
 *
39
 * This file contains pipe service functions.
40
 */
41
#include "config.h"
42
#include "util/tube.h"
43
#include "util/log.h"
44
#include "util/net_help.h"
45
#include "util/netevent.h"
46
#include "util/fptr_wlist.h"
47
#include "util/ub_event.h"
48
#ifdef HAVE_POLL_H
49
#include <poll.h>
50
#endif
51
52
#ifndef USE_WINSOCK
53
/* on unix */
54
55
#ifndef HAVE_SOCKETPAIR
56
/** no socketpair() available, like on Minix 3.1.7, use pipe */
57
#define socketpair(f, t, p, sv) pipe(sv) 
58
#endif /* HAVE_SOCKETPAIR */
59
60
struct tube* tube_create(void)
61
0
{
62
0
  struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
63
0
  int sv[2];
64
0
  if(!tube) {
65
0
    int err = errno;
66
0
    log_err("tube_create: out of memory");
67
0
    errno = err;
68
0
    return NULL;
69
0
  }
70
0
  tube->sr = -1;
71
0
  tube->sw = -1;
72
0
  if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
73
0
    int err = errno;
74
0
    log_err("socketpair: %s", strerror(errno));
75
0
    free(tube);
76
0
    errno = err;
77
0
    return NULL;
78
0
  }
79
0
  tube->sr = sv[0];
80
0
  tube->sw = sv[1];
81
0
  if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
82
0
    int err = errno;
83
0
    log_err("tube: cannot set nonblocking");
84
0
    tube_delete(tube);
85
0
    errno = err;
86
0
    return NULL;
87
0
  }
88
0
  return tube;
89
0
}
90
91
void tube_delete(struct tube* tube)
92
0
{
93
0
  if(!tube) return;
94
0
  tube_remove_bg_listen(tube);
95
0
  tube_remove_bg_write(tube);
96
  /* close fds after deleting commpoints, to be sure.
97
   *            Also epoll does not like closing fd before event_del */
98
0
  tube_close_read(tube);
99
0
  tube_close_write(tube);
100
0
  free(tube);
101
0
}
102
103
void tube_close_read(struct tube* tube)
104
0
{
105
0
  if(tube->sr != -1) {
106
0
    close(tube->sr);
107
0
    tube->sr = -1;
108
0
  }
109
0
}
110
111
void tube_close_write(struct tube* tube)
112
0
{
113
0
  if(tube->sw != -1) {
114
0
    close(tube->sw);
115
0
    tube->sw = -1;
116
0
  }
117
0
}
118
119
void tube_remove_bg_listen(struct tube* tube)
120
0
{
121
0
  if(tube->listen_com) {
122
0
    comm_point_delete(tube->listen_com);
123
0
    tube->listen_com = NULL;
124
0
  }
125
0
  free(tube->cmd_msg);
126
0
  tube->cmd_msg = NULL;
127
0
}
128
129
void tube_remove_bg_write(struct tube* tube)
130
0
{
131
0
  if(tube->res_com) {
132
0
    comm_point_delete(tube->res_com);
133
0
    tube->res_com = NULL;
134
0
  }
135
0
  if(tube->res_list) {
136
0
    struct tube_res_list* np, *p = tube->res_list;
137
0
    tube->res_list = NULL;
138
0
    tube->res_last = NULL;
139
0
    while(p) {
140
0
      np = p->next;
141
0
      free(p->buf);
142
0
      free(p);
143
0
      p = np;
144
0
    }
145
0
  }
146
0
}
147
148
int
149
tube_handle_listen(struct comm_point* c, void* arg, int error,
150
        struct comm_reply* ATTR_UNUSED(reply_info))
151
0
{
152
0
  struct tube* tube = (struct tube*)arg;
153
0
  ssize_t r;
154
0
  if(error != NETEVENT_NOERROR) {
155
0
    fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
156
0
    (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
157
0
    return 0;
158
0
  }
159
160
0
  if(tube->cmd_read < sizeof(tube->cmd_len)) {
161
    /* complete reading the length of control msg */
162
0
    r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
163
0
      sizeof(tube->cmd_len) - tube->cmd_read);
164
0
    if(r==0) {
165
      /* error has happened or */
166
      /* parent closed pipe, must have exited somehow */
167
0
      fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
168
0
      (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 
169
0
        tube->listen_arg);
170
0
      return 0;
171
0
    }
172
0
    if(r==-1) {
173
0
      if(errno != EAGAIN && errno != EINTR) {
174
0
        log_err("rpipe error: %s", strerror(errno));
175
0
      }
176
      /* nothing to read now, try later */
177
0
      return 0;
178
0
    }
179
0
    tube->cmd_read += r;
180
0
    if(tube->cmd_read < sizeof(tube->cmd_len)) {
181
      /* not complete, try later */
182
0
      return 0;
183
0
    }
184
0
    tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
185
0
    if(!tube->cmd_msg) {
186
0
      log_err("malloc failure");
187
0
      tube->cmd_read = 0;
188
0
      return 0;
189
0
    }
190
0
  }
191
  /* cmd_len has been read, read remainder */
192
0
  r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
193
0
    tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
194
0
  if(r==0) {
195
    /* error has happened or */
196
    /* parent closed pipe, must have exited somehow */
197
0
    fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
198
0
    (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 
199
0
      tube->listen_arg);
200
0
    return 0;
201
0
  }
202
0
  if(r==-1) {
203
    /* nothing to read now, try later */
204
0
    if(errno != EAGAIN && errno != EINTR) {
205
0
      log_err("rpipe error: %s", strerror(errno));
206
0
    }
207
0
    return 0;
208
0
  }
209
0
  tube->cmd_read += r;
210
0
  if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
211
    /* not complete, try later */
212
0
    return 0;
213
0
  }
214
0
  tube->cmd_read = 0;
215
216
0
  fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
217
0
  (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len, 
218
0
    NETEVENT_NOERROR, tube->listen_arg);
219
    /* also frees the buf */
220
0
  tube->cmd_msg = NULL;
221
0
  return 0;
222
0
}
223
224
int
225
tube_handle_write(struct comm_point* c, void* arg, int error,
226
        struct comm_reply* ATTR_UNUSED(reply_info))
227
0
{
228
0
  struct tube* tube = (struct tube*)arg;
229
0
  struct tube_res_list* item = tube->res_list;
230
0
  ssize_t r;
231
0
  if(error != NETEVENT_NOERROR) {
232
0
    log_err("tube_handle_write net error %d", error);
233
0
    return 0;
234
0
  }
235
236
0
  if(!item) {
237
0
    comm_point_stop_listening(c);
238
0
    return 0;
239
0
  }
240
241
0
  if(tube->res_write < sizeof(item->len)) {
242
0
    r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
243
0
      sizeof(item->len) - tube->res_write);
244
0
    if(r == -1) {
245
0
      if(errno != EAGAIN && errno != EINTR) {
246
0
        log_err("wpipe error: %s", strerror(errno));
247
0
      }
248
0
      return 0; /* try again later */
249
0
    }
250
0
    if(r == 0) {
251
      /* error on pipe, must have exited somehow */
252
      /* cannot signal this to pipe user */
253
0
      return 0;
254
0
    }
255
0
    tube->res_write += r;
256
0
    if(tube->res_write < sizeof(item->len))
257
0
      return 0;
258
0
  }
259
0
  r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
260
0
    item->len - (tube->res_write - sizeof(item->len)));
261
0
  if(r == -1) {
262
0
    if(errno != EAGAIN && errno != EINTR) {
263
0
      log_err("wpipe error: %s", strerror(errno));
264
0
    }
265
0
    return 0; /* try again later */
266
0
  }
267
0
  if(r == 0) {
268
    /* error on pipe, must have exited somehow */
269
    /* cannot signal this to pipe user */
270
0
    return 0;
271
0
  }
272
0
  tube->res_write += r;
273
0
  if(tube->res_write < sizeof(item->len) + item->len)
274
0
    return 0;
275
  /* done this result, remove it */
276
0
  free(item->buf);
277
0
  item->buf = NULL;
278
0
  tube->res_list = tube->res_list->next;
279
0
  free(item);
280
0
  if(!tube->res_list) {
281
0
    tube->res_last = NULL;
282
0
    comm_point_stop_listening(c);
283
0
  }
284
0
  tube->res_write = 0;
285
0
  return 0;
286
0
}
287
288
int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 
289
        int nonblock)
290
0
{
291
0
  ssize_t r, d;
292
0
  int fd = tube->sw;
293
294
  /* test */
295
0
  if(nonblock) {
296
0
    r = write(fd, &len, sizeof(len));
297
0
    if(r == -1) {
298
0
      if(errno==EINTR || errno==EAGAIN)
299
0
        return -1;
300
0
      log_err("tube msg write failed: %s", strerror(errno));
301
0
      return -1; /* can still continue, perhaps */
302
0
    }
303
0
  } else r = 0;
304
0
  if(!fd_set_block(fd))
305
0
    return 0;
306
  /* write remainder */
307
0
  d = r;
308
0
  while(d != (ssize_t)sizeof(len)) {
309
0
    if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) {
310
0
      if(errno == EAGAIN)
311
0
        continue; /* temporarily unavail: try again*/
312
0
      log_err("tube msg write failed: %s", strerror(errno));
313
0
      (void)fd_set_nonblock(fd);
314
0
      return 0;
315
0
    }
316
0
    d += r;
317
0
  }
318
0
  d = 0;
319
0
  while(d != (ssize_t)len) {
320
0
    if((r=write(fd, buf+d, len-d)) == -1) {
321
0
      if(errno == EAGAIN)
322
0
        continue; /* temporarily unavail: try again*/
323
0
      log_err("tube msg write failed: %s", strerror(errno));
324
0
      (void)fd_set_nonblock(fd);
325
0
      return 0;
326
0
    }
327
0
    d += r;
328
0
  }
329
0
  if(!fd_set_nonblock(fd))
330
0
    return 0;
331
0
  return 1;
332
0
}
333
334
int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 
335
        int nonblock)
336
0
{
337
0
  ssize_t r, d;
338
0
  int fd = tube->sr;
339
340
  /* test */
341
0
  *len = 0;
342
0
  if(nonblock) {
343
0
    r = read(fd, len, sizeof(*len));
344
0
    if(r == -1) {
345
0
      if(errno==EINTR || errno==EAGAIN)
346
0
        return -1;
347
0
      log_err("tube msg read failed: %s", strerror(errno));
348
0
      return -1; /* we can still continue, perhaps */
349
0
    }
350
0
    if(r == 0) /* EOF */
351
0
      return 0;
352
0
  } else r = 0;
353
0
  if(!fd_set_block(fd))
354
0
    return 0;
355
  /* read remainder */
356
0
  d = r;
357
0
  while(d != (ssize_t)sizeof(*len)) {
358
0
    if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) {
359
0
      log_err("tube msg read failed: %s", strerror(errno));
360
0
      (void)fd_set_nonblock(fd);
361
0
      return 0;
362
0
    }
363
0
    if(r == 0) /* EOF */ {
364
0
      (void)fd_set_nonblock(fd);
365
0
      return 0;
366
0
    }
367
0
    d += r;
368
0
  }
369
0
  if (*len >= 65536*2) {
370
0
    log_err("tube msg length %u is too big", (unsigned)*len);
371
0
    (void)fd_set_nonblock(fd);
372
0
    return 0;
373
0
  }
374
0
  *buf = (uint8_t*)malloc(*len);
375
0
  if(!*buf) {
376
0
    log_err("tube read out of memory");
377
0
    (void)fd_set_nonblock(fd);
378
0
    return 0;
379
0
  }
380
0
  d = 0;
381
0
  while(d < (ssize_t)*len) {
382
0
    if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) {
383
0
      log_err("tube msg read failed: %s", strerror(errno));
384
0
      (void)fd_set_nonblock(fd);
385
0
      free(*buf);
386
0
      return 0;
387
0
    }
388
0
    if(r == 0) { /* EOF */
389
0
      (void)fd_set_nonblock(fd);
390
0
      free(*buf);
391
0
      return 0;
392
0
    }
393
0
    d += r;
394
0
  }
395
0
  if(!fd_set_nonblock(fd)) {
396
0
    free(*buf);
397
0
    return 0;
398
0
  }
399
0
  return 1;
400
0
}
401
402
/** perform poll() on the fd */
403
static int
404
pollit(int fd, struct timeval* t)
405
0
{
406
0
  struct pollfd fds;
407
0
  int pret;
408
0
  int msec = -1;
409
0
  memset(&fds, 0, sizeof(fds));
410
0
  fds.fd = fd;
411
0
  fds.events = POLLIN | POLLERR | POLLHUP;
412
0
#ifndef S_SPLINT_S
413
0
  if(t)
414
0
    msec = t->tv_sec*1000 + t->tv_usec/1000;
415
0
#endif
416
417
0
  pret = poll(&fds, 1, msec);
418
419
0
  if(pret == -1)
420
0
    return 0;
421
0
  if(pret != 0)
422
0
    return 1;
423
0
  return 0;
424
0
}
425
426
int tube_poll(struct tube* tube)
427
0
{
428
0
  struct timeval t;
429
0
  memset(&t, 0, sizeof(t));
430
0
  return pollit(tube->sr, &t);
431
0
}
432
433
int tube_wait(struct tube* tube)
434
0
{
435
0
  return pollit(tube->sr, NULL);
436
0
}
437
438
int tube_wait_timeout(struct tube* tube, int msec)
439
0
{
440
0
  int ret = 0;
441
442
0
  while(1) {
443
0
    struct pollfd fds;
444
0
    memset(&fds, 0, sizeof(fds));
445
446
0
    fds.fd = tube->sr;
447
0
    fds.events = POLLIN | POLLERR | POLLHUP;
448
0
    ret = poll(&fds, 1, msec);
449
450
0
    if(ret == -1) {
451
0
      if(errno == EAGAIN || errno == EINTR)
452
0
        continue;
453
0
      return -1;
454
0
    }
455
0
    break;
456
0
  }
457
458
0
  if(ret != 0)
459
0
    return 1;
460
0
  return 0;
461
0
}
462
463
int tube_read_fd(struct tube* tube)
464
0
{
465
0
  return tube->sr;
466
0
}
467
468
int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
469
        tube_callback_type* cb, void* arg)
470
0
{
471
0
  tube->listen_cb = cb;
472
0
  tube->listen_arg = arg;
473
0
  if(!(tube->listen_com = comm_point_create_raw(base, tube->sr, 
474
0
    0, tube_handle_listen, tube))) {
475
0
    int err = errno;
476
0
    log_err("tube_setup_bg_l: commpoint creation failed");
477
0
    errno = err;
478
0
    return 0;
479
0
  }
480
0
  return 1;
481
0
}
482
483
int tube_setup_bg_write(struct tube* tube, struct comm_base* base)
484
0
{
485
0
  if(!(tube->res_com = comm_point_create_raw(base, tube->sw, 
486
0
    1, tube_handle_write, tube))) {
487
0
    int err = errno;
488
0
    log_err("tube_setup_bg_w: commpoint creation failed");
489
0
    errno = err;
490
0
    return 0;
491
0
  }
492
0
  return 1;
493
0
}
494
495
int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
496
0
{
497
0
  struct tube_res_list* item;
498
0
  if(!tube || !tube->res_com) return 0;
499
0
  item = (struct tube_res_list*)malloc(sizeof(*item));
500
0
  if(!item) {
501
0
    free(msg);
502
0
    log_err("out of memory for async answer");
503
0
    return 0;
504
0
  }
505
0
  item->buf = msg;
506
0
  item->len = len;
507
0
  item->next = NULL;
508
  /* add at back of list, since the first one may be partially written */
509
0
  if(tube->res_last)
510
0
    tube->res_last->next = item;
511
0
  else    tube->res_list = item;
512
0
  tube->res_last = item;
513
0
  if(tube->res_list == tube->res_last) {
514
    /* first added item, start the write process */
515
0
    comm_point_start_listening(tube->res_com, -1, -1);
516
0
  }
517
0
  return 1;
518
0
}
519
520
void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 
521
  void* ATTR_UNUSED(arg))
522
0
{
523
0
  log_assert(0);
524
0
}
525
526
#else /* USE_WINSOCK */
527
/* on windows */
528
529
530
struct tube* tube_create(void)
531
{
532
  /* windows does not have forks like unix, so we only support
533
   * threads on windows. And thus the pipe need only connect
534
   * threads. We use a mutex and a list of datagrams. */
535
  struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
536
  if(!tube) {
537
    int err = errno;
538
    log_err("tube_create: out of memory");
539
    errno = err;
540
    return NULL;
541
  }
542
  tube->event = WSACreateEvent();
543
  if(tube->event == WSA_INVALID_EVENT) {
544
    free(tube);
545
    log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError()));
546
    return NULL;
547
  }
548
  if(!WSAResetEvent(tube->event)) {
549
    log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError()));
550
  }
551
  lock_basic_init(&tube->res_lock);
552
  verbose(VERB_ALGO, "tube created");
553
  return tube;
554
}
555
556
void tube_delete(struct tube* tube)
557
{
558
  if(!tube) return;
559
  tube_remove_bg_listen(tube);
560
  tube_remove_bg_write(tube);
561
  tube_close_read(tube);
562
  tube_close_write(tube);
563
  if(!WSACloseEvent(tube->event))
564
    log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError()));
565
  lock_basic_destroy(&tube->res_lock);
566
  verbose(VERB_ALGO, "tube deleted");
567
  free(tube);
568
}
569
570
void tube_close_read(struct tube* ATTR_UNUSED(tube))
571
{
572
  verbose(VERB_ALGO, "tube close_read");
573
}
574
575
void tube_close_write(struct tube* ATTR_UNUSED(tube))
576
{
577
  verbose(VERB_ALGO, "tube close_write");
578
  /* wake up waiting reader with an empty queue */
579
  if(!WSASetEvent(tube->event)) {
580
    log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
581
  }
582
}
583
584
void tube_remove_bg_listen(struct tube* tube)
585
{
586
  verbose(VERB_ALGO, "tube remove_bg_listen");
587
  if (tube->ev_listen != NULL) {
588
    ub_winsock_unregister_wsaevent(tube->ev_listen);
589
    tube->ev_listen = NULL;
590
  }
591
}
592
593
void tube_remove_bg_write(struct tube* tube)
594
{
595
  verbose(VERB_ALGO, "tube remove_bg_write");
596
  if(tube->res_list) {
597
    struct tube_res_list* np, *p = tube->res_list;
598
    tube->res_list = NULL;
599
    tube->res_last = NULL;
600
    while(p) {
601
      np = p->next;
602
      free(p->buf);
603
      free(p);
604
      p = np;
605
    }
606
  }
607
}
608
609
int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 
610
        int ATTR_UNUSED(nonblock))
611
{
612
  uint8_t* a;
613
  verbose(VERB_ALGO, "tube write_msg len %d", (int)len);
614
  a = (uint8_t*)memdup(buf, len);
615
  if(!a) {
616
    log_err("out of memory in tube_write_msg");
617
    return 0;
618
  }
619
  /* always nonblocking, this pipe cannot get full */
620
  return tube_queue_item(tube, a, len);
621
}
622
623
int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 
624
        int nonblock)
625
{
626
  struct tube_res_list* item = NULL;
627
  verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking");
628
  *buf = NULL;
629
  if(!tube_poll(tube)) {
630
    verbose(VERB_ALGO, "tube read_msg nodata");
631
    /* nothing ready right now, wait if we want to */
632
    if(nonblock)
633
      return -1; /* would block waiting for items */
634
    if(!tube_wait(tube))
635
      return 0;
636
  }
637
  lock_basic_lock(&tube->res_lock);
638
  if(tube->res_list) {
639
    item = tube->res_list;
640
    tube->res_list = item->next;
641
    if(tube->res_last == item) {
642
      /* the list is now empty */
643
      tube->res_last = NULL;
644
      verbose(VERB_ALGO, "tube read_msg lastdata");
645
      if(!WSAResetEvent(tube->event)) {
646
        log_err("WSAResetEvent: %s", 
647
          wsa_strerror(WSAGetLastError()));
648
      }
649
    }
650
  }
651
  lock_basic_unlock(&tube->res_lock);
652
  if(!item)
653
    return 0; /* would block waiting for items */
654
  *buf = item->buf;
655
  *len = item->len;
656
  free(item);
657
  verbose(VERB_ALGO, "tube read_msg len %d", (int)*len);
658
  return 1;
659
}
660
661
int tube_poll(struct tube* tube)
662
{
663
  struct tube_res_list* item = NULL;
664
  lock_basic_lock(&tube->res_lock);
665
  item = tube->res_list;
666
  lock_basic_unlock(&tube->res_lock);
667
  if(item)
668
    return 1;
669
  return 0;
670
}
671
672
int tube_wait(struct tube* tube)
673
{
674
  /* block on eventhandle */
675
  DWORD res = WSAWaitForMultipleEvents(
676
    1 /* one event in array */, 
677
    &tube->event /* the event to wait for, our pipe signal */, 
678
    0 /* wait for all events is false */, 
679
    WSA_INFINITE /* wait, no timeout */,
680
    0 /* we are not alertable for IO completion routines */
681
    );
682
  if(res == WSA_WAIT_TIMEOUT) {
683
    return 0;
684
  }
685
  if(res == WSA_WAIT_IO_COMPLETION) {
686
    /* a bit unexpected, since we were not alertable */
687
    return 0;
688
  }
689
  return 1;
690
}
691
692
int tube_wait_timeout(struct tube* tube, int msec)
693
{
694
  /* block on eventhandle */
695
  DWORD res = WSAWaitForMultipleEvents(
696
    1 /* one event in array */,
697
    &tube->event /* the event to wait for, our pipe signal */,
698
    0 /* wait for all events is false */,
699
    msec /* wait for timeout */,
700
    0 /* we are not alertable for IO completion routines */
701
    );
702
  if(res == WSA_WAIT_TIMEOUT) {
703
    return 0;
704
  }
705
  if(res == WSA_WAIT_IO_COMPLETION) {
706
    /* a bit unexpected, since we were not alertable */
707
    return -1;
708
  }
709
  return 1;
710
}
711
712
int tube_read_fd(struct tube* ATTR_UNUSED(tube))
713
{
714
  /* nothing sensible on Windows */
715
  return -1;
716
}
717
718
int
719
tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 
720
  int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
721
{
722
  log_assert(0);
723
  return 0;
724
}
725
726
int
727
tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 
728
  int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
729
{
730
  log_assert(0);
731
  return 0;
732
}
733
734
int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
735
        tube_callback_type* cb, void* arg)
736
{
737
  tube->listen_cb = cb;
738
  tube->listen_arg = arg;
739
  if(!comm_base_internal(base))
740
    return 1; /* ignore when no comm base - testing */
741
  tube->ev_listen = ub_winsock_register_wsaevent(
742
      comm_base_internal(base), tube->event, &tube_handle_signal, tube);
743
  return tube->ev_listen ? 1 : 0;
744
}
745
746
int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube), 
747
  struct comm_base* ATTR_UNUSED(base))
748
{
749
  /* the queue item routine performs the signaling */
750
  return 1;
751
}
752
753
int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
754
{
755
  struct tube_res_list* item;
756
  if(!tube) return 0;
757
  item = (struct tube_res_list*)malloc(sizeof(*item));
758
  verbose(VERB_ALGO, "tube queue_item len %d", (int)len);
759
  if(!item) {
760
    free(msg);
761
    log_err("out of memory for async answer");
762
    return 0;
763
  }
764
  item->buf = msg;
765
  item->len = len;
766
  item->next = NULL;
767
  lock_basic_lock(&tube->res_lock);
768
  /* add at back of list, since the first one may be partially written */
769
  if(tube->res_last)
770
    tube->res_last->next = item;
771
  else    tube->res_list = item;
772
  tube->res_last = item;
773
  /* signal the eventhandle */
774
  if(!WSASetEvent(tube->event)) {
775
    log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
776
  }
777
  lock_basic_unlock(&tube->res_lock);
778
  return 1;
779
}
780
781
void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 
782
  void* arg)
783
{
784
  struct tube* tube = (struct tube*)arg;
785
  uint8_t* buf;
786
  uint32_t len = 0;
787
  verbose(VERB_ALGO, "tube handle_signal");
788
  while(tube_poll(tube)) {
789
    if(tube_read_msg(tube, &buf, &len, 1)) {
790
      fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
791
      (*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR, 
792
        tube->listen_arg);
793
    }
794
  }
795
}
796
797
#endif /* USE_WINSOCK */