Coverage Report

Created: 2023-09-25 06:44

/src/rtpproxy/src/rtpp_command_async.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2010 Sippy Software, Inc., http://www.sippysoft.com
3
 * All rights reserved.
4
 *
5
 * Redistribution and use in source and binary forms, with or without
6
 * modification, are permitted provided that the following conditions
7
 * are met:
8
 * 1. Redistributions of source code must retain the above copyright
9
 *    notice, this list of conditions and the following disclaimer.
10
 * 2. Redistributions in binary form must reproduce the above copyright
11
 *    notice, this list of conditions and the following disclaimer in the
12
 *    documentation and/or other materials provided with the distribution.
13
 *
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
 * SUCH DAMAGE.
25
 *
26
 */
27
28
#if defined(LINUX_XXX) && !defined(_GNU_SOURCE)
29
#define _GNU_SOURCE /* pthread_setname_np() */
30
#endif
31
32
#include <sys/types.h>
33
#include <sys/socket.h>
34
#include <netinet/in.h>
35
#include <assert.h>
36
#include <errno.h>
37
#include <poll.h>
38
#include <pthread.h>
39
#include <signal.h>
40
#include <stddef.h>
41
#include <stdlib.h>
42
#include <string.h>
43
#include <unistd.h>
44
45
#include "config.h"
46
47
#include "rtpp_log.h"
48
#include "rtpp_cfg.h"
49
#include "rtpp_types.h"
50
#include "rtpp_refcnt.h"
51
#include "rtpp_log_obj.h"
52
#include "rtpp_time.h"
53
#include "rtpp_command.h"
54
#include "rtpp_command_async.h"
55
#include "rtpp_command_args.h"
56
#include "rtpp_command_sub.h"
57
#include "rtpp_command_private.h"
58
#include "rtpp_command_rcache.h"
59
#include "rtpp_command_stream.h"
60
#include "rtpp_network.h"
61
#include "rtpp_netio_async.h"
62
#include "rtpp_mallocs.h"
63
#include "rtpp_stats.h"
64
#include "rtpp_list.h"
65
#include "rtpp_controlfd.h"
66
#include "rtpp_locking.h"
67
#include "rtpp_util.h"
68
#include "rtpp_threads.h"
69
#include "rtpp_proc_async.h"
70
71
0
#define RTPC_MAX_CONNECTIONS 100
72
73
struct rtpp_cmd_pollset {
74
    struct pollfd *pfds;
75
    int pfds_used;
76
    struct rtpp_cmd_connection *rccs[RTPC_MAX_CONNECTIONS + 1];
77
    pthread_mutex_t pfds_mutex;
78
};
79
80
struct rtpp_cmd_accptset {
81
    struct pollfd *pfds;
82
    struct rtpp_ctrl_sock **csocks;
83
    int pfds_used;
84
};
85
86
struct rtpp_cmd_async_cf {
87
    struct rtpp_cmd_async pub;
88
    pthread_t thread_id;
89
    pthread_t acpt_thread_id;
90
    pthread_mutex_t cmd_mutex;
91
    int clock_tick;
92
    double tused;
93
    int tstate_queue;
94
    int tstate_acceptor;
95
    int acceptor_started;
96
    int overload;
97
#if 0
98
    struct recfilter average_load;
99
#endif
100
    struct rtpp_command_stats cstats;
101
    struct rtpp_cmd_pollset pset;
102
    int wakefds[2];
103
    struct rtpp_cmd_accptset aset;
104
    struct rtpp_cfg *cf_save;
105
    struct rtpp_cmd_rcache *rcache;
106
};
107
108
static double rtpp_command_async_get_aload(struct rtpp_cmd_async *);
109
static int rtpp_command_async_wakeup(struct rtpp_cmd_async *, int);
110
static void rtpp_command_async_reg_overload(struct rtpp_cmd_async *, int);
111
static int rtpp_command_async_chk_overload(struct rtpp_cmd_async *);
112
static void rtpp_command_async_dtor(struct rtpp_cmd_async *);
113
114
static void
115
init_cstats(struct rtpp_stats *sobj, struct rtpp_command_stats *csp)
116
2
{
117
118
2
    csp->ncmds_rcvd.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "ncmds_rcvd");
119
2
    csp->ncmds_rcvd_ndups.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "ncmds_rcvd_ndups");
120
2
    csp->ncmds_succd.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "ncmds_succd");
