Coverage Report

Created: 2025-11-09 07:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rtpproxy/src/rtpp_stream.c
Line
Count
Source
1
/*
2
 * Copyright (c) 2004-2006 Maxim Sobolev <sobomax@FreeBSD.org>
3
 * Copyright (c) 2006-20015 Sippy Software, Inc., http://www.sippysoft.com
4
 * All rights reserved.
5
 *
6
 * Redistribution and use in source and binary forms, with or without
7
 * modification, are permitted provided that the following conditions
8
 * are met:
9
 * 1. Redistributions of source code must retain the above copyright
10
 *    notice, this list of conditions and the following disclaimer.
11
 * 2. Redistributions in binary form must reproduce the above copyright
12
 *    notice, this list of conditions and the following disclaimer in the
13
 *    documentation and/or other materials provided with the distribution.
14
 *
15
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
 * SUCH DAMAGE.
26
 *
27
 */
28
29
#include <sys/types.h>
30
#include <sys/socket.h>
31
#include <netinet/in.h>
32
#include <netdb.h>
33
#include <pthread.h>
34
#include <stdatomic.h>
35
#include <stddef.h>
36
#include <stdlib.h>
37
#include <stdint.h>
38
#include <stdio.h>
39
#include <string.h>
40
41
#include "config.h"
42
43
#include "rtpp_ssrc.h"
44
#include "rtpa_stats.h"
45
#include "rtpp_defines.h"
46
#include "rtpp_log.h"
47
#include "rtpp_types.h"
48
#include "rtpp_log_obj.h"
49
#include "rtp.h"
50
#include "rtp_analyze.h"
51
#include "rtpp_analyzer.h"
52
#include "rtp_resizer.h"
53
#include "rtpp_time.h"
54
#include "rtpp_command.h"
55
#include "rtpp_command_args.h"
56
#include "rtpp_command_sub.h"
57
#include "rtpp_command_private.h"
58
#include "rtpp_genuid.h"
59
#include "rtp_info.h"
60
#include "rtp_packet.h"
61
#include "rtpp_mallocs.h"
62
#include "rtpp_codeptr.h"
63
#include "rtpp_refcnt.h"
64
#include "rtpp_network.h"
65
#include "rtpp_pcount.h"
66
#include "rtpp_pcnt_strm.h"
67
#include "rtpp_proc.h"
68
#include "rtpp_record.h"
69
#include "rtpp_stats.h"
70
#include "rtpp_stream.h"
71
#include "rtpp_stream_fin.h"
72
#include "rtpp_server.h"
73
#include "rtpp_socket.h"
74
#include "rtpp_weakref.h"
75
#include "rtpp_ttl.h"
76
#include "rtpp_pipe.h"
77
#include "rtpp_netaddr.h"
78
#include "rtpp_debug.h"
79
#include "rtpp_acct_pipe.h"
80
#include "rtpp_cfg.h"
81
#include "rtpp_proc_servers.h"
82
#include "advanced/pproc_manager.h"
83
#include "advanced/packet_processor.h"
84
#include "rtpp_command_stats.h"
85
#include "rtpp_modman.h"
86
#include "rtpp_session.h"
87
88
0
#define  SEQ_SYNC_IVAL   1.0    /* in seconds */
89
90
struct rtpps_latch {
91
    enum rtpps_latch_mode mode;
92
    int latched;
93
    struct rtpp_ssrc ssrc;
94
    int seq;
95
    double last_sync;
96
};
97
98
struct rtps {
99
    uint64_t uid;
100
    int inact;
101
};
102
103
struct rtpp_stream_priv
104
{
105
    struct rtpp_stream pub;
106
    struct rtpp_proc_servers *proc_servers;
107
    struct rtpp_stats *rtpp_stats;
108
    pthread_mutex_t lock;
109
    /* Weak reference to the "rtpp_server" (player) */
110
    struct rtps rtps;
111
    /* Timestamp of the last session update */
112
    double last_update;
113
    /* Flag that indicates whether or not address supplied by client can't be trusted */
114
    int untrusted_addr;
115
    /* Save previous address when doing update */
116
    struct rtpp_netaddr *raddr_prev;
117
    /* Flag which tells if we are allowed to update address with RTP src IP */
118
    struct rtpps_latch latch_info;
119
    /* Structure to track hold requests */
120
    struct rtpp_acct_hold hld_stat;
121
    /* Descriptor */
122
    struct rtpp_socket *fd;
123
    /* Remote source address */
124
    struct rtpp_netaddr *rem_addr;
125
    int npkts_resizer_in_idx;
126
    /* Placeholder for per-module structures */
127
    struct pmod_data pmod_data;
128
};
129
130
static void rtpp_stream_dtor(struct rtpp_stream_priv *);
131
static int rtpp_stream_handle_play(struct rtpp_stream *,
132
  const struct r_stream_h_play_args *);
133
static void rtpp_stream_handle_noplay(struct rtpp_stream *);
134
static int rtpp_stream_isplayer_active(struct rtpp_stream *);
135
static void rtpp_stream_finish_playback(struct rtpp_stream *, uint64_t);
136
static const char *rtpp_stream_get_actor(struct rtpp_stream *);
137
static const char *rtpp_stream_get_proto(struct rtpp_stream *);
138
static int _rtpp_stream_latch(struct rtpp_stream_priv *, double,
139
  struct rtp_packet *);
140
static int rtpp_stream_latch(struct rtpp_stream *, struct rtp_packet *);
141
static void rtpp_stream_latch_setmode(struct rtpp_stream *, enum rtpps_latch_mode);
142
static enum rtpps_latch_mode rtpp_stream_latch_getmode(struct rtpp_stream *);
143
144
static int _rtpp_stream_check_latch_override(struct rtpp_stream_priv *,
145
  struct rtp_packet *, double);
146
static void __rtpp_stream_fill_addr(struct rtpp_stream_priv *,
147
  struct rtp_packet *);
148
static int rtpp_stream_guess_addr(struct rtpp_stream *,
149
  struct rtp_packet *);
150
static void rtpp_stream_prefill_addr(struct rtpp_stream *,
151
  struct sockaddr **, double);
152
static void rtpp_stream_set_skt(struct rtpp_stream *, struct rtpp_socket *);
153
static struct rtpp_socket *rtpp_stream_get_skt(struct rtpp_stream *, HERETYPE);
154
static struct rtpp_socket *rtpp_stream_update_skt(struct rtpp_stream *,
155
  struct rtpp_socket *);
156
static int rtpp_stream_send_pkt(struct rtpp_stream *, struct sthread_args *,
157
  struct rtp_packet *);
158
static int rtpp_stream_send_pkt_to(struct rtpp_stream *, struct sthread_args *,
159
  struct rtp_packet *, struct rtpp_netaddr *);
160
static struct rtp_packet *_rtpp_stream_recv_pkt(struct rtpp_stream_priv *,
161
  const struct rtpp_timestamp *);
