Coverage Report

Created: 2025-08-29 06:41

/src/rtpproxy/src/rtpp_timed.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2015 Sippy Software, Inc., http://www.sippysoft.com
3
 * All rights reserved.
4
 *
5
 * Redistribution and use in source and binary forms, with or without
6
 * modification, are permitted provided that the following conditions
7
 * are met:
8
 * 1. Redistributions of source code must retain the above copyright
9
 *    notice, this list of conditions and the following disclaimer.
10
 * 2. Redistributions in binary form must reproduce the above copyright
11
 *    notice, this list of conditions and the following disclaimer in the
12
 *    documentation and/or other materials provided with the distribution.
13
 *
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
 * SUCH DAMAGE.
25
 *
26
 */
27
28
#if defined(LINUX_XXX) && !defined(_GNU_SOURCE)
29
#define _GNU_SOURCE /* pthread_setname_np() */
30
#endif
31
32
#include <assert.h>
33
#include <pthread.h>
34
#include <signal.h>
35
#include <stddef.h>
36
#include <stdlib.h>
37
#include <string.h>
38
39
#include "config.h"
40
41
#include "rtpp_types.h"
42
#include "rtpp_mallocs.h"
43
#include "rtpp_codeptr.h"
44
#include "rtpp_refcnt.h"
45
#include "rtpp_queue.h"
46
#include "rtpp_wi.h"
47
#include "rtpp_wi_data.h"
48
#include "rtpp_wi_sgnl.h"
49
#include "rtpp_time.h"
50
#include "rtpp_timed.h"
51
#include "rtpp_timed_task.h"
52
#include "rtpp_timed_fin.h"
53
#include "rtpp_timed_task_fin.h"
54
55
#include "elperiodic.h"
56
57
struct rtpp_timed_cf {
58
    struct rtpp_timed pub;
59
    struct rtpp_queue *q;
60
    struct rtpp_queue *cmd_q;
61
    double last_run;
62
    double period;
63
    pthread_t thread_id;
64
    struct rtpp_wi *sigterm;
65
    int wi_dsize;
66
    void *elp;
67
    int state;
68
};
69
70
2
#define RT_ST_RUNNING 0
71
2
#define RT_ST_SHTDOWN 1
72
73
DEFINE_CB_STRUCT(rtpp_timed);
74
DEFINE_CB_STRUCT(rtpp_timed_cancel);
75
76
struct rtpp_timed_wi {
77
    struct rtpp_timed_task pub;
78
    rtpp_timed_cb_s cb;
79
    rtpp_timed_cancel_cb_s cancel_cb;
80
    struct rtpp_refcnt *callback_rcnt;
81
    double when;
82
    double offset;
83
    struct rtpp_timed_cf *timed_cf;
84
};
85
86
static void rtpp_timed_destroy(struct rtpp_timed_cf *);
87
static int rtpp_timed_schedule(struct rtpp_timed *,
88
  double offset, rtpp_timed_cb_t, rtpp_timed_cancel_cb_t, void *);
89
static struct rtpp_timed_task *rtpp_timed_schedule_rc(struct rtpp_timed *,
90
  double offset, struct rtpp_refcnt *, rtpp_timed_cb_t, rtpp_timed_cancel_cb_t,
91
  void *);