121
2
    csp->ncmds_errs.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "ncmds_errs");
122
2
    csp->ncmds_repld.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "ncmds_repld");
123
124
2
    csp->nsess_complete.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "nsess_complete");
125
2
    csp->nsess_created.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "nsess_created");
126
127
2
    csp->nplrs_created.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "nplrs_created");
128
2
    csp->nplrs_destroyed.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "nplrs_destroyed");
129
2
}
130
131
18
#define FLUSH_CSTAT(sobj, st)    { \
132
18
    if ((st).cnt > 0) { \
133
0
        CALL_SMETHOD(sobj, updatebyidx, (st).cnt_idx, (st).cnt); \
134
0
        (st).cnt = 0; \
135
0
    } \
136
18
}
137
138
static void
139
flush_cstats(struct rtpp_stats *sobj, struct rtpp_command_stats *csp)
140
2
{
141
142
2
    FLUSH_CSTAT(sobj, csp->ncmds_rcvd);
143
2
    FLUSH_CSTAT(sobj, csp->ncmds_rcvd_ndups);
144
2
    FLUSH_CSTAT(sobj, csp->ncmds_succd);
145
2
    FLUSH_CSTAT(sobj, csp->ncmds_errs);
146
2
    FLUSH_CSTAT(sobj, csp->ncmds_repld);
147
148
2
    FLUSH_CSTAT(sobj, csp->nsess_complete);
149
2
    FLUSH_CSTAT(sobj, csp->nsess_created);
150
151
2
    FLUSH_CSTAT(sobj, csp->nplrs_created);
152
2
    FLUSH_CSTAT(sobj, csp->nplrs_destroyed);
153
2
}
154
155
static int
156
accept_connection(const struct rtpp_cfg *cfsp, struct rtpp_ctrl_sock *rcsp,
157
  struct sockaddr *rap)
158
0
{
159
0
    int controlfd;
160
0
    socklen_t rlen;
161
162
0
    rlen = rtpp_csock_addrlen(rcsp);
163
0
    assert(rlen > 0);
164
0
    controlfd = accept(rcsp->controlfd_in, rap, &rlen);
165
0
    if (controlfd == -1) {
166
0
        if (errno != EWOULDBLOCK) {
167
0
            RTPP_ELOG(cfsp->glog, RTPP_LOG_ERR,
168
0
              "can't accept connection on control socket");
169
0
        }
170
0
        return (-1);
171
0
    }
172
0
    return (controlfd);
173
0
}
174
175
static int
176
process_commands(struct rtpp_ctrl_sock *csock, const struct rtpp_cfg *cfsp, int controlfd,
177
  const struct rtpp_timestamp *dtime, struct rtpp_command_stats *csp,
178
  struct rtpp_stats *rsc, struct rtpp_cmd_rcache *rcp)
179
0
{
180
0
    int i, rval;
181
0
    struct rtpp_command *cmd;
182
0
    int umode;
183
184
0
    umode = RTPP_CTRL_ISDG(csock);
185
0
    i = 0;
186
0
    do {
187
0
again:
188
0
        cmd = get_command(cfsp, csock, controlfd, &rval, dtime, csp, rcp);
189
0
        if (cmd == NULL) {
190
0
            switch (rval) {
191
0
            case GET_CMD_OK:
192
0
            case GET_CMD_ENOMEM:
193
                /*
194
                 * get_command() failed with error other than I/O error
195
                 * or something, there might be some good commands in
196
                 * the queue.
197
                 */
198
0
                goto again;
199
0
            case GET_CMD_EOF:
200
0
                goto out;
201
0
            }
202
0
            i = -1;
203
0
        } else {
204
0
            cmd->laddr = sstosa(&csock->bindaddr);
205
0
            if (cmd->cca.op == GET_STATS || cmd->cca.op == INFO) {
206
0
                flush_cstats(rsc, csp);
207
0
            }
208
0
            if (cmd->no_glock == 0) {
209
0
                pthread_mutex_lock(&(cfsp->locks->glob));
210
0
            }
211
0
            i = handle_command(cfsp, cmd);
212
0
            if (cmd->no_glock == 0) {
213
0
                pthread_mutex_unlock(&(cfsp->locks->glob));
214
0
            }
215
0
            free_command(cmd);
216
0
        }
217
0
    } while (i == 0 && umode != 0);
218
0
out:
219
0
    return (i);
220
0
}
221
222
static int
223
process_commands_stream(const struct rtpp_cfg *cfsp, struct rtpp_cmd_connection *rcc,
224
  const struct rtpp_timestamp *dtime, struct rtpp_command_stats *csp, struct rtpp_stats *rsc)