162
static int rtpp_stream_issendable(struct rtpp_stream *);
163
static int _rtpp_stream_islatched(struct rtpp_stream_priv *);
164
static void rtpp_stream_locklatch(struct rtpp_stream *);
165
static void rtpp_stream_reg_onhold(struct rtpp_stream *);
166
void rtpp_stream_get_stats(struct rtpp_stream *, struct rtpp_acct_hold *);
167
static struct rtp_packet *rtpp_stream_rx(struct rtpp_stream *,
168
  struct rtpp_weakref *, const struct rtpp_timestamp *, struct rtpp_proc_rstats *);
169
static struct rtpp_netaddr *rtpp_stream_get_rem_addr(struct rtpp_stream *, int);
170
static struct rtpp_stream *rtpp_stream_get_sender(struct rtpp_stream *,
171
  const struct rtpp_cfg *cfsp);
172
173
DEFINE_SMETHODS(rtpp_stream,
174
    .handle_play = &rtpp_stream_handle_play,
175
    .handle_noplay = &rtpp_stream_handle_noplay,
176
    .isplayer_active = &rtpp_stream_isplayer_active,
177
    .finish_playback = &rtpp_stream_finish_playback,
178
    .get_actor = &rtpp_stream_get_actor,
179
    .get_proto = &rtpp_stream_get_proto,
180
    .prefill_addr = &rtpp_stream_prefill_addr,
181
    .set_skt = &rtpp_stream_set_skt,
182
    .get_skt = &rtpp_stream_get_skt,
183
    .update_skt = &rtpp_stream_update_skt,
184
    .send_pkt = &rtpp_stream_send_pkt,
185
    .send_pkt_to = &rtpp_stream_send_pkt_to,
186
    .guess_addr = &rtpp_stream_guess_addr,
187
    .issendable = &rtpp_stream_issendable,
188
    .locklatch = &rtpp_stream_locklatch,
189
    .reg_onhold = &rtpp_stream_reg_onhold,
190
    .get_stats = &rtpp_stream_get_stats,
191
    .rx = &rtpp_stream_rx,
192
    .get_rem_addr = &rtpp_stream_get_rem_addr,
193
    .latch = &rtpp_stream_latch,
194
    .latch_setmode = &rtpp_stream_latch_setmode,
195
    .latch_getmode = &rtpp_stream_latch_getmode,
196
    .get_sender = &rtpp_stream_get_sender,
197
);
198
199
static struct pproc_act
200
analyze_rtp_packet(const struct pkt_proc_ctx *pktxp)
201
266k
{
202
266k
    struct rtpp_stream *stp_in = pktxp->strmp_in;
203
266k
    struct rtp_packet *packet = pktxp->pktp;
204
205
266k
    CALL_SMETHOD(stp_in->pcnt_strm, reg_pktin, packet);
206
266k
    if (CALL_SMETHOD(stp_in->analyzer, update, packet) == UPDATE_SSRC_CHG &&
207
266k
      CALL_SMETHOD(stp_in, latch_getmode) != RTPLM_FORCE_ON) {
208
12.5k
        CALL_SMETHOD(stp_in, latch, packet);
209
12.5k
    }
210
266k
    return (PPROC_ACT_NOP);
211
266k
}
212
213
static struct pproc_act
214
analyze_rtcp_packet(const struct pkt_proc_ctx *pktxp)
215
0
{
216
0
    struct rtpp_stream *stp_in = pktxp->strmp_in;
217
0
    struct rtp_packet *packet = pktxp->pktp;
218
219
0
    CALL_SMETHOD(stp_in->pcnt_strm, reg_pktin, packet);
220
0
    return (PPROC_ACT_NOP);
221
0
}
222
223
static struct pproc_act
224
resizer_injest(const struct pkt_proc_ctx *pktxp)
225
266k
{
226
266k
    struct rtpp_stream *stp_in = pktxp->strmp_in;
227
266k
    struct rtp_packet *packet = pktxp->pktp;
228
266k
    struct rtpp_stream_priv *pvt;
229
230
266k
    if (stp_in->resizer != NULL) {
231
0
        rtp_resizer_enqueue(stp_in->resizer, &packet, pktxp->rsp);
232
0
        if (packet == NULL) {
233
0
            if (pktxp->rsp != NULL) {
234
0
                pktxp->rsp->npkts_resizer_in.cnt++;
235
0
            } else {
236
0
                pvt = (struct rtpp_stream_priv *)pktxp->pproc->arg;
237
0
                CALL_SMETHOD(pvt->rtpp_stats, updatebyidx, pvt->npkts_resizer_in_idx, 1);
238
0
            }
239
0
        }
240
0
    }
241
266k
    return ((packet == NULL) ? PPROC_ACT_TAKE : PPROC_ACT_NOP);
242
266k
}
243
244
struct rtpp_stream *
245
rtpp_stream_ctor(const struct r_stream_ctor_args *ap)
246
250k
{
247
250k
    struct rtpp_stream_priv *pvt;
248
250k
    size_t alen;
249
250k
    const struct r_pipe_ctor_args *pap = ap->pipe_cap;
250
250k
    const struct rtpp_session_ctor_args *sap = pap->session_cap;
251
250k
    const struct rtpp_cfg *cfs = sap->cfs;
252
250k
    int nmodules = 0;
253
254
250k
#if ENABLE_MODULE_IF
255
250k
    nmodules  = cfs->modules_cf->count.total;
256
250k
#endif
257
258
250k
    alen = offsetof(struct rtpp_stream_priv, pmod_data.adp) + 
259
250k
      (nmodules * sizeof(pvt->pmod_data.adp[0]));
260
250k
    pvt = rtpp_rzmalloc(alen, PVT_RCOFFS(pvt));
261
250k
    if (pvt == NULL) {
262
0
        goto e0;
263
0
    }
264
250k
    if (pthread_mutex_init(&pvt->lock, NULL) != 0) {
265
0
        goto e1;
266
0
    }
267
250k
    pvt->pub.log = pap->log;
268
250k
    RTPP_OBJ_BORROW(&pvt->pub, pap->log);
269
250k
    RTPP_OBJ_DTOR_ATTACH(&pvt->pub, pthread_mutex_destroy, &pvt->lock);
270
250k
    pvt->pub.pproc_manager = CALL_SMETHOD(cfs->pproc_manager, clone);
271
250k
    if (pvt->pub.pproc_manager == NULL) {
272
0
        goto e1;
273
0
    }
274
250k
    RTPP_OBJ_DTOR_ATTACH_OBJ(&pvt->pub, pvt->pub.pproc_manager);
275
250k
    if (pap->pipe_type == PIPE_RTP) {
276
125k
        pvt->pub.analyzer = rtpp_analyzer_ctor(pap->log);
277
125k
        if (pvt->pub.analyzer == NULL) {
278
0
            goto e1;
279
0
        }
280
125k
        RTPP_OBJ_DTOR_ATTACH_OBJ(&pvt->pub, pvt->pub.analyzer);
281
282
125k
        const struct packet_processor_if resize_packet_poi = {
283
125k
            .descr = "resize_packet",
284
125k
            .arg = (void *)pvt,
285
125k
            .key = (void *)(pvt + 1),
286
125k
            .enqueue = &resizer_injest
287
125k
        };
288
125k
        if (CALL_SMETHOD(pvt->pub.pproc_manager, reg, PPROC_ORD_RESIZE, &resize_packet_poi) < 0)
289
0
            goto e1;
290
125k
        pvt->npkts_resizer_in_idx = CALL_SMETHOD(cfs->rtpp_stats, getidxbyname,
291
125k
          "npkts_resizer_in");
292
125k
        if (pvt->npkts_resizer_in_idx == -1)
293
0
            goto e2;
294
125k
    }
295
296
250k
    const struct packet_processor_if analyze_packet_poi = {
297
250k
        .descr = "analyze_packet",
298
250k
        .arg = (void *)pvt->pub.analyzer,
299
250k
        .key = (void *)pvt,
300
250k
        .enqueue = (pap->pipe_type == PIPE_RTP) ? &analyze_rtp_packet : &analyze_rtcp_packet,
301
250k
    };
302
250k
    if (CALL_SMETHOD(pvt->pub.pproc_manager, reg, PPROC_ORD_ANALYZE, &analyze_packet_poi) < 0)
303
0
        goto e2;
304
305
250k
    pvt->pub.pcnt_strm = rtpp_pcnt_strm_ctor();
306
250k
    if (pvt->pub.pcnt_strm == NULL) {
307
0
        goto e3;
308
0
    }
309
250k
    RTPP_OBJ_DTOR_ATTACH_OBJ(&pvt->pub, pvt->pub.pcnt_strm);
310
250k
    pvt->raddr_prev = rtpp_netaddr_ctor();
311
250k
    if (pvt->raddr_prev == NULL) {
312
0
        goto e3;
313
0
    }
314
250k
    RTPP_OBJ_DTOR_ATTACH_OBJ(&pvt->pub, pvt->raddr_prev);
315
250k
    pvt->rem_addr = rtpp_netaddr_ctor();
316
250k
    if (pvt->rem_addr == NULL) {
317
0
        goto e3;
318
0
    }
319
250k
    RTPP_OBJ_DTOR_ATTACH_OBJ(&pvt->pub, pvt->rem_addr);
320
250k
    pvt->proc_servers = cfs->proc_servers;
321
250k
    RTPP_OBJ_BORROW(&pvt->pub, cfs->proc_servers);
322
250k
    pvt->rtpp_stats = cfs->rtpp_stats;
323
250k
    pvt->pub.side = ap->side;
324
250k
    pvt->pub.pipe_type = pap->pipe_type;
325
326
250k
    pvt->pub.stuid = CALL_SMETHOD(cfs->guid, gen);
327
250k
    pvt->pub.seuid = pap->seuid;
328
1.25M
    for (unsigned int i = 0; i < nmodules; i++) {
329
1.00M
        atomic_init(&(pvt->pmod_data.adp[i]), NULL);
330
1.00M
    }
331
250k
    pvt->pmod_data.nmodules = nmodules;
332
250k
    pvt->pub.pmod_datap = &(pvt->pmod_data);
333
250k
    pvt->pub.laddr = sap->lia[ap->side];
334
250k
    PUBINST_FININIT(&pvt->pub, pvt, rtpp_stream_dtor);
335
250k
    return (&pvt->pub);
336
337
0
e3:
338
0
    CALL_SMETHOD(pvt->pub.pproc_manager, unreg, pvt);
339
0
e2:
340
0
    if (pap->pipe_type == PIPE_RTP) {
341
0
        CALL_SMETHOD(pvt->pub.pproc_manager, unreg, pvt + 1);
342
0
    }
343
0
e1:
344
0
    RTPP_OBJ_DECREF(&(pvt->pub));
345
0
e0:
346
0
    return (NULL);
347
0
}
348
349
static inline void
350
_s_rtps(struct rtpp_stream_priv *pvt, uint64_t rtps, int replace)
351
0
{
352
353
0
    RTPP_DBG_ASSERT(pvt->pub.pipe_type == PIPE_RTP);
354
0
    if (replace == 0) {
355
0
        RTPP_DBG_ASSERT(pvt->rtps.uid == RTPP_UID_NONE);
356
0
    }
357
0
    pvt->rtps.uid = rtps;
358
0
    if (CALL_SMETHOD(pvt->rem_addr, isempty) || pvt->fd == NULL) {
359
0
        pvt->rtps.inact = 1;
360
0
    }
361
0
}
362
363
static const char *
364
_rtpp_stream_get_actor(struct rtpp_stream_priv *pvt)
365
319k
{
366
367
319k
    return ((pvt->pub.side == RTPP_SSIDE_CALLER) ? "caller" : "callee");
368
319k
}
369
370
static const char *
371
_rtpp_stream_get_proto(struct rtpp_stream_priv *pvt)
372
12.5k
{
373
374
12.5k
    return (PP_NAME(pvt->pub.pipe_type));
375
12.5k
}
376
377
static void
378
rtpp_stream_dtor(struct rtpp_stream_priv *pvt)
379
250k
{
380
250k
    struct rtpp_stream *pub;
381
382
250k
    pub = &(pvt->pub);
383
250k
    rtpp_stream_fin(pub);
384
250k
    if (pub->analyzer != NULL) {
385
125k
         struct rtpa_stats rst;
386
125k
         char ssrc_buf[11];
387
125k
         const char *actor, *ssrc;
388
389
125k
         actor = _rtpp_stream_get_actor(pvt);
390
125k
         CALL_SMETHOD(pub->analyzer, get_stats, &rst);
391
125k
         if (rst.ssrc_changes != 0) {
392
4.60k
             snprintf(ssrc_buf, sizeof(ssrc_buf), SSRC_FMT, rst.last_ssrc.val);
393
4.60k
             ssrc = ssrc_buf;
394
120k
         } else {
395
120k
             ssrc = "NONE";
396
120k
         }
397
125k
         RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "RTP stream from %s: "
398
125k
           "SSRC=%s, ssrc_changes=%lu, psent=%lu, precvd=%lu, plost=%lu, pdups=%lu",
399
125k
           actor, ssrc, rst.ssrc_changes, rst.psent, rst.precvd,
400
125k
           rst.plost, rst.pdups);
401
125k
         if (rst.psent > 0) {
402
4.57k
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_nsent", rst.psent);
403
4.57k
         }
404
125k
         if (rst.precvd > 0) {
405
4.60k
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_nrcvd", rst.precvd);
406
4.60k
         }
407
125k
         if (rst.pdups > 0) {
408
912
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_ndups", rst.pdups);
409
912
         }
410
125k
         if (rst.plost > 0) {
411
2.20k
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_nlost", rst.plost);
412
2.20k
         }
413
125k
         if (rst.pecount > 0) {
414
24.3k
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_perrs", rst.pecount);
415
24.3k
         }
416
125k
    }
