Coverage Report

Created: 2023-03-26 06:07

/src/unbound/util/tube.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * util/tube.c - pipe service
3
 *
4
 * Copyright (c) 2008, NLnet Labs. All rights reserved.
5
 *
6
 * This software is open source.
7
 * 
8
 * Redistribution and use in source and binary forms, with or without
9
 * modification, are permitted provided that the following conditions
10
 * are met:
11
 * 
12
 * Redistributions of source code must retain the above copyright notice,
13
 * this list of conditions and the following disclaimer.
14
 * 
15
 * Redistributions in binary form must reproduce the above copyright notice,
16
 * this list of conditions and the following disclaimer in the documentation
17
 * and/or other materials provided with the distribution.
18
 * 
19
 * Neither the name of the NLNET LABS nor the names of its contributors may
20
 * be used to endorse or promote products derived from this software without
21
 * specific prior written permission.
22
 * 
23
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27
 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29
 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34
 */
35
36
/**
37
 * \file
38
 *
39
 * This file contains pipe service functions.
40
 */
41
#include "config.h"
42
#include "util/tube.h"
43
#include "util/log.h"
44
#include "util/net_help.h"
45
#include "util/netevent.h"
46
#include "util/fptr_wlist.h"
47
#include "util/ub_event.h"
48
#ifdef HAVE_POLL_H
49
#include <poll.h>
50
#endif
51
52
#ifndef USE_WINSOCK
53
/* on unix */
54
55
#ifndef HAVE_SOCKETPAIR
56
/** no socketpair() available, like on Minix 3.1.7, use pipe */
57
#define socketpair(f, t, p, sv) pipe(sv) 
58
#endif /* HAVE_SOCKETPAIR */
59
60
struct tube* tube_create(void)
61
0
{
62
0
  struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
63
0
  int sv[2];
64
0
  if(!tube) {
65
0
    int err = errno;
66
0
    log_err("tube_create: out of memory");
67
0
    errno = err;
68
0
    return NULL;
69
0
  }
70
0
  tube->sr = -1;
71
0
  tube->sw = -1;
72
0
  if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
73
0
    int err = errno;
74
0
    log_err("socketpair: %s", strerror(errno));
75
0
    free(tube);
76
0
    errno = err;
77
0
    return NULL;
78
0
  }
79
0
  tube->sr = sv[0];
80
0
  tube->sw = sv[1];
81
0
  if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
82
0
    int err = errno;
83
0
    log_err("tube: cannot set nonblocking");
84
0
    tube_delete(tube);
85
0
    errno = err;
86
0
    return NULL;
87
0
  }
88
0
  return tube;
89
0
}
90
91
void tube_delete(struct tube* tube)
92
0
{
93
0
  if(!tube) return;
94
0
  tube_remove_bg_listen(tube);
95
0
  tube_remove_bg_write(tube);
96
  /* close fds after deleting commpoints, to be sure.
97
   *            Also epoll does not like closing fd before event_del */
98
0
  tube_close_read(tube);
99
0
  tube_close_write(tube);
100
0
  free(tube);
101
0
}
102
103
void tube_close_read(struct tube* tube)
104
0
{
105
0
  if(tube->sr != -1) {
106
0
    close(tube->sr);
107
0
    tube->sr = -1;
108
0
  }
109
0
}
110
111
void tube_close_write(struct tube* tube)
112
0
{
113
0
  if(tube->sw != -1) {
114
0
    close(tube->sw);
115
0
    tube->sw = -1;
116
0
  }
117
0
}
118
119
void tube_remove_bg_listen(struct tube* tube)
120
0
{
121
0
  if(tube->listen_com) {
122
0
    comm_point_delete(tube->listen_com);
123
0
    tube->listen_com = NULL;
124
0
  }
125
0
  free(tube->cmd_msg);
126
0
  tube->cmd_msg = NULL;
127
0
}
128
129
void tube_remove_bg_write(struct tube* tube)
130
0
{
131
0
  if(tube->res_com) {
132
0
    comm_point_delete(tube->res_com);
133
0
    tube->res_com = NULL;
134
0
  }
135
0
  if(tube->res_list) {
136
0
    struct tube_res_list* np, *p = tube->res_list;
137
0
    tube->res_list = NULL;
138
0
    tube->res_last = NULL;
139
0
    while(p) {
140
0
      np = p->next;
141
0
      free(p->buf);
142
0
      free(p);
143
0
      p = np;
144
0
    }
145
0
  }
146
0
}
147
148
int
149
tube_handle_listen(struct comm_point* c, void* arg, int error,
150
        struct comm_reply* ATTR_UNUSED(reply_info))
