Coverage Report

Created: 2023-09-25 06:44

/src/rtpproxy/src/rtpp_proc_async.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2014 Sippy Software, Inc., http://www.sippysoft.com
3
 * All rights reserved.
4
 *
5
 * Redistribution and use in source and binary forms, with or without
6
 * modification, are permitted provided that the following conditions
7
 * are met:
8
 * 1. Redistributions of source code must retain the above copyright
9
 *    notice, this list of conditions and the following disclaimer.
10
 * 2. Redistributions in binary form must reproduce the above copyright
11
 *    notice, this list of conditions and the following disclaimer in the
12
 *    documentation and/or other materials provided with the distribution.
13
 *
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
 * SUCH DAMAGE.
25
 *
26
 */
27
28
#if defined(LINUX_XXX) && !defined(_GNU_SOURCE)
29
#define _GNU_SOURCE /* pthread_setname_np() */
30
#endif
31
32
#include <sys/types.h>
33
#include <sys/socket.h>
34
#include <netinet/in.h>
35
#include <assert.h>
36
#include <errno.h>
37
#include <pthread.h>
38
#include <stdio.h>
39
#include <stdatomic.h>
40
#include <stddef.h>
41
#include <string.h>
42
#include <stdlib.h>
43
#include <unistd.h>
44
45
#include "config.h"
46
47
#include "rtpp_log.h"
48
#include "rtpp_cfg.h"
49
#include "rtpp_defines.h"
50
#include "rtpp_types.h"
51
#include "rtpp_weakref.h"
52
#include "rtpp_log_obj.h"
53
#include "rtpp_command_async.h"
54
#include "rtpp_debug.h"
55
#include "rtpp_netio_async.h"
56
#include "rtpp_proc.h"
57
#include "rtpp_proc_async.h"
58
#include "rtpp_proc_wakeup.h"
59
#include "rtpp_mallocs.h"
60
#include "rtpp_sessinfo.h"
61
#include "rtpp_stats.h"
62
#include "rtpp_time.h"
63
#include "rtpp_pipe.h"
64
#include "rtpp_epoll.h"
65
#include "rtpp_refcnt.h"
66
#include "rtpp_debug.h"
67
#include "rtpp_stream.h"
68
#include "rtpp_record.h"
69
#include "rtpp_pcount.h"
70
#include "rtp.h"
71
#include "rtp_packet.h"
72
#include "rtpp_ttl.h"
73
#include "rtpp_threads.h"
74
#include "advanced/pproc_manager.h"
75
#include "advanced/packet_processor.h"
76
77
struct rtpp_proc_async_cf;
78
79
struct rtpp_proc_thread_cf {
80
    pthread_t thread_id;
81
    atomic_int tstate;
82
    int pipe_type;
83
    struct rtpp_polltbl ptbl;
84
    const struct rtpp_proc_async_cf *proc_cf;
85
    struct rtpp_proc_rstats rstats;
86
    struct epoll_event *events;
87
    int events_alloc;
88
};
89
90
struct rtpp_proc_async_cf {
91
    struct rtpp_proc_async pub;
92
    const struct rtpp_cfg *cf_save;
93
    struct rtpp_proc_thread_cf rtp_thread;
94
    struct rtpp_proc_thread_cf rtcp_thread;
95
    struct rtpp_proc_wakeup *wakeup_cf;
96
    int npkts_relayed_idx;
97
};
98
99
static void rtpp_proc_async_dtor(struct rtpp_proc_async *);
100
static int rtpp_proc_async_nudge(struct rtpp_proc_async *);
101
102
static void
103
flush_rstats(struct rtpp_stats *sobj, struct rtpp_proc_rstats *rsp)
104
0
{
105
106
0
    FLUSH_STAT(sobj, rsp->npkts_rcvd);
107
0
    FLUSH_STAT(sobj, rsp->npkts_relayed);
108
0
    FLUSH_STAT(sobj, rsp->npkts_resizer_in);
109
0
    FLUSH_STAT(sobj, rsp->npkts_resizer_out);
110
0
    FLUSH_STAT(sobj, rsp->npkts_resizer_discard);
111
0
    FLUSH_STAT(sobj, rsp->npkts_discard);
112
0
}
113
114
static void
115
init_rstats(struct rtpp_stats *sobj, struct rtpp_proc_rstats *rsp)
116
4
{
117
118
4
    rsp->npkts_rcvd.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "npkts_rcvd");
119
4
    rsp->npkts_relayed.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "npkts_relayed");
120
4
    rsp->npkts_resizer_in.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "npkts_resizer_in");
121
4
    rsp->npkts_resizer_out.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "npkts_resizer_out");
122
4
    rsp->npkts_resizer_discard.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "npkts_resizer_discard");
123
4
    rsp->npkts_discard.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "npkts_discard");