417
250k
    if (pvt->fd != NULL)
418
250k
        RTPP_OBJ_DECREF(pvt->fd);
419
250k
    if (pub->codecs != NULL)
420
489
        free(pub->codecs);
421
250k
    if (pvt->rtps.uid != RTPP_UID_NONE)
422
250k
        CALL_SMETHOD(pvt->proc_servers, unreg, pvt->rtps.uid);
423
250k
    if (pub->resizer != NULL)
424
792
        rtp_resizer_free(pvt->rtpp_stats, pub->resizer);
425
250k
    if (pub->rrc != NULL)
426
250k
        RTPP_OBJ_DECREF(pub->rrc);
427
1.25M
    for (unsigned int i = 0; i < pvt->pmod_data.nmodules; i++) {
428
1.00M
        struct rtpp_refcnt *mdata_rcnt;
429
1.00M
        mdata_rcnt = atomic_load(&(pvt->pmod_data.adp[i]));
430
1.00M
        if (mdata_rcnt != NULL) {
431
0
            RC_DECREF(mdata_rcnt);
432
0
        }
433
1.00M
    }
434
250k
    if (pub->ttl != NULL)
435
250k
        RTPP_OBJ_DECREF(pub->ttl);
436
250k
    if (pvt->pub.pipe_type == PIPE_RTP) {
437
125k
        CALL_SMETHOD(pvt->pub.pproc_manager, unreg, pvt + 1);
438
125k
    }