92
static void rtpp_timed_process(struct rtpp_timed_cf *, double);
93
static int rtpp_timed_cancel(struct rtpp_timed_task *);
94
static void rtpp_timed_shutdown(struct rtpp_timed *);
95
96
DEFINE_SMETHODS(rtpp_timed,
97
    .schedule = &rtpp_timed_schedule,
98
    .schedule_rc = rtpp_timed_schedule_rc,
99
    .shutdown = &rtpp_timed_shutdown
100
);
101
102
static void
103
rtpp_timed_queue_run(void *argp)
104
2
{
105
2
    struct rtpp_timed_cf *rtcp;
106
2
    struct rtpp_wi *wi;
107
2
    struct rtpp_timed_wi *wi_data;
108
2
    int signum;
109
2
    double ctime;
110
111
2
    rtcp = (struct rtpp_timed_cf *)argp;
112
716
    for (;;) {
113
716
        if (rtpp_queue_get_length(rtcp->cmd_q) > 0) {
114
2
            wi = rtpp_queue_get_item(rtcp->cmd_q, 0);
115
2
            signum = rtpp_wi_sgnl_get_signum(wi);
116
2
            RTPP_OBJ_DECREF(wi);
117
2
            if (signum == SIGTERM) {
118
2
                break;
119
2
            }
120
2
        }
121
714
        ctime = getdtime();
122
714
        rtpp_timed_process(rtcp, ctime);
123
714
        prdic_procrastinate(rtcp->elp);
124
714
    }
125
    /* We are terminating, get rid of all requests */
126
4
    while (rtpp_queue_get_length(rtcp->q) > 0) {
127
2
        wi = rtpp_queue_get_item(rtcp->q, 1);
128
2
        wi_data = rtpp_wi_data_get_ptr(wi, rtcp->wi_dsize, rtcp->wi_dsize);
129
2
        if (wi_data->cancel_cb.func != NULL) {
130
0
            wi_data->cancel_cb.func(wi_data->cancel_cb.arg);
131
0
        }
132
2
        if (wi_data->callback_rcnt != NULL) {
133
2
            RC_DECREF(wi_data->callback_rcnt);
134
2
        }
135
2
        RTPP_OBJ_DECREF(&(wi_data->pub));
136
2
    }
137
2
    prdic_free(rtcp->elp);
138
2
}
139
140
struct rtpp_timed *
141
rtpp_timed_ctor(double run_period)
142
2
{
143
2
    struct rtpp_timed_cf *rtcp;
144
145
2
    rtcp = rtpp_rzmalloc(sizeof(struct rtpp_timed_cf), PVT_RCOFFS(rtcp));
146
2
    if (rtcp == NULL) {
147
0
        goto e0;
148
0
    }
149
2
    rtcp->q = rtpp_queue_init(RTPQ_SMALL_CB_LEN, "rtpp_timed(requests)");
150
2
    if (rtcp->q == NULL) {
151
0
        goto e1;
152
0
    }
153
2
    rtpp_queue_setqlen(rtcp->q, 0);
154
2
    rtcp->cmd_q = rtpp_queue_init(RTPQ_TINY_CB_LEN, "rtpp_timed(commands)");
155
2
    if (rtcp->cmd_q == NULL) {
156
0
        goto e2;
157
0
    }
158
    /*
159
     * Pre-allocate sigterm, so that we don't have any malloc() in
160
     * the destructor.
161
     */
162
2
    rtcp->sigterm = rtpp_wi_malloc_sgnl(SIGTERM, NULL, 0);
163
2
    if (rtcp->sigterm == NULL) {
164
0
        goto e3;
165
0
    }
166
2
    rtcp->elp = prdic_init(1.0 / run_period, 0.0);
167
2
    if (rtcp->elp == NULL) {
168
0
        goto e4;
169
0
    }
170
2
    if (pthread_create(&rtcp->thread_id, NULL,
171
2
      (void *(*)(void *))&rtpp_timed_queue_run, rtcp) != 0) {
172
0
        goto e5;
173
0
    }
174
2
#if HAVE_PTHREAD_SETNAME_NP
175
2
    (void)pthread_setname_np(rtcp->thread_id, "rtpp_timed_queue");
176
2
#endif
177
2
    rtcp->last_run = getdtime();
178
2
    rtcp->period = run_period;
179
2
    rtcp->wi_dsize = sizeof(struct rtpp_timed_wi) + rtpp_refcnt_osize;
180
2
    PUBINST_FININIT(&rtcp->pub, rtcp, rtpp_timed_destroy);
181
2
    return (&rtcp->pub);
182
0
e5:
183
0
    prdic_free(rtcp->elp);
184
0
e4:
185
0
    RTPP_OBJ_DECREF(rtcp->sigterm);
186
0
e3:
187
0
    rtpp_queue_destroy(rtcp->cmd_q);
188
0
e2:
189
0
    rtpp_queue_destroy(rtcp->q);
190
0
e1:
191
0
    RTPP_OBJ_DECREF(&(rtcp->pub));
192
0
e0:
193
0
    return (NULL);
194
0
}
195
196
static void
197
rtpp_timed_shutdown(struct rtpp_timed *self)
198
2
{
199
2
    struct rtpp_timed_cf *rtpp_timed_cf;
200
201
2
    PUB2PVT(self, rtpp_timed_cf);
202
2
    assert(rtpp_timed_cf->state == RT_ST_RUNNING);
203
2
    rtpp_queue_put_item(rtpp_timed_cf->sigterm, rtpp_timed_cf->cmd_q);
204
2
    pthread_join(rtpp_timed_cf->thread_id, NULL);
205
2
    rtpp_timed_cf->state = RT_ST_SHTDOWN;
206
2
}
207
208
static void
209
rtpp_timed_destroy(struct rtpp_timed_cf *rtpp_timed_cf)
210
2
{
211
212
2
    if (rtpp_timed_cf->state == RT_ST_RUNNING) {
213
0
        rtpp_timed_shutdown(&rtpp_timed_cf->pub);
214
0
    }
215
2
    rtpp_timed_fin(&(rtpp_timed_cf->pub));
216
2
    rtpp_queue_destroy(rtpp_timed_cf->cmd_q);
217
2
    rtpp_queue_destroy(rtpp_timed_cf->q);
218
2
}
219
220
static struct rtpp_timed_task *
221
rtpp_timed_schedule_base(struct rtpp_timed *pub, double offset,
222
  struct rtpp_refcnt *callback_rcnt, rtpp_timed_cb_t cb_func,
223
  rtpp_timed_cancel_cb_t cancel_cb_func, void *cb_func_arg,
224
  int support_cancel)
