Coverage Report

Created: 2025-07-12 06:36

/src/rtpproxy/src/rtpp_netio_async.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2014 Sippy Software, Inc., http://www.sippysoft.com
3
 * All rights reserved.
4
 *
5
 * Redistribution and use in source and binary forms, with or without
6
 * modification, are permitted provided that the following conditions
7
 * are met:
8
 * 1. Redistributions of source code must retain the above copyright
9
 *    notice, this list of conditions and the following disclaimer.
10
 * 2. Redistributions in binary form must reproduce the above copyright
11
 *    notice, this list of conditions and the following disclaimer in the
12
 *    documentation and/or other materials provided with the distribution.
13
 *
14
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
 * SUCH DAMAGE.
25
 *
26
 */
27
28
#if defined(LINUX_XXX) && !defined(_GNU_SOURCE)
29
#define _GNU_SOURCE /* pthread_setname_np() */
30
#endif
31
32
#include <sys/types.h>
33
#include <sys/socket.h>
34
#include <errno.h>
35
#include <pthread.h>
36
#include <sched.h>
37
#include <signal.h>
38
#include <stddef.h>
39
#include <stdint.h>
40
#include <stdlib.h>
41
#include <string.h>
42
43
#include "config.h"
44
45
#include "rtpp_types.h"
46
#include "rtpp_log.h"
47
#include "rtpp_cfg.h"
48
#include "rtpp_defines.h"
49
#include "rtp.h"
50
#include "rtpp_time.h"
51
#include "rtp_packet.h"
52
#include "rtpp_wi.h"
53
#include "rtpp_wi_pkt.h"
54
#include "rtpp_wi_sgnl.h"
55
#include "rtpp_wi_private.h"
56
#include "rtpp_codeptr.h"
57
#include "rtpp_refcnt.h"
58
#include "rtpp_log_obj.h"
59
#include "rtpp_queue.h"
60
#include "rtpp_network.h"
61
#include "rtpp_netio_async.h"
62
#include "rtpp_mallocs.h"
63
#include "rtpp_debug.h"
64
#ifdef RTPP_DEBUG_timers
65
#include "rtpp_time.h"
66
#include "rtpp_math.h"
67
#endif
68
69
struct sthread_args {
70
    struct rtpp_queue *out_q;
71
    struct rtpp_log *glog;
72
    int dmode;
73
#if RTPP_DEBUG_timers
74
    struct recfilter average_load;
75
#endif
76
    struct rtpp_wi *sigterm;
77
};
78
79
45.0k
#define SEND_THREADS 1
80
81
struct rtpp_anetio_cf {
82
    pthread_t thread_id[SEND_THREADS];
83
    struct sthread_args args[SEND_THREADS];
84
};
85
86
0
#define RTPP_ANETIO_MAX_RETRY 3
87
32.4k
#define RTPP_ANETIO_BATCH_LEN (RTPQ_LARGE_CB_LEN / 8)
88
89
static void
90
rtpp_anetio_sthread(struct sthread_args *args)
91
4
{
92
4
    int n, nsend, i, send_errno, nretry;
93
4
    struct rtpp_wi *wi, *wis[RTPQ_LARGE_CB_LEN / 8];
94
4
    struct rtpp_wi_pvt *wipp;
95
#if RTPP_DEBUG_timers
96
    double tp[3], runtime, sleeptime;
97
    long run_n;
98
99
    runtime = sleeptime = 0.0;
100
    run_n = 0;
101
    tp[0] = getdtime();
102
#endif
103
32.4k
    for (;;) {
104
32.4k
        nsend = rtpp_queue_get_items(args->out_q, wis, RTPP_ANETIO_BATCH_LEN, 0);
105
#if RTPP_DEBUG_timers
106
        tp[1] = getdtime();
107
#endif
108
109
108k
        for (i = 0; i < nsend; i++) {
110
76.0k
      wi = wis[i];
111
76.0k
            PUB2PVT(wi, wipp);
112
76.0k
            if (wi->wi_type == RTPP_WI_TYPE_SGNL) {
113
4
                RTPP_OBJ_DECREF(wi);
114
4
                goto out;
115
4
            }
116
76.0k
            nretry = 0;
117
76.0k
            do {
118
76.0k
                n = sendto(wipp->sock, wipp->msg, wipp->msg_len, wipp->flags,
119
76.0k
                  wipp->sendto, wipp->tolen);
120
76.0k
                send_errno = (n < 0) ? errno : 0;
121
#if RTPP_DEBUG_netio >= 1
122
                if (wipp->debug != 0) {
123
                    char daddr[MAX_AP_STRBUF];
124
125
                    addrport2char_r(wipp->sendto, daddr, sizeof(daddr), ':');
126
                    if (n < 0) {
127
                        RTPP_ELOG(wipp->log, RTPP_LOG_DBUG,
128
                          "sendto(%d, %p, %lld, %d, %p (%s), %d) = %d",
129
                          wipp->sock, wipp->msg, (long long)wipp->msg_len, wipp->flags,
130
                          wipp->sendto, daddr, wipp->tolen, n);
131
                    } else if (n < wipp->msg_len) {
132
                        RTPP_LOG(wipp->log, RTPP_LOG_DBUG,
133
                          "sendto(%d, %p, %lld, %d, %p (%s), %d) = %d: short write",
134
                          wipp->sock, wipp->msg, (long long)wipp->msg_len, wipp->flags,
135
                          wipp->sendto, daddr, wipp->tolen, n);
136
#if RTPP_DEBUG_netio >= 2
137
                    } else {
138
                        RTPP_LOG(wipp->log, RTPP_LOG_DBUG,
139
                          "sendto(%d, %p, %d, %d, %p (%s), %d) = %d",
140
                          wipp->sock, wipp->msg, wipp->msg_len, wipp->flags, wipp->sendto, daddr,
141
                          wipp->tolen, n);
142
#endif
143
                    }
144
                }
145
#endif
146
76.0k
                if (n >= 0) {
147
76.0k
                    wipp->nsend--;
148
76.0k
                } else {
149
                    /* "EPERM" is Linux thing, yield and retry */
150
18
                    if ((send_errno == EPERM || send_errno == ENOBUFS)
151
18
                      && nretry < RTPP_ANETIO_MAX_RETRY) {
152
0
                        sched_yield();
153
0
                        nretry++;
154
18
                    } else {
155
18
                        break;
156
18
                    }
157
18
                }
158
76.0k
            } while (wipp->nsend > 0);
159
76.0k
            RTPP_OBJ_DECREF(wi);
160
76.0k
        }
161
#if RTPP_DEBUG_timers
162
        sleeptime += tp[1] - tp[0];
163
        tp[0] = getdtime();
164
        runtime += tp[0] - tp[1];
165
        if ((run_n % 10000) == 0) {
166
            RTPP_LOG(args->glog, RTPP_LOG_DBUG, "rtpp_anetio_sthread(%p): run %ld aload = %f filtered = %f", \
167
              args, run_n, runtime / (runtime + sleeptime), args->average_load.lastval);
168
        }
169
        if (runtime + sleeptime > 1.0) {
170
            recfilter_apply(&args->average_load, runtime / (runtime + sleeptime));
171
            runtime = sleeptime = 0.0;
172
        }
173
        run_n += 1;
174
#endif
175
32.4k
    }
176
4
out:
177
4
    return;
178
4
}
179
180
static void
181
rtpp_anetio_sendto_debug(struct rtpp_anetio_cf *netio_cf, struct rtpp_wi *wi)
182
0
{
183
#if RTPP_DEBUG_netio >= 1
184
    struct rtpp_wi_pvt *wipp;
185
    PUB2PVT(wi, wipp);
186
    wipp->debug = 1;
187
    wipp->log = netio_cf->args[0].glog;
188
    RTPP_OBJ_INCREF(wipp->log);
189
#if RTPP_DEBUG_netio >= 2
190
    RTPP_LOG(netio_cf->args[0].glog, RTPP_LOG_DBUG, "malloc(%d, %p, %d, %d, %p, %d) = %p",
191
      sock, msg, msg_len, flags, sendto, tolen, wi);
192
    RTPP_LOG(netio_cf->args[0].glog, RTPP_LOG_DBUG, "sendto(%d, %p, %d, %d, %p, %d)",
193
      wipp->sock, wipp->msg, wipp->msg_len, wipp->flags, wipp->sendto, wipp->tolen);
194
#endif
195
#endif
196
0
}
197
198
int
199
rtpp_anetio_sendto(struct rtpp_anetio_cf *netio_cf, int sock, const void *msg, \
200
  size_t msg_len, int flags, const struct sockaddr *sendto, socklen_t tolen)