439
250k
    CALL_SMETHOD(pvt->pub.pproc_manager, unreg, pvt);
440
250k
}
441
442
static void
443
player_predestroy_cb(struct rtpp_stats *rtpp_stats)
444
0
{
445
446
0
    CALL_SMETHOD(rtpp_stats, updatebyname, "nplrs_destroyed", 1);
447
0
}
448
449
static struct pproc_act
450
drop_packets(const struct pkt_proc_ctx *pktxp)
451
0
{
452
453
0
    return (PPROC_ACT_DROP);
454
0
}
455
456
static int
457
rtpp_stream_handle_play(struct rtpp_stream *self,
458
  const struct r_stream_h_play_args *ap)
459
714
{
460
714
    struct rtpp_stream_priv *pvt;
461
714
    int n;
462
714
    char *cp;
463
714
    struct rtpp_server *rsrv;
464
714
    uint16_t seq;
465
714
    uint32_t ssrc;
466
714
    const char *plerror, *codecs = ap->codecs;
467
714
    struct rtpp_server_ctor_args sca = {.name = ap->pname, .loop = ap->playcount,
468
714
      .ptime = ap->ptime, .guid = ap->guid};
469
470
714
    PUB2PVT(self, pvt);
471
472
714
    const struct packet_processor_if drop_on_pa_poi = {
473
714
        .descr = "drop_packets(player_active)",
474
714
        .arg = (void *)pvt,
475
714
        .key = (void *)(pvt + 2),
476
714
        .enqueue = &drop_packets
477
714
    };
478
479
714
    pthread_mutex_lock(&pvt->lock);
480
714
    plerror = "reason unknown";
481
1.25k
    while (*codecs != '\0') {
482
845
        n = strtol(codecs, &cp, 10);
483
845
        if (cp == codecs) {
484
309
            plerror = "invalid codecs";
485
309
            goto e0;
486
309
        }
487
536
        codecs = cp;
488
536
        if (*codecs != '\0')
489
146
            codecs++;
490
536
        sca.codec = n;
491
536
        rsrv = rtpp_server_ctor(&sca);
492
536
        if (rsrv == NULL) {
493
536
            RTPP_LOG(pvt->pub.log, RTPP_LOG_DBUG, "rtpp_server_ctor(\"%s\", %d, %d) failed",
494
536
              ap->pname, n, ap->playcount);
495
536
            plerror = "rtpp_server_ctor() failed";
496
536
            if (sca.result == RTPP_SERV_NOENT)
497
536
                continue;
498
0
            goto e0;
499
536
        }
500
0
        rsrv->stuid = self->stuid;
501
0
        ssrc = CALL_SMETHOD(rsrv, get_ssrc);
502
0
        seq = CALL_SMETHOD(rsrv, get_seq);
503
0
        _s_rtps(pvt, rsrv->sruid, 0);
504
0
        int regres = CALL_SMETHOD(self->pproc_manager->reverse, reg, PPROC_ORD_PLAY,
505
0
          &drop_on_pa_poi);
506
0
        if (regres < 0) {
507
0
            plerror = "pproc_manager->reg() method failed";
508
0
            goto e1;
509
0
        }
510
0
        if (pvt->rtps.inact == 0) {
511
0
            CALL_SMETHOD(rsrv, start, ap->cmd->dtime->mono);
512
0
        }
513
0
        if (CALL_SMETHOD(pvt->proc_servers, reg, rsrv, pvt->rtps.inact) != 0) {
514
0
            plerror = "proc_servers->reg() method failed";
515
0
            goto e2;
516
0
        }
517
0
        pthread_mutex_unlock(&pvt->lock);
518
0
        rtpp_command_get_stats(ap->cmd)->nplrs_created.cnt++;
519
0
        RTPP_OBJ_DTOR_ATTACH(rsrv, (rtpp_refcnt_dtor_t)player_predestroy_cb,
520
0
          pvt->rtpp_stats);
521
0
        RTPP_OBJ_DECREF(rsrv);
522
0
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
523
0
          "%d times playing prompt %s codec %d: SSRC=" SSRC_FMT ", seq=%u",
524
0
          ap->playcount, ap->pname, n, ssrc, seq);
525
0
        return 0;
526
0
    }
527
405
    goto e0;
528
405
e2:
529
0
    CALL_SMETHOD(self->pproc_manager->reverse, unreg, drop_on_pa_poi.key);
530
0
e1:
531
0
    RTPP_OBJ_DECREF(rsrv);
532
714
e0:
533
714
    pthread_mutex_unlock(&pvt->lock);
534
714
    RTPP_LOG(pvt->pub.log, RTPP_LOG_ERR, "can't create player: %s", plerror);
535
714
    return -1;
536
0
}
537
538
static void
539
rtpp_stream_handle_noplay(struct rtpp_stream *self)
540
1.07k
{
541
1.07k
    struct rtpp_stream_priv *pvt;
542
1.07k
    uint64_t ruid;
543
1.07k
    int stopped;
544
545
1.07k
    stopped = 0;
546
1.07k
    PUB2PVT(self, pvt);
547
1.07k
    pthread_mutex_lock(&pvt->lock);
548
1.07k
    ruid = pvt->rtps.uid;
549
1.07k
    pthread_mutex_unlock(&pvt->lock);
550
1.07k
    if (ruid == RTPP_UID_NONE)
551
1.07k
        return;
552
0
    if (CALL_SMETHOD(pvt->proc_servers, unreg, ruid) == 0) {
553
0
        pthread_mutex_lock(&pvt->lock);
554
0
        CALL_SMETHOD(self->pproc_manager->reverse, unreg, pvt + 2);
555
0
        if (pvt->rtps.uid == ruid) {
556
0
            pvt->rtps.uid = RTPP_UID_NONE;
557
0
            pvt->rtps.inact = 0;
558
0
            stopped = 1;
559
0
        }
560
0
        pthread_mutex_unlock(&pvt->lock);
561
0
    }
562
0
    if (stopped != 0) {
563
0
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
564
0
          "stopping player at port %d", self->port);
565
0
    }
