Coverage Report

Created: 2025-08-26 06:38

/src/rtpproxy/src/rtpp_sessinfo.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2004-2006 Maxim Sobolev <sobomax@FreeBSD.org>
3
 * Copyright (c) 2006-2015 Sippy Software, Inc., http://www.sippysoft.com
4
 * All rights reserved.
5
 *
6
 * Redistribution and use in source and binary forms, with or without
7
 * modification, are permitted provided that the following conditions
8
 * are met:
9
 * 1. Redistributions of source code must retain the above copyright
10
 *    notice, this list of conditions and the following disclaimer.
11
 * 2. Redistributions in binary form must reproduce the above copyright
12
 *    notice, this list of conditions and the following disclaimer in the
13
 *    documentation and/or other materials provided with the distribution.
14
 *
15
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
 * SUCH DAMAGE.
26
 *
27
 */
28
29
#include "config.h"
30
31
#include <sys/stat.h>
32
#include <assert.h>
33
#include <stddef.h>
34
#include <stdint.h>
35
#include <stdlib.h>
36
#include <string.h>
37
#include <poll.h>
38
#include <pthread.h>
39
#include <unistd.h>
40
41
#include "rtpp_types.h"
42
#include "rtpp_codeptr.h"
43
#include "rtpp_refcnt.h"
44
#include "rtpp_cfg.h"
45
#include "rtpp_sessinfo.h"
46
#include "rtpp_sessinfo_fin.h"
47
#include "rtpp_pipe.h"
48
#include "rtpp_stream.h"
49
#include "rtpp_session.h"
50
#include "rtpp_socket.h"
51
#include "rtpp_mallocs.h"
52
#include "rtpp_epoll.h"
53
#include "rtpp_debug.h"
54
55
enum polltbl_hst_ops {HST_ADD, HST_DEL, HST_UPD};
56
57
struct rtpp_polltbl_hst_ent {
58
   uint64_t stuid;
59
   enum polltbl_hst_ops op;
60
   struct rtpp_socket *skt;
61
};
62
63
struct rtpp_polltbl_hst_part {
64
   int alen;  /* Number of entries allocated */
65
   struct rtpp_polltbl_hst_ent *clog;
66
};
67
68
struct rtpp_polltbl_hst {
69
   int ulen;  /* Number of entries used */
70
   int ilen;  /* Minimum number of entries to be allocated when need to extend */
71
   struct rtpp_polltbl_hst_part main;
72
   struct rtpp_polltbl_hst_part shadow;
73
   struct rtpp_weakref *streams_wrt;
74
   pthread_mutex_t lock;
75
};
76
77
struct rtpp_sessinfo_priv {
78
   struct rtpp_sessinfo pub;
79
   struct rtpp_polltbl_hst hst_rtp;
80
   struct rtpp_polltbl_hst hst_rtcp;
81
};
82
83
static int rtpp_sinfo_append(struct rtpp_sessinfo *, struct rtpp_session *,
84
  int, struct rtpp_socket **);
85
static void rtpp_sinfo_update(struct rtpp_sessinfo *, struct rtpp_session *,
86
  int, struct rtpp_socket **);
87
static void rtpp_sinfo_remove(struct rtpp_sessinfo *, struct rtpp_session *,
88
  int);
89
static int rtpp_sinfo_sync_polltbl(struct rtpp_sessinfo *, struct rtpp_polltbl *,
90
  int);
