Coverage Report

Created: 2025-10-10 06:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rtpproxy/src/advanced/pproc_manager.c
Line
Count
Source
1
/*
2
 * Copyright (c) 2019 Sippy Software, Inc., http://www.sippysoft.com
3
 * All rights reserved.
4
 *
5
 * Redistribution and use in source and binary forms, with or without
6
 * modification, are permitted provided that the following conditions
7
 * are met:
8
 * 1. Redistributions of source code must retain the above copyright
9
 *    notice, this list of conditions and the following disclaimer.
10
 * 2. Redistributions in binary form must reproduce the above copyright
11
 *    notice, this list of conditions and the following disclaimer in the
12
 *    documentation and/or other materials provided with the distribution.
13
 *
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
 * SUCH DAMAGE.
25
 *
26
 */
27
28
#include <pthread.h>
29
#include <assert.h>
30
#include <stddef.h>
31
#include <stdint.h>
32
#include <stdlib.h>
33
#include <string.h>
34
35
#include "rtpp_types.h"
36
#include "rtpp_debug.h"
37
#include "rtpp_mallocs.h"
38
#include "rtpp_codeptr.h"
39
#include "rtpp_refcnt.h"
40
#include "rtp_packet.h"
41
#include "rtpp_stream.h"
42
#include "rtpp_pcount.h"
43
#include "rtpp_stats.h"
44
#include "rtpp_proc.h"
45
#include "rtpp_codeptr.h"
46
47
#include "advanced/pproc_manager.h"
48
#include "advanced/packet_processor.h"
49
50
struct pproc_handler {
51
    enum pproc_order order;
52
    struct packet_processor_if ppif;
53
};
54
55
struct pproc_handlers {
56
    struct rtpp_refcnt *rcnt;
57
    int nprocs;
58
    struct pproc_handler pproc[0];
59
};
60
61
struct pproc_manager_pvt {
62
    struct pproc_manager pub;
63
    pthread_mutex_t lock;
64
    struct rtpp_stats *rtpp_stats;
65
    int npkts_discard_idx;
66
    struct pproc_handlers *handlers;
67
};
68
69
static int rtpp_pproc_mgr_register(struct pproc_manager *, enum pproc_order, const struct packet_processor_if *);
70
static enum pproc_action rtpp_pproc_mgr_handle(struct pproc_manager *, struct pkt_proc_ctx *);
71
static struct pproc_act rtpp_pproc_mgr_handleat(struct pproc_manager *, struct pkt_proc_ctx *,
72
  enum pproc_order) __attribute__((always_inline));