566
0
}
567
568
static int
569
rtpp_stream_isplayer_active(struct rtpp_stream *self)
570
0
{
571
0
    struct rtpp_stream_priv *pvt;
572
0
    int rval;
573
574
0
    PUB2PVT(self, pvt);
575
0
    pthread_mutex_lock(&pvt->lock);
576
0
    rval = (pvt->rtps.uid != RTPP_UID_NONE) ? 1 : 0;
577
0
    pthread_mutex_unlock(&pvt->lock);
578
0
    return (rval);
579
0
}
580
581
static void
582
rtpp_stream_finish_playback(struct rtpp_stream *self, uint64_t sruid)
583
0
{
584
0
    struct rtpp_stream_priv *pvt;
585
586
0
    PUB2PVT(self, pvt);
587
0
    pthread_mutex_lock(&pvt->lock);
588
0
    if (pvt->rtps.uid != RTPP_UID_NONE && pvt->rtps.uid == sruid) {
589
0
        CALL_SMETHOD(self->pproc_manager->reverse, unreg, pvt + 2);
590
0
        _s_rtps(pvt, RTPP_UID_NONE, 1);
591
0
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
592
0
          "player at port %d has finished", self->port);
593
0
    }
594
0
    pthread_mutex_unlock(&pvt->lock);
595
0
}
596
597
static const char *
598
rtpp_stream_get_actor(struct rtpp_stream *self)
599
181k
{
600
181k
    struct rtpp_stream_priv *pvt;
601
602
181k
    PUB2PVT(self, pvt);
603
181k
    return (_rtpp_stream_get_actor(pvt));
604
181k
}
605
606
static const char *
607
rtpp_stream_get_proto(struct rtpp_stream *self)
608
84.6k
{
609
610
84.6k
    return (PP_NAME(self->pipe_type));
611
84.6k
}
612
613
static int
614
_rtpp_stream_latch(struct rtpp_stream_priv *pvt, double dtime,
615
  struct rtp_packet *packet)
616
12.5k
{
617
12.5k
    const char *actor, *ptype, *ssrc, *seq, *relatch;
618
12.5k
    char ssrc_buf[11], seq_buf[6];
619
12.5k
    char saddr[MAX_AP_STRBUF];
620
12.5k
    int newlatch;
621
622
12.5k
    if (pvt->latch_info.mode == RTPLM_FORCE_ON) {
623
0
        __rtpp_stream_fill_addr(pvt, packet);
624
0
        return (0);
625
0
    }
626
627
12.5k
    if (pvt->last_update != 0 &&
628
0
      dtime - pvt->last_update < UPDATE_WINDOW) {
629
0
        return (0);
630
0
    }
631
632
12.5k
    actor = _rtpp_stream_get_actor(pvt);
633
12.5k
    ptype = _rtpp_stream_get_proto(pvt);
634
635
12.5k
    if (pvt->pub.pipe_type == PIPE_RTP) {
636
12.5k
        if (rtp_packet_parse(packet) == RTP_PARSER_OK) {
637
12.5k
            pvt->latch_info.ssrc.val = packet->parsed->ssrc;
638
12.5k
            pvt->latch_info.ssrc.inited = 1;
639
12.5k
            pvt->latch_info.seq = packet->parsed->seq;
640
12.5k
            pvt->latch_info.last_sync = dtime;
641
12.5k
            snprintf(ssrc_buf, sizeof(ssrc_buf), SSRC_FMT, packet->parsed->ssrc);
642
12.5k
            snprintf(seq_buf, sizeof(seq_buf), "%u", packet->parsed->seq);
643
12.5k
            ssrc = ssrc_buf;
644
12.5k
            seq = seq_buf;
645
12.5k
        } else {
646
0
            pvt->latch_info.ssrc.val = 0;
647
0
            pvt->latch_info.ssrc.inited = 0;
648
0
            ssrc = seq = "INVALID";
649
0
        }
650
12.5k
    } else {
651
0
        pvt->latch_info.ssrc.inited = 0;
652
0
        ssrc = seq = "UNKNOWN";
653
0
    }
654
655
12.5k
    addrport2char_r(sstosa(&packet->raddr), saddr, sizeof(saddr), ':');
656
12.5k
    newlatch = SSRC_IS_BAD(&pvt->latch_info.ssrc) ? 0 : 1;
657
12.5k
    if (pvt->latch_info.latched == 0) {
658
4.09k
        relatch = (newlatch != 0) ? "latched in" : "not latched (bad SSRC)";
659
8.45k
    } else {
660
8.45k
        relatch = "re-latched";
661
8.45k
    }
662
#if !defined(FUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION)
663
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
664
      "%s's address %s: %s (%s), SSRC=%s, Seq=%s", actor, relatch,
665
      saddr, ptype, ssrc, seq);
666
#endif
667
12.5k
    pvt->latch_info.latched = newlatch;
668
12.5k
    return (1);
669
12.5k
}
670
671
static int
672
rtpp_stream_latch(struct rtpp_stream *self, struct rtp_packet *packet)
673
12.5k
{
674
12.5k
    struct rtpp_stream_priv *pvt;
675
676
12.5k
    PUB2PVT(self, pvt);
677
12.5k
    pthread_mutex_lock(&pvt->lock);
678
12.5k
    int rval = _rtpp_stream_latch(pvt, packet->rtime.mono, packet);
679
12.5k
    pthread_mutex_unlock(&pvt->lock);
680
12.5k
    return (rval);
681
12.5k
}
682
683
static void
684
rtpp_stream_latch_setmode(struct rtpp_stream *self, enum rtpps_latch_mode mode)
685
11.5k
{
686
11.5k
    struct rtpp_stream_priv *pvt;
687
688
11.5k
    PUB2PVT(self, pvt);
689
11.5k
    pthread_mutex_lock(&pvt->lock);
690
11.5k
    pvt->latch_info.mode = mode;
691
11.5k
    pthread_mutex_unlock(&pvt->lock);
692
11.5k
}
693
694
static enum rtpps_latch_mode
695
rtpp_stream_latch_getmode(struct rtpp_stream *self)
696
12.5k
{
697
12.5k
    struct rtpp_stream_priv *pvt;
698
699
12.5k
    PUB2PVT(self, pvt);
700
12.5k
    pthread_mutex_lock(&pvt->lock);
701
12.5k
    enum rtpps_latch_mode mode = pvt->latch_info.mode;
702
12.5k
    pthread_mutex_unlock(&pvt->lock);
703
12.5k
    return (mode);
704
12.5k
}
705
706
static void
707
_rtpp_stream_latch_sync(struct rtpp_stream_priv *pvt, double dtime,
708
  struct rtp_packet *packet)
709
0
{
710
0
    struct rtpps_latch *lip;
711
712
0
    lip = &pvt->latch_info;
713
0
    if (pvt->pub.pipe_type != PIPE_RTP || lip->ssrc.inited == 0)
714
0
        return;
715
0
    if (dtime - lip->last_sync < SEQ_SYNC_IVAL)
716
0
        return;
717
0
    if (rtp_packet_parse(packet) != RTP_PARSER_OK)
718
0
        return;
719
0
    if (lip->ssrc.val != packet->parsed->ssrc)
720
0
        return;
721
0
    lip->seq = packet->parsed->seq;
722
0
    lip->last_sync = dtime;
723
0
}
724
725
static int
726
_rtpp_stream_check_latch_override(struct rtpp_stream_priv *pvt,
727
  struct rtp_packet *packet, double dtime)