91
static void rtpp_sessinfo_dtor(struct rtpp_sessinfo_priv *);
92
93
DEFINE_SMETHODS(rtpp_sessinfo,
94
    .append = &rtpp_sinfo_append,
95
    .update = &rtpp_sinfo_update,
96
    .remove = &rtpp_sinfo_remove,
97
    .sync_polltbl = &rtpp_sinfo_sync_polltbl,
98
);
99
100
static int
101
rtpp_polltbl_hst_alloc(struct rtpp_polltbl_hst *hp, int alen)
102
4
{
103
104
4
    hp->main.clog = rtpp_zmalloc(sizeof(struct rtpp_polltbl_hst_ent) * alen);
105
4
    if (hp->main.clog == NULL) {
106
0
        goto e0;
107
0
    }
108
4
    hp->shadow.clog = rtpp_zmalloc(sizeof(struct rtpp_polltbl_hst_ent) * alen);
109
4
    if (hp->shadow.clog == NULL) {
110
0
        goto e1;
111
0
    }
112
4
    if (pthread_mutex_init(&hp->lock, NULL) != 0)
113
0
        goto e2;
114
4
    hp->main.alen = hp->shadow.alen = hp->ilen = alen;
115
4
    return (0);
116
0
e2:
117
0
    free(hp->shadow.clog);
118
0
e1:
119
0
    free(hp->main.clog);
120
0
e0:
121
0
    return (-1);
122
0
}
123
124
static void
125
rtpp_polltbl_hst_dtor(struct rtpp_polltbl_hst *hp)
126
4
{
127
4
    int i;
128
4
    struct rtpp_polltbl_hst_ent *hep;
129
130
4
    for (i = 0; i < hp->ulen; i++) {
131
0
        hep = hp->main.clog + i;
132
0
        if (hep->skt != NULL) {
133
0
            RTPP_OBJ_DECREF(hep->skt);
134
0
        }
135
0
    }
136
4
    if (hp->main.alen > 0) {
137
4
        free(hp->shadow.clog);
138
4
        free(hp->main.clog);
139
4
        pthread_mutex_destroy(&hp->lock);
140
4
    }
141
4
}
142
143
static int
144
rtpp_polltbl_hst_extend(struct rtpp_polltbl_hst *hp)
145
46
{
146
46
    struct rtpp_polltbl_hst_ent *clog_new;
147
46
    size_t alen = sizeof(struct rtpp_polltbl_hst_ent) *
148
46
      (hp->main.alen + hp->ilen);
149
150
46
    clog_new = realloc(hp->main.clog, alen);
151
46
    if (clog_new == NULL) {
152
0
        return (-1);
153
0
    }
154
46
    hp->main.clog = clog_new;
155
46
    hp->main.alen += hp->ilen;
156
46
    return (0);
157
46
}
158
159
static void
160
rtpp_polltbl_hst_record(struct rtpp_polltbl_hst *hp, enum polltbl_hst_ops op,
161
  uint64_t stuid, struct rtpp_socket *skt)
162
175k
{
163
175k
    struct rtpp_polltbl_hst_ent *hpe;
164
165
175k
    hpe = hp->main.clog + hp->ulen;
166
175k
    hpe->op = op;
167
175k
    hpe->stuid = stuid;
168
175k
    hpe->skt = skt;
169
175k
    hp->ulen += 1;
170
175k
    if (skt != NULL) {
171
93.3k
        RTPP_OBJ_INCREF(skt);
172
93.3k
    }
173
175k
}
174
175
struct rtpp_sessinfo *
176
rtpp_sessinfo_ctor(const struct rtpp_cfg *cfsp)
177
2
{
178
2
    struct rtpp_sessinfo *sessinfo;
179
2
    struct rtpp_sessinfo_priv *pvt;
180
181
2
    pvt = rtpp_rzmalloc(sizeof(struct rtpp_sessinfo_priv), PVT_RCOFFS(pvt));
182
2
    if (pvt == NULL) {
183
0
        return (NULL);
184
0
    }
185
2
    sessinfo = &(pvt->pub);
186
2
    if (rtpp_polltbl_hst_alloc(&pvt->hst_rtp, 10) != 0) {
187
0
        goto e6;
188
0
    }
189
2
    if (rtpp_polltbl_hst_alloc(&pvt->hst_rtcp, 10) != 0) {
190
0
        goto e7;
191
0
    }
192
2
    pvt->hst_rtp.streams_wrt = cfsp->rtp_streams_wrt;
193
2
    pvt->hst_rtcp.streams_wrt = cfsp->rtcp_streams_wrt;
194
195
2
    PUBINST_FININIT(&pvt->pub, pvt, rtpp_sessinfo_dtor);
196
2
    return (sessinfo);
197
198
0
e7:
199
0
    rtpp_polltbl_hst_dtor(&pvt->hst_rtp);
200
0
e6:
201
0
    RTPP_OBJ_DECREF(&(pvt->pub));
202
0
    return (NULL);
203
0
}
204
205
static void
206
rtpp_sessinfo_dtor(struct rtpp_sessinfo_priv *pvt)
207
2
{
208
209
2
    rtpp_sessinfo_fin(&(pvt->pub));
210
2
    rtpp_polltbl_hst_dtor(&pvt->hst_rtp);
211
2
    rtpp_polltbl_hst_dtor(&pvt->hst_rtcp);
212
2
}
213
214
static int
215
rtpp_sinfo_append(struct rtpp_sessinfo *sessinfo, struct rtpp_session *sp,
216
  int index, struct rtpp_socket **new_fds)