151
0
{
152
0
  struct tube* tube = (struct tube*)arg;
153
0
  ssize_t r;
154
0
  if(error != NETEVENT_NOERROR) {
155
0
    fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
156
0
    (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
157
0
    return 0;
158
0
  }
159
160
0
  if(tube->cmd_read < sizeof(tube->cmd_len)) {
161
    /* complete reading the length of control msg */
162
0
    r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
163
0
      sizeof(tube->cmd_len) - tube->cmd_read);
164
0
    if(r==0) {
165
      /* error has happened or */
166
      /* parent closed pipe, must have exited somehow */
167
0
      fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
168
0
      (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 
169
0
        tube->listen_arg);
170
0
      return 0;
171
0
    }
172
0
    if(r==-1) {
173
0
      if(errno != EAGAIN && errno != EINTR) {
174
0
        log_err("rpipe error: %s", strerror(errno));
175
0
      }
176
      /* nothing to read now, try later */
177
0
      return 0;
178
0
    }
179
0
    tube->cmd_read += r;
180
0
    if(tube->cmd_read < sizeof(tube->cmd_len)) {
181
      /* not complete, try later */
182
0
      return 0;
183
0
    }
184
0
    tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
185
0
    if(!tube->cmd_msg) {
186
0
      log_err("malloc failure");
187
0
      tube->cmd_read = 0;
188
0
      return 0;
189
0
    }
190
0
  }
191
  /* cmd_len has been read, read remainder */
192
0
  r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
193
0
    tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
194
0
  if(r==0) {
195
    /* error has happened or */
196
    /* parent closed pipe, must have exited somehow */
197
0
    fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
198
0
    (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 
199
0
      tube->listen_arg);
200
0
    return 0;
201
0
  }
202
0
  if(r==-1) {
203
    /* nothing to read now, try later */
204
0
    if(errno != EAGAIN && errno != EINTR) {
205
0
      log_err("rpipe error: %s", strerror(errno));
206
0
    }
207
0
    return 0;
208
0
  }
209
0
  tube->cmd_read += r;
210
0
  if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
211
    /* not complete, try later */
212
0
    return 0;
213
0
  }
214
0
  tube->cmd_read = 0;
215
216
0
  fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
217
0
  (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len, 
218
0
    NETEVENT_NOERROR, tube->listen_arg);
219
    /* also frees the buf */
220
0
  tube->cmd_msg = NULL;
221
0
  return 0;
222
0
}
223
224
int
225
tube_handle_write(struct comm_point* c, void* arg, int error,
226
        struct comm_reply* ATTR_UNUSED(reply_info))
227
0
{
228
0
  struct tube* tube = (struct tube*)arg;
229
0
  struct tube_res_list* item = tube->res_list;
230
0
  ssize_t r;
231
0
  if(error != NETEVENT_NOERROR) {
232
0
    log_err("tube_handle_write net error %d", error);
233
0
    return 0;
234
0
  }
235
236
0
  if(!item) {
237
0
    comm_point_stop_listening(c);
238
0
    return 0;
239
0
  }
240
241
0
  if(tube->res_write < sizeof(item->len)) {
242
0
    r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
243
0
      sizeof(item->len) - tube->res_write);
244
0
    if(r == -1) {
245
0
      if(errno != EAGAIN && errno != EINTR) {
246
0
        log_err("wpipe error: %s", strerror(errno));
247
0
      }
248
0
      return 0; /* try again later */
249
0
    }
250
0
    if(r == 0) {
251
      /* error on pipe, must have exited somehow */
252
      /* cannot signal this to pipe user */
253
0
      return 0;
254
0
    }
255
0
    tube->res_write += r;
256
0
    if(tube->res_write < sizeof(item->len))
257
0
      return 0;
258
0
  }
259
0
  r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
260
0
    item->len - (tube->res_write - sizeof(item->len)));
261
0
  if(r == -1) {
262
0
    if(errno != EAGAIN && errno != EINTR) {
263
0
      log_err("wpipe error: %s", strerror(errno));
264
0
    }
265
0
    return 0; /* try again later */
266
0
  }