728
0
{
729
0
    const char *actor;
730
0
    char saddr[MAX_AP_STRBUF];
731
732
0
    if (pvt->pub.pipe_type == PIPE_RTCP || pvt->latch_info.ssrc.inited == 0)
733
0
        return (0);
734
0
    if (pvt->latch_info.mode == RTPLM_FORCE_ON)
735
0
        return (0);
736
0
    if (rtp_packet_parse(packet) != RTP_PARSER_OK)
737
0
        return (0);
738
0
    if (pvt->last_update == 0 || dtime - pvt->last_update > UPDATE_WINDOW) {
739
0
        if (packet->parsed->ssrc != pvt->latch_info.ssrc.val)
740
0
            return (0);
741
0
        if (SEQ_DIST(pvt->latch_info.seq, packet->parsed->seq) > 536)
742
0
            return (0);
743
0
    }
744
745
0
    actor = _rtpp_stream_get_actor(pvt);
746
747
0
    addrport2char_r(sstosa(&packet->raddr), saddr, sizeof(saddr), ':');
748
0
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
749
0
      "%s's address re-latched: %s (%s), SSRC=" SSRC_FMT ", Seq=%u->%u", actor,
750
0
      saddr, "RTP", pvt->latch_info.ssrc.val, pvt->latch_info.seq,
751
0
      packet->parsed->seq);
752
753
0
    pvt->latch_info.seq = packet->parsed->seq;
754
0
    pvt->latch_info.last_sync = packet->rtime.mono;
755
0
    return (1);
756
0
}
757
758
static void
759
_rtpp_stream_plr_start(struct rtpp_stream_priv *pvt, double dtime)
760
0
{
761
762
0
    RTPP_DBG_ASSERT(pvt->rtps.inact != 0);
763
0
    CALL_SMETHOD(pvt->proc_servers, plr_start, pvt->rtps.uid, dtime);
764
0
    pvt->rtps.inact = 0;
765
0
}
766
767
static void
768
__rtpp_stream_fill_addr(struct rtpp_stream_priv *pvt, struct rtp_packet *packet)
769
0
{
770
0
    const char *actor, *ptype;
771
0
    char saddr[MAX_AP_STRBUF];
772
773
0
    pvt->untrusted_addr = 1;
774
0
    CALL_SMETHOD(pvt->rem_addr, set, sstosa(&packet->raddr), packet->rlen);
775
0
    if (CALL_SMETHOD(pvt->raddr_prev, isempty) ||
776
0
      CALL_SMETHOD(pvt->raddr_prev, cmp, sstosa(&packet->raddr), packet->rlen) != 0) {
777
0
        pvt->latch_info.latched = 1;
778
0
    }
779
0
    if (pvt->rtps.inact != 0 && pvt->fd != NULL) {
780
0
        _rtpp_stream_plr_start(pvt, packet->rtime.mono);
781
0
    }
782
783
0
    actor = _rtpp_stream_get_actor(pvt);
784
0
    ptype = _rtpp_stream_get_proto(pvt);
785
0
    addrport2char_r(sstosa(&packet->raddr), saddr, sizeof(saddr), ':');
786
0
    const char *wice = (pvt->latch_info.mode == RTPLM_FORCE_ON) ? " (with ICE)" : "";
787
0
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
788
0
      "%s's address filled in%s: %s (%s)", actor, wice, saddr, ptype);
789
0
    return;
790
0
}
791
792
static int
793
rtpp_stream_guess_addr(struct rtpp_stream *self,
794
  struct rtp_packet *packet)
795
0
{
796
0
    int rport;
797
0
    const char *actor, *ptype;
798
0
    struct rtpp_stream_priv *pvt;
799
0
    struct sockaddr_storage ta;
800
801
0
    RTPP_DBG_ASSERT(self->pipe_type == PIPE_RTCP);
802
0
    PUB2PVT(self, pvt);
803
804
0
    if (!CALL_SMETHOD(pvt->rem_addr, isempty) &&
805
0
      CALL_SMETHOD(pvt->rem_addr, cmphost, sstosa(&packet->raddr))) {
806
0
        return (0);
807
0
    }
808
#if 0
809
    if (self->addr == NULL) {
810
        self->addr = malloc(packet->rlen);
811
        if (self->addr == NULL) {
812
            return (-1);
813
        }
814
    }
815
#endif
816
0
    actor = rtpp_stream_get_actor(self);
817
0
    ptype = rtpp_stream_get_proto(self);
818
0
    rport = ntohs(satosin(&packet->raddr)->sin_port);
819
0
    if (IS_LAST_PORT(rport)) {
820
0
        return (-1);
821
0
    }
822
823
0
    memcpy(&ta, &packet->raddr, packet->rlen);
824
0
    setport(sstosa(&ta), rport + 1);
825
826
0
    CALL_SMETHOD(pvt->rem_addr, set, sstosa(&ta), packet->rlen);
827
    /* Use guessed value as the only true one for asymmetric clients */
828
0
    pvt->latch_info.latched = self->asymmetric;
829
0
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "guessing %s port "
830
0
      "for %s to be %d", ptype, actor, rport + 1);
831
832
0
    return (0);
833
0
}
834
835
static void
836
rtpp_stream_prefill_addr(struct rtpp_stream *self, struct sockaddr **iapp,
837
  double dtime)
838
84.3k
{
839
84.3k
    struct rtpp_stream_priv *pvt;
840
84.3k
    char saddr[MAX_AP_STRBUF];
841
84.3k
    const char *actor, *ptype;
842
843
84.3k
    PUB2PVT(self, pvt);
844
845
84.3k
    actor = rtpp_stream_get_actor(self);
846
84.3k
    ptype = rtpp_stream_get_proto(self);
847
84.3k
    pthread_mutex_lock(&pvt->lock);
848
84.3k
    if (pvt->hld_stat.status != 0) {
849
62
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "taking %s's %s stream off-hold",
850
62
            actor, ptype);
851
62
        pvt->hld_stat.status = 0;
852
62
    }
853
854
84.3k
    if (!CALL_SMETHOD(pvt->rem_addr, isempty))
855
2.88k
        pvt->last_update = dtime;
856
857
    /*
858
     * Unless the address provided by client historically
859
     * cannot be trusted and address is different from one
860
     * that we recorded update it.
861
     */
862
84.3k
    if (pvt->untrusted_addr != 0) {
863
0
        pthread_mutex_unlock(&pvt->lock);
864
0
        return;
865
0
    }
866
84.3k
    if (!CALL_SMETHOD(pvt->rem_addr, isempty) && CALL_SMETHOD(pvt->rem_addr,
867
84.3k
      isaddrseq, *iapp)) {
868
1.18k
        pthread_mutex_unlock(&pvt->lock);
869
1.18k
        return;
870
1.18k
    }
871
872
83.2k
    addrport2char_r(*iapp, saddr, sizeof(saddr), ':');
873
83.2k
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "pre-filling %s's %s address "
874
83.2k
      "with %s", actor, ptype, saddr);
875
83.2k
    if (!CALL_SMETHOD(pvt->rem_addr, isempty)) {
876
1.69k
        if (pvt->latch_info.latched != 0) {
877
190
            CALL_SMETHOD(pvt->raddr_prev, copy, pvt->rem_addr);
878
190
        }
879
1.69k
    }