225
2.55k
{
226
2.55k
    struct rtpp_wi *wi;
227
2.55k
    struct rtpp_timed_wi *wi_data;
228
2.55k
    struct rtpp_timed_cf *rtpp_timed_cf;
229
230
2.55k
    rtpp_timed_cf = (struct rtpp_timed_cf *)pub;
231
    
232
2.55k
    wi = rtpp_wi_malloc_udata((void **)&wi_data, rtpp_timed_cf->wi_dsize);
233
2.55k
    if (wi == NULL) {
234
0
        return (NULL);
235
0
    }
236
2.55k
    memset(wi_data, '\0', rtpp_timed_cf->wi_dsize);
237
2.55k
    wi_data->pub.rcnt = wi->rcnt;
238
2.55k
    wi_data->cb.func = cb_func;
239
2.55k
    wi_data->cb.arg = cb_func_arg;
240
2.55k
    wi_data->cancel_cb.func = cancel_cb_func;
241
2.55k
    wi_data->cancel_cb.arg = cb_func_arg;
242
2.55k
    wi_data->when = getdtime() + offset;
243
2.55k
    wi_data->offset = offset;
244
2.55k
    wi_data->callback_rcnt = callback_rcnt;
245
2.55k
    if (callback_rcnt != NULL) {
246
2.55k
        RC_INCREF(callback_rcnt);
247
2.55k
    }
248
#if defined(RTPP_DEBUG)
249
    RTPP_OBJ_DTOR_ATTACH(wi, (rtpp_refcnt_dtor_t)&rtpp_timed_task_fin,
250
      &(wi_data->pub));
251
#endif
252
2.55k
    if (support_cancel != 0) {
253
2.55k
        wi_data->pub.cancel = &rtpp_timed_cancel;
254
2.55k
        wi_data->timed_cf = rtpp_timed_cf;
255
2.55k
        RTPP_OBJ_BORROW(wi, pub);
256
2.55k
    }
257
2.55k
    RTPP_OBJ_INCREF(wi);
258
2.55k
    rtpp_queue_put_item(wi, rtpp_timed_cf->q);
259
2.55k
    return (&(wi_data->pub));
260
2.55k
}
261
262
static struct rtpp_timed_task *
263
rtpp_timed_schedule_rc(struct rtpp_timed *pub, double offset,
264
  struct rtpp_refcnt *callback_rcnt, rtpp_timed_cb_t cb_func,
265
  rtpp_timed_cancel_cb_t cancel_cb_func, void *cb_func_arg)
266
2.55k
{
267
2.55k
    struct rtpp_timed_task *tpub;
268
269
2.55k
    tpub = rtpp_timed_schedule_base(pub, offset, callback_rcnt, cb_func,
270
2.55k
      cancel_cb_func, cb_func_arg, 1);
271
2.55k
    if (tpub == NULL) {
272
0
        return (NULL);
273
0
    }
274
2.55k
    return (tpub);
275
2.55k
}
276
277
static int
278
rtpp_timed_schedule(struct rtpp_timed *pub, double offset,
279
  rtpp_timed_cb_t cb_func, rtpp_timed_cancel_cb_t cancel_cb_func,
280
  void *cb_func_arg)