267
0
  if(r == 0) {
268
    /* error on pipe, must have exited somehow */
269
    /* cannot signal this to pipe user */
270
0
    return 0;
271
0
  }
272
0
  tube->res_write += r;
273
0
  if(tube->res_write < sizeof(item->len) + item->len)
274
0
    return 0;
275
  /* done this result, remove it */
276
0
  free(item->buf);
277
0
  item->buf = NULL;
278
0
  tube->res_list = tube->res_list->next;
279
0
  free(item);
280
0
  if(!tube->res_list) {
281
0
    tube->res_last = NULL;
282
0
    comm_point_stop_listening(c);
283
0
  }
284
0
  tube->res_write = 0;
285
0
  return 0;
286
0
}
287
288
int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 
289
        int nonblock)
290
0
{
291
0
  ssize_t r, d;
292
0
  int fd = tube->sw;
293
294
  /* test */
295
0
  if(nonblock) {
296
0
    r = write(fd, &len, sizeof(len));
297
0
    if(r == -1) {
298
0
      if(errno==EINTR || errno==EAGAIN)
299
0
        return -1;
300
0
      log_err("tube msg write failed: %s", strerror(errno));
301
0
      return -1; /* can still continue, perhaps */
302
0
    }
303
0
  } else r = 0;
304
0
  if(!fd_set_block(fd))
305
0
    return 0;
306
  /* write remainder */
307
0
  d = r;
308
0
  while(d != (ssize_t)sizeof(len)) {
309
0
    if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) {
310
0
      if(errno == EAGAIN)
311
0
        continue; /* temporarily unavail: try again*/
312
0
      log_err("tube msg write failed: %s", strerror(errno));
313
0
      (void)fd_set_nonblock(fd);
314
0
      return 0;
315
0
    }
316
0
    d += r;
317
0
  }
318
0
  d = 0;
319
0
  while(d != (ssize_t)len) {
320
0
    if((r=write(fd, buf+d, len-d)) == -1) {
321
0
      if(errno == EAGAIN)
322
0
        continue; /* temporarily unavail: try again*/
323
0
      log_err("tube msg write failed: %s", strerror(errno));
324
0
      (void)fd_set_nonblock(fd);
325
0
      return 0;
326
0
    }
327
0
    d += r;
328
0
  }
329
0
  if(!fd_set_nonblock(fd))
330
0
    return 0;
331
0
  return 1;
332
0
}
333
334
int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 
335
        int nonblock)
336
0
{
337
0
  ssize_t r, d;
338
0
  int fd = tube->sr;
339
340
  /* test */
341
0
  *len = 0;
342
0
  if(nonblock) {
343
0
    r = read(fd, len, sizeof(*len));
344
0
    if(r == -1) {
345
0
      if(errno==EINTR || errno==EAGAIN)
346
0
        return -1;
347
0
      log_err("tube msg read failed: %s", strerror(errno));
348
0
      return -1; /* we can still continue, perhaps */
349
0
    }
350
0
    if(r == 0) /* EOF */
351
0
      return 0;
352
0
  } else r = 0;
353
0
  if(!fd_set_block(fd))
354
0
    return 0;
355
  /* read remainder */
356
0
  d = r;
357
0
  while(d != (ssize_t)sizeof(*len)) {
358
0
    if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) {
359
0
      log_err("tube msg read failed: %s", strerror(errno));
360
0
      (void)fd_set_nonblock(fd);
361
0
      return 0;
362
0
    }
363
0
    if(r == 0) /* EOF */ {
364
0
      (void)fd_set_nonblock(fd);
365
0
      return 0;
366
0
    }
367
0
    d += r;
368
0
  }
369
0
  if (*len >= 65536*2) {
370
0
    log_err("tube msg length %u is too big", (unsigned)*len);
371
0
    (void)fd_set_nonblock(fd);
372
0
    return 0;
373
0
  }
374
0
  *buf = (uint8_t*)malloc(*len);
375
0
  if(!*buf) {
376
0
    log_err("tube read out of memory");
377
0
    (void)fd_set_nonblock(fd);
378
0
    return 0;
379
0
  }
380
0
  d = 0;
381
0
  while(d < (ssize_t)*len) {
382
0
    if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) {
383
0
      log_err("tube msg read failed: %s", strerror(errno));
384
0
      (void)fd_set_nonblock(fd);
385
0
      free(*buf);
386
0
      return 0;
387
0
    }