880
83.2k
    CALL_SMETHOD(pvt->rem_addr, set, *iapp, SA_LEN(*iapp));
881
83.2k
    if (pvt->rtps.inact != 0 && pvt->fd != NULL) {
882
0
        _rtpp_stream_plr_start(pvt, dtime);
883
0
    }
884
83.2k
    pthread_mutex_unlock(&pvt->lock);
885
83.2k
}
886
887
static void rtpp_stream_reg_onhold(struct rtpp_stream *self)
888
930
{
889
930
    struct rtpp_stream_priv *pvt;
890
930
    const char *actor, *ptype;
891
892
930
    PUB2PVT(self, pvt);
893
930
    pthread_mutex_lock(&pvt->lock);
894
930
    if (pvt->hld_stat.status == 0) {
895
304
        actor = rtpp_stream_get_actor(self);
896
304
        ptype = rtpp_stream_get_proto(self);
897
304
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "putting %s's %s stream on hold",
898
304
           actor, ptype);
899
304
        pvt->hld_stat.status = 1;
900
304
    }
901
930
    pvt->hld_stat.cnt++;
902
930
    pthread_mutex_unlock(&pvt->lock);
903
930
}
904
905
static void
906
rtpp_stream_set_skt(struct rtpp_stream *self, struct rtpp_socket *new_skt)
907
169k
{
908
169k
    struct rtpp_stream_priv *pvt;
909
910
169k
    PUB2PVT(self, pvt);
911
169k
    pthread_mutex_lock(&pvt->lock);
912
169k
    if (new_skt == NULL) {
913
0
        RTPP_DBG_ASSERT(pvt->fd != NULL);
914
0
        RTPP_OBJ_DECREF(pvt->fd);
915
0
        pvt->fd = NULL;
916
0
        pthread_mutex_unlock(&pvt->lock);
917
0
        return;
918
0
    }
919
169k
    RTPP_DBG_ASSERT(pvt->fd == NULL);
920
169k
    CALL_SMETHOD(new_skt, set_stuid, self->stuid);
921
169k
    pvt->fd = new_skt;
922
169k
    RTPP_OBJ_INCREF(pvt->fd);
923
169k
    if (pvt->rtps.inact != 0 && !CALL_SMETHOD(pvt->rem_addr, isempty)) {
924
0
        _rtpp_stream_plr_start(pvt, getdtime());
925
0
    }
926
169k
    pthread_mutex_unlock(&pvt->lock);
927
169k
}
928
929
static struct rtpp_socket *
930
rtpp_stream_get_skt(struct rtpp_stream *self, HERETYPE mlp)
931
294k
{
932
294k
    struct rtpp_stream_priv *pvt;
933
294k
    struct rtpp_socket *rval;
934
935
294k
    PUB2PVT(self, pvt);
936
294k
    pthread_mutex_lock(&pvt->lock);
937
294k
    if (pvt->fd == NULL) {
938
103k
        pthread_mutex_unlock(&pvt->lock);
939
103k
        return (NULL);
940
103k
    }
941
190k
    RTPP_OBJ_INCREF(pvt->fd);
942
190k
    rval = pvt->fd;
943
190k
    pthread_mutex_unlock(&pvt->lock);
944
190k
    return (rval);
945
294k
}
946
947
static struct rtpp_socket *
948
rtpp_stream_update_skt(struct rtpp_stream *self, struct rtpp_socket *new_skt)
949
10.6k
{
950
10.6k
    struct rtpp_socket *old_skt;
951
10.6k
    struct rtpp_stream_priv *pvt;
952
953
10.6k
    RTPP_DBG_ASSERT(new_skt != NULL);
954
10.6k
    PUB2PVT(self, pvt);
955
10.6k
    pthread_mutex_lock(&pvt->lock);
956
10.6k
    old_skt = pvt->fd;
957
10.6k
    CALL_SMETHOD(new_skt, set_stuid, self->stuid);
958
10.6k
    pvt->fd = new_skt;
959
10.6k
    RTPP_OBJ_INCREF(pvt->fd);
960
10.6k
    if (pvt->rtps.inact != 0 && !CALL_SMETHOD(pvt->rem_addr, isempty)) {
961
0
        _rtpp_stream_plr_start(pvt, getdtime());
962
0
    }
963
10.6k
    pthread_mutex_unlock(&pvt->lock);
964
10.6k
    return (old_skt);
965
10.6k
}
966
967
static int
968
rtpp_stream_send_pkt(struct rtpp_stream *self, struct sthread_args *sap,
969
  struct rtp_packet *pkt)
970
99.5k
{
971
99.5k
    struct rtpp_stream_priv *pvt;
972
99.5k
    int rval;
973
974
99.5k
    PUB2PVT(self, pvt);
975
99.5k
    pthread_mutex_lock(&pvt->lock);
976
99.5k
    rval = CALL_SMETHOD(pvt->fd, send_pkt_na, sap, pvt->rem_addr, pkt,
977
99.5k
      self->log);
978
99.5k
    pthread_mutex_unlock(&pvt->lock);
979
99.5k
    return (rval);
980
99.5k
}
981
982
static int
983
rtpp_stream_send_pkt_to(struct rtpp_stream *self, struct sthread_args *sap,
984
  struct rtp_packet *pkt, struct rtpp_netaddr *rem_addr)
985
2
{
986
2
    struct rtpp_stream_priv *pvt;
987
2
    int rval;
988
989
2
    PUB2PVT(self, pvt);
990
2
    pthread_mutex_lock(&pvt->lock);
991
2
    rval = CALL_SMETHOD(pvt->fd, send_pkt_na, sap, rem_addr, pkt,
992
2
      self->log);
993
2
    pthread_mutex_unlock(&pvt->lock);
994
2
    return (rval);
995
2
}
996
997
static struct rtp_packet *
998
_rtpp_stream_recv_pkt(struct rtpp_stream_priv *pvt,
999
  const struct rtpp_timestamp *dtime)