201
0
{
202
0
    struct rtpp_wi *wi;
203
204
0
    wi = rtpp_wi_malloc(sock, msg, msg_len, flags, sendto, tolen);
205
0
    if (wi == NULL) {
206
0
        return (-1);
207
0
    }
208
0
    rtpp_anetio_sendto_debug(netio_cf, wi);
209
0
    rtpp_queue_put_item(wi, netio_cf->args[0].out_q);
210
0
    return (0);
211
0
}
212
213
int
214
rtpp_anetio_sendto_na(struct rtpp_anetio_cf *netio_cf, int sock, const void *msg, \
215
  size_t msg_len, int flags, const struct sockaddr *sendto, socklen_t tolen, \
216
  struct rtpp_refcnt *data_rcnt)
217
0
{
218
0
    struct rtpp_wi *wi;
219
220
0
    wi = rtpp_wi_malloc_na(sock, msg, msg_len, flags, sendto, tolen, data_rcnt);
221
0
    if (wi == NULL) {
222
0
        return (-1);
223
0
    }
224
0
    rtpp_anetio_sendto_debug(netio_cf, wi);
225
0
    rtpp_queue_put_item(wi, netio_cf->args[0].out_q);
226
0
    return (0);
227
0
}
228
229
void
230
rtpp_anetio_pump(struct rtpp_anetio_cf *netio_cf)
231
4
{
232
233
4
    rtpp_queue_pump(netio_cf->args[0].out_q);
234
4
}
235
236
void
237
rtpp_anetio_pump_q(struct sthread_args *sender)
238
10.7k
{
239
240
10.7k
    rtpp_queue_pump(sender->out_q);
241
10.7k
}
242
243
int
244
rtpp_anetio_send_pkt_na(struct sthread_args *sender, int sock, \
245
  struct rtpp_netaddr *sendto, struct rtp_packet *pkt,
246
  struct rtpp_refcnt *sock_rcnt, struct rtpp_log *plog)