124
4
}
125
126
static void
127
rtpp_proc_async_run(void *arg)
128
4
{
129
4
    const struct rtpp_cfg *cfsp;
130
4
    int ndrain;
131
4
    int nready;
132
4
    struct rtpp_proc_thread_cf *tcp;
133
4
    const struct rtpp_proc_async_cf *proc_cf;
134
4
    long long last_ctick;
135
4
    struct sthread_args *sender;
136
4
    struct rtpp_proc_rstats *rstats;
137
4
    struct rtpp_stats *stats_cf;
138
4
    int tstate;
139
4
    struct rtpp_timestamp rtime;
140
141
4
    tcp = (struct rtpp_proc_thread_cf *)arg;
142
4
    proc_cf = tcp->proc_cf;
143
4
    cfsp = proc_cf->cf_save;
144
4
    stats_cf = cfsp->rtpp_stats;
145
4
    rstats = &tcp->rstats;
146
147
4
    memset(&rtime, '\0', sizeof(rtime));
148
149
4
    RTPP_DBGCODE(netio) {
150
0
        last_ctick = 0;
151
0
    }
152
153
4
    for (;;) {
154
4
        tstate = atomic_load(&tcp->tstate);
155
4
        CALL_SMETHOD(cfsp->sessinfo, sync_polltbl, &tcp->ptbl, tcp->pipe_type);
156
4
        if (tstate == TSTATE_CEASE) {
157
0
            break;
158
0
        }
159
160
4
        ndrain = 1;
161
162
4
        nready = 0;
163
4
        RTPP_DBGCODE(netio > 1) {
164
0
            RTPP_LOG(cfsp->glog, RTPP_LOG_DBUG, "run %lld " \
165
0
              "polling for %d %s file descriptors", \
166
0
              last_ctick, tcp->ptbl.curlen, PP_NAME(tcp->pipe_type));
167
0
        }
168
4
        nready = rtpp_epoll_wait(tcp->ptbl.epfd, tcp->events, tcp->events_alloc, -1);
169
4
        RTPP_DBGCODE(netio) {
170
0
            RTPP_DBGCODE(netio > 1 || nready > 0) {
171
0
                RTPP_LOG(cfsp->glog, RTPP_LOG_DBUG, "run %lld " \
172
0
                  "polling for %d %s file descriptors: %d descriptors are ready", \
173
0
                  last_ctick, tcp->ptbl.curlen, PP_NAME(tcp->pipe_type), nready);
174
0
            }
175
0
        }
176
4
        if (nready < 0 && errno == EINTR) {
177
0
            continue;
178
0
        }
179
4
        if (nready == 0)
180
0
            goto next;
181
182
4
        rtpp_timestamp_get(&rtime);
183
4
        RTPP_DBG_ASSERT(rtime.wall > 0 && rtime.mono > 0);
184
185
4
        sender = rtpp_anetio_pick_sender(proc_cf->pub.netio);
186
4
        process_rtp_only(cfsp, &tcp->ptbl, &rtime, ndrain, sender, rstats,
187
4
          tcp->events, nready);
188
189
4
        rtpp_anetio_pump_q(sender);
190
4
        flush_rstats(stats_cf, rstats);
191
192
4
        if (nready == tcp->events_alloc) {
193
0
            struct epoll_event *tep;
194
195
0
            tep = realloc(tcp->events, sizeof(tcp->events[0]) * tcp->events_alloc * 2);
196
0
            if (tep != NULL) {
197
0
                tcp->events = tep;
198
0
                tcp->events_alloc *= 2;
199
0
            }
200
0
        }
201
202
4
next:
203
0
        RTPP_DBGCODE(netio) {
204
0
            last_ctick++;
205
0
        }
206
0
    }
207
0
    rtpp_polltbl_free(&tcp->ptbl);
208
0
}
209
210
void
211
rtpp_proc_async_setprocname(pthread_t thread_id, const char *pname)
212
6
{
213
#if HAVE_PTHREAD_SETNAME_NP
214
    const char ppr[] = "rtpp_proc: ";
215
    char *ptrname = alloca(sizeof(ppr) + strlen(pname));
216
    if (ptrname != NULL) {
217
        sprintf(ptrname, "%s%s", ppr, pname);
218
        (void)pthread_setname_np(thread_id, ptrname);
219
    }
220
#endif
221
6
}
222
223
static int
224
rtpp_proc_async_thread_init(const struct rtpp_cfg *cfsp, const struct rtpp_proc_async_cf *proc_cf,
225
  struct rtpp_proc_thread_cf *tcp, int pipe_type)