281
0
{
282
0
    struct rtpp_timed_task *tpub;
283
284
0
    tpub = rtpp_timed_schedule_base(pub, offset, NULL, cb_func, cancel_cb_func,
285
0
      cb_func_arg, 0);
286
0
    if (tpub == NULL) {
287
0
        return (-1);
288
0
    }
289
0
    RTPP_OBJ_DECREF(tpub);
290
0
    return (0);
291
0
}
292
293
struct rtpp_timed_istime_arg {
294
    double ctime;
295
    int wi_dsize;
296
};
297
298
static int
299
rtpp_timed_istime(struct rtpp_wi *wi, void *p)
300
1.60k
{
301
1.60k
    struct rtpp_timed_istime_arg *ap;
302
1.60k
    struct rtpp_timed_wi *wi_data;
303
304
1.60k
    ap = (struct rtpp_timed_istime_arg *)p;
305
1.60k
    wi_data = rtpp_wi_data_get_ptr(wi, ap->wi_dsize, ap->wi_dsize);
306
1.60k
    if (wi_data->when <= ap->ctime) {
307
9
       return (0);
308
9
    }
309
1.59k
    return (1);
310
1.60k
}
311
312
static void
313
rtpp_timed_process(struct rtpp_timed_cf *rtcp, double ctime)
314
714
{
315
714
    struct rtpp_wi *wi;
316
714
    struct rtpp_timed_wi *wi_data;
317
714
    struct rtpp_timed_istime_arg istime_arg;
318
714
    enum rtpp_timed_cb_rvals cb_rval;
319
320
714
    istime_arg.ctime = ctime;
321
714
    istime_arg.wi_dsize = rtcp->wi_dsize;
322
723
    for (;;) {
323
723
        wi = rtpp_queue_get_first_matching(rtcp->q, rtpp_timed_istime,
324
723
          &istime_arg);
325
723
        if (wi == NULL) {
326
714
            return;
327
714
        }
328
9
        wi_data = rtpp_wi_data_get_ptr(wi, rtcp->wi_dsize, rtcp->wi_dsize);
329
9
        cb_rval = wi_data->cb.func(ctime, wi_data->cb.arg);
330
9
        if (cb_rval == CB_MORE) {
331
18
            while (wi_data->when <= ctime) {
332
                /* Make sure next run is in the future */
333
9
                wi_data->when += wi_data->offset;
334
9
            }
335
9
            rtpp_queue_put_item(wi, rtcp->q);
336
9
            continue;
337
9
        }
338
0
        if (wi_data->callback_rcnt != NULL) {
339
0
            RC_DECREF(wi_data->callback_rcnt);
340
0
        }
341
0
        RTPP_OBJ_DECREF(&(wi_data->pub));
342
0
    }
343
714
}
344
345
struct rtpp_timed_match_wi_arg {
346
    int wi_dsize;
347
    struct rtpp_timed_wi *wi_data;
348
};
349
350
static int
351
rtpp_timed_match_wi(struct rtpp_wi *wia, void *p)
352
14.7k
{
353
14.7k
    struct rtpp_timed_match_wi_arg *ap;
354
14.7k
    struct rtpp_timed_wi *wia_data;
355
356
14.7k
    ap = (struct rtpp_timed_match_wi_arg *)p;
357
14.7k
    wia_data = rtpp_wi_data_get_ptr(wia, ap->wi_dsize, ap->wi_dsize);
358
14.7k
    if (wia_data == ap->wi_data) {
359
2.55k
        return (0);
360
2.55k
    }
361
12.1k
    return (1);
362
14.7k
}
363
364
static int
365
rtpp_timed_cancel(struct rtpp_timed_task *taskpub)
366
2.55k
{
367
2.55k
    struct rtpp_wi *wim;
368
2.55k
    struct rtpp_timed_cf *rtcp;
369
2.55k
    struct rtpp_timed_match_wi_arg match_arg;
370
2.55k
    struct rtpp_timed_wi *wi_data;
371
372
2.55k
    PUB2PVT(taskpub, wi_data);
373
374
2.55k
    rtcp = wi_data->timed_cf;
375
2.55k
    match_arg.wi_dsize = rtcp->wi_dsize;
376
2.55k
    match_arg.wi_data = wi_data;
377
2.55k
    wim = rtpp_queue_get_first_matching(rtcp->q, rtpp_timed_match_wi,
378
2.55k
      &match_arg);
379
2.55k
    if (wim == NULL) {
380
0
        return (0);
381
0
    }
382
2.55k
    if (wi_data->cancel_cb.func != NULL) {
383
0
        wi_data->cancel_cb.func(wi_data->cancel_cb.arg);
384
0
    }
385
2.55k
    if (wi_data->callback_rcnt != NULL) {
386
2.55k
        RC_DECREF(wi_data->callback_rcnt);
387
2.55k
    }
388
2.55k
    RTPP_OBJ_DECREF(&(wi_data->pub));
389
2.55k
    return (1);
390
2.55k
}