73
static int rtpp_pproc_mgr_lookup(struct pproc_manager *, void *, struct packet_processor_if *);
74
static int rtpp_pproc_mgr_unregister(struct pproc_manager *, void *);
75
static struct pproc_manager *rtpp_pproc_mgr_clone(struct pproc_manager *);
76
void rtpp_pproc_mgr_reg_drop(struct pproc_manager *);
77
78
DEFINE_SMETHODS(pproc_manager,
79
    .reg = &rtpp_pproc_mgr_register,
80
    .handle = &rtpp_pproc_mgr_handle,
81
    .handleat = &rtpp_pproc_mgr_handleat,
82
    .lookup = &rtpp_pproc_mgr_lookup,
83
    .unreg = &rtpp_pproc_mgr_unregister,
84
    .clone = &rtpp_pproc_mgr_clone,
85
    .reg_drop = &rtpp_pproc_mgr_reg_drop,
86
);
87
88
static struct pproc_handlers *
89
pproc_handlers_alloc(int nprocs)
90
1.04M
{
91
1.04M
    struct pproc_handlers *hndlrs;
92
93
1.04M
    hndlrs = rtpp_rzmalloc(sizeof(*hndlrs) + sizeof(struct pproc_handler) * nprocs,
94
1.04M
      offsetof(struct pproc_handlers, rcnt));
95
1.04M
    if (hndlrs == NULL)
96
0
        return (NULL);
97
1.04M
    hndlrs->nprocs = nprocs;
98
1.04M
    return (hndlrs);
99
1.04M
}
100
101
static void
102
rtpp_pproc_mgr_dtor(struct pproc_manager_pvt *pvt)
103
239k
{
104
105
239k
    RTPP_OBJ_DECREF(pvt->handlers);
106
239k
}
107
108
struct pproc_manager *
109
pproc_manager_ctor(struct rtpp_stats *rtpp_stats, int nprocs)
110
239k
{
111
239k
    struct pproc_manager_pvt *pvt;
112
113
239k
    pvt = rtpp_rzmalloc(sizeof(*pvt), PVT_RCOFFS(pvt));
114
239k
    if (pvt == NULL)
115
0
        goto e0;
116
239k
    pvt->npkts_discard_idx = CALL_SMETHOD(rtpp_stats, getidxbyname, "npkts_discard");
117
239k
    if (pvt->npkts_discard_idx < 0)
118
0
        goto e1;
119
239k
    if (pthread_mutex_init(&pvt->lock, NULL) != 0)
120
0
        goto e1;
121
239k
    RTPP_OBJ_DTOR_ATTACH(&(pvt->pub), pthread_mutex_destroy, &pvt->lock);
122
239k
    pvt->handlers = pproc_handlers_alloc(nprocs);
123
239k
    if (pvt->handlers == NULL)
124
0
        goto e1;
125
239k
    RTPP_OBJ_BORROW(&(pvt->pub), rtpp_stats);
126
239k
    pvt->rtpp_stats = rtpp_stats;
127
239k
    PUBINST_FININIT(&pvt->pub, pvt, rtpp_pproc_mgr_dtor);
128
239k
    return (&(pvt->pub));
129
0
e1:
130
0
    RTPP_OBJ_DECREF(&(pvt->pub));
131
0
e0:
132
0
    return (NULL);
133
0
}
134
135
static int
136
rtpp_pproc_mgr_register(struct pproc_manager *pub, enum pproc_order pproc_order,
137
  const struct packet_processor_if *ip)
138
433k
{
139
433k
    int i;
140
433k
    struct pproc_manager_pvt *pvt;
141
433k
    struct pproc_handlers *newh;
142
143
433k
    PUB2PVT(pub, pvt);
144
433k
    pthread_mutex_lock(&pvt->lock);
145
146
433k
    newh = pproc_handlers_alloc(pvt->handlers->nprocs + 1);
147
433k
    if (newh == NULL) {
148
0
        pthread_mutex_unlock(&pvt->lock);
149
0
        return (-1);
150
0
    }
151
537k
    for (i = 0; i < pvt->handlers->nprocs; i++)
152
537k
        if (pvt->handlers->pproc[i].order > pproc_order)
153
433k
            break;
154
433k
    if (i > 0)
155
42.5k
        memcpy(&newh->pproc[0], &pvt->handlers->pproc[0],
156
42.5k
          sizeof(pvt->handlers->pproc[0]) * i);
157
433k
    if (i < pvt->handlers->nprocs)
158
433k
        memcpy(&newh->pproc[i + 1], &pvt->handlers->pproc[i],
159
433k
          sizeof(pvt->handlers->pproc[0]) * (pvt->handlers->nprocs - i));
160
433k
    newh->pproc[i].order = pproc_order;
161
433k
    newh->pproc[i].ppif = *ip;
162
2.01M
    for (int j = 0; j < newh->nprocs; j++) {
163
1.58M
        ip = &newh->pproc[j].ppif;
164
1.58M
        if (ip->rcnt != NULL)
165
97.6k
            RTPP_OBJ_BORROW(newh, ip);
166
1.58M
    }
167
433k
    RTPP_OBJ_DECREF(pvt->handlers);
168
433k
    pvt->handlers = newh;
169
433k
    pthread_mutex_unlock(&pvt->lock);
170
433k
    return (0);
171
433k
}
172
173
static enum pproc_action
174
rtpp_pproc_mgr_handle(struct pproc_manager *pub, struct pkt_proc_ctx *pktxp)
175
0
{
176
177
0
    return rtpp_pproc_mgr_handleat(pub, pktxp, _PPROC_ORD_EMPTY).a;
178
0
}
179
180
static struct pproc_act
181
rtpp_pproc_mgr_handleat(struct pproc_manager *pub, struct pkt_proc_ctx *pktxp,
182
  enum pproc_order startat)