388
0
    if(r == 0) { /* EOF */
389
0
      (void)fd_set_nonblock(fd);
390
0
      free(*buf);
391
0
      return 0;
392
0
    }
393
0
    d += r;
394
0
  }
395
0
  if(!fd_set_nonblock(fd)) {
396
0
    free(*buf);
397
0
    return 0;
398
0
  }
399
0
  return 1;
400
0
}
401
402
/** perform poll() on the fd */
403
static int
404
pollit(int fd, struct timeval* t)
405
0
{
406
0
  struct pollfd fds;
407
0
  int pret;
408
0
  int msec = -1;
409
0
  memset(&fds, 0, sizeof(fds));
410
0
  fds.fd = fd;
411
0
  fds.events = POLLIN | POLLERR | POLLHUP;
412
0
#ifndef S_SPLINT_S
413
0
  if(t)
414
0
    msec = t->tv_sec*1000 + t->tv_usec/1000;
415
0
#endif
416
417
0
  pret = poll(&fds, 1, msec);
418
419
0
  if(pret == -1)
420
0
    return 0;
421
0
  if(pret != 0)
422
0
    return 1;
423
0
  return 0;
424
0
}
425
426
int tube_poll(struct tube* tube)
427
0
{
428
0
  struct timeval t;
429
0
  memset(&t, 0, sizeof(t));
430
0
  return pollit(tube->sr, &t);
431
0
}
432
433
int tube_wait(struct tube* tube)
434
0
{
435
0
  return pollit(tube->sr, NULL);
436
0
}
437
438
int tube_wait_timeout(struct tube* tube, int msec)
439
0
{
440
0
  int ret = 0;
441
442
0
  while(1) {
443
0
    struct pollfd fds;
444
0
    memset(&fds, 0, sizeof(fds));
445
446
0
    fds.fd = tube->sr;
447
0
    fds.events = POLLIN | POLLERR | POLLHUP;
448
0
    ret = poll(&fds, 1, msec);
449
450
0
    if(ret == -1) {
451
0
      if(errno == EAGAIN || errno == EINTR)
452
0
        continue;
453
0
      return -1;
454
0
    }
455
0
    break;
456
0
  }
457
458
0
  if(ret != 0)
459
0
    return 1;
460
0
  return 0;
461
0
}
462
463
int tube_read_fd(struct tube* tube)
464
0
{
465
0
  return tube->sr;
466
0
}
467
468
int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
469
        tube_callback_type* cb, void* arg)
470
0
{
471
0
  tube->listen_cb = cb;
472
0
  tube->listen_arg = arg;
473
0
  if(!(tube->listen_com = comm_point_create_raw(base, tube->sr, 
474
0
    0, tube_handle_listen, tube))) {
475
0
    int err = errno;
476
0
    log_err("tube_setup_bg_l: commpoint creation failed");
477
0
    errno = err;
478
0
    return 0;
479
0
  }
480
0
  return 1;
481
0
}
482
483
int tube_setup_bg_write(struct tube* tube, struct comm_base* base)
484
0
{
485
0
  if(!(tube->res_com = comm_point_create_raw(base, tube->sw, 
486
0
    1, tube_handle_write, tube))) {
487
0
    int err = errno;
488
0
    log_err("tube_setup_bg_w: commpoint creation failed");
489
0
    errno = err;
490
0
    return 0;
491
0
  }
492
0
  return 1;
493
0
}
494
495
int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
496
0
{
497
0
  struct tube_res_list* item;
498
0
  if(!tube || !tube->res_com) return 0;
499
0
  item = (struct tube_res_list*)malloc(sizeof(*item));
500
0
  if(!item) {
501
0
    free(msg);
502
0
    log_err("out of memory for async answer");
503
0
    return 0;
504
0
  }
505
0
  item->buf = msg;
506
0
  item->len = len;
507
0
  item->next = NULL;
508
  /* add at back of list, since the first one may be partially written */
509
0
  if(tube->res_last)
510
0
    tube->res_last->next = item;
511
0
  else    tube->res_list = item;
512
0
  tube->res_last = item;
513
0
  if(tube->res_list == tube->res_last) {
514
    /* first added item, start the write process */
515
0
    comm_point_start_listening(tube->res_com, -1, -1);
516
0
  }
517
0
  return 1;
518
0
}
519
520
void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 
521
  void* ATTR_UNUSED(arg))