225
2
{
226
2
    int rval;
227
2
    struct rtpp_command *cmd;
228
229
2
    rval = rtpp_command_stream_doio(cfsp, rcc);
230
2
    if (rval <= 0) {
231
2
        return (-1);
232
2
    }
233
0
    do {
234
0
again:
235
0
        cmd = rtpp_command_stream_get(cfsp, rcc, &rval, dtime, csp);
236
0
        if (cmd == NULL) {
237
0
            switch (rval) {
238
0
            case GET_CMD_EAGAIN:
239
0
                return (0);
240
0
            case GET_CMD_OK:
241
0
            case GET_CMD_INVAL:
242
0
            case GET_CMD_ENOMEM:
243
0
                goto again;
244
0
            default:
245
0
                return (-1);
246
0
            }
247
0
        }
248
0
        cmd->laddr = sstosa(&rcc->csock->bindaddr);
249
0
        if (cmd->cca.op == GET_STATS || cmd->cca.op == INFO) {
250
0
            flush_cstats(rsc, csp);
251
0
        }
252
0
        if (cmd->no_glock == 0) {
253
0
            pthread_mutex_lock(&(cfsp->locks->glob));
254
0
        }
255
0
        rval = handle_command(cfsp, cmd);
256
0
        if (cmd->no_glock == 0) {
257
0
            pthread_mutex_unlock(&(cfsp->locks->glob));
258
0
        }
259
0
        free_command(cmd);
260
0
    } while (rval == 0);
261
0
    return (rval);
262
0
}
263
264
static struct rtpp_cmd_connection *
265
rtpp_cmd_connection_ctor(int controlfd_in, int controlfd_out,
266
  struct rtpp_ctrl_sock *csock, struct sockaddr *rap)