1000
0
{
1001
0
    struct rtp_packet *pkt;
1002
1003
0
    pkt = CALL_SMETHOD(pvt->fd, rtp_recv, dtime, pvt->pub.laddr, pvt->pub.port);
1004
0
    return (pkt);
1005
0
}
1006
1007
static int
1008
rtpp_stream_issendable(struct rtpp_stream *self)
1009
101k
{
1010
101k
    struct rtpp_stream_priv *pvt;
1011
1012
101k
    PUB2PVT(self, pvt);
1013
101k
    pthread_mutex_lock(&pvt->lock);
1014
101k
    if (CALL_SMETHOD(pvt->rem_addr, isempty)) {
1015
1.89k
        pthread_mutex_unlock(&pvt->lock);
1016
1.89k
        return (0);
1017
1.89k
    }
1018
99.5k
    if (pvt->fd == NULL) {
1019
0
        pthread_mutex_unlock(&pvt->lock);
1020
0
        return (0);
1021
0
    }
1022
99.5k
    pthread_mutex_unlock(&pvt->lock);
1023
99.5k
    return (1);
1024
99.5k
}
1025
1026
static int
1027
_rtpp_stream_islatched(struct rtpp_stream_priv *pvt)
1028
0
{
1029
0
    int rval;
1030
1031
0
    rval = pvt->latch_info.latched;
1032
0
    return (rval);
1033
0
}
1034
1035
static void
1036
rtpp_stream_locklatch(struct rtpp_stream *self)
1037
624
{
1038
624
    struct rtpp_stream_priv *pvt;
1039
1040
624
    PUB2PVT(self, pvt);
1041
624
    pthread_mutex_lock(&pvt->lock);
1042
624
    pvt->latch_info.latched = 1;
1043
624
    pthread_mutex_unlock(&pvt->lock);
1044
624
}
1045
1046
void
1047
rtpp_stream_get_stats(struct rtpp_stream *self, struct rtpp_acct_hold *ahp)
1048
250k
{
1049
250k
    struct rtpp_stream_priv *pvt;
1050
1051
250k
    PUB2PVT(self, pvt);
1052
250k
    pthread_mutex_lock(&pvt->lock);
1053
250k
    *ahp = pvt->hld_stat;
1054
250k
    pthread_mutex_unlock(&pvt->lock);
1055
250k
}
1056
1057
static int
1058
_rtpp_stream_fill_addr(struct rtpp_stream_priv *pvt,
1059
  struct rtpp_weakref *rtcps_wrt, struct rtp_packet *packet)
1060
0
{
1061
0
    struct rtpp_stream *stp_rtcp;
1062
0
    int rval;
1063
1064
0
    __rtpp_stream_fill_addr(pvt, packet);
1065
0
    if (pvt->pub.stuid_rtcp == RTPP_UID_NONE) {
1066
0
        return (0);
1067
0
    }
1068
0
    stp_rtcp = CALL_SMETHOD(rtcps_wrt, get_by_idx,
1069
0
      pvt->pub.stuid_rtcp);
1070
0
    if (stp_rtcp == NULL) {
1071
0
        return (0);
1072
0
    }
1073
0
    rval = CALL_SMETHOD(stp_rtcp, guess_addr, packet);
1074
0
    RTPP_OBJ_DECREF(stp_rtcp);
1075
0
    return (rval);
1076
0
}
1077
1078
static struct rtp_packet *
1079
rtpp_stream_rx(struct rtpp_stream *self, struct rtpp_weakref *rtcps_wrt,
1080
  const struct rtpp_timestamp *dtime, struct rtpp_proc_rstats *rsp)
1081
0
{
1082
0
    struct rtp_packet *packet = NULL;
1083
0
    struct rtpp_stream_priv *pvt;
1084
1085
0
    PUB2PVT(self, pvt);
1086
0
    pthread_mutex_lock(&pvt->lock);
1087
0
    packet = _rtpp_stream_recv_pkt(pvt, dtime);
1088
0
    if (packet == NULL) {
1089
        /* Move on to the next session */
1090
0
        pthread_mutex_unlock(&pvt->lock);
1091
0
        return (NULL);
1092
0
    }
1093
0
    rsp->npkts_rcvd.cnt++;
1094
1095
0
    if (pvt->latch_info.mode == RTPLM_FORCE_OFF)
1096
0
        goto nolatch;
1097
1098
0
    if (!CALL_SMETHOD(pvt->rem_addr, isempty)) {
1099
        /* Check that the packet is authentic, drop if it isn't */
1100
0
        if (self->asymmetric == 0) {
1101
0
            if (CALL_SMETHOD(pvt->rem_addr, cmp, sstosa(&packet->raddr),
1102
0
              packet->rlen) != 0) {
1103
0
                if (_rtpp_stream_islatched(pvt) && \
1104
0
                  _rtpp_stream_check_latch_override(pvt, packet, dtime->mono) == 0) {
1105
                    /*
1106
                     * Continue, since there could be good packets in
1107
                     * queue.
1108
                     */
1109
0
                    CALL_SMETHOD(self->pcount, reg_ignr);
1110
0
                    goto discard_and_continue;
1111
0
                } else if (!_rtpp_stream_islatched(pvt)) {
1112
0
                    _rtpp_stream_latch(pvt, dtime->mono, packet);
1113
0
                }
1114
                /* Signal that an address has to be updated */
1115
0
                _rtpp_stream_fill_addr(pvt, rtcps_wrt, packet);
1116
0
            } else if (!_rtpp_stream_islatched(pvt)) {
1117
0
                _rtpp_stream_latch(pvt, dtime->mono, packet);
1118
0
            }
1119
0
        } else {
1120
            /*
1121
             * For asymmetric clients don't check
1122
             * source port since it may be different.
1123
             */
1124
0
            if (!CALL_SMETHOD(pvt->rem_addr, cmphost, sstosa(&packet->raddr))) {
1125
                /*
1126
                 * Continue, since there could be good packets in
1127
                 * queue.
1128
                 */
1129
0
                CALL_SMETHOD(self->pcount, reg_ignr);
1130
0
                goto discard_and_continue;
1131
0
            }
1132
0
        }
1133
0
    } else {
1134
        /* Update address recorded in the session */
1135
0
        _rtpp_stream_fill_addr(pvt, rtcps_wrt, packet);
1136
0
    }
1137
0
    _rtpp_stream_latch_sync(pvt, dtime->mono, packet);
1138
0
nolatch:
1139
0
    pthread_mutex_unlock(&pvt->lock);
1140
0
    return (packet);
1141
1142
0
discard_and_continue:
1143
0
    pthread_mutex_unlock(&pvt->lock);
1144
0
    RTPP_OBJ_DECREF(packet);
1145
0
    rsp->npkts_discard.cnt++;
1146
0
    return (RTPP_S_RX_DCONT);
1147
0
}
1148
1149
static struct rtpp_netaddr *
1150
rtpp_stream_get_rem_addr(struct rtpp_stream *self, int retempty)
1151
250k
{
1152
250k
    struct rtpp_stream_priv *pvt;
1153
250k
    struct rtpp_netaddr *rval;
1154
1155
250k
    PUB2PVT(self, pvt);
1156
250k
    pthread_mutex_lock(&pvt->lock);
1157
250k
    if (retempty == 0 && CALL_SMETHOD(pvt->rem_addr, isempty)) {
1158
0
        pthread_mutex_unlock(&pvt->lock);
1159
0
        return (NULL);
1160
0
    }
1161
250k
    rval = pvt->rem_addr;
1162
250k
    RTPP_OBJ_INCREF(rval);
1163
250k
    pthread_mutex_unlock(&pvt->lock);
1164
250k
    return (rval);
1165
250k
}
1166
1167
static struct rtpp_stream *
1168
rtpp_stream_get_sender(struct rtpp_stream *self, const struct rtpp_cfg *cfsp)
1169
0
{
1170
0
    if (self->pipe_type == PIPE_RTP) {
1171
0
       return (CALL_SMETHOD(cfsp->rtp_streams_wrt, get_by_idx,
1172
0
         self->stuid_sendr));
1173
0
    }
1174
0
    return (CALL_SMETHOD(cfsp->rtcp_streams_wrt, get_by_idx,
1175
0
      self->stuid_sendr));
1176
0
}