522
0
{
523
0
  log_assert(0);
524
0
}
525
526
#else /* USE_WINSOCK */
527
/* on windows */
528
529
530
struct tube* tube_create(void)
531
{
532
  /* windows does not have forks like unix, so we only support
533
   * threads on windows. And thus the pipe need only connect
534
   * threads. We use a mutex and a list of datagrams. */
535
  struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
536
  if(!tube) {
537
    int err = errno;
538
    log_err("tube_create: out of memory");
539
    errno = err;
540
    return NULL;
541
  }
542
  tube->event = WSACreateEvent();
543
  if(tube->event == WSA_INVALID_EVENT) {
544
    free(tube);
545
    log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError()));
546
    return NULL;
547
  }
548
  if(!WSAResetEvent(tube->event)) {
549
    log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError()));
550
  }
551
  lock_basic_init(&tube->res_lock);
552
  verbose(VERB_ALGO, "tube created");
553
  return tube;
554
}
555
556
void tube_delete(struct tube* tube)
557
{
558
  if(!tube) return;
559
  tube_remove_bg_listen(tube);
560
  tube_remove_bg_write(tube);
561
  tube_close_read(tube);
562
  tube_close_write(tube);
563
  if(!WSACloseEvent(tube->event))
564
    log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError()));
565
  lock_basic_destroy(&tube->res_lock);
566
  verbose(VERB_ALGO, "tube deleted");
567
  free(tube);
568
}
569
570
void tube_close_read(struct tube* ATTR_UNUSED(tube))
571
{
572
  verbose(VERB_ALGO, "tube close_read");
573
}
574
575
void tube_close_write(struct tube* ATTR_UNUSED(tube))
576
{
577
  verbose(VERB_ALGO, "tube close_write");
578
  /* wake up waiting reader with an empty queue */
579
  if(!WSASetEvent(tube->event)) {
580
    log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
581
  }
582
}
583
584
void tube_remove_bg_listen(struct tube* tube)
585
{
586
  verbose(VERB_ALGO, "tube remove_bg_listen");
587
  ub_winsock_unregister_wsaevent(tube->ev_listen);
588
}
589
590
void tube_remove_bg_write(struct tube* tube)
591
{
592
  verbose(VERB_ALGO, "tube remove_bg_write");
593
  if(tube->res_list) {
594
    struct tube_res_list* np, *p = tube->res_list;
595
    tube->res_list = NULL;
596
    tube->res_last = NULL;
597
    while(p) {
598
      np = p->next;
599
      free(p->buf);
600
      free(p);
601
      p = np;
602
    }
603
  }
604
}
605
606
int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 
607
        int ATTR_UNUSED(nonblock))
608
{
609
  uint8_t* a;
610
  verbose(VERB_ALGO, "tube write_msg len %d", (int)len);
611
  a = (uint8_t*)memdup(buf, len);
612
  if(!a) {
613
    log_err("out of memory in tube_write_msg");
614
    return 0;
615
  }
616
  /* always nonblocking, this pipe cannot get full */
617
  return tube_queue_item(tube, a, len);
618
}
619
620
int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 
621
        int nonblock)