217
41.2k
{
218
41.2k
    struct rtpp_sessinfo_priv *pvt;
219
41.2k
    struct rtpp_stream *rtp, *rtcp;
220
221
41.2k
    PUB2PVT(sessinfo, pvt);
222
41.2k
    pthread_mutex_lock(&pvt->hst_rtp.lock);
223
41.2k
    if (pvt->hst_rtp.ulen == pvt->hst_rtp.main.alen) {
224
10
        if (rtpp_polltbl_hst_extend(&pvt->hst_rtp) < 0) {
225
0
            goto e0;
226
0
        }
227
10
    }
228
41.2k
    pthread_mutex_lock(&pvt->hst_rtcp.lock);
229
41.2k
    if (pvt->hst_rtcp.ulen == pvt->hst_rtcp.main.alen) {
230
6
        if (rtpp_polltbl_hst_extend(&pvt->hst_rtcp) < 0) {
231
0
            goto e1;
232
0
        }
233
6
    }
234
41.2k
    rtp = sp->rtp->stream[index];
235
41.2k
    CALL_SMETHOD(rtp, set_skt, new_fds[0]);
236
41.2k
    rtpp_polltbl_hst_record(&pvt->hst_rtp, HST_ADD, rtp->stuid, new_fds[0]);
237
41.2k
    pthread_mutex_unlock(&pvt->hst_rtp.lock);
238
239
41.2k
    rtcp = sp->rtcp->stream[index];
240
41.2k
    CALL_SMETHOD(rtcp, set_skt, new_fds[1]);
241
41.2k
    rtpp_polltbl_hst_record(&pvt->hst_rtcp, HST_ADD, rtcp->stuid, new_fds[1]);
242
41.2k
    pthread_mutex_unlock(&pvt->hst_rtcp.lock);
243
244
41.2k
    return (0);
245
0
e1:
246
0
    pthread_mutex_unlock(&pvt->hst_rtcp.lock);
247
0
e0:
248
0
    pthread_mutex_unlock(&pvt->hst_rtp.lock);
249
0
    return (-1);
250
0
}
251
252
static int
253
find_polltbl_idx(struct rtpp_polltbl *ptp, uint64_t stuid)
254
93.3k
{
255
93.3k
    int i, j = -1;
256
257
533k
    for (i = 0; i < ptp->curlen; i++) {
258
533k
        if (ptp->mds[i].stuid != stuid)
259
440k
            continue;
260
92.7k
        RTPP_DBGCODE() {
261
0
            assert(j == -1);
262
0
            j = i;
263
92.7k
        } else {
264
92.7k
            return (i);
265
92.7k
        }
266
92.7k
    }
267
564
    return (j);
268
93.3k
}
269
270
static void
271
rtpp_sinfo_update(struct rtpp_sessinfo *sessinfo, struct rtpp_session *sp,
272
  int index, struct rtpp_socket **new_fds)