226
4
{
227
4
    struct epoll_event epevent;
228
229
4
    tcp->ptbl.epfd = rtpp_epoll_create();
230
4
    if (tcp->ptbl.epfd < 0)
231
0
        goto e0;
232
4
    if (socketpair(PF_LOCAL, SOCK_STREAM, 0, tcp->ptbl.wakefd) != 0)
233
0
        goto e1;
234
4
    epevent.events = EPOLLIN;
235
4
    epevent.data.ptr = NULL;
236
4
    if (rtpp_epoll_ctl(tcp->ptbl.epfd, EPOLL_CTL_ADD, tcp->ptbl.wakefd[0], &epevent) != 0)
237
0
        goto e2;
238
239
4
    tcp->proc_cf = proc_cf;
240
4
    tcp->pipe_type = pipe_type;
241
242
4
    init_rstats(cfsp->rtpp_stats, &tcp->rstats);
243
244
4
    tcp->events_alloc = 16;
245
4
    tcp->events = rtpp_zmalloc(sizeof(tcp->events[0]) * tcp->events_alloc);
246
4
    if (tcp->events == NULL)
247
0
        goto e2;
248
249
4
    if (pthread_create(&tcp->thread_id, NULL, (void *(*)(void *))&rtpp_proc_async_run, tcp) != 0) {
250
0
        goto e3;
251
0
    }
252
4
    rtpp_proc_async_setprocname(tcp->thread_id, PP_NAME(pipe_type));
253
4
    return (0);
254
255
0
e3:
256
0
    free(tcp->events);
257
0
e2:
258
0
    close(tcp->ptbl.wakefd[0]);
259
0
    close(tcp->ptbl.wakefd[1]);
260
0
e1:
261
0
    close(tcp->ptbl.epfd);
262
0
e0:
263
0
    return (-1);
264
0
}
265
266
static void
267
rtpp_proc_async_thread_destroy(struct rtpp_proc_thread_cf *tcp)
268
0
{
269
0
    int tstate = atomic_load(&tcp->tstate);
270
271
0
    assert(tstate == TSTATE_RUN);
272
0
    close(tcp->ptbl.wakefd[1]);
273
0
    atomic_store(&tcp->tstate, TSTATE_CEASE);
274
0
    pthread_join(tcp->thread_id, NULL);
275
0
    free(tcp->events);
276
0
}
277
278
static enum pproc_action
279
relay_packet(const struct pkt_proc_ctx *pktxp)
280
0
{
281
0
    struct rtpp_stream *stp_out = pktxp->strmp_out;
282
0
    struct rtpp_stream *stp_in = pktxp->strmp_in;
283
0
    struct rtp_packet *packet = pktxp->pktp;
284
285
0
    CALL_SMETHOD(stp_in->ttl, reset);
286
0
    if (stp_out == NULL) {
287
0
        return PPROC_ACT_DROP;
288
0
    }
289
290
    /*
291
     * Check that we have some address to which packet is to be
292
     * sent out, drop otherwise.
293
     */
294
0
    if (!CALL_SMETHOD(stp_out, issendable)) {
295
0
        return PPROC_ACT_DROP;
296
0
    }
297
0
    CALL_SMETHOD(stp_out, send_pkt, packet->sender, packet);
298
0
    if ((pktxp->flags & PPROC_FLAG_LGEN) == 0) {
299
0
        CALL_SMETHOD(stp_in->pcount, reg_reld);
300
0
        if (pktxp->rsp != NULL) {
301
0
            pktxp->rsp->npkts_relayed.cnt++;
302
0
        } else {
303
0
            struct rtpp_proc_async_cf *proc_cf = pktxp->pproc->arg;
304
0
            CALL_SMETHOD(proc_cf->cf_save->rtpp_stats, updatebyidx,
305
0
              proc_cf->npkts_relayed_idx, 1);
306
0
        }
307
0
    }
308
0
    return PPROC_ACT_TAKE;
309
0
}
310
311
static enum pproc_action
312
record_packet(const struct pkt_proc_ctx *pktxp)
313
0
{
314
0
    struct rtpp_stream *stp_out = pktxp->strmp_out;
315
0
    struct rtpp_stream *stp_in = pktxp->strmp_in;
316
317
0
    if (stp_in->rrc != NULL && stp_out != NULL) {
318
0
        if (!CALL_SMETHOD(stp_out, isplayer_active)) {
319
0
            CALL_SMETHOD(stp_in->rrc, pktwrite, pktxp);
320
0
        }
321
0
    }
322
0
    return PPROC_ACT_NOP;
323
0
}
324
325
struct rtpp_proc_async *
326
rtpp_proc_async_ctor(const struct rtpp_cfg *cfsp)
327
2
{
328
2
    struct rtpp_proc_async_cf *proc_cf;
329
330
2
    proc_cf = rtpp_zmalloc(sizeof(*proc_cf));
331
2
    if (proc_cf == NULL)
332
0
        return (NULL);
333
334
2
    proc_cf->npkts_relayed_idx = CALL_SMETHOD(cfsp->rtpp_stats, getidxbyname, "npkts_relayed");
335
2
    if (proc_cf->npkts_relayed_idx < 0)
336
0
        goto e0;
337
338
2
    proc_cf->pub.netio = rtpp_netio_async_init(cfsp, 1);
339
2
    if (proc_cf->pub.netio == NULL) {
340
0
        goto e0;
341
0
    }
342
343
2
    proc_cf->cf_save = cfsp;
344
345
2
    const struct packet_processor_if relay_packet_poi = {
346
2
        .descr = "relay_packet",
347
2
        .arg = (void *)proc_cf,
348
2
        .key = (void *)&relay_packet,
349
2
        .enqueue = &relay_packet
350
2
    };
351
2
    if (CALL_SMETHOD(cfsp->pproc_manager, reg, PPROC_ORD_RELAY, &relay_packet_poi) < 0)
352
0
        goto e1;
353
354
2
    const struct packet_processor_if record_packet_poi = {
355
2
        .descr = "record_packet",
356
2
        .arg = (void *)proc_cf,
357
2
        .key = (void *)&record_packet,
358
2
        .enqueue = &record_packet
359
2
    };
360
2
    if (CALL_SMETHOD(cfsp->pproc_manager, reg, PPROC_ORD_WITNESS, &record_packet_poi) < 0)
361
0
        goto e2;
362
363
2
    if (rtpp_proc_async_thread_init(cfsp, proc_cf, &proc_cf->rtp_thread, PIPE_RTP) != 0) {
364
0
        goto e3;
365
0
    }
366
367
2
    if (rtpp_proc_async_thread_init(cfsp, proc_cf, &proc_cf->rtcp_thread, PIPE_RTCP) != 0) {
368
0
        goto e4;
369
0
    }
370
371
2
    proc_cf->wakeup_cf = rtpp_proc_wakeup_ctor(proc_cf->rtp_thread.ptbl.wakefd[1],
372
2
      proc_cf->rtcp_thread.ptbl.wakefd[1]);
373
2
    if (proc_cf->wakeup_cf == NULL)
374
0
        goto e5;
375
376
2
    RTPP_OBJ_INCREF(cfsp->rtpp_stats);
377
2
    RTPP_OBJ_INCREF(cfsp->pproc_manager);
378
379
2
    proc_cf->pub.dtor = &rtpp_proc_async_dtor;
380
2
    proc_cf->pub.nudge = &rtpp_proc_async_nudge;
381
2
    return (&proc_cf->pub);
382
0
e5:
383
0
    rtpp_proc_async_thread_destroy(&proc_cf->rtcp_thread);
384
0
e4:
385
0
    rtpp_proc_async_thread_destroy(&proc_cf->rtp_thread);
386
0
e3:
387
0
    CALL_SMETHOD(cfsp->pproc_manager, unreg, record_packet_poi.key);
388
0
e2:
389
0
    CALL_SMETHOD(cfsp->pproc_manager, unreg, relay_packet_poi.key);
390
0
e1:
391
0
    rtpp_netio_async_destroy(proc_cf->pub.netio);
392
0
e0:
393
0
    free(proc_cf);
394
0
    return (NULL);
395
0
}
396
397
static void
398
rtpp_proc_async_dtor(struct rtpp_proc_async *pub)
399
0
{
400
0
    struct rtpp_proc_async_cf *proc_cf;
401
402
0
    PUB2PVT(pub, proc_cf);
403
0
    RTPP_OBJ_DECREF(proc_cf->wakeup_cf);
404
0
    CALL_SMETHOD(proc_cf->cf_save->pproc_manager, unreg, record_packet);
405
0
    CALL_SMETHOD(proc_cf->cf_save->pproc_manager, unreg, relay_packet);
406
0
    rtpp_proc_async_thread_destroy(&proc_cf->rtcp_thread);
407
0
    rtpp_proc_async_thread_destroy(&proc_cf->rtp_thread);
408
0
    rtpp_netio_async_destroy(proc_cf->pub.netio);
409
0
    RTPP_OBJ_DECREF(proc_cf->cf_save->rtpp_stats);
410
0
    RTPP_OBJ_DECREF(proc_cf->cf_save->pproc_manager);
411
0
    free(proc_cf);
412
0
}
413
414
static int
415
rtpp_proc_async_nudge(struct rtpp_proc_async *pub)
416
0
{
417
0
    struct rtpp_proc_async_cf *proc_cf;
418
0
    int nres;
419
420
0
    PUB2PVT(pub, proc_cf);
421
0
    nres = CALL_SMETHOD(proc_cf->wakeup_cf, nudge);
422
0
    return (nres);
423
0
}