247
76.0k
{
248
76.0k
    struct rtpp_wi *wi;
249
76.0k
    int nsend;
250
251
76.0k
    if (sender->dmode != 0 && pkt->size < LBR_THRS) {
252
0
        nsend = 2;
253
76.0k
    } else {
254
76.0k
        nsend = 1;
255
76.0k
    }
256
257
76.0k
    wi = rtpp_wi_malloc_pkt_na(sock, pkt, sendto, nsend, sock_rcnt);
258
    /*
259
     * rtpp_wi_malloc_pkt_na() consumes pkt and returns wi, so no need to
260
     * call rtp_packet_free() here.
261
     */
262
#if RTPP_DEBUG_netio >= 2
263
    struct rtpp_wi_pvt *wipp;
264
    PUB2PVT(wi, wipp);
265
    wipp->debug = 1;
266
    if (plog == NULL) {
267
        plog = sender->glog;
268
    }
269
    RTPP_OBJ_INCREF(plog);
270
    wipp->log = plog;
271
    RTPP_LOG(plog, RTPP_LOG_DBUG, "send_pkt(%d, %p, %d, %d, %p, %d)",
272
      wipp->sock, wipp->msg, wipp->msg_len, wipp->flags, wipp->sendto, wipp->tolen);
273
#endif
274
76.0k
    rtpp_queue_put_item(wi, sender->out_q);
275
76.0k
    return (0);
276
76.0k
}
277
278
struct sthread_args *
279
rtpp_anetio_pick_sender(struct rtpp_anetio_cf *netio_cf)
280
86.8k
{
281
86.8k
    int min_len, i, l;
282
86.8k
    struct sthread_args *sender;
283
284
86.8k
    sender = &netio_cf->args[0];
285
86.8k
    min_len = rtpp_queue_get_length(sender->out_q);
286
86.8k
    if (min_len == 0) {
287
41.8k
        return (sender);
288
41.8k
    }
289
45.0k
    for (i = 1; i < SEND_THREADS; i++) {
290
0
        l = rtpp_queue_get_length(netio_cf->args[i].out_q);
291
0
        if (l < min_len) {
292
0
            sender = &netio_cf->args[i];
293
0
            min_len = l;
294
0
        }
295
0
    }
296
45.0k
    return (sender);
297
86.8k
}
298
299
struct rtpp_anetio_cf *
300
rtpp_netio_async_init(const struct rtpp_cfg *cfsp, int qlen)
301
4
{
302
4
    struct rtpp_anetio_cf *netio_cf;
303
4
    int i, ri;
304
305
4
    netio_cf = rtpp_zmalloc(sizeof(*netio_cf));
306
4
    if (netio_cf == NULL)
307
0
        return (NULL);
308
309
8
    for (i = 0; i < SEND_THREADS; i++) {
310
4
        netio_cf->args[i].out_q = rtpp_queue_init(RTPQ_LARGE_CB_LEN, "RTPP->NET%.2d", i);
311
4
        if (netio_cf->args[i].out_q == NULL) {
312
0
            for (ri = i - 1; ri >= 0; ri--) {
313
0
                rtpp_queue_destroy(netio_cf->args[ri].out_q);
314
0
                RTPP_OBJ_DECREF(netio_cf->args[ri].glog);
315
0
            }
316
0
            goto e0;
317
0
        }
318
4
        rtpp_queue_setqlen(netio_cf->args[i].out_q, qlen);
319
4
        RTPP_OBJ_INCREF(cfsp->glog);
320
4
        netio_cf->args[i].glog = cfsp->glog;
321
4
        netio_cf->args[i].dmode = cfsp->dmode;
322
#if RTPP_DEBUG_timers
323
        recfilter_init(&netio_cf->args[i].average_load, 0.9, 0.0, 0);
324
#endif
325
4
    }
326
327
8
    for (i = 0; i < SEND_THREADS; i++) {
328
4
        netio_cf->args[i].sigterm = rtpp_wi_malloc_sgnl(SIGTERM, NULL, 0);
329
4
        if (netio_cf->args[i].sigterm == NULL) {
330
0
            for (ri = i - 1; ri >= 0; ri--) {
331
0
                RTPP_OBJ_DECREF(netio_cf->args[ri].sigterm);
332
0
            }
333
0
            goto e1;
334
0
        }
335
4
    }
336
337
8
    for (i = 0; i < SEND_THREADS; i++) {
338
4
        if (pthread_create(&(netio_cf->thread_id[i]), NULL, (void *(*)(void *))&rtpp_anetio_sthread, &netio_cf->args[i]) != 0) {
339
0
             for (ri = i - 1; ri >= 0; ri--) {
340
0
                 rtpp_queue_put_item(netio_cf->args[ri].sigterm, netio_cf->args[ri].out_q);
341
0
                 pthread_join(netio_cf->thread_id[ri], NULL);
342
0
             }
343
0
             for (ri = i; ri < SEND_THREADS; ri++) {
344
0
                 RTPP_OBJ_DECREF(netio_cf->args[ri].sigterm);
345
0
             }
346
0
             goto e1;
347
0
        }
348
4
#if HAVE_PTHREAD_SETNAME_NP
349
4
        (void)pthread_setname_np(netio_cf->thread_id[i], "rtpp_anetio_sender");
350
4
#endif
351
4
    }
352
353
4
    return (netio_cf);
354
355
#if 0
356
e2:
357
    for (i = 0; i < SEND_THREADS; i++) {
358
        RTPP_OBJ_DECREF(netio_cf->args[i].sigterm);
359
    }
360
#endif
361
0
e1:
362
0
    for (i = 0; i < SEND_THREADS; i++) {
363
0
        rtpp_queue_destroy(netio_cf->args[i].out_q);
364
0
        RTPP_OBJ_DECREF(netio_cf->args[i].glog);
365
0
    }
366
0
e0:
367
0
    free(netio_cf);
368
0
    return (NULL);
369
0
}
370
371
void
372
rtpp_netio_async_destroy(struct rtpp_anetio_cf *netio_cf)
373
4
{
374
4
    int i;
375
376
8
    for (i = 0; i < SEND_THREADS; i++) {
377
4
        rtpp_queue_put_item(netio_cf->args[i].sigterm, netio_cf->args[i].out_q);
378
4
    }
379
8
    for (i = 0; i < SEND_THREADS; i++) {
380
4
        pthread_join(netio_cf->thread_id[i], NULL);
381
4
        rtpp_queue_destroy(netio_cf->args[i].out_q);
382
4
        RTPP_OBJ_DECREF(netio_cf->args[i].glog);
383
4
    }
384
4
    free(netio_cf);
385
4
}