273
5.46k
{
274
5.46k
    struct rtpp_sessinfo_priv *pvt;
275
5.46k
    struct rtpp_stream *rtp, *rtcp;
276
5.46k
    struct rtpp_socket *old_fd;
277
278
5.46k
    PUB2PVT(sessinfo, pvt);
279
280
5.46k
    pthread_mutex_lock(&pvt->hst_rtp.lock);
281
5.46k
    if (pvt->hst_rtp.ulen == pvt->hst_rtp.main.alen) {
282
0
        if (rtpp_polltbl_hst_extend(&pvt->hst_rtp) < 0) {
283
0
            goto e0;
284
0
        }
285
0
    }
286
5.46k
    pthread_mutex_lock(&pvt->hst_rtcp.lock);
287
5.46k
    if (pvt->hst_rtcp.ulen == pvt->hst_rtcp.main.alen) {
288
0
        if (rtpp_polltbl_hst_extend(&pvt->hst_rtcp) < 0) {
289
0
            goto e1;
290
0
        }
291
0
    }
292
5.46k
    rtp = sp->rtp->stream[index];
293
5.46k
    old_fd = CALL_SMETHOD(rtp, update_skt, new_fds[0]);
294
5.46k
    if (old_fd != NULL) {
295
5.46k
        rtpp_polltbl_hst_record(&pvt->hst_rtp, HST_UPD, rtp->stuid, new_fds[0]);
296
5.46k
        pthread_mutex_unlock(&pvt->hst_rtp.lock);
297
5.46k
        RTPP_OBJ_DECREF(old_fd);
298
5.46k
    } else {
299
0
        rtpp_polltbl_hst_record(&pvt->hst_rtp, HST_ADD, rtp->stuid, new_fds[0]);
300
0
        pthread_mutex_unlock(&pvt->hst_rtp.lock);
301
0
    }
302
5.46k
    rtcp = sp->rtcp->stream[index];
303
5.46k
    old_fd = CALL_SMETHOD(rtcp, update_skt, new_fds[1]);
304
5.46k
    if (old_fd != NULL) {
305
5.46k
        rtpp_polltbl_hst_record(&pvt->hst_rtcp, HST_UPD, rtcp->stuid, new_fds[1]);
306
5.46k
        pthread_mutex_unlock(&pvt->hst_rtcp.lock);
307
5.46k
        RTPP_OBJ_DECREF(old_fd);
308
5.46k
    } else {
309
0
        rtpp_polltbl_hst_record(&pvt->hst_rtcp, HST_ADD, rtcp->stuid, new_fds[1]);
310
0
        pthread_mutex_unlock(&pvt->hst_rtcp.lock);
311
0
    }
312
5.46k
    return;
313
0
e1:
314
0
    pthread_mutex_unlock(&pvt->hst_rtcp.lock);
315
0
e0:
316
0
    pthread_mutex_unlock(&pvt->hst_rtp.lock);
317
0
}
318
319
static void
320
rtpp_sinfo_remove(struct rtpp_sessinfo *sessinfo, struct rtpp_session *sp,
321
  int index)