267
2
{
268
2
    struct rtpp_cmd_connection *rcc;
269
270
2
    rcc = rtpp_zmalloc(sizeof(struct rtpp_cmd_connection));
271
2
    if (rcc == NULL) {
272
0
        return (NULL);
273
0
    }
274
2
    rcc->controlfd_in = controlfd_in;
275
2
    rcc->controlfd_out = controlfd_out;
276
2
    rcc->csock = csock;
277
2
    if (rap != NULL && RTPP_CTRL_ISUNIX(csock) == 0) {
278
0
        rcc->rlen = SA_LEN(rap);
279
0
        memcpy(&rcc->raddr, rap, rcc->rlen);
280
0
    }
281
2
    return (rcc);
282
2
}
283
284
void
285
rtpp_cmd_connection_dtor(struct rtpp_cmd_connection *rcc)
286
2
{
287
288
2
    if (rcc->controlfd_in != rcc->csock->controlfd_in) {
289
0
        close(rcc->controlfd_in);
290
0
        if (rcc->controlfd_out != rcc->controlfd_in) {
291
0
            close(rcc->controlfd_out);
292
0
        }
293
0
    }
294
2
    free(rcc);
295
2
}
296
297
static void
298
rtpp_cmd_acceptor_run(void *arg)
299
0
{
300
0
    struct rtpp_cmd_async_cf *cmd_cf;
301
0
    struct pollfd *tp;
302
0
    struct rtpp_cmd_pollset *psp;
303
0
    struct rtpp_cmd_accptset *asp;
304
0
    struct rtpp_cmd_connection *rcc;
305
0
    int nready, controlfd, i, tstate;
306
0
    struct sockaddr_storage raddr;
307
308
0
    cmd_cf = (struct rtpp_cmd_async_cf *)arg;
309
0
    psp = &cmd_cf->pset;
310
0
    asp = &cmd_cf->aset;
311
312
0
    for (;;) {
313
#ifndef LINUX_XXX
314
        /*
315
         * On most decent OSes but Linux close()ing file descriptor in
316
         * parent thread would wake up poll most of the time, however
317
         * once in a while it wouldn't. This is why we don't use INFTIM
318
         * below.
319
         */
320
        nready = poll(asp->pfds, asp->pfds_used, 1000);
321
#else
322
0
  nready = poll(asp->pfds, asp->pfds_used, 100);
323
0
#endif
324
0
        pthread_mutex_lock(&cmd_cf->cmd_mutex);
325
0
        tstate = cmd_cf->tstate_acceptor;
326
0
        pthread_mutex_unlock(&cmd_cf->cmd_mutex);
327
0
        if (tstate == TSTATE_CEASE) {
328
0
            break;
329
0
        }
330
0
        if (nready <= 0)
331
0
            continue;
332
0
        for (i = 0; i < asp->pfds_used; i++) {
333
0
            if ((asp->pfds[i].revents & POLLIN) == 0) {
334
0
                continue;
335
0
            }
336
0
            controlfd = accept_connection(CONST(cmd_cf->cf_save), asp->csocks[i],
337
0
              sstosa(&raddr));
338
0
            if (controlfd < 0) {
339
0
                continue;
340
0
            }
341
0
            if (rtpp_command_async_wakeup(&cmd_cf->pub, 1) < 0) {
342
                /*
343
     * We cannot proceed if proc cannot be waked,
344
     * the pfds_mutex might remain locked forever.
345
     */
346
0
                close(controlfd);
347
0
                continue;
348
0
            }
349
0
            pthread_mutex_lock(&psp->pfds_mutex);
350
0
            pthread_mutex_unlock(&cmd_cf->cmd_mutex);
351
0
            if (psp->pfds_used >= RTPC_MAX_CONNECTIONS) {
352
0
                pthread_mutex_unlock(&psp->pfds_mutex);
353
0
                close(controlfd);
354
0
                continue;
355
0
            }
356
0
            tp = realloc(psp->pfds, sizeof(struct pollfd) * (psp->pfds_used + 1));
357
0
            if (tp == NULL) {
358
0
                pthread_mutex_unlock(&psp->pfds_mutex);
359
0
                close(controlfd); /* Yeah, sorry, please try later */
360
0
                continue;
361
0
            }
362
0
            psp->pfds = tp;
363
0
            rcc = rtpp_cmd_connection_ctor(controlfd, controlfd, asp->csocks[i],
364
0
              sstosa(&raddr));
365
0
            if (rcc == NULL) {
366
0
                pthread_mutex_unlock(&psp->pfds_mutex);
367
0
                close(controlfd); /* Yeah, sorry, please try later */
368
0
                continue;
369
0
            }
370
0
            psp->pfds[psp->pfds_used].fd = controlfd;
371
0
            psp->pfds[psp->pfds_used].events = POLLIN | POLLERR | POLLHUP;
372
0
            psp->pfds[psp->pfds_used].revents = 0;
373
0
            psp->rccs[psp->pfds_used] = rcc;
374
0
            psp->pfds_used++;
375
0
            pthread_mutex_unlock(&psp->pfds_mutex);
376
0
            rtpp_command_async_wakeup(&cmd_cf->pub, 0);
377
0
        }
378
0
    }
379
0
}
380
381
static void
382
rtpp_cmd_queue_run(void *arg)
383
2
{
384
2
    struct rtpp_cmd_async_cf *cmd_cf;
385
2
    struct rtpp_cmd_pollset *psp;
386
2
    int i, nready, rval;
387
2
    struct rtpp_timestamp sptime;
388
2
    struct rtpp_command_stats *csp;
389
2
    struct rtpp_stats *rtpp_stats_cf;
390
391
2
    cmd_cf = (struct rtpp_cmd_async_cf *)arg;
392
2
    rtpp_stats_cf = cmd_cf->cf_save->rtpp_stats;
393
2
    csp = &cmd_cf->cstats;
394
395
2
    psp = &cmd_cf->pset;
396
397
4
    for (;;) {
398
4
        pthread_mutex_lock(&cmd_cf->cmd_mutex);
399
4
        if (cmd_cf->tstate_queue != TSTATE_RUN) {
400
0
            pthread_mutex_unlock(&cmd_cf->cmd_mutex);
401
0
            break;
402
0
        }
403
4
        pthread_mutex_unlock(&cmd_cf->cmd_mutex);
404
4
        pthread_mutex_lock(&psp->pfds_mutex);
405
4
        nready = poll(psp->pfds, psp->pfds_used, INFTIM);
406
4
        if (nready == 0) {
407
0
            pthread_mutex_unlock(&psp->pfds_mutex);
408
0
            continue;
409
0
        }
410
4
        if (nready < 0 && errno == EINTR) {
411
0
            pthread_mutex_unlock(&psp->pfds_mutex);
412
0
            continue;
413
0
        }
414
4
        if (nready > 0) {
415
6
            for (i = 0; i < psp->pfds_used; i++) {
416
4
                if (i == 0) {
417
2
                    unsigned int dummy;
418
2
                    if (psp->pfds[i].revents & POLLIN) {
419
0
                        read(psp->pfds[i].fd, &dummy, sizeof(dummy));
420
0
                    }
421
2
                    continue;
422
2
                }
423
2
again:
424
2
                if ((psp->pfds[i].revents & (POLLERR | POLLHUP)) != 0) {
425
0
                    if (RTPP_CTRL_ACCEPTABLE(psp->rccs[i]->csock)) {
426
0
                        goto closefd;
427
0
                    }
428
0
                    if (psp->rccs[i]->csock->type == RTPC_STDIO && (psp->pfds[i].revents & POLLIN) == 0) {
429
0
                        goto closefd;
430
0
                    }
431
0
                }
432
2
                if ((psp->pfds[i].revents & POLLIN) == 0) {
433
0
                    continue;
434
0
                }
435
2
                rtpp_timestamp_get(&sptime);
436
2
                if (RTPP_CTRL_ISSTREAM(psp->rccs[i]->csock)) {
437
2
                    rval = process_commands_stream(CONST(cmd_cf->cf_save), psp->rccs[i], &sptime, csp, rtpp_stats_cf);
438
2
                } else {
439
0
                    rval = process_commands(psp->rccs[i]->csock, CONST(cmd_cf->cf_save), psp->pfds[i].fd,
440
0
                      &sptime, csp, rtpp_stats_cf, cmd_cf->rcache);
441
0
                }
442
                /*
443
                 * Shut down non-datagram sockets that got I/O error
444
                 * and also all non-continuous UNIX sockets are recycled
445
                 * after each use.
446
                 */
447
2
                if (!RTPP_CTRL_ISDG(psp->rccs[i]->csock) && (rval == -1 || !RTPP_CTRL_ISSTREAM(psp->rccs[i]->csock))) {
448
2
closefd:
449
2
                    if (psp->rccs[i]->csock->type == RTPC_STDIO && psp->rccs[i]->csock->exit_on_close != 0) {
450
2
                        cmd_cf->cf_save->slowshutdown = 1;
451
2
                    }
452
2
                    rtpp_cmd_connection_dtor(psp->rccs[i]);
453
2
                    psp->pfds_used--;
454
2
                    if (psp->pfds_used > 0 && i < psp->pfds_used) {
455
0
                        memmove(&psp->pfds[i], &psp->pfds[i + 1],
456
0
                          (psp->pfds_used - i) * sizeof(struct pollfd));
457
0
                        memmove(&psp->rccs[i], &psp->rccs[i + 1],
458
0
                          (psp->pfds_used - i) * sizeof(struct rtpp_ctrl_connection *));
459
0
                        goto again;
460
0
                    }
461
2
                }
462
2
            }
463
2
        }
464
4
        pthread_mutex_unlock(&psp->pfds_mutex);
465
4
        if (nready > 0) {
466
2
            rtpp_anetio_pump(cmd_cf->cf_save->rtpp_proc_cf->netio);
467
2
        }
468
4
        flush_cstats(rtpp_stats_cf, csp);
469
4
    }
470
2
}
471
472
static double
473
rtpp_command_async_get_aload(struct rtpp_cmd_async *pub)
474
17
{
475
#if 0
476
    double aload;
477
    struct rtpp_cmd_async_cf *cmd_cf;
478
479
    PUB2PVT(pub, cmd_cf);
480
481
    pthread_mutex_lock(&cmd_cf->cmd_mutex);
482
    aload = cmd_cf->average_load.lastval;
483
    pthread_mutex_unlock(&cmd_cf->cmd_mutex);
484
485
    return (aload);
486
#else
487
17
    return (0);
488
17
#endif
489
17
}
490
491
static int
492
rtpp_command_async_wakeup(struct rtpp_cmd_async *pub, int keep_locked)
493
0
{
494
0
    int old_clock;
495
0
    struct rtpp_cmd_async_cf *cmd_cf;
496
497
0
    PUB2PVT(pub, cmd_cf);
498
499
0
    pthread_mutex_lock(&cmd_cf->cmd_mutex);
500
0
    old_clock = cmd_cf->clock_tick;
501
0
    cmd_cf->clock_tick++;
502
0
    if (!keep_locked)
503
0
        pthread_mutex_unlock(&cmd_cf->cmd_mutex);
504
505
    /* notify worker thread */
506
0
    if (write(cmd_cf->wakefds[0], &old_clock, sizeof(old_clock + 1)) <= 0) {
507
0
  if (keep_locked)
508
0
            pthread_mutex_unlock(&cmd_cf->cmd_mutex);
509
0
        return (-1);
510
0
    }
511
512
0
    return (old_clock);
513
0
}
514
515
static void
516
rtpp_command_async_reg_overload(struct rtpp_cmd_async *pub, int overload)
517
0
{
518
0
    struct rtpp_cmd_async_cf *cmd_cf;
519
520
0
    PUB2PVT(pub, cmd_cf);
521
522
0
    pthread_mutex_lock(&cmd_cf->cmd_mutex);
523
0
    cmd_cf->overload = overload;
524
0
    pthread_mutex_unlock(&cmd_cf->cmd_mutex);
525
0
}
526
527
static int
528
rtpp_command_async_chk_overload(struct rtpp_cmd_async *pub)
529
0
{
530
0
    struct rtpp_cmd_async_cf *cmd_cf;
531
0
    int rval;
532
533
0
    PUB2PVT(pub, cmd_cf);
534
535
0
    pthread_mutex_lock(&cmd_cf->cmd_mutex);
536
0
    rval = cmd_cf->overload;
537
0
    pthread_mutex_unlock(&cmd_cf->cmd_mutex);
538
0
    return (rval);
539
0
}
540
541
static int
542
init_pollset(const struct rtpp_cfg *cfsp, struct rtpp_cmd_pollset *psp, int wakefd)
543
2
{
544
2
    struct rtpp_ctrl_sock *ctrl_sock;
545
2
    int pfds_used, i;
546
547
2
    ctrl_sock = RTPP_LIST_HEAD(cfsp->ctrl_socks);
548
4
    for (pfds_used = 1; ctrl_sock != NULL; ctrl_sock = RTPP_ITER_NEXT(ctrl_sock)) {
549
2
        if (RTPP_CTRL_ACCEPTABLE(ctrl_sock))
550
0
            continue;
551
2
        pfds_used++;
552
2
    }
553
2
    psp->pfds = malloc(sizeof(struct pollfd) * pfds_used);
554
2
    if (psp->pfds == NULL) {
555
0
        return (-1);
556
0
    }
557
2
    if (pthread_mutex_init(&psp->pfds_mutex, NULL) != 0) {
558
0
        goto e1;
559
0
    }
560
2
    psp->pfds_used = pfds_used;
561
2
    if (psp->pfds_used == 0) {
562
0
        return (0);
563
0
    }
564
2
    psp->pfds[0].fd = wakefd;
565
2
    psp->pfds[0].events = POLLIN;
566
2
    psp->pfds[0].revents = 0;
567
2
    psp->rccs[0] = NULL;
568
2
    ctrl_sock = RTPP_LIST_HEAD(cfsp->ctrl_socks);
569
4
    for (i = 1; ctrl_sock != NULL; ctrl_sock = RTPP_ITER_NEXT(ctrl_sock)) {
570
2
        if (RTPP_CTRL_ACCEPTABLE(ctrl_sock))
571
0
            continue;
572
2
        psp->pfds[i].fd = ctrl_sock->controlfd_in;
573
2
        psp->pfds[i].events = POLLIN;
574
2
        psp->pfds[i].revents = 0;
575
2
        psp->rccs[i] = rtpp_cmd_connection_ctor(ctrl_sock->controlfd_in, 
576
2
          ctrl_sock->controlfd_out, ctrl_sock, NULL);
577
2
        if (psp->rccs[i] == NULL) {
578
0
            int j;
579
580
0
            for (j = i - 1; j >= 0; j --)
581
0
                rtpp_cmd_connection_dtor(psp->rccs[j]);
582
0
            goto e1;
583
0
        }
584
2
        i++;
585
2
    }
586
2
    if (i == 2 && RTPP_CTRL_ISSTREAM(psp->rccs[1]->csock)) {
587
2
        psp->rccs[1]->csock->exit_on_close = 1;
588
2
    }
589
2
    return (0);
590
0
e1:
591
0
    free(psp->pfds);
592
0
    return (-1);
593
2
}
594
595
static void
596
free_pollset(struct rtpp_cmd_pollset *psp)
597
0
{
598
0
    int i;
599
600
0
    for (i = 1; i < psp->pfds_used; i ++) {
601
0
        rtpp_cmd_connection_dtor(psp->rccs[i]);
602
0
    }
603
0
    free(psp->pfds);
604
0
}
605
606
static int
607
init_accptset(const struct rtpp_cfg *cfsp, struct rtpp_cmd_accptset *asp)
608
2
{
609
2
    int i, pfds_used;
610
2
    struct rtpp_ctrl_sock *ctrl_sock;
611
612
2
    pfds_used = 0;
613
2
    ctrl_sock = RTPP_LIST_HEAD(cfsp->ctrl_socks);
614
4
    for (pfds_used = 0; ctrl_sock != NULL; ctrl_sock = RTPP_ITER_NEXT(ctrl_sock)) {
615
2
        if (RTPP_CTRL_ACCEPTABLE(ctrl_sock) == 0)
616
2
            continue;
617
0
        pfds_used++;
618
0
    }
619
2
    if (pfds_used == 0) {
620
2
        return (0);
621
2
    }
622
623
0
    asp->pfds = malloc(sizeof(struct pollfd) * pfds_used);
624
0
    if (asp->pfds == NULL) {
625
0
        return (-1);
626
0
    }
627
0
    asp->pfds_used = pfds_used;
628
0
    asp->csocks = malloc(sizeof(struct rtpp_ctrl_sock) * pfds_used);
629
0
    if (asp->csocks == NULL) {
630
0
        free(asp->pfds);
631
0
        return (-1);
632
0
    }
633
0
    ctrl_sock = RTPP_LIST_HEAD(cfsp->ctrl_socks);
634
0
    for (i = 0; i < asp->pfds_used; ctrl_sock = RTPP_ITER_NEXT(ctrl_sock)) {
635
0
        if (RTPP_CTRL_ACCEPTABLE(ctrl_sock) == 0)
636
0
            continue;
637
0
        asp->pfds[i].fd = ctrl_sock->controlfd_in;
638
0
        asp->pfds[i].events = POLLIN;
639
0
        asp->pfds[i].revents = 0;
640
0
        asp->csocks[i] = ctrl_sock;
641
0
        i++;
642
0
    }
643
0
    return (pfds_used);
644
0
}
645
646
static void
647
free_accptset(struct rtpp_cmd_accptset *asp)
648
0
{
649
0
    if (asp->pfds_used > 0) {
650
0
        free(asp->csocks);
651
0
        free(asp->pfds);
652
0
    }
653
0
}
654
655
struct rtpp_cmd_async *
656
rtpp_command_async_ctor(struct rtpp_cfg *cfsp)
657
2
{
658
2
    struct rtpp_cmd_async_cf *cmd_cf;
659
2
    int need_acptr, i;
660
661
2
    cmd_cf = rtpp_zmalloc(sizeof(*cmd_cf));
662
2
    if (cmd_cf == NULL)
663
0
        goto e0;
664
665
2
    if (socketpair(PF_LOCAL, SOCK_STREAM, 0, cmd_cf->wakefds) != 0)
666
0
        goto e1;
667
668
2
    if (init_pollset(cfsp, &cmd_cf->pset, cmd_cf->wakefds[1]) == -1) {
669
0
        goto e2;
670
0
    }
671
2
    need_acptr = init_accptset(cfsp, &cmd_cf->aset);
672
2
    if (need_acptr == -1) {
673
0
        goto e3;
674
0
    }
675
676
2
    init_cstats(cfsp->rtpp_stats, &cmd_cf->cstats);
677
678
2
    if (pthread_mutex_init(&cmd_cf->cmd_mutex, NULL) != 0) {
679
0
        goto e4;
680
0
    }
681
2
    assert(cfsp->rtpp_timed_cf != NULL);
682
2
    cmd_cf->rcache = rtpp_cmd_rcache_ctor(cfsp->rtpp_timed_cf,
683
2
      32.0 + 3.0);
684
2
    if (cmd_cf->rcache == NULL) {
685
0
        goto e5;
686
0
    }
687
688
#if 0
689
    recfilter_init(&cmd_cf->average_load, 0.999, 0.0, 1);
690
#endif
691
692
2
    cmd_cf->cf_save = cfsp;
693
2
    if (need_acptr != 0) {
694
0
        if (pthread_create(&cmd_cf->acpt_thread_id, NULL,
695
0
          (void *(*)(void *))&rtpp_cmd_acceptor_run, cmd_cf) != 0) {
696
0
            goto e6;
697
0
        }
698
0
        cmd_cf->acceptor_started = 1;
699
#if HAVE_PTHREAD_SETNAME_NP
700
        (void)pthread_setname_np(cmd_cf->acpt_thread_id, "rtpp_cmd_acceptor");
701
#endif
702
0
    }
703
2
    if (pthread_create(&cmd_cf->thread_id, NULL,
704
2
      (void *(*)(void *))&rtpp_cmd_queue_run, cmd_cf) != 0) {
705
0
        goto e7;
706
0
    }
707
#if HAVE_PTHREAD_SETNAME_NP
708
    (void)pthread_setname_np(cmd_cf->thread_id, "rtpp_cmd_queue");
709
#endif
710
2
    cmd_cf->pub.dtor = &rtpp_command_async_dtor;
711
2
    cmd_cf->pub.wakeup = &rtpp_command_async_wakeup;
712
2
    cmd_cf->pub.get_aload = &rtpp_command_async_get_aload;
713
2
    cmd_cf->pub.reg_overload = &rtpp_command_async_reg_overload;
714
2
    cmd_cf->pub.chk_overload = &rtpp_command_async_chk_overload;
715
2
    return (&cmd_cf->pub);
716
717
0
e7:
718
0
    if (cmd_cf->acceptor_started != 0) {
719
0
        pthread_mutex_lock(&cmd_cf->cmd_mutex);
720
0
        cmd_cf->tstate_acceptor = TSTATE_CEASE;
721
0
        pthread_mutex_unlock(&cmd_cf->cmd_mutex);
722
0
        for (i = 0; i < cmd_cf->aset.pfds_used; i ++) {
723
0
            close(cmd_cf->aset.pfds[i].fd);
724
0
        }
725
0
        pthread_join(cmd_cf->acpt_thread_id, NULL);
726
0
    }
727
0
e6:
728
0
    CALL_METHOD(cmd_cf->rcache, shutdown);
729
0
    RTPP_OBJ_DECREF(cmd_cf->rcache);
730
0
e5:
731
0
    pthread_mutex_destroy(&cmd_cf->cmd_mutex);
732
0
e4:
733
0
    free_accptset(&cmd_cf->aset);
734
0
e3:
735
0
    free_pollset(&cmd_cf->pset);
736
0
e2:
737
0
    for (int k = 0; k < 2; k++)
738
0
        close(cmd_cf->wakefds[k]);
739
0
e1:
740
0
    free(cmd_cf);
741
0
e0:
742
0
    return (NULL);
743
0
}
744
745
static void
746
rtpp_command_async_dtor(struct rtpp_cmd_async *pub)
747
0
{
748
0
    struct rtpp_cmd_async_cf *cmd_cf;
749
0
    int i;
750
751
0
    PUB2PVT(pub, cmd_cf);
752
753
0
    pthread_mutex_lock(&cmd_cf->cmd_mutex);
754
0
    cmd_cf->tstate_queue = TSTATE_CEASE;
755
    /* nudge acceptor thread */
756
0
    if (cmd_cf->acceptor_started != 0) {
757
0
        cmd_cf->tstate_acceptor = TSTATE_CEASE;
758
0
        for (i = 0; i < cmd_cf->aset.pfds_used; i ++) {
759
0
            close(cmd_cf->aset.pfds[i].fd);
760
0
        }
761
0
    }
762
0
    pthread_mutex_unlock(&cmd_cf->cmd_mutex);
763
    /* notify worker thread */
764
0
    if (rtpp_command_async_wakeup(pub, 0) < 0)
765
0
        pthread_kill(cmd_cf->thread_id, SIGKILL);
766
0
    pthread_join(cmd_cf->thread_id, NULL);        
767
0
    if (cmd_cf->acceptor_started != 0) {
768
0
        pthread_join(cmd_cf->acpt_thread_id, NULL);
769
0
    }
770
0
    CALL_METHOD(cmd_cf->rcache, shutdown);
771
0
    RTPP_OBJ_DECREF(cmd_cf->rcache);
772
0
    pthread_mutex_destroy(&cmd_cf->cmd_mutex);
773
0
    free_pollset(&cmd_cf->pset);
774
0
    free_accptset(&cmd_cf->aset);
775
0
    for (int k = 0; k < 2; k++)
776
0
        close(cmd_cf->wakefds[k]);
777
0
    free(cmd_cf);
778
0
}