/src/rtpproxy/src/rtpp_proc_async.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2014 Sippy Software, Inc., http://www.sippysoft.com |
3 | | * All rights reserved. |
4 | | * |
5 | | * Redistribution and use in source and binary forms, with or without |
6 | | * modification, are permitted provided that the following conditions |
7 | | * are met: |
8 | | * 1. Redistributions of source code must retain the above copyright |
9 | | * notice, this list of conditions and the following disclaimer. |
10 | | * 2. Redistributions in binary form must reproduce the above copyright |
11 | | * notice, this list of conditions and the following disclaimer in the |
12 | | * documentation and/or other materials provided with the distribution. |
13 | | * |
14 | | * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND |
15 | | * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
16 | | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
17 | | * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE |
18 | | * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
19 | | * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS |
20 | | * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) |
21 | | * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
22 | | * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY |
23 | | * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF |
24 | | * SUCH DAMAGE. |
25 | | * |
26 | | */ |
27 | | |
28 | | #if defined(LINUX_XXX) && !defined(_GNU_SOURCE) |
29 | | #define _GNU_SOURCE /* pthread_setname_np() */ |
30 | | #endif |
31 | | |
32 | | #include <sys/types.h> |
33 | | #include <sys/socket.h> |
34 | | #include <netinet/in.h> |
35 | | #include <assert.h> |
36 | | #include <errno.h> |
37 | | #include <pthread.h> |
38 | | #include <stdio.h> |
39 | | #include <stdatomic.h> |
40 | | #include <stddef.h> |
41 | | #include <string.h> |
42 | | #include <stdlib.h> |
43 | | #include <unistd.h> |
44 | | |
45 | | #include "config.h" |
46 | | |
47 | | #include "rtpp_log.h" |
48 | | #include "rtpp_cfg.h" |
49 | | #include "rtpp_defines.h" |
50 | | #include "rtpp_types.h" |
51 | | #include "rtpp_weakref.h" |
52 | | #include "rtpp_log_obj.h" |
53 | | #include "rtpp_command_async.h" |
54 | | #include "rtpp_debug.h" |
55 | | #include "rtpp_netio_async.h" |
56 | | #include "rtpp_proc.h" |
57 | | #include "rtpp_proc_async.h" |
58 | | #include "rtpp_proc_wakeup.h" |
59 | | #include "rtpp_mallocs.h" |
60 | | #include "rtpp_sessinfo.h" |
61 | | #include "rtpp_stats.h" |
62 | | #include "rtpp_time.h" |
63 | | #include "rtpp_pipe.h" |
64 | | #include "rtpp_epoll.h" |
65 | | #include "rtpp_refcnt.h" |
66 | | #include "rtpp_debug.h" |
67 | | #include "rtpp_stream.h" |
68 | | #include "rtpp_record.h" |
69 | | #include "rtpp_pcount.h" |
70 | | #include "rtp.h" |
71 | | #include "rtp_packet.h" |
72 | | #include "rtpp_ttl.h" |
73 | | #include "rtpp_threads.h" |
74 | | #include "advanced/pproc_manager.h" |
75 | | #include "advanced/packet_processor.h" |
76 | | |
77 | | struct rtpp_proc_async_cf; |
78 | | |
79 | | struct rtpp_proc_thread_cf { |
80 | | pthread_t thread_id; |
81 | | atomic_int tstate; |
82 | | int pipe_type; |
83 | | struct rtpp_polltbl ptbl; |
84 | | const struct rtpp_proc_async_cf *proc_cf; |
85 | | struct rtpp_proc_rstats rstats; |
86 | | struct epoll_event *events; |
87 | | int events_alloc; |
88 | | }; |
89 | | |
90 | | struct rtpp_proc_async_cf { |
91 | | struct rtpp_proc_async pub; |
92 | | const struct rtpp_cfg *cf_save; |
93 | | struct rtpp_proc_thread_cf rtp_thread; |
94 | | struct rtpp_proc_thread_cf rtcp_thread; |
95 | | struct rtpp_proc_wakeup *wakeup_cf; |
96 | | int npkts_relayed_idx; |
97 | | }; |
98 | | |
99 | | static void rtpp_proc_async_dtor(struct rtpp_proc_async *); |
100 | | static int rtpp_proc_async_nudge(struct rtpp_proc_async *); |
101 | | |
102 | | static void |
103 | | flush_rstats(struct rtpp_stats *sobj, struct rtpp_proc_rstats *rsp) |
104 | 0 | { |
105 | |
|
106 | 0 | FLUSH_STAT(sobj, rsp->npkts_rcvd); |
107 | 0 | FLUSH_STAT(sobj, rsp->npkts_relayed); |
108 | 0 | FLUSH_STAT(sobj, rsp->npkts_resizer_in); |
109 | 0 | FLUSH_STAT(sobj, rsp->npkts_resizer_out); |
110 | 0 | FLUSH_STAT(sobj, rsp->npkts_resizer_discard); |
111 | 0 | FLUSH_STAT(sobj, rsp->npkts_discard); |
112 | 0 | } |
113 | | |
114 | | static void |
115 | | init_rstats(struct rtpp_stats *sobj, struct rtpp_proc_rstats *rsp) |
116 | 4 | { |
117 | | |
118 | 4 | rsp->npkts_rcvd.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "npkts_rcvd"); |
119 | 4 | rsp->npkts_relayed.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "npkts_relayed"); |
120 | 4 | rsp->npkts_resizer_in.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "npkts_resizer_in"); |
121 | 4 | rsp->npkts_resizer_out.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "npkts_resizer_out"); |
122 | 4 | rsp->npkts_resizer_discard.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "npkts_resizer_discard"); |
123 | 4 | rsp->npkts_discard.cnt_idx = CALL_SMETHOD(sobj, getidxbyname, "npkts_discard"); |
124 | 4 | } |
125 | | |
126 | | static void |
127 | | rtpp_proc_async_run(void *arg) |
128 | 4 | { |
129 | 4 | const struct rtpp_cfg *cfsp; |
130 | 4 | int ndrain; |
131 | 4 | int nready; |
132 | 4 | struct rtpp_proc_thread_cf *tcp; |
133 | 4 | const struct rtpp_proc_async_cf *proc_cf; |
134 | 4 | long long last_ctick; |
135 | 4 | struct sthread_args *sender; |
136 | 4 | struct rtpp_proc_rstats *rstats; |
137 | 4 | struct rtpp_stats *stats_cf; |
138 | 4 | int tstate; |
139 | 4 | struct rtpp_timestamp rtime; |
140 | | |
141 | 4 | tcp = (struct rtpp_proc_thread_cf *)arg; |
142 | 4 | proc_cf = tcp->proc_cf; |
143 | 4 | cfsp = proc_cf->cf_save; |
144 | 4 | stats_cf = cfsp->rtpp_stats; |
145 | 4 | rstats = &tcp->rstats; |
146 | | |
147 | 4 | memset(&rtime, '\0', sizeof(rtime)); |
148 | | |
149 | 4 | RTPP_DBGCODE(netio) { |
150 | 0 | last_ctick = 0; |
151 | 0 | } |
152 | | |
153 | 4 | for (;;) { |
154 | 4 | tstate = atomic_load(&tcp->tstate); |
155 | 4 | CALL_SMETHOD(cfsp->sessinfo, sync_polltbl, &tcp->ptbl, tcp->pipe_type); |
156 | 4 | if (tstate == TSTATE_CEASE) { |
157 | 0 | break; |
158 | 0 | } |
159 | | |
160 | 4 | ndrain = 1; |
161 | | |
162 | 4 | nready = 0; |
163 | 4 | RTPP_DBGCODE(netio > 1) { |
164 | 0 | RTPP_LOG(cfsp->glog, RTPP_LOG_DBUG, "run %lld " \ |
165 | 0 | "polling for %d %s file descriptors", \ |
166 | 0 | last_ctick, tcp->ptbl.curlen, PP_NAME(tcp->pipe_type)); |
167 | 0 | } |
168 | 4 | nready = rtpp_epoll_wait(tcp->ptbl.epfd, tcp->events, tcp->events_alloc, -1); |
169 | 4 | RTPP_DBGCODE(netio) { |
170 | 0 | RTPP_DBGCODE(netio > 1 || nready > 0) { |
171 | 0 | RTPP_LOG(cfsp->glog, RTPP_LOG_DBUG, "run %lld " \ |
172 | 0 | "polling for %d %s file descriptors: %d descriptors are ready", \ |
173 | 0 | last_ctick, tcp->ptbl.curlen, PP_NAME(tcp->pipe_type), nready); |
174 | 0 | } |
175 | 0 | } |
176 | 4 | if (nready < 0 && errno == EINTR) { |
177 | 0 | continue; |
178 | 0 | } |
179 | 4 | if (nready == 0) |
180 | 0 | goto next; |
181 | | |
182 | 4 | rtpp_timestamp_get(&rtime); |
183 | 4 | RTPP_DBG_ASSERT(rtime.wall > 0 && rtime.mono > 0); |
184 | | |
185 | 4 | sender = rtpp_anetio_pick_sender(proc_cf->pub.netio); |
186 | 4 | process_rtp_only(cfsp, &tcp->ptbl, &rtime, ndrain, sender, rstats, |
187 | 4 | tcp->events, nready); |
188 | | |
189 | 4 | rtpp_anetio_pump_q(sender); |
190 | 4 | flush_rstats(stats_cf, rstats); |
191 | | |
192 | 4 | if (nready == tcp->events_alloc) { |
193 | 0 | struct epoll_event *tep; |
194 | |
|
195 | 0 | tep = realloc(tcp->events, sizeof(tcp->events[0]) * tcp->events_alloc * 2); |
196 | 0 | if (tep != NULL) { |
197 | 0 | tcp->events = tep; |
198 | 0 | tcp->events_alloc *= 2; |
199 | 0 | } |
200 | 0 | } |
201 | | |
202 | 4 | next: |
203 | 0 | RTPP_DBGCODE(netio) { |
204 | 0 | last_ctick++; |
205 | 0 | } |
206 | 0 | } |
207 | 0 | rtpp_polltbl_free(&tcp->ptbl); |
208 | 0 | } |
209 | | |
210 | | void |
211 | | rtpp_proc_async_setprocname(pthread_t thread_id, const char *pname) |
212 | 6 | { |
213 | | #if HAVE_PTHREAD_SETNAME_NP |
214 | | const char ppr[] = "rtpp_proc: "; |
215 | | char *ptrname = alloca(sizeof(ppr) + strlen(pname)); |
216 | | if (ptrname != NULL) { |
217 | | sprintf(ptrname, "%s%s", ppr, pname); |
218 | | (void)pthread_setname_np(thread_id, ptrname); |
219 | | } |
220 | | #endif |
221 | 6 | } |
222 | | |
223 | | static int |
224 | | rtpp_proc_async_thread_init(const struct rtpp_cfg *cfsp, const struct rtpp_proc_async_cf *proc_cf, |
225 | | struct rtpp_proc_thread_cf *tcp, int pipe_type) |
226 | 4 | { |
227 | 4 | struct epoll_event epevent; |
228 | | |
229 | 4 | tcp->ptbl.epfd = rtpp_epoll_create(); |
230 | 4 | if (tcp->ptbl.epfd < 0) |
231 | 0 | goto e0; |
232 | 4 | if (socketpair(PF_LOCAL, SOCK_STREAM, 0, tcp->ptbl.wakefd) != 0) |
233 | 0 | goto e1; |
234 | 4 | epevent.events = EPOLLIN; |
235 | 4 | epevent.data.ptr = NULL; |
236 | 4 | if (rtpp_epoll_ctl(tcp->ptbl.epfd, EPOLL_CTL_ADD, tcp->ptbl.wakefd[0], &epevent) != 0) |
237 | 0 | goto e2; |
238 | | |
239 | 4 | tcp->proc_cf = proc_cf; |
240 | 4 | tcp->pipe_type = pipe_type; |
241 | | |
242 | 4 | init_rstats(cfsp->rtpp_stats, &tcp->rstats); |
243 | | |
244 | 4 | tcp->events_alloc = 16; |
245 | 4 | tcp->events = rtpp_zmalloc(sizeof(tcp->events[0]) * tcp->events_alloc); |
246 | 4 | if (tcp->events == NULL) |
247 | 0 | goto e2; |
248 | | |
249 | 4 | if (pthread_create(&tcp->thread_id, NULL, (void *(*)(void *))&rtpp_proc_async_run, tcp) != 0) { |
250 | 0 | goto e3; |
251 | 0 | } |
252 | 4 | rtpp_proc_async_setprocname(tcp->thread_id, PP_NAME(pipe_type)); |
253 | 4 | return (0); |
254 | | |
255 | 0 | e3: |
256 | 0 | free(tcp->events); |
257 | 0 | e2: |
258 | 0 | close(tcp->ptbl.wakefd[0]); |
259 | 0 | close(tcp->ptbl.wakefd[1]); |
260 | 0 | e1: |
261 | 0 | close(tcp->ptbl.epfd); |
262 | 0 | e0: |
263 | 0 | return (-1); |
264 | 0 | } |
265 | | |
266 | | static void |
267 | | rtpp_proc_async_thread_destroy(struct rtpp_proc_thread_cf *tcp) |
268 | 0 | { |
269 | 0 | int tstate = atomic_load(&tcp->tstate); |
270 | |
|
271 | 0 | assert(tstate == TSTATE_RUN); |
272 | 0 | close(tcp->ptbl.wakefd[1]); |
273 | 0 | atomic_store(&tcp->tstate, TSTATE_CEASE); |
274 | 0 | pthread_join(tcp->thread_id, NULL); |
275 | 0 | free(tcp->events); |
276 | 0 | } |
277 | | |
278 | | static enum pproc_action |
279 | | relay_packet(const struct pkt_proc_ctx *pktxp) |
280 | 0 | { |
281 | 0 | struct rtpp_stream *stp_out = pktxp->strmp_out; |
282 | 0 | struct rtpp_stream *stp_in = pktxp->strmp_in; |
283 | 0 | struct rtp_packet *packet = pktxp->pktp; |
284 | |
|
285 | 0 | CALL_SMETHOD(stp_in->ttl, reset); |
286 | 0 | if (stp_out == NULL) { |
287 | 0 | return PPROC_ACT_DROP; |
288 | 0 | } |
289 | | |
290 | | /* |
291 | | * Check that we have some address to which packet is to be |
292 | | * sent out, drop otherwise. |
293 | | */ |
294 | 0 | if (!CALL_SMETHOD(stp_out, issendable)) { |
295 | 0 | return PPROC_ACT_DROP; |
296 | 0 | } |
297 | 0 | CALL_SMETHOD(stp_out, send_pkt, packet->sender, packet); |
298 | 0 | if ((pktxp->flags & PPROC_FLAG_LGEN) == 0) { |
299 | 0 | CALL_SMETHOD(stp_in->pcount, reg_reld); |
300 | 0 | if (pktxp->rsp != NULL) { |
301 | 0 | pktxp->rsp->npkts_relayed.cnt++; |
302 | 0 | } else { |
303 | 0 | struct rtpp_proc_async_cf *proc_cf = pktxp->pproc->arg; |
304 | 0 | CALL_SMETHOD(proc_cf->cf_save->rtpp_stats, updatebyidx, |
305 | 0 | proc_cf->npkts_relayed_idx, 1); |
306 | 0 | } |
307 | 0 | } |
308 | 0 | return PPROC_ACT_TAKE; |
309 | 0 | } |
310 | | |
311 | | static enum pproc_action |
312 | | record_packet(const struct pkt_proc_ctx *pktxp) |
313 | 0 | { |
314 | 0 | struct rtpp_stream *stp_out = pktxp->strmp_out; |
315 | 0 | struct rtpp_stream *stp_in = pktxp->strmp_in; |
316 | |
|
317 | 0 | if (stp_in->rrc != NULL && stp_out != NULL) { |
318 | 0 | if (!CALL_SMETHOD(stp_out, isplayer_active)) { |
319 | 0 | CALL_SMETHOD(stp_in->rrc, pktwrite, pktxp); |
320 | 0 | } |
321 | 0 | } |
322 | 0 | return PPROC_ACT_NOP; |
323 | 0 | } |
324 | | |
325 | | struct rtpp_proc_async * |
326 | | rtpp_proc_async_ctor(const struct rtpp_cfg *cfsp) |
327 | 2 | { |
328 | 2 | struct rtpp_proc_async_cf *proc_cf; |
329 | | |
330 | 2 | proc_cf = rtpp_zmalloc(sizeof(*proc_cf)); |
331 | 2 | if (proc_cf == NULL) |
332 | 0 | return (NULL); |
333 | | |
334 | 2 | proc_cf->npkts_relayed_idx = CALL_SMETHOD(cfsp->rtpp_stats, getidxbyname, "npkts_relayed"); |
335 | 2 | if (proc_cf->npkts_relayed_idx < 0) |
336 | 0 | goto e0; |
337 | | |
338 | 2 | proc_cf->pub.netio = rtpp_netio_async_init(cfsp, 1); |
339 | 2 | if (proc_cf->pub.netio == NULL) { |
340 | 0 | goto e0; |
341 | 0 | } |
342 | | |
343 | 2 | proc_cf->cf_save = cfsp; |
344 | | |
345 | 2 | const struct packet_processor_if relay_packet_poi = { |
346 | 2 | .descr = "relay_packet", |
347 | 2 | .arg = (void *)proc_cf, |
348 | 2 | .key = (void *)&relay_packet, |
349 | 2 | .enqueue = &relay_packet |
350 | 2 | }; |
351 | 2 | if (CALL_SMETHOD(cfsp->pproc_manager, reg, PPROC_ORD_RELAY, &relay_packet_poi) < 0) |
352 | 0 | goto e1; |
353 | | |
354 | 2 | const struct packet_processor_if record_packet_poi = { |
355 | 2 | .descr = "record_packet", |
356 | 2 | .arg = (void *)proc_cf, |
357 | 2 | .key = (void *)&record_packet, |
358 | 2 | .enqueue = &record_packet |
359 | 2 | }; |
360 | 2 | if (CALL_SMETHOD(cfsp->pproc_manager, reg, PPROC_ORD_WITNESS, &record_packet_poi) < 0) |
361 | 0 | goto e2; |
362 | | |
363 | 2 | if (rtpp_proc_async_thread_init(cfsp, proc_cf, &proc_cf->rtp_thread, PIPE_RTP) != 0) { |
364 | 0 | goto e3; |
365 | 0 | } |
366 | | |
367 | 2 | if (rtpp_proc_async_thread_init(cfsp, proc_cf, &proc_cf->rtcp_thread, PIPE_RTCP) != 0) { |
368 | 0 | goto e4; |
369 | 0 | } |
370 | | |
371 | 2 | proc_cf->wakeup_cf = rtpp_proc_wakeup_ctor(proc_cf->rtp_thread.ptbl.wakefd[1], |
372 | 2 | proc_cf->rtcp_thread.ptbl.wakefd[1]); |
373 | 2 | if (proc_cf->wakeup_cf == NULL) |
374 | 0 | goto e5; |
375 | | |
376 | 2 | RTPP_OBJ_INCREF(cfsp->rtpp_stats); |
377 | 2 | RTPP_OBJ_INCREF(cfsp->pproc_manager); |
378 | | |
379 | 2 | proc_cf->pub.dtor = &rtpp_proc_async_dtor; |
380 | 2 | proc_cf->pub.nudge = &rtpp_proc_async_nudge; |
381 | 2 | return (&proc_cf->pub); |
382 | 0 | e5: |
383 | 0 | rtpp_proc_async_thread_destroy(&proc_cf->rtcp_thread); |
384 | 0 | e4: |
385 | 0 | rtpp_proc_async_thread_destroy(&proc_cf->rtp_thread); |
386 | 0 | e3: |
387 | 0 | CALL_SMETHOD(cfsp->pproc_manager, unreg, record_packet_poi.key); |
388 | 0 | e2: |
389 | 0 | CALL_SMETHOD(cfsp->pproc_manager, unreg, relay_packet_poi.key); |
390 | 0 | e1: |
391 | 0 | rtpp_netio_async_destroy(proc_cf->pub.netio); |
392 | 0 | e0: |
393 | 0 | free(proc_cf); |
394 | 0 | return (NULL); |
395 | 0 | } |
396 | | |
397 | | static void |
398 | | rtpp_proc_async_dtor(struct rtpp_proc_async *pub) |
399 | 0 | { |
400 | 0 | struct rtpp_proc_async_cf *proc_cf; |
401 | |
|
402 | 0 | PUB2PVT(pub, proc_cf); |
403 | 0 | RTPP_OBJ_DECREF(proc_cf->wakeup_cf); |
404 | 0 | CALL_SMETHOD(proc_cf->cf_save->pproc_manager, unreg, record_packet); |
405 | 0 | CALL_SMETHOD(proc_cf->cf_save->pproc_manager, unreg, relay_packet); |
406 | 0 | rtpp_proc_async_thread_destroy(&proc_cf->rtcp_thread); |
407 | 0 | rtpp_proc_async_thread_destroy(&proc_cf->rtp_thread); |
408 | 0 | rtpp_netio_async_destroy(proc_cf->pub.netio); |
409 | 0 | RTPP_OBJ_DECREF(proc_cf->cf_save->rtpp_stats); |
410 | 0 | RTPP_OBJ_DECREF(proc_cf->cf_save->pproc_manager); |
411 | 0 | free(proc_cf); |
412 | 0 | } |
413 | | |
414 | | static int |
415 | | rtpp_proc_async_nudge(struct rtpp_proc_async *pub) |
416 | 0 | { |
417 | 0 | struct rtpp_proc_async_cf *proc_cf; |
418 | 0 | int nres; |
419 | |
|
420 | 0 | PUB2PVT(pub, proc_cf); |
421 | 0 | nres = CALL_SMETHOD(proc_cf->wakeup_cf, nudge); |
422 | 0 | return (nres); |
423 | 0 | } |