322
76.2k
{
323
76.2k
    struct rtpp_sessinfo_priv *pvt;
324
76.2k
    struct rtpp_stream *rtp, *rtcp;
325
76.2k
    struct rtpp_socket *fd_rtp, *fd_rtcp;;
326
327
76.2k
    PUB2PVT(sessinfo, pvt);
328
329
76.2k
    rtp = sp->rtp->stream[index];
330
76.2k
    rtcp = sp->rtcp->stream[index];
331
76.2k
    fd_rtp = CALL_SMETHOD(rtp, get_skt, HEREVAL);
332
76.2k
    fd_rtcp = CALL_SMETHOD(rtcp, get_skt, HEREVAL);
333
76.2k
    if (fd_rtp != NULL) {
334
41.2k
        pthread_mutex_lock(&pvt->hst_rtp.lock);
335
41.2k
        if (pvt->hst_rtp.ulen == pvt->hst_rtp.main.alen) {
336
13
            if (rtpp_polltbl_hst_extend(&pvt->hst_rtp) < 0) {
337
0
                goto e0;
338
0
            }
339
13
        }
340
41.2k
    }
341
76.2k
    if (fd_rtcp != NULL) {
342
41.2k
        pthread_mutex_lock(&pvt->hst_rtcp.lock);
343
41.2k
        if (pvt->hst_rtcp.ulen == pvt->hst_rtcp.main.alen) {
344
17
            if (rtpp_polltbl_hst_extend(&pvt->hst_rtcp) < 0) {
345
0
                goto e1;
346
0
            }
347
17
        }
348
41.2k
    }
349
76.2k
    if (fd_rtp != NULL) {
350
41.2k
        rtpp_polltbl_hst_record(&pvt->hst_rtp, HST_DEL, rtp->stuid, NULL);
351
41.2k
        pthread_mutex_unlock(&pvt->hst_rtp.lock);
352
41.2k
    }
353
76.2k
    if (fd_rtcp != NULL) {
354
41.2k
        rtpp_polltbl_hst_record(&pvt->hst_rtcp, HST_DEL, rtcp->stuid, NULL);
355
41.2k
        pthread_mutex_unlock(&pvt->hst_rtcp.lock);
356
41.2k
    }
357
76.2k
    if (fd_rtp != NULL)
358
76.2k
        RTPP_OBJ_DECREF(fd_rtp);
359
76.2k
    if (fd_rtcp != NULL)
360
76.2k
        RTPP_OBJ_DECREF(fd_rtcp);
361
76.2k
    return;
362
0
e1:
363
0
    pthread_mutex_unlock(&pvt->hst_rtcp.lock);
364
0
e0:
365
0
    if (fd_rtp != NULL)
366
0
        pthread_mutex_unlock(&pvt->hst_rtp.lock);
367
0
}
368
369
void
370
rtpp_polltbl_free(struct rtpp_polltbl *ptbl)
371
4
{
372
4
    int i;
373
374
4
    if (ptbl->aloclen != 0) {
375
2
        for (i = 0; i < ptbl->curlen; i++) {
376
0
            int fd = CALL_SMETHOD(ptbl->mds[i].skt, getfd);
377
0
            rtpp_epoll_ctl(ptbl->epfd, EPOLL_CTL_DEL, fd, NULL);
378
0
            RTPP_OBJ_DECREF(ptbl->mds[i].skt);
379
0
        }
380
2
        free(ptbl->mds);
381
2
    }
382
4
    close(ptbl->wakefd[0]);
383
4
    close(ptbl->epfd);
384
4
}
385
386
static int
387
rtpp_sinfo_sync_polltbl(struct rtpp_sessinfo *sessinfo,
388
  struct rtpp_polltbl *ptbl, int pipe_type)