183
434k
{
184
434k
    int i;
185
434k
    struct pproc_manager_pvt *pvt;
186
434k
    enum pproc_action res = PPROC_ACT_NOP_v;
187
434k
    struct pproc_act lastres = PPROC_ACT(res);
188
434k
    const struct pproc_handlers *handlers;
189
434k
    static __thread int max_recursion = 16;
190
191
434k
    PUB2PVT(pub, pvt);
192
434k
    pthread_mutex_lock(&pvt->lock);
193
434k
    handlers = pvt->handlers;
194
434k
    RTPP_OBJ_INCREF(handlers);
195
434k
    pthread_mutex_unlock(&pvt->lock);
196
197
434k
    RTPP_DBGCODE() {
198
0
        max_recursion--;
199
0
        assert(max_recursion > 0);
200
0
    }
201
202
1.21M
    for (i = 0; i < handlers->nprocs; i++) {
203
1.21M
        const struct packet_processor_if *ip = &handlers->pproc[i].ppif;
204
1.21M
        RTPP_DBG_ASSERT(handlers->pproc[i].order != _PPROC_ORD_EMPTY);
205
1.21M
        if (startat > _PPROC_ORD_EMPTY && handlers->pproc[i].order < startat)
206
0
            continue;
207
1.21M
        if (i > 0) {
208
            /* Clean after use */
209
781k
            pktxp->auxp = NULL;
210
781k
        }
211
1.21M
        pktxp->pproc = ip;
212
1.21M
        if (ip->taste != NULL && ip->taste(pktxp) == 0)
213
0
            continue;
214
1.21M
        lastres = ip->enqueue(pktxp);
215
1.21M
        res |= lastres.a;
216
1.21M
        if (res & (PPROC_ACT_TAKE_v | PPROC_ACT_DROP_v))
217
434k
            break;
218
1.21M
    }
219
434k
    RTPP_OBJ_DECREF(handlers);
220
434k
    if ((res & PPROC_ACT_TAKE_v) == 0 || (res & PPROC_ACT_DROP_v) != 0) {
221
39.3k
        RTPP_OBJ_DECREF(pktxp->pktp);
222
39.3k
        if ((pktxp->flags & PPROC_FLAG_LGEN) == 0) {
223
39.3k
            CALL_SMETHOD(pktxp->strmp_in->pcount, reg_drop, lastres.loc);
224
39.3k
            if (pktxp->rsp != NULL)
225
39.3k
                pktxp->rsp->npkts_discard.cnt++;
226
0
            else
227
39.3k
                CALL_SMETHOD(pvt->rtpp_stats, updatebyidx, pvt->npkts_discard_idx, 1);
228
39.3k
        }
229
39.3k
    }
230
434k
    lastres.a = res;
231
434k
    RTPP_DBGCODE() {
232
0
        max_recursion++;
233
0
    }
234
434k
    return (lastres);
235
434k
}
236
237
int
238
rtpp_pproc_mgr_lookup(struct pproc_manager *pub, void *key, struct packet_processor_if *rval)
239
89.7k
{
240
89.7k
    struct pproc_manager_pvt *pvt;
241
242
89.7k
    PUB2PVT(pub, pvt);
243
89.7k
    pthread_mutex_lock(&pvt->lock);
244
262k
    for (int i = 0; i < pvt->handlers->nprocs; i++) {
245
223k
        const struct packet_processor_if *ip = &pvt->handlers->pproc[i].ppif;
246
223k
        RTPP_DBG_ASSERT(pvt->handlers->pproc[i].order != _PPROC_ORD_EMPTY);
247
223k
        if (ip->key == key) {
248
51.0k
            if (ip->rcnt != NULL)
249
51.0k
                RTPP_OBJ_INCREF(ip);
250
51.0k
            *rval = *ip;
251
51.0k
            pthread_mutex_unlock(&pvt->lock);
252
51.0k
            return (1);
253
51.0k
        }
254
223k
    }
255
38.7k
    pthread_mutex_unlock(&pvt->lock);
256
38.7k
    return (0);
257
89.7k
}
258
259
static int
260
rtpp_pproc_mgr_unregister(struct pproc_manager *pub, void *key)
261
371k
{
262
371k
    int i;
263
371k
    struct pproc_manager_pvt *pvt;
264
371k
    struct pproc_handlers *newh, *oldh;
265
266
371k
    PUB2PVT(pub, pvt);
267
371k
    pthread_mutex_lock(&pvt->lock);
268
575k
    for (i = 0; i < pvt->handlers->nprocs; i++) {
269
575k
        const struct packet_processor_if *ip = &pvt->handlers->pproc[i].ppif;
270
575k
        RTPP_DBG_ASSERT(pvt->handlers->pproc[i].order != _PPROC_ORD_EMPTY);
271
575k
        if (ip->key != key)
272
203k
            continue;
273
371k
        newh = pproc_handlers_alloc(pvt->handlers->nprocs - 1);
274
371k
        if (newh == NULL) {
275
0
            pthread_mutex_unlock(&pvt->lock);
276
0
            return (-1);
277
0
        }
278
371k
        if (i > 0)
279
150k
            memcpy(&newh->pproc[0], &pvt->handlers->pproc[0],
280
150k
              sizeof(pvt->handlers->pproc[0]) * i);
281
371k
        if (i < pvt->handlers->nprocs - 1)
282
371k
            memcpy(&newh->pproc[i], &pvt->handlers->pproc[i + 1],
283
371k
              sizeof(pvt->handlers->pproc[0]) * (pvt->handlers->nprocs - i - 1));
284
1.37M
        for (int j = 0; j < newh->nprocs; j++) {
285
1.00M
            ip = &newh->pproc[j].ppif;
286
1.00M
            if (ip->rcnt != NULL)
287
121k
                RTPP_OBJ_BORROW(newh, ip);
288
1.00M
        }
289
371k
        oldh = pvt->handlers;
290
371k
        pvt->handlers = newh;
291
371k
        pthread_mutex_unlock(&pvt->lock);
292
        /* DECREF might call a destructor chain, so it should be done out */
293
        /* of the locked area! */
294
371k
        RTPP_OBJ_DECREF(oldh);
295
371k
        return (0);
296
371k
    }
297
0
    abort();
298
371k
}
299
300
static struct pproc_manager *
301
rtpp_pproc_mgr_clone(struct pproc_manager *pub)
302
239k
{
303
239k
    struct pproc_manager *rval;
304
239k
    struct pproc_manager_pvt *pvt, *pvt_new;
305
239k
    int i;
306
307
239k
    PUB2PVT(pub, pvt);
308
239k
    pthread_mutex_lock(&pvt->lock);
309
239k
    rval = pproc_manager_ctor(pvt->rtpp_stats, pvt->handlers->nprocs);
310
239k
    if (rval == NULL) {
311
0
        pthread_mutex_unlock(&pvt->lock);
312
0
        return (NULL);
313
0
    }
314
239k
    PUB2PVT(rval, pvt_new);
315
239k
    memcpy(pvt_new->handlers->pproc, pvt->handlers->pproc,
316
239k
      sizeof(pvt->handlers->pproc[0]) * pvt->handlers->nprocs);
317
717k
    for (i = 0; i < pvt->handlers->nprocs; i++) {
318
478k
        const struct packet_processor_if *ip = &pvt_new->handlers->pproc[i].ppif;
319
478k
        RTPP_DBG_ASSERT(pvt->handlers->pproc[i].order != _PPROC_ORD_EMPTY);
320
478k
        if (ip->rcnt != NULL)
321
0
            RTPP_OBJ_BORROW(pvt->handlers, ip);
322
478k
    }
323
239k
    pthread_mutex_unlock(&pvt->lock);
324
239k
    return (rval);
325
239k
}
326
327
void
328
rtpp_pproc_mgr_reg_drop(struct pproc_manager *pub)
329
269k
{
330
269k
    struct pproc_manager_pvt *pvt;
331
332
269k
    PUB2PVT(pub, pvt);
333
269k
    CALL_SMETHOD(pvt->rtpp_stats, updatebyidx, pvt->npkts_discard_idx, 1);
334
269k
}