622
{
623
  struct tube_res_list* item = NULL;
624
  verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking");
625
  *buf = NULL;
626
  if(!tube_poll(tube)) {
627
    verbose(VERB_ALGO, "tube read_msg nodata");
628
    /* nothing ready right now, wait if we want to */
629
    if(nonblock)
630
      return -1; /* would block waiting for items */
631
    if(!tube_wait(tube))
632
      return 0;
633
  }
634
  lock_basic_lock(&tube->res_lock);
635
  if(tube->res_list) {
636
    item = tube->res_list;
637
    tube->res_list = item->next;
638
    if(tube->res_last == item) {
639
      /* the list is now empty */
640
      tube->res_last = NULL;
641
      verbose(VERB_ALGO, "tube read_msg lastdata");
642
      if(!WSAResetEvent(tube->event)) {
643
        log_err("WSAResetEvent: %s", 
644
          wsa_strerror(WSAGetLastError()));
645
      }
646
    }
647
  }
648
  lock_basic_unlock(&tube->res_lock);
649
  if(!item)
650
    return 0; /* would block waiting for items */
651
  *buf = item->buf;
652
  *len = item->len;
653
  free(item);
654
  verbose(VERB_ALGO, "tube read_msg len %d", (int)*len);
655
  return 1;
656
}
657
658
int tube_poll(struct tube* tube)
659
{
660
  struct tube_res_list* item = NULL;
661
  lock_basic_lock(&tube->res_lock);
662
  item = tube->res_list;
663
  lock_basic_unlock(&tube->res_lock);
664
  if(item)
665
    return 1;
666
  return 0;
667
}
668
669
int tube_wait(struct tube* tube)
670
{
671
  /* block on eventhandle */
672
  DWORD res = WSAWaitForMultipleEvents(
673
    1 /* one event in array */, 
674
    &tube->event /* the event to wait for, our pipe signal */, 
675
    0 /* wait for all events is false */, 
676
    WSA_INFINITE /* wait, no timeout */,
677
    0 /* we are not alertable for IO completion routines */
678
    );
679
  if(res == WSA_WAIT_TIMEOUT) {
680
    return 0;
681
  }
682
  if(res == WSA_WAIT_IO_COMPLETION) {
683
    /* a bit unexpected, since we were not alertable */
684
    return 0;
685
  }
686
  return 1;
687
}
688
689
int tube_wait_timeout(struct tube* tube, int msec)
690
{
691
  /* block on eventhandle */
692
  DWORD res = WSAWaitForMultipleEvents(
693
    1 /* one event in array */,
694
    &tube->event /* the event to wait for, our pipe signal */,
695
    0 /* wait for all events is false */,
696
    msec /* wait for timeout */,
697
    0 /* we are not alertable for IO completion routines */
698
    );
699
  if(res == WSA_WAIT_TIMEOUT) {
700
    return 0;
701
  }
702
  if(res == WSA_WAIT_IO_COMPLETION) {
703
    /* a bit unexpected, since we were not alertable */
704
    return -1;
705
  }
706
  return 1;
707
}
708
709
int tube_read_fd(struct tube* ATTR_UNUSED(tube))
710
{
711
  /* nothing sensible on Windows */
712
  return -1;
713
}
714
715
int
716
tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 
717
  int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
718
{
719
  log_assert(0);
720
  return 0;
721
}
722
723
int
724
tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 
725
  int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
726
{
727
  log_assert(0);
728
  return 0;
729
}
730
731
int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
732
        tube_callback_type* cb, void* arg)
733
{
734
  tube->listen_cb = cb;
735
  tube->listen_arg = arg;
736
  if(!comm_base_internal(base))
737
    return 1; /* ignore when no comm base - testing */
738
  tube->ev_listen = ub_winsock_register_wsaevent(
739
      comm_base_internal(base), tube->event, &tube_handle_signal, tube);
740
  return tube->ev_listen ? 1 : 0;
741
}
742
743
int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube), 
744
  struct comm_base* ATTR_UNUSED(base))
745
{
746
  /* the queue item routine performs the signaling */
747
  return 1;
748
}
749
750
int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
751
{
752
  struct tube_res_list* item;
753
  if(!tube) return 0;
754
  item = (struct tube_res_list*)malloc(sizeof(*item));
755
  verbose(VERB_ALGO, "tube queue_item len %d", (int)len);
756
  if(!item) {
757
    free(msg);
758
    log_err("out of memory for async answer");
759
    return 0;
760
  }
761
  item->buf = msg;
762
  item->len = len;
763
  item->next = NULL;
764
  lock_basic_lock(&tube->res_lock);
765
  /* add at back of list, since the first one may be partially written */
766
  if(tube->res_last)
767
    tube->res_last->next = item;
768
  else    tube->res_list = item;
769
  tube->res_last = item;
770
  /* signal the eventhandle */
771
  if(!WSASetEvent(tube->event)) {
772
    log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
773
  }
774
  lock_basic_unlock(&tube->res_lock);
775
  return 1;
776
}
777
778
void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 
779
  void* arg)
780
{
781
  struct tube* tube = (struct tube*)arg;
782
  uint8_t* buf;
783
  uint32_t len = 0;
784
  verbose(VERB_ALGO, "tube handle_signal");
785
  while(tube_poll(tube)) {
786
    if(tube_read_msg(tube, &buf, &len, 1)) {
787
      fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
788
      (*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR, 
789
        tube->listen_arg);
790
    }
791
  }
792
}
793
794
#endif /* USE_WINSOCK */