/src/rtpproxy/src/rtpp_timed.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2015 Sippy Software, Inc., http://www.sippysoft.com |
3 | | * All rights reserved. |
4 | | * |
5 | | * Redistribution and use in source and binary forms, with or without |
6 | | * modification, are permitted provided that the following conditions |
7 | | * are met: |
8 | | * 1. Redistributions of source code must retain the above copyright |
9 | | * notice, this list of conditions and the following disclaimer. |
10 | | * 2. Redistributions in binary form must reproduce the above copyright |
11 | | * notice, this list of conditions and the following disclaimer in the |
12 | | * documentation and/or other materials provided with the distribution. |
13 | | * |
14 | | * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND |
15 | | * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
16 | | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
17 | | * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE |
18 | | * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
19 | | * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS |
20 | | * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) |
21 | | * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
22 | | * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY |
23 | | * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF |
24 | | * SUCH DAMAGE. |
25 | | * |
26 | | */ |
27 | | |
28 | | #if defined(LINUX_XXX) && !defined(_GNU_SOURCE) |
29 | | #define _GNU_SOURCE /* pthread_setname_np() */ |
30 | | #endif |
31 | | |
32 | | #include <assert.h> |
33 | | #include <pthread.h> |
34 | | #include <signal.h> |
35 | | #include <stddef.h> |
36 | | #include <stdlib.h> |
37 | | #include <string.h> |
38 | | |
39 | | #include "config.h" |
40 | | |
41 | | #include "rtpp_types.h" |
42 | | #include "rtpp_mallocs.h" |
43 | | #include "rtpp_codeptr.h" |
44 | | #include "rtpp_refcnt.h" |
45 | | #include "rtpp_queue.h" |
46 | | #include "rtpp_wi.h" |
47 | | #include "rtpp_wi_data.h" |
48 | | #include "rtpp_wi_sgnl.h" |
49 | | #include "rtpp_time.h" |
50 | | #include "rtpp_timed.h" |
51 | | #include "rtpp_timed_task.h" |
52 | | #include "rtpp_timed_fin.h" |
53 | | #include "rtpp_timed_task_fin.h" |
54 | | |
55 | | #include "elperiodic.h" |
56 | | |
57 | | struct rtpp_timed_cf { |
58 | | struct rtpp_timed pub; |
59 | | struct rtpp_queue *q; |
60 | | struct rtpp_queue *cmd_q; |
61 | | double last_run; |
62 | | double period; |
63 | | pthread_t thread_id; |
64 | | struct rtpp_wi *sigterm; |
65 | | int wi_dsize; |
66 | | void *elp; |
67 | | int state; |
68 | | }; |
69 | | |
70 | 2 | #define RT_ST_RUNNING 0 |
71 | 2 | #define RT_ST_SHTDOWN 1 |
72 | | |
73 | | DEFINE_CB_STRUCT(rtpp_timed); |
74 | | DEFINE_CB_STRUCT(rtpp_timed_cancel); |
75 | | |
76 | | struct rtpp_timed_wi { |
77 | | struct rtpp_timed_task pub; |
78 | | rtpp_timed_cb_s cb; |
79 | | rtpp_timed_cancel_cb_s cancel_cb; |
80 | | struct rtpp_refcnt *callback_rcnt; |
81 | | double when; |
82 | | double offset; |
83 | | struct rtpp_timed_cf *timed_cf; |
84 | | }; |
85 | | |
86 | | static void rtpp_timed_destroy(struct rtpp_timed_cf *); |
87 | | static int rtpp_timed_schedule(struct rtpp_timed *, |
88 | | double offset, rtpp_timed_cb_t, rtpp_timed_cancel_cb_t, void *); |
89 | | static struct rtpp_timed_task *rtpp_timed_schedule_rc(struct rtpp_timed *, |
90 | | double offset, struct rtpp_refcnt *, rtpp_timed_cb_t, rtpp_timed_cancel_cb_t, |
91 | | void *); |
92 | | static void rtpp_timed_process(struct rtpp_timed_cf *, double); |
93 | | static int rtpp_timed_cancel(struct rtpp_timed_task *); |
94 | | static void rtpp_timed_shutdown(struct rtpp_timed *); |
95 | | |
96 | | DEFINE_SMETHODS(rtpp_timed, |
97 | | .schedule = &rtpp_timed_schedule, |
98 | | .schedule_rc = rtpp_timed_schedule_rc, |
99 | | .shutdown = &rtpp_timed_shutdown |
100 | | ); |
101 | | |
102 | | static void |
103 | | rtpp_timed_queue_run(void *argp) |
104 | 2 | { |
105 | 2 | struct rtpp_timed_cf *rtcp; |
106 | 2 | struct rtpp_wi *wi; |
107 | 2 | struct rtpp_timed_wi *wi_data; |
108 | 2 | int signum; |
109 | 2 | double ctime; |
110 | | |
111 | 2 | rtcp = (struct rtpp_timed_cf *)argp; |
112 | 716 | for (;;) { |
113 | 716 | if (rtpp_queue_get_length(rtcp->cmd_q) > 0) { |
114 | 2 | wi = rtpp_queue_get_item(rtcp->cmd_q, 0); |
115 | 2 | signum = rtpp_wi_sgnl_get_signum(wi); |
116 | 2 | RTPP_OBJ_DECREF(wi); |
117 | 2 | if (signum == SIGTERM) { |
118 | 2 | break; |
119 | 2 | } |
120 | 2 | } |
121 | 714 | ctime = getdtime(); |
122 | 714 | rtpp_timed_process(rtcp, ctime); |
123 | 714 | prdic_procrastinate(rtcp->elp); |
124 | 714 | } |
125 | | /* We are terminating, get rid of all requests */ |
126 | 4 | while (rtpp_queue_get_length(rtcp->q) > 0) { |
127 | 2 | wi = rtpp_queue_get_item(rtcp->q, 1); |
128 | 2 | wi_data = rtpp_wi_data_get_ptr(wi, rtcp->wi_dsize, rtcp->wi_dsize); |
129 | 2 | if (wi_data->cancel_cb.func != NULL) { |
130 | 0 | wi_data->cancel_cb.func(wi_data->cancel_cb.arg); |
131 | 0 | } |
132 | 2 | if (wi_data->callback_rcnt != NULL) { |
133 | 2 | RC_DECREF(wi_data->callback_rcnt); |
134 | 2 | } |
135 | 2 | RTPP_OBJ_DECREF(&(wi_data->pub)); |
136 | 2 | } |
137 | 2 | prdic_free(rtcp->elp); |
138 | 2 | } |
139 | | |
140 | | struct rtpp_timed * |
141 | | rtpp_timed_ctor(double run_period) |
142 | 2 | { |
143 | 2 | struct rtpp_timed_cf *rtcp; |
144 | | |
145 | 2 | rtcp = rtpp_rzmalloc(sizeof(struct rtpp_timed_cf), PVT_RCOFFS(rtcp)); |
146 | 2 | if (rtcp == NULL) { |
147 | 0 | goto e0; |
148 | 0 | } |
149 | 2 | rtcp->q = rtpp_queue_init(RTPQ_SMALL_CB_LEN, "rtpp_timed(requests)"); |
150 | 2 | if (rtcp->q == NULL) { |
151 | 0 | goto e1; |
152 | 0 | } |
153 | 2 | rtpp_queue_setqlen(rtcp->q, 0); |
154 | 2 | rtcp->cmd_q = rtpp_queue_init(RTPQ_TINY_CB_LEN, "rtpp_timed(commands)"); |
155 | 2 | if (rtcp->cmd_q == NULL) { |
156 | 0 | goto e2; |
157 | 0 | } |
158 | | /* |
159 | | * Pre-allocate sigterm, so that we don't have any malloc() in |
160 | | * the destructor. |
161 | | */ |
162 | 2 | rtcp->sigterm = rtpp_wi_malloc_sgnl(SIGTERM, NULL, 0); |
163 | 2 | if (rtcp->sigterm == NULL) { |
164 | 0 | goto e3; |
165 | 0 | } |
166 | 2 | rtcp->elp = prdic_init(1.0 / run_period, 0.0); |
167 | 2 | if (rtcp->elp == NULL) { |
168 | 0 | goto e4; |
169 | 0 | } |
170 | 2 | if (pthread_create(&rtcp->thread_id, NULL, |
171 | 2 | (void *(*)(void *))&rtpp_timed_queue_run, rtcp) != 0) { |
172 | 0 | goto e5; |
173 | 0 | } |
174 | 2 | #if HAVE_PTHREAD_SETNAME_NP |
175 | 2 | (void)pthread_setname_np(rtcp->thread_id, "rtpp_timed_queue"); |
176 | 2 | #endif |
177 | 2 | rtcp->last_run = getdtime(); |
178 | 2 | rtcp->period = run_period; |
179 | 2 | rtcp->wi_dsize = sizeof(struct rtpp_timed_wi) + rtpp_refcnt_osize; |
180 | 2 | PUBINST_FININIT(&rtcp->pub, rtcp, rtpp_timed_destroy); |
181 | 2 | return (&rtcp->pub); |
182 | 0 | e5: |
183 | 0 | prdic_free(rtcp->elp); |
184 | 0 | e4: |
185 | 0 | RTPP_OBJ_DECREF(rtcp->sigterm); |
186 | 0 | e3: |
187 | 0 | rtpp_queue_destroy(rtcp->cmd_q); |
188 | 0 | e2: |
189 | 0 | rtpp_queue_destroy(rtcp->q); |
190 | 0 | e1: |
191 | 0 | RTPP_OBJ_DECREF(&(rtcp->pub)); |
192 | 0 | e0: |
193 | 0 | return (NULL); |
194 | 0 | } |
195 | | |
196 | | static void |
197 | | rtpp_timed_shutdown(struct rtpp_timed *self) |
198 | 2 | { |
199 | 2 | struct rtpp_timed_cf *rtpp_timed_cf; |
200 | | |
201 | 2 | PUB2PVT(self, rtpp_timed_cf); |
202 | 2 | assert(rtpp_timed_cf->state == RT_ST_RUNNING); |
203 | 2 | rtpp_queue_put_item(rtpp_timed_cf->sigterm, rtpp_timed_cf->cmd_q); |
204 | 2 | pthread_join(rtpp_timed_cf->thread_id, NULL); |
205 | 2 | rtpp_timed_cf->state = RT_ST_SHTDOWN; |
206 | 2 | } |
207 | | |
208 | | static void |
209 | | rtpp_timed_destroy(struct rtpp_timed_cf *rtpp_timed_cf) |
210 | 2 | { |
211 | | |
212 | 2 | if (rtpp_timed_cf->state == RT_ST_RUNNING) { |
213 | 0 | rtpp_timed_shutdown(&rtpp_timed_cf->pub); |
214 | 0 | } |
215 | 2 | rtpp_timed_fin(&(rtpp_timed_cf->pub)); |
216 | 2 | rtpp_queue_destroy(rtpp_timed_cf->cmd_q); |
217 | 2 | rtpp_queue_destroy(rtpp_timed_cf->q); |
218 | 2 | } |
219 | | |
220 | | static struct rtpp_timed_task * |
221 | | rtpp_timed_schedule_base(struct rtpp_timed *pub, double offset, |
222 | | struct rtpp_refcnt *callback_rcnt, rtpp_timed_cb_t cb_func, |
223 | | rtpp_timed_cancel_cb_t cancel_cb_func, void *cb_func_arg, |
224 | | int support_cancel) |
225 | 2.55k | { |
226 | 2.55k | struct rtpp_wi *wi; |
227 | 2.55k | struct rtpp_timed_wi *wi_data; |
228 | 2.55k | struct rtpp_timed_cf *rtpp_timed_cf; |
229 | | |
230 | 2.55k | rtpp_timed_cf = (struct rtpp_timed_cf *)pub; |
231 | | |
232 | 2.55k | wi = rtpp_wi_malloc_udata((void **)&wi_data, rtpp_timed_cf->wi_dsize); |
233 | 2.55k | if (wi == NULL) { |
234 | 0 | return (NULL); |
235 | 0 | } |
236 | 2.55k | memset(wi_data, '\0', rtpp_timed_cf->wi_dsize); |
237 | 2.55k | wi_data->pub.rcnt = wi->rcnt; |
238 | 2.55k | wi_data->cb.func = cb_func; |
239 | 2.55k | wi_data->cb.arg = cb_func_arg; |
240 | 2.55k | wi_data->cancel_cb.func = cancel_cb_func; |
241 | 2.55k | wi_data->cancel_cb.arg = cb_func_arg; |
242 | 2.55k | wi_data->when = getdtime() + offset; |
243 | 2.55k | wi_data->offset = offset; |
244 | 2.55k | wi_data->callback_rcnt = callback_rcnt; |
245 | 2.55k | if (callback_rcnt != NULL) { |
246 | 2.55k | RC_INCREF(callback_rcnt); |
247 | 2.55k | } |
248 | | #if defined(RTPP_DEBUG) |
249 | | RTPP_OBJ_DTOR_ATTACH(wi, (rtpp_refcnt_dtor_t)&rtpp_timed_task_fin, |
250 | | &(wi_data->pub)); |
251 | | #endif |
252 | 2.55k | if (support_cancel != 0) { |
253 | 2.55k | wi_data->pub.cancel = &rtpp_timed_cancel; |
254 | 2.55k | wi_data->timed_cf = rtpp_timed_cf; |
255 | 2.55k | RTPP_OBJ_BORROW(wi, pub); |
256 | 2.55k | } |
257 | 2.55k | RTPP_OBJ_INCREF(wi); |
258 | 2.55k | rtpp_queue_put_item(wi, rtpp_timed_cf->q); |
259 | 2.55k | return (&(wi_data->pub)); |
260 | 2.55k | } |
261 | | |
262 | | static struct rtpp_timed_task * |
263 | | rtpp_timed_schedule_rc(struct rtpp_timed *pub, double offset, |
264 | | struct rtpp_refcnt *callback_rcnt, rtpp_timed_cb_t cb_func, |
265 | | rtpp_timed_cancel_cb_t cancel_cb_func, void *cb_func_arg) |
266 | 2.55k | { |
267 | 2.55k | struct rtpp_timed_task *tpub; |
268 | | |
269 | 2.55k | tpub = rtpp_timed_schedule_base(pub, offset, callback_rcnt, cb_func, |
270 | 2.55k | cancel_cb_func, cb_func_arg, 1); |
271 | 2.55k | if (tpub == NULL) { |
272 | 0 | return (NULL); |
273 | 0 | } |
274 | 2.55k | return (tpub); |
275 | 2.55k | } |
276 | | |
277 | | static int |
278 | | rtpp_timed_schedule(struct rtpp_timed *pub, double offset, |
279 | | rtpp_timed_cb_t cb_func, rtpp_timed_cancel_cb_t cancel_cb_func, |
280 | | void *cb_func_arg) |
281 | 0 | { |
282 | 0 | struct rtpp_timed_task *tpub; |
283 | |
|
284 | 0 | tpub = rtpp_timed_schedule_base(pub, offset, NULL, cb_func, cancel_cb_func, |
285 | 0 | cb_func_arg, 0); |
286 | 0 | if (tpub == NULL) { |
287 | 0 | return (-1); |
288 | 0 | } |
289 | 0 | RTPP_OBJ_DECREF(tpub); |
290 | 0 | return (0); |
291 | 0 | } |
292 | | |
293 | | struct rtpp_timed_istime_arg { |
294 | | double ctime; |
295 | | int wi_dsize; |
296 | | }; |
297 | | |
298 | | static int |
299 | | rtpp_timed_istime(struct rtpp_wi *wi, void *p) |
300 | 1.60k | { |
301 | 1.60k | struct rtpp_timed_istime_arg *ap; |
302 | 1.60k | struct rtpp_timed_wi *wi_data; |
303 | | |
304 | 1.60k | ap = (struct rtpp_timed_istime_arg *)p; |
305 | 1.60k | wi_data = rtpp_wi_data_get_ptr(wi, ap->wi_dsize, ap->wi_dsize); |
306 | 1.60k | if (wi_data->when <= ap->ctime) { |
307 | 9 | return (0); |
308 | 9 | } |
309 | 1.59k | return (1); |
310 | 1.60k | } |
311 | | |
312 | | static void |
313 | | rtpp_timed_process(struct rtpp_timed_cf *rtcp, double ctime) |
314 | 714 | { |
315 | 714 | struct rtpp_wi *wi; |
316 | 714 | struct rtpp_timed_wi *wi_data; |
317 | 714 | struct rtpp_timed_istime_arg istime_arg; |
318 | 714 | enum rtpp_timed_cb_rvals cb_rval; |
319 | | |
320 | 714 | istime_arg.ctime = ctime; |
321 | 714 | istime_arg.wi_dsize = rtcp->wi_dsize; |
322 | 723 | for (;;) { |
323 | 723 | wi = rtpp_queue_get_first_matching(rtcp->q, rtpp_timed_istime, |
324 | 723 | &istime_arg); |
325 | 723 | if (wi == NULL) { |
326 | 714 | return; |
327 | 714 | } |
328 | 9 | wi_data = rtpp_wi_data_get_ptr(wi, rtcp->wi_dsize, rtcp->wi_dsize); |
329 | 9 | cb_rval = wi_data->cb.func(ctime, wi_data->cb.arg); |
330 | 9 | if (cb_rval == CB_MORE) { |
331 | 18 | while (wi_data->when <= ctime) { |
332 | | /* Make sure next run is in the future */ |
333 | 9 | wi_data->when += wi_data->offset; |
334 | 9 | } |
335 | 9 | rtpp_queue_put_item(wi, rtcp->q); |
336 | 9 | continue; |
337 | 9 | } |
338 | 0 | if (wi_data->callback_rcnt != NULL) { |
339 | 0 | RC_DECREF(wi_data->callback_rcnt); |
340 | 0 | } |
341 | 0 | RTPP_OBJ_DECREF(&(wi_data->pub)); |
342 | 0 | } |
343 | 714 | } |
344 | | |
345 | | struct rtpp_timed_match_wi_arg { |
346 | | int wi_dsize; |
347 | | struct rtpp_timed_wi *wi_data; |
348 | | }; |
349 | | |
350 | | static int |
351 | | rtpp_timed_match_wi(struct rtpp_wi *wia, void *p) |
352 | 14.7k | { |
353 | 14.7k | struct rtpp_timed_match_wi_arg *ap; |
354 | 14.7k | struct rtpp_timed_wi *wia_data; |
355 | | |
356 | 14.7k | ap = (struct rtpp_timed_match_wi_arg *)p; |
357 | 14.7k | wia_data = rtpp_wi_data_get_ptr(wia, ap->wi_dsize, ap->wi_dsize); |
358 | 14.7k | if (wia_data == ap->wi_data) { |
359 | 2.55k | return (0); |
360 | 2.55k | } |
361 | 12.1k | return (1); |
362 | 14.7k | } |
363 | | |
364 | | static int |
365 | | rtpp_timed_cancel(struct rtpp_timed_task *taskpub) |
366 | 2.55k | { |
367 | 2.55k | struct rtpp_wi *wim; |
368 | 2.55k | struct rtpp_timed_cf *rtcp; |
369 | 2.55k | struct rtpp_timed_match_wi_arg match_arg; |
370 | 2.55k | struct rtpp_timed_wi *wi_data; |
371 | | |
372 | 2.55k | PUB2PVT(taskpub, wi_data); |
373 | | |
374 | 2.55k | rtcp = wi_data->timed_cf; |
375 | 2.55k | match_arg.wi_dsize = rtcp->wi_dsize; |
376 | 2.55k | match_arg.wi_data = wi_data; |
377 | 2.55k | wim = rtpp_queue_get_first_matching(rtcp->q, rtpp_timed_match_wi, |
378 | 2.55k | &match_arg); |
379 | 2.55k | if (wim == NULL) { |
380 | 0 | return (0); |
381 | 0 | } |
382 | 2.55k | if (wi_data->cancel_cb.func != NULL) { |
383 | 0 | wi_data->cancel_cb.func(wi_data->cancel_cb.arg); |
384 | 0 | } |
385 | 2.55k | if (wi_data->callback_rcnt != NULL) { |
386 | 2.55k | RC_DECREF(wi_data->callback_rcnt); |
387 | 2.55k | } |
388 | 2.55k | RTPP_OBJ_DECREF(&(wi_data->pub)); |
389 | 2.55k | return (1); |
390 | 2.55k | } |