/src/rtpproxy/src/advanced/pproc_manager.c
Line | Count | Source |
1 | | /* |
2 | | * Copyright (c) 2019 Sippy Software, Inc., http://www.sippysoft.com |
3 | | * All rights reserved. |
4 | | * |
5 | | * Redistribution and use in source and binary forms, with or without |
6 | | * modification, are permitted provided that the following conditions |
7 | | * are met: |
8 | | * 1. Redistributions of source code must retain the above copyright |
9 | | * notice, this list of conditions and the following disclaimer. |
10 | | * 2. Redistributions in binary form must reproduce the above copyright |
11 | | * notice, this list of conditions and the following disclaimer in the |
12 | | * documentation and/or other materials provided with the distribution. |
13 | | * |
14 | | * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND |
15 | | * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
16 | | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
17 | | * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE |
18 | | * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
19 | | * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS |
20 | | * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) |
21 | | * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
22 | | * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY |
23 | | * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF |
24 | | * SUCH DAMAGE. |
25 | | * |
26 | | */ |
27 | | |
28 | | #include <pthread.h> |
29 | | #include <assert.h> |
30 | | #include <stddef.h> |
31 | | #include <stdint.h> |
32 | | #include <stdlib.h> |
33 | | #include <string.h> |
34 | | |
35 | | #include "rtpp_types.h" |
36 | | #include "rtpp_debug.h" |
37 | | #include "rtpp_mallocs.h" |
38 | | #include "rtpp_codeptr.h" |
39 | | #include "rtpp_refcnt.h" |
40 | | #include "rtp_packet.h" |
41 | | #include "rtpp_stream.h" |
42 | | #include "rtpp_pcount.h" |
43 | | #include "rtpp_stats.h" |
44 | | #include "rtpp_proc.h" |
45 | | #include "rtpp_codeptr.h" |
46 | | |
47 | | #include "advanced/pproc_manager.h" |
48 | | #include "advanced/packet_processor.h" |
49 | | |
50 | | struct pproc_handler { |
51 | | enum pproc_order order; |
52 | | struct packet_processor_if ppif; |
53 | | }; |
54 | | |
55 | | struct pproc_handlers { |
56 | | struct rtpp_refcnt *rcnt; |
57 | | int nprocs; |
58 | | struct pproc_handler pproc[0]; |
59 | | }; |
60 | | |
61 | | struct pproc_manager_pvt { |
62 | | struct pproc_manager pub; |
63 | | pthread_mutex_t lock; |
64 | | struct rtpp_stats *rtpp_stats; |
65 | | int npkts_discard_idx; |
66 | | struct pproc_handlers *handlers; |
67 | | }; |
68 | | |
69 | | static int rtpp_pproc_mgr_register(struct pproc_manager *, enum pproc_order, const struct packet_processor_if *); |
70 | | static enum pproc_action rtpp_pproc_mgr_handle(struct pproc_manager *, struct pkt_proc_ctx *); |
71 | | static struct pproc_act rtpp_pproc_mgr_handleat(struct pproc_manager *, struct pkt_proc_ctx *, |
72 | | enum pproc_order) __attribute__((always_inline)); |
73 | | static int rtpp_pproc_mgr_lookup(struct pproc_manager *, void *, struct packet_processor_if *); |
74 | | static int rtpp_pproc_mgr_unregister(struct pproc_manager *, void *); |
75 | | static struct pproc_manager *rtpp_pproc_mgr_clone(struct pproc_manager *); |
76 | | void rtpp_pproc_mgr_reg_drop(struct pproc_manager *); |
77 | | |
78 | | DEFINE_SMETHODS(pproc_manager, |
79 | | .reg = &rtpp_pproc_mgr_register, |
80 | | .handle = &rtpp_pproc_mgr_handle, |
81 | | .handleat = &rtpp_pproc_mgr_handleat, |
82 | | .lookup = &rtpp_pproc_mgr_lookup, |
83 | | .unreg = &rtpp_pproc_mgr_unregister, |
84 | | .clone = &rtpp_pproc_mgr_clone, |
85 | | .reg_drop = &rtpp_pproc_mgr_reg_drop, |
86 | | ); |
87 | | |
88 | | static struct pproc_handlers * |
89 | | pproc_handlers_alloc(int nprocs) |
90 | 1.04M | { |
91 | 1.04M | struct pproc_handlers *hndlrs; |
92 | | |
93 | 1.04M | hndlrs = rtpp_rzmalloc(sizeof(*hndlrs) + sizeof(struct pproc_handler) * nprocs, |
94 | 1.04M | offsetof(struct pproc_handlers, rcnt)); |
95 | 1.04M | if (hndlrs == NULL) |
96 | 0 | return (NULL); |
97 | 1.04M | hndlrs->nprocs = nprocs; |
98 | 1.04M | return (hndlrs); |
99 | 1.04M | } |
100 | | |
101 | | static void |
102 | | rtpp_pproc_mgr_dtor(struct pproc_manager_pvt *pvt) |
103 | 239k | { |
104 | | |
105 | 239k | RTPP_OBJ_DECREF(pvt->handlers); |
106 | 239k | } |
107 | | |
108 | | struct pproc_manager * |
109 | | pproc_manager_ctor(struct rtpp_stats *rtpp_stats, int nprocs) |
110 | 239k | { |
111 | 239k | struct pproc_manager_pvt *pvt; |
112 | | |
113 | 239k | pvt = rtpp_rzmalloc(sizeof(*pvt), PVT_RCOFFS(pvt)); |
114 | 239k | if (pvt == NULL) |
115 | 0 | goto e0; |
116 | 239k | pvt->npkts_discard_idx = CALL_SMETHOD(rtpp_stats, getidxbyname, "npkts_discard"); |
117 | 239k | if (pvt->npkts_discard_idx < 0) |
118 | 0 | goto e1; |
119 | 239k | if (pthread_mutex_init(&pvt->lock, NULL) != 0) |
120 | 0 | goto e1; |
121 | 239k | RTPP_OBJ_DTOR_ATTACH(&(pvt->pub), pthread_mutex_destroy, &pvt->lock); |
122 | 239k | pvt->handlers = pproc_handlers_alloc(nprocs); |
123 | 239k | if (pvt->handlers == NULL) |
124 | 0 | goto e1; |
125 | 239k | RTPP_OBJ_BORROW(&(pvt->pub), rtpp_stats); |
126 | 239k | pvt->rtpp_stats = rtpp_stats; |
127 | 239k | PUBINST_FININIT(&pvt->pub, pvt, rtpp_pproc_mgr_dtor); |
128 | 239k | return (&(pvt->pub)); |
129 | 0 | e1: |
130 | 0 | RTPP_OBJ_DECREF(&(pvt->pub)); |
131 | 0 | e0: |
132 | 0 | return (NULL); |
133 | 0 | } |
134 | | |
135 | | static int |
136 | | rtpp_pproc_mgr_register(struct pproc_manager *pub, enum pproc_order pproc_order, |
137 | | const struct packet_processor_if *ip) |
138 | 433k | { |
139 | 433k | int i; |
140 | 433k | struct pproc_manager_pvt *pvt; |
141 | 433k | struct pproc_handlers *newh; |
142 | | |
143 | 433k | PUB2PVT(pub, pvt); |
144 | 433k | pthread_mutex_lock(&pvt->lock); |
145 | | |
146 | 433k | newh = pproc_handlers_alloc(pvt->handlers->nprocs + 1); |
147 | 433k | if (newh == NULL) { |
148 | 0 | pthread_mutex_unlock(&pvt->lock); |
149 | 0 | return (-1); |
150 | 0 | } |
151 | 537k | for (i = 0; i < pvt->handlers->nprocs; i++) |
152 | 537k | if (pvt->handlers->pproc[i].order > pproc_order) |
153 | 433k | break; |
154 | 433k | if (i > 0) |
155 | 42.5k | memcpy(&newh->pproc[0], &pvt->handlers->pproc[0], |
156 | 42.5k | sizeof(pvt->handlers->pproc[0]) * i); |
157 | 433k | if (i < pvt->handlers->nprocs) |
158 | 433k | memcpy(&newh->pproc[i + 1], &pvt->handlers->pproc[i], |
159 | 433k | sizeof(pvt->handlers->pproc[0]) * (pvt->handlers->nprocs - i)); |
160 | 433k | newh->pproc[i].order = pproc_order; |
161 | 433k | newh->pproc[i].ppif = *ip; |
162 | 2.01M | for (int j = 0; j < newh->nprocs; j++) { |
163 | 1.58M | ip = &newh->pproc[j].ppif; |
164 | 1.58M | if (ip->rcnt != NULL) |
165 | 97.6k | RTPP_OBJ_BORROW(newh, ip); |
166 | 1.58M | } |
167 | 433k | RTPP_OBJ_DECREF(pvt->handlers); |
168 | 433k | pvt->handlers = newh; |
169 | 433k | pthread_mutex_unlock(&pvt->lock); |
170 | 433k | return (0); |
171 | 433k | } |
172 | | |
173 | | static enum pproc_action |
174 | | rtpp_pproc_mgr_handle(struct pproc_manager *pub, struct pkt_proc_ctx *pktxp) |
175 | 0 | { |
176 | |
|
177 | 0 | return rtpp_pproc_mgr_handleat(pub, pktxp, _PPROC_ORD_EMPTY).a; |
178 | 0 | } |
179 | | |
180 | | static struct pproc_act |
181 | | rtpp_pproc_mgr_handleat(struct pproc_manager *pub, struct pkt_proc_ctx *pktxp, |
182 | | enum pproc_order startat) |
183 | 434k | { |
184 | 434k | int i; |
185 | 434k | struct pproc_manager_pvt *pvt; |
186 | 434k | enum pproc_action res = PPROC_ACT_NOP_v; |
187 | 434k | struct pproc_act lastres = PPROC_ACT(res); |
188 | 434k | const struct pproc_handlers *handlers; |
189 | 434k | static __thread int max_recursion = 16; |
190 | | |
191 | 434k | PUB2PVT(pub, pvt); |
192 | 434k | pthread_mutex_lock(&pvt->lock); |
193 | 434k | handlers = pvt->handlers; |
194 | 434k | RTPP_OBJ_INCREF(handlers); |
195 | 434k | pthread_mutex_unlock(&pvt->lock); |
196 | | |
197 | 434k | RTPP_DBGCODE() { |
198 | 0 | max_recursion--; |
199 | 0 | assert(max_recursion > 0); |
200 | 0 | } |
201 | | |
202 | 1.21M | for (i = 0; i < handlers->nprocs; i++) { |
203 | 1.21M | const struct packet_processor_if *ip = &handlers->pproc[i].ppif; |
204 | 1.21M | RTPP_DBG_ASSERT(handlers->pproc[i].order != _PPROC_ORD_EMPTY); |
205 | 1.21M | if (startat > _PPROC_ORD_EMPTY && handlers->pproc[i].order < startat) |
206 | 0 | continue; |
207 | 1.21M | if (i > 0) { |
208 | | /* Clean after use */ |
209 | 781k | pktxp->auxp = NULL; |
210 | 781k | } |
211 | 1.21M | pktxp->pproc = ip; |
212 | 1.21M | if (ip->taste != NULL && ip->taste(pktxp) == 0) |
213 | 0 | continue; |
214 | 1.21M | lastres = ip->enqueue(pktxp); |
215 | 1.21M | res |= lastres.a; |
216 | 1.21M | if (res & (PPROC_ACT_TAKE_v | PPROC_ACT_DROP_v)) |
217 | 434k | break; |
218 | 1.21M | } |
219 | 434k | RTPP_OBJ_DECREF(handlers); |
220 | 434k | if ((res & PPROC_ACT_TAKE_v) == 0 || (res & PPROC_ACT_DROP_v) != 0) { |
221 | 39.3k | RTPP_OBJ_DECREF(pktxp->pktp); |
222 | 39.3k | if ((pktxp->flags & PPROC_FLAG_LGEN) == 0) { |
223 | 39.3k | CALL_SMETHOD(pktxp->strmp_in->pcount, reg_drop, lastres.loc); |
224 | 39.3k | if (pktxp->rsp != NULL) |
225 | 39.3k | pktxp->rsp->npkts_discard.cnt++; |
226 | 0 | else |
227 | 39.3k | CALL_SMETHOD(pvt->rtpp_stats, updatebyidx, pvt->npkts_discard_idx, 1); |
228 | 39.3k | } |
229 | 39.3k | } |
230 | 434k | lastres.a = res; |
231 | 434k | RTPP_DBGCODE() { |
232 | 0 | max_recursion++; |
233 | 0 | } |
234 | 434k | return (lastres); |
235 | 434k | } |
236 | | |
237 | | int |
238 | | rtpp_pproc_mgr_lookup(struct pproc_manager *pub, void *key, struct packet_processor_if *rval) |
239 | 89.7k | { |
240 | 89.7k | struct pproc_manager_pvt *pvt; |
241 | | |
242 | 89.7k | PUB2PVT(pub, pvt); |
243 | 89.7k | pthread_mutex_lock(&pvt->lock); |
244 | 262k | for (int i = 0; i < pvt->handlers->nprocs; i++) { |
245 | 223k | const struct packet_processor_if *ip = &pvt->handlers->pproc[i].ppif; |
246 | 223k | RTPP_DBG_ASSERT(pvt->handlers->pproc[i].order != _PPROC_ORD_EMPTY); |
247 | 223k | if (ip->key == key) { |
248 | 51.0k | if (ip->rcnt != NULL) |
249 | 51.0k | RTPP_OBJ_INCREF(ip); |
250 | 51.0k | *rval = *ip; |
251 | 51.0k | pthread_mutex_unlock(&pvt->lock); |
252 | 51.0k | return (1); |
253 | 51.0k | } |
254 | 223k | } |
255 | 38.7k | pthread_mutex_unlock(&pvt->lock); |
256 | 38.7k | return (0); |
257 | 89.7k | } |
258 | | |
259 | | static int |
260 | | rtpp_pproc_mgr_unregister(struct pproc_manager *pub, void *key) |
261 | 371k | { |
262 | 371k | int i; |
263 | 371k | struct pproc_manager_pvt *pvt; |
264 | 371k | struct pproc_handlers *newh, *oldh; |
265 | | |
266 | 371k | PUB2PVT(pub, pvt); |
267 | 371k | pthread_mutex_lock(&pvt->lock); |
268 | 575k | for (i = 0; i < pvt->handlers->nprocs; i++) { |
269 | 575k | const struct packet_processor_if *ip = &pvt->handlers->pproc[i].ppif; |
270 | 575k | RTPP_DBG_ASSERT(pvt->handlers->pproc[i].order != _PPROC_ORD_EMPTY); |
271 | 575k | if (ip->key != key) |
272 | 203k | continue; |
273 | 371k | newh = pproc_handlers_alloc(pvt->handlers->nprocs - 1); |
274 | 371k | if (newh == NULL) { |
275 | 0 | pthread_mutex_unlock(&pvt->lock); |
276 | 0 | return (-1); |
277 | 0 | } |
278 | 371k | if (i > 0) |
279 | 150k | memcpy(&newh->pproc[0], &pvt->handlers->pproc[0], |
280 | 150k | sizeof(pvt->handlers->pproc[0]) * i); |
281 | 371k | if (i < pvt->handlers->nprocs - 1) |
282 | 371k | memcpy(&newh->pproc[i], &pvt->handlers->pproc[i + 1], |
283 | 371k | sizeof(pvt->handlers->pproc[0]) * (pvt->handlers->nprocs - i - 1)); |
284 | 1.37M | for (int j = 0; j < newh->nprocs; j++) { |
285 | 1.00M | ip = &newh->pproc[j].ppif; |
286 | 1.00M | if (ip->rcnt != NULL) |
287 | 121k | RTPP_OBJ_BORROW(newh, ip); |
288 | 1.00M | } |
289 | 371k | oldh = pvt->handlers; |
290 | 371k | pvt->handlers = newh; |
291 | 371k | pthread_mutex_unlock(&pvt->lock); |
292 | | /* DECREF might call a destructor chain, so it should be done out */ |
293 | | /* of the locked area! */ |
294 | 371k | RTPP_OBJ_DECREF(oldh); |
295 | 371k | return (0); |
296 | 371k | } |
297 | 0 | abort(); |
298 | 371k | } |
299 | | |
300 | | static struct pproc_manager * |
301 | | rtpp_pproc_mgr_clone(struct pproc_manager *pub) |
302 | 239k | { |
303 | 239k | struct pproc_manager *rval; |
304 | 239k | struct pproc_manager_pvt *pvt, *pvt_new; |
305 | 239k | int i; |
306 | | |
307 | 239k | PUB2PVT(pub, pvt); |
308 | 239k | pthread_mutex_lock(&pvt->lock); |
309 | 239k | rval = pproc_manager_ctor(pvt->rtpp_stats, pvt->handlers->nprocs); |
310 | 239k | if (rval == NULL) { |
311 | 0 | pthread_mutex_unlock(&pvt->lock); |
312 | 0 | return (NULL); |
313 | 0 | } |
314 | 239k | PUB2PVT(rval, pvt_new); |
315 | 239k | memcpy(pvt_new->handlers->pproc, pvt->handlers->pproc, |
316 | 239k | sizeof(pvt->handlers->pproc[0]) * pvt->handlers->nprocs); |
317 | 717k | for (i = 0; i < pvt->handlers->nprocs; i++) { |
318 | 478k | const struct packet_processor_if *ip = &pvt_new->handlers->pproc[i].ppif; |
319 | 478k | RTPP_DBG_ASSERT(pvt->handlers->pproc[i].order != _PPROC_ORD_EMPTY); |
320 | 478k | if (ip->rcnt != NULL) |
321 | 0 | RTPP_OBJ_BORROW(pvt->handlers, ip); |
322 | 478k | } |
323 | 239k | pthread_mutex_unlock(&pvt->lock); |
324 | 239k | return (rval); |
325 | 239k | } |
326 | | |
327 | | void |
328 | | rtpp_pproc_mgr_reg_drop(struct pproc_manager *pub) |
329 | 269k | { |
330 | 269k | struct pproc_manager_pvt *pvt; |
331 | | |
332 | 269k | PUB2PVT(pub, pvt); |
333 | 269k | CALL_SMETHOD(pvt->rtpp_stats, updatebyidx, pvt->npkts_discard_idx, 1); |
334 | 269k | } |