389
5.41k
{
390
5.41k
    struct rtpp_sessinfo_priv *pvt;
391
5.41k
    struct rtpp_polltbl_mdata *mds;
392
5.41k
    struct rtpp_polltbl_hst *hp;
393
5.41k
    struct rtpp_polltbl_hst_ent *clog;
394
5.41k
    int i, ulen;
395
396
5.41k
    PUB2PVT(sessinfo, pvt);
397
398
5.41k
    hp = (pipe_type == PIPE_RTP) ? &pvt->hst_rtp : &pvt->hst_rtcp;
399
400
5.41k
    pthread_mutex_lock(&hp->lock);
401
5.41k
    if (hp->ulen == 0) {
402
12
        pthread_mutex_unlock(&hp->lock);
403
12
        return (0);
404
12
    }
405
406
5.40k
    if (hp->ulen > ptbl->aloclen - ptbl->curlen) {
407
74
        int alen = hp->ulen + ptbl->curlen;
408
409
74
        mds = realloc(ptbl->mds, (alen * sizeof(struct rtpp_polltbl_mdata)));
410
74
        if (mds == NULL) {
411
0
            goto e0;
412
0
        }
413
74
        ptbl->mds = mds;
414
74
        ptbl->aloclen = alen;
415
74
    }
416
417
5.40k
    struct rtpp_polltbl_hst_part hpp = hp->main;
418
5.40k
    hp->main = hp->shadow;
419
5.40k
    hp->shadow = hpp;
420
5.40k
    clog = hpp.clog;
421
5.40k
    ulen = hp->ulen;
422
5.40k
    hp->ulen = 0;
423
5.40k
    ptbl->streams_wrt = hp->streams_wrt;
424
5.40k
    pthread_mutex_unlock(&hp->lock);
425
426
181k
    for (i = 0; i < ulen; i++) {
427
175k
        struct rtpp_polltbl_hst_ent *hep;
428
175k
        int session_index, movelen;
429
175k
        struct epoll_event event;
430
431
175k
        hep = clog + i;
432
175k
        switch (hep->op) {
433
82.3k
        case HST_ADD:
434
#ifdef RTPP_DEBUG
435
            assert(find_polltbl_idx(ptbl, hep->stuid) < 0);
436
#endif
437
82.3k
            session_index = ptbl->curlen;
438
82.3k
            event.events = EPOLLIN;
439
82.3k
            event.data.ptr = hep->skt;
440
82.3k
            rtpp_epoll_ctl(ptbl->epfd, EPOLL_CTL_ADD, CALL_SMETHOD(hep->skt, getfd), &event);
441
82.3k
            ptbl->mds[session_index].stuid = hep->stuid;
442
82.3k
            ptbl->mds[session_index].skt = hep->skt;
443
82.3k
            ptbl->curlen++;
444
82.3k
            break;
445
446
82.4k
        case HST_DEL:
447
82.4k
            session_index = find_polltbl_idx(ptbl, hep->stuid);
448
82.4k
            assert(session_index > -1);
449
82.4k
            rtpp_epoll_ctl(ptbl->epfd, EPOLL_CTL_DEL, CALL_SMETHOD(ptbl->mds[session_index].skt, getfd), NULL);
450
82.4k
            RTPP_OBJ_DECREF(ptbl->mds[session_index].skt);
451
82.4k
            movelen = (ptbl->curlen - session_index - 1);
452
82.4k
            if (movelen > 0) {
453
54.7k
                memmove(&ptbl->mds[session_index], &ptbl->mds[session_index + 1],
454
54.7k
                  movelen * sizeof(ptbl->mds[0]));
455
54.7k
            }
456
82.4k
            ptbl->curlen--;
457
82.4k
            break;
458
459
10.9k
        case HST_UPD:
460
10.9k
            session_index = find_polltbl_idx(ptbl, hep->stuid);
461
10.9k
            assert(session_index > -1);
462
10.9k
            rtpp_epoll_ctl(ptbl->epfd, EPOLL_CTL_DEL, CALL_SMETHOD(ptbl->mds[session_index].skt, getfd), NULL);
463
10.9k
            RTPP_OBJ_DECREF(ptbl->mds[session_index].skt);
464
10.9k
            event.events = EPOLLIN;
465
10.9k
            event.data.ptr = hep->skt;
466
10.9k
            rtpp_epoll_ctl(ptbl->epfd, EPOLL_CTL_ADD, CALL_SMETHOD(hep->skt, getfd), &event);
467
10.9k
            ptbl->mds[session_index].skt = hep->skt;
468
10.9k
            break;
469
175k
        }
470
175k
    }
471
5.41k
    ptbl->revision += ulen;
472
473
5.41k
    return (1);
474
0
e0:
475
0
    for (i = 0; i < hp->ulen; i++) {
476
0
        struct rtpp_polltbl_hst_ent *hep;
477
478
0
        hep = hp->main.clog + i;
479
0
        if (hep->skt != NULL) {
480
0
            RTPP_OBJ_DECREF(hep->skt);
481
0
        }
482
0
    }
483
0
    hp->ulen = 0;
484
0
    pthread_mutex_unlock(&hp->lock);
485
0
    return (-1);
486
5.40k
}