Coverage Report

Created: 2025-07-12 06:36

/src/rtpproxy/src/rtpp_stream.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2004-2006 Maxim Sobolev <sobomax@FreeBSD.org>
3
 * Copyright (c) 2006-20015 Sippy Software, Inc., http://www.sippysoft.com
4
 * All rights reserved.
5
 *
6
 * Redistribution and use in source and binary forms, with or without
7
 * modification, are permitted provided that the following conditions
8
 * are met:
9
 * 1. Redistributions of source code must retain the above copyright
10
 *    notice, this list of conditions and the following disclaimer.
11
 * 2. Redistributions in binary form must reproduce the above copyright
12
 *    notice, this list of conditions and the following disclaimer in the
13
 *    documentation and/or other materials provided with the distribution.
14
 *
15
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
 * SUCH DAMAGE.
26
 *
27
 */
28
29
#include <sys/types.h>
30
#include <sys/socket.h>
31
#include <netinet/in.h>
32
#include <netdb.h>
33
#include <pthread.h>
34
#include <stdatomic.h>
35
#include <stddef.h>
36
#include <stdlib.h>
37
#include <stdint.h>
38
#include <stdio.h>
39
#include <string.h>
40
41
#include "config.h"
42
43
#include "rtpp_ssrc.h"
44
#include "rtpa_stats.h"
45
#include "rtpp_defines.h"
46
#include "rtpp_log.h"
47
#include "rtpp_types.h"
48
#include "rtpp_log_obj.h"
49
#include "rtp.h"
50
#include "rtp_analyze.h"
51
#include "rtpp_analyzer.h"
52
#include "rtp_resizer.h"
53
#include "rtpp_time.h"
54
#include "rtpp_command.h"
55
#include "rtpp_command_args.h"
56
#include "rtpp_command_sub.h"
57
#include "rtpp_command_private.h"
58
#include "rtpp_genuid.h"
59
#include "rtp_info.h"
60
#include "rtp_packet.h"
61
#include "rtpp_mallocs.h"
62
#include "rtpp_codeptr.h"
63
#include "rtpp_refcnt.h"
64
#include "rtpp_network.h"
65
#include "rtpp_pcount.h"
66
#include "rtpp_pcnt_strm.h"
67
#include "rtpp_proc.h"
68
#include "rtpp_record.h"
69
#include "rtpp_stats.h"
70
#include "rtpp_stream.h"
71
#include "rtpp_stream_fin.h"
72
#include "rtpp_server.h"
73
#include "rtpp_socket.h"
74
#include "rtpp_weakref.h"
75
#include "rtpp_ttl.h"
76
#include "rtpp_pipe.h"
77
#include "rtpp_netaddr.h"
78
#include "rtpp_debug.h"
79
#include "rtpp_acct_pipe.h"
80
#include "rtpp_cfg.h"
81
#include "rtpp_proc_servers.h"
82
#include "advanced/pproc_manager.h"
83
#include "advanced/packet_processor.h"
84
#include "rtpp_command_stats.h"
85
86
0
#define  SEQ_SYNC_IVAL   1.0    /* in seconds */
87
88
struct rtpps_latch {
89
    enum rtpps_latch_mode mode;
90
    int latched;
91
    struct rtpp_ssrc ssrc;
92
    int seq;
93
    double last_sync;
94
};
95
96
struct rtps {
97
    uint64_t uid;
98
    int inact;
99
};
100
101
struct rtpp_stream_priv
102
{
103
    struct rtpp_stream pub;
104
    struct rtpp_proc_servers *proc_servers;
105
    struct rtpp_stats *rtpp_stats;
106
    pthread_mutex_t lock;
107
    /* Weak reference to the "rtpp_server" (player) */
108
    struct rtps rtps;
109
    /* Timestamp of the last session update */
110
    double last_update;
111
    /* Flag that indicates whether or not address supplied by client can't be trusted */
112
    int untrusted_addr;
113
    /* Save previous address when doing update */
114
    struct rtpp_netaddr *raddr_prev;
115
    /* Flag which tells if we are allowed to update address with RTP src IP */
116
    struct rtpps_latch latch_info;
117
    /* Structure to track hold requests */
118
    struct rtpp_acct_hold hld_stat;
119
    /* Descriptor */
120
    struct rtpp_socket *fd;
121
    /* Remote source address */
122
    struct rtpp_netaddr *rem_addr;
123
    int npkts_resizer_in_idx;
124
    /* Placeholder for per-module structures */
125
    struct pmod_data pmod_data;
126
};
127
128
static void rtpp_stream_dtor(struct rtpp_stream_priv *);
129
static int rtpp_stream_handle_play(struct rtpp_stream *,
130
  const struct r_stream_h_play_args *);
131
static void rtpp_stream_handle_noplay(struct rtpp_stream *);
132
static int rtpp_stream_isplayer_active(struct rtpp_stream *);
133
static void rtpp_stream_finish_playback(struct rtpp_stream *, uint64_t);
134
static const char *rtpp_stream_get_actor(struct rtpp_stream *);
135
static const char *rtpp_stream_get_proto(struct rtpp_stream *);
136
static int _rtpp_stream_latch(struct rtpp_stream_priv *, double,
137
  struct rtp_packet *);
138
static int rtpp_stream_latch(struct rtpp_stream *, struct rtp_packet *);
139
static void rtpp_stream_latch_setmode(struct rtpp_stream *, enum rtpps_latch_mode);
140
static enum rtpps_latch_mode rtpp_stream_latch_getmode(struct rtpp_stream *);
141
142
static int _rtpp_stream_check_latch_override(struct rtpp_stream_priv *,
143
  struct rtp_packet *, double);
144
static void __rtpp_stream_fill_addr(struct rtpp_stream_priv *,
145
  struct rtp_packet *);
146
static int rtpp_stream_guess_addr(struct rtpp_stream *,
147
  struct rtp_packet *);
148
static void rtpp_stream_prefill_addr(struct rtpp_stream *,
149
  struct sockaddr **, double);
150
static void rtpp_stream_set_skt(struct rtpp_stream *, struct rtpp_socket *);
151
static struct rtpp_socket *rtpp_stream_get_skt(struct rtpp_stream *, HERETYPE);
152
static struct rtpp_socket *rtpp_stream_update_skt(struct rtpp_stream *,
153
  struct rtpp_socket *);
154
static int rtpp_stream_send_pkt(struct rtpp_stream *, struct sthread_args *,
155
  struct rtp_packet *);
156
static int rtpp_stream_send_pkt_to(struct rtpp_stream *, struct sthread_args *,
157
  struct rtp_packet *, struct rtpp_netaddr *);
158
static struct rtp_packet *_rtpp_stream_recv_pkt(struct rtpp_stream_priv *,
159
  const struct rtpp_timestamp *);
160
static int rtpp_stream_issendable(struct rtpp_stream *);
161
static int _rtpp_stream_islatched(struct rtpp_stream_priv *);
162
static void rtpp_stream_locklatch(struct rtpp_stream *);
163
static void rtpp_stream_reg_onhold(struct rtpp_stream *);
164
void rtpp_stream_get_stats(struct rtpp_stream *, struct rtpp_acct_hold *);
165
static struct rtp_packet *rtpp_stream_rx(struct rtpp_stream *,
166
  struct rtpp_weakref *, const struct rtpp_timestamp *, struct rtpp_proc_rstats *);
167
static struct rtpp_netaddr *rtpp_stream_get_rem_addr(struct rtpp_stream *, int);
168
static struct rtpp_stream *rtpp_stream_get_sender(struct rtpp_stream *,
169
  const struct rtpp_cfg *cfsp);
170
171
DEFINE_SMETHODS(rtpp_stream,
172
    .handle_play = &rtpp_stream_handle_play,
173
    .handle_noplay = &rtpp_stream_handle_noplay,
174
    .isplayer_active = &rtpp_stream_isplayer_active,
175
    .finish_playback = &rtpp_stream_finish_playback,
176
    .get_actor = &rtpp_stream_get_actor,
177
    .get_proto = &rtpp_stream_get_proto,
178
    .prefill_addr = &rtpp_stream_prefill_addr,
179
    .set_skt = &rtpp_stream_set_skt,
180
    .get_skt = &rtpp_stream_get_skt,
181
    .update_skt = &rtpp_stream_update_skt,
182
    .send_pkt = &rtpp_stream_send_pkt,
183
    .send_pkt_to = &rtpp_stream_send_pkt_to,
184
    .guess_addr = &rtpp_stream_guess_addr,
185
    .issendable = &rtpp_stream_issendable,
186
    .locklatch = &rtpp_stream_locklatch,
187
    .reg_onhold = &rtpp_stream_reg_onhold,
188
    .get_stats = &rtpp_stream_get_stats,
189
    .rx = &rtpp_stream_rx,
190
    .get_rem_addr = &rtpp_stream_get_rem_addr,
191
    .latch = &rtpp_stream_latch,
192
    .latch_setmode = &rtpp_stream_latch_setmode,
193
    .latch_getmode = &rtpp_stream_latch_getmode,
194
    .get_sender = &rtpp_stream_get_sender,
195
);
196
197
static struct pproc_act
198
analyze_rtp_packet(const struct pkt_proc_ctx *pktxp)
199
0
{
200
0
    struct rtpp_stream *stp_in = pktxp->strmp_in;
201
0
    struct rtp_packet *packet = pktxp->pktp;
202
203
0
    CALL_SMETHOD(stp_in->pcnt_strm, reg_pktin, packet);
204
0
    if (CALL_SMETHOD(stp_in->analyzer, update, packet) == UPDATE_SSRC_CHG &&
205
0
      CALL_SMETHOD(stp_in, latch_getmode) != RTPLM_FORCE_ON) {
206
0
        CALL_SMETHOD(stp_in, latch, packet);
207
0
    }
208
0
    return (PPROC_ACT_NOP);
209
0
}
210
211
static struct pproc_act
212
analyze_rtcp_packet(const struct pkt_proc_ctx *pktxp)
213
0
{
214
0
    struct rtpp_stream *stp_in = pktxp->strmp_in;
215
0
    struct rtp_packet *packet = pktxp->pktp;
216
217
0
    CALL_SMETHOD(stp_in->pcnt_strm, reg_pktin, packet);
218
0
    return (PPROC_ACT_NOP);
219
0
}
220
221
static struct pproc_act
222
resizer_injest(const struct pkt_proc_ctx *pktxp)
223
0
{
224
0
    struct rtpp_stream *stp_in = pktxp->strmp_in;
225
0
    struct rtp_packet *packet = pktxp->pktp;
226
0
    struct rtpp_stream_priv *pvt;
227
228
0
    if (stp_in->resizer != NULL) {
229
0
        rtp_resizer_enqueue(stp_in->resizer, &packet, pktxp->rsp);
230
0
        if (packet == NULL) {
231
0
            if (pktxp->rsp != NULL) {
232
0
                pktxp->rsp->npkts_resizer_in.cnt++;
233
0
            } else {
234
0
                pvt = (struct rtpp_stream_priv *)pktxp->pproc->arg;
235
0
                CALL_SMETHOD(pvt->rtpp_stats, updatebyidx, pvt->npkts_resizer_in_idx, 1);
236
0
            }
237
0
        }
238
0
    }
239
0
    return ((packet == NULL) ? PPROC_ACT_TAKE : PPROC_ACT_NOP);
240
0
}
241
242
struct rtpp_stream *
243
rtpp_stream_ctor(const struct r_stream_ctor_args *ap)
244
0
{
245
0
    struct rtpp_stream_priv *pvt;
246
0
    size_t alen;
247
248
0
    alen = offsetof(struct rtpp_stream_priv, pmod_data.adp) + 
249
0
      (ap->nmodules * sizeof(pvt->pmod_data.adp[0]));
250
0
    pvt = rtpp_rzmalloc(alen, PVT_RCOFFS(pvt));
251
0
    if (pvt == NULL) {
252
0
        goto e0;
253
0
    }
254
0
    if (pthread_mutex_init(&pvt->lock, NULL) != 0) {
255
0
        goto e1;
256
0
    }
257
0
    pvt->pub.log = ap->log;
258
0
    RTPP_OBJ_BORROW(&pvt->pub, ap->log);
259
0
    RTPP_OBJ_DTOR_ATTACH(&pvt->pub, pthread_mutex_destroy, &pvt->lock);
260
0
    pvt->pub.pproc_manager = CALL_SMETHOD(ap->pproc_manager, clone);
261
0
    if (pvt->pub.pproc_manager == NULL) {
262
0
        goto e1;
263
0
    }
264
0
    RTPP_OBJ_DTOR_ATTACH_OBJ(&pvt->pub, pvt->pub.pproc_manager);
265
0
    if (ap->pipe_type == PIPE_RTP) {
266
0
        pvt->pub.analyzer = rtpp_analyzer_ctor(ap->log);
267
0
        if (pvt->pub.analyzer == NULL) {
268
0
            goto e1;
269
0
        }
270
0
        RTPP_OBJ_DTOR_ATTACH_OBJ(&pvt->pub, pvt->pub.analyzer);
271
272
0
        const struct packet_processor_if resize_packet_poi = {
273
0
            .descr = "resize_packet",
274
0
            .arg = (void *)pvt,
275
0
            .key = (void *)(pvt + 1),
276
0
            .enqueue = &resizer_injest
277
0
        };
278
0
        if (CALL_SMETHOD(pvt->pub.pproc_manager, reg, PPROC_ORD_RESIZE, &resize_packet_poi) < 0)
279
0
            goto e1;
280
0
        pvt->npkts_resizer_in_idx = CALL_SMETHOD(ap->rtpp_stats, getidxbyname,
281
0
          "npkts_resizer_in");
282
0
        if (pvt->npkts_resizer_in_idx == -1)
283
0
            goto e2;
284
0
    }
285
286
0
    const struct packet_processor_if analyze_packet_poi = {
287
0
        .descr = "analyze_packet",
288
0
        .arg = (void *)pvt->pub.analyzer,
289
0
        .key = (void *)pvt,
290
0
        .enqueue = (ap->pipe_type == PIPE_RTP) ? &analyze_rtp_packet : &analyze_rtcp_packet,
291
0
    };
292
0
    if (CALL_SMETHOD(pvt->pub.pproc_manager, reg, PPROC_ORD_ANALYZE, &analyze_packet_poi) < 0)
293
0
        goto e2;
294
295
0
    pvt->pub.pcnt_strm = rtpp_pcnt_strm_ctor();
296
0
    if (pvt->pub.pcnt_strm == NULL) {
297
0
        goto e3;
298
0
    }
299
0
    RTPP_OBJ_DTOR_ATTACH_OBJ(&pvt->pub, pvt->pub.pcnt_strm);
300
0
    pvt->raddr_prev = rtpp_netaddr_ctor();
301
0
    if (pvt->raddr_prev == NULL) {
302
0
        goto e3;
303
0
    }
304
0
    RTPP_OBJ_DTOR_ATTACH_OBJ(&pvt->pub, pvt->raddr_prev);
305
0
    pvt->rem_addr = rtpp_netaddr_ctor();
306
0
    if (pvt->rem_addr == NULL) {
307
0
        goto e3;
308
0
    }
309
0
    RTPP_OBJ_DTOR_ATTACH_OBJ(&pvt->pub, pvt->rem_addr);
310
0
    pvt->proc_servers = ap->proc_servers;
311
0
    RTPP_OBJ_BORROW(&pvt->pub, ap->proc_servers);
312
0
    pvt->rtpp_stats = ap->rtpp_stats;
313
0
    pvt->pub.side = ap->side;
314
0
    pvt->pub.pipe_type = ap->pipe_type;
315
316
0
    pvt->pub.stuid = CALL_SMETHOD(ap->guid, gen);
317
0
    pvt->pub.seuid = ap->seuid;
318
0
    for (unsigned int i = 0; i < ap->nmodules; i++) {
319
0
        atomic_init(&(pvt->pmod_data.adp[i]), NULL);
320
0
    }
321
0
    pvt->pmod_data.nmodules = ap->nmodules;
322
0
    pvt->pub.pmod_datap = &(pvt->pmod_data);
323
0
    PUBINST_FININIT(&pvt->pub, pvt, rtpp_stream_dtor);
324
0
    return (&pvt->pub);
325
326
0
e3:
327
0
    CALL_SMETHOD(pvt->pub.pproc_manager, unreg, pvt);
328
0
e2:
329
0
    if (ap->pipe_type == PIPE_RTP) {
330
0
        CALL_SMETHOD(pvt->pub.pproc_manager, unreg, pvt + 1);
331
0
    }
332
0
e1:
333
0
    RTPP_OBJ_DECREF(&(pvt->pub));
334
0
e0:
335
0
    return (NULL);
336
0
}
337
338
static inline void
339
_s_rtps(struct rtpp_stream_priv *pvt, uint64_t rtps, int replace)
340
0
{
341
342
0
    RTPP_DBG_ASSERT(pvt->pub.pipe_type == PIPE_RTP);
343
0
    if (replace == 0) {
344
0
        RTPP_DBG_ASSERT(pvt->rtps.uid == RTPP_UID_NONE);
345
0
    }
346
0
    pvt->rtps.uid = rtps;
347
0
    if (CALL_SMETHOD(pvt->rem_addr, isempty) || pvt->fd == NULL) {
348
0
        pvt->rtps.inact = 1;
349
0
    }
350
0
}
351
352
static const char *
353
_rtpp_stream_get_actor(struct rtpp_stream_priv *pvt)
354
0
{
355
356
0
    return ((pvt->pub.side == RTPP_SSIDE_CALLER) ? "caller" : "callee");
357
0
}
358
359
static const char *
360
_rtpp_stream_get_proto(struct rtpp_stream_priv *pvt)
361
0
{
362
363
0
    return (PP_NAME(pvt->pub.pipe_type));
364
0
}
365
366
static void
367
rtpp_stream_dtor(struct rtpp_stream_priv *pvt)
368
0
{
369
0
    struct rtpp_stream *pub;
370
371
0
    pub = &(pvt->pub);
372
0
    rtpp_stream_fin(pub);
373
0
    if (pub->analyzer != NULL) {
374
0
         struct rtpa_stats rst;
375
0
         char ssrc_buf[11];
376
0
         const char *actor, *ssrc;
377
378
0
         actor = _rtpp_stream_get_actor(pvt);
379
0
         CALL_SMETHOD(pub->analyzer, get_stats, &rst);
380
0
         if (rst.ssrc_changes != 0) {
381
0
             snprintf(ssrc_buf, sizeof(ssrc_buf), SSRC_FMT, rst.last_ssrc.val);
382
0
             ssrc = ssrc_buf;
383
0
         } else {
384
0
             ssrc = "NONE";
385
0
         }
386
0
         RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "RTP stream from %s: "
387
0
           "SSRC=%s, ssrc_changes=%lu, psent=%lu, precvd=%lu, plost=%lu, pdups=%lu",
388
0
           actor, ssrc, rst.ssrc_changes, rst.psent, rst.precvd,
389
0
           rst.plost, rst.pdups);
390
0
         if (rst.psent > 0) {
391
0
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_nsent", rst.psent);
392
0
         }
393
0
         if (rst.precvd > 0) {
394
0
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_nrcvd", rst.precvd);
395
0
         }
396
0
         if (rst.pdups > 0) {
397
0
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_ndups", rst.pdups);
398
0
         }
399
0
         if (rst.plost > 0) {
400
0
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_nlost", rst.plost);
401
0
         }
402
0
         if (rst.pecount > 0) {
403
0
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_perrs", rst.pecount);
404
0
         }
405
0
    }
406
0
    if (pvt->fd != NULL)
407
0
        RTPP_OBJ_DECREF(pvt->fd);
408
0
    if (pub->codecs != NULL)
409
0
        free(pub->codecs);
410
0
    if (pvt->rtps.uid != RTPP_UID_NONE)
411
0
        CALL_SMETHOD(pvt->proc_servers, unreg, pvt->rtps.uid);
412
0
    if (pub->resizer != NULL)
413
0
        rtp_resizer_free(pvt->rtpp_stats, pub->resizer);
414
0
    if (pub->rrc != NULL)
415
0
        RTPP_OBJ_DECREF(pub->rrc);
416
0
    for (unsigned int i = 0; i < pvt->pmod_data.nmodules; i++) {
417
0
        struct rtpp_refcnt *mdata_rcnt;
418
0
        mdata_rcnt = atomic_load(&(pvt->pmod_data.adp[i]));
419
0
        if (mdata_rcnt != NULL) {
420
0
            RC_DECREF(mdata_rcnt);
421
0
        }
422
0
    }
423
0
    if (pub->ttl != NULL)
424
0
        RTPP_OBJ_DECREF(pub->ttl);
425
0
    if (pvt->pub.pipe_type == PIPE_RTP) {
426
0
        CALL_SMETHOD(pvt->pub.pproc_manager, unreg, pvt + 1);
427
0
    }
428
0
    CALL_SMETHOD(pvt->pub.pproc_manager, unreg, pvt);
429
0
}
430
431
static void
432
player_predestroy_cb(struct rtpp_stats *rtpp_stats)
433
0
{
434
435
0
    CALL_SMETHOD(rtpp_stats, updatebyname, "nplrs_destroyed", 1);
436
0
}
437
438
static struct pproc_act
439
drop_packets(const struct pkt_proc_ctx *pktxp)
440
0
{
441
442
0
    return (PPROC_ACT_DROP);
443
0
}
444
445
static int
446
rtpp_stream_handle_play(struct rtpp_stream *self,
447
  const struct r_stream_h_play_args *ap)
448
0
{
449
0
    struct rtpp_stream_priv *pvt;
450
0
    int n;
451
0
    char *cp;
452
0
    struct rtpp_server *rsrv;
453
0
    uint16_t seq;
454
0
    uint32_t ssrc;
455
0
    const char *plerror, *codecs = ap->codecs;
456
0
    struct rtpp_server_ctor_args sca = {.name = ap->pname, .loop = ap->playcount,
457
0
      .ptime = ap->ptime, .guid = ap->guid};
458
459
0
    PUB2PVT(self, pvt);
460
461
0
    const struct packet_processor_if drop_on_pa_poi = {
462
0
        .descr = "drop_packets(player_active)",
463
0
        .arg = (void *)pvt,
464
0
        .key = (void *)(pvt + 2),
465
0
        .enqueue = &drop_packets
466
0
    };
467
468
0
    pthread_mutex_lock(&pvt->lock);
469
0
    plerror = "reason unknown";
470
0
    while (*codecs != '\0') {
471
0
        n = strtol(codecs, &cp, 10);
472
0
        if (cp == codecs) {
473
0
            plerror = "invalid codecs";
474
0
            goto e0;
475
0
        }
476
0
        codecs = cp;
477
0
        if (*codecs != '\0')
478
0
            codecs++;
479
0
        sca.codec = n;
480
0
        rsrv = rtpp_server_ctor(&sca);
481
0
        if (rsrv == NULL) {
482
0
            RTPP_LOG(pvt->pub.log, RTPP_LOG_DBUG, "rtpp_server_ctor(\"%s\", %d, %d) failed",
483
0
              ap->pname, n, ap->playcount);
484
0
            plerror = "rtpp_server_ctor() failed";
485
0
            if (sca.result == RTPP_SERV_NOENT)
486
0
                continue;
487
0
            goto e0;
488
0
        }
489
0
        rsrv->stuid = self->stuid;
490
0
        ssrc = CALL_SMETHOD(rsrv, get_ssrc);
491
0
        seq = CALL_SMETHOD(rsrv, get_seq);
492
0
        _s_rtps(pvt, rsrv->sruid, 0);
493
0
        int regres = CALL_SMETHOD(self->pproc_manager->reverse, reg, PPROC_ORD_PLAY,
494
0
          &drop_on_pa_poi);
495
0
        if (regres < 0) {
496
0
            plerror = "pproc_manager->reg() method failed";
497
0
            goto e1;
498
0
        }
499
0
        if (pvt->rtps.inact == 0) {
500
0
            CALL_SMETHOD(rsrv, start, ap->cmd->dtime->mono);
501
0
        }
502
0
        if (CALL_SMETHOD(pvt->proc_servers, reg, rsrv, pvt->rtps.inact) != 0) {
503
0
            plerror = "proc_servers->reg() method failed";
504
0
            goto e2;
505
0
        }
506
0
        pthread_mutex_unlock(&pvt->lock);
507
0
        rtpp_command_get_stats(ap->cmd)->nplrs_created.cnt++;
508
0
        RTPP_OBJ_DTOR_ATTACH(rsrv, (rtpp_refcnt_dtor_t)player_predestroy_cb,
509
0
          pvt->rtpp_stats);
510
0
        RTPP_OBJ_DECREF(rsrv);
511
0
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
512
0
          "%d times playing prompt %s codec %d: SSRC=" SSRC_FMT ", seq=%u",
513
0
          ap->playcount, ap->pname, n, ssrc, seq);
514
0
        return 0;
515
0
    }
516
0
    goto e0;
517
0
e2:
518
0
    CALL_SMETHOD(self->pproc_manager->reverse, unreg, drop_on_pa_poi.key);
519
0
e1:
520
0
    RTPP_OBJ_DECREF(rsrv);
521
0
e0:
522
0
    pthread_mutex_unlock(&pvt->lock);
523
0
    RTPP_LOG(pvt->pub.log, RTPP_LOG_ERR, "can't create player: %s", plerror);
524
0
    return -1;
525
0
}
526
527
static void
528
rtpp_stream_handle_noplay(struct rtpp_stream *self)
529
0
{
530
0
    struct rtpp_stream_priv *pvt;
531
0
    uint64_t ruid;
532
0
    int stopped;
533
534
0
    stopped = 0;
535
0
    PUB2PVT(self, pvt);
536
0
    pthread_mutex_lock(&pvt->lock);
537
0
    ruid = pvt->rtps.uid;
538
0
    pthread_mutex_unlock(&pvt->lock);
539
0
    if (ruid == RTPP_UID_NONE)
540
0
        return;
541
0
    if (CALL_SMETHOD(pvt->proc_servers, unreg, ruid) == 0) {
542
0
        pthread_mutex_lock(&pvt->lock);
543
0
        CALL_SMETHOD(self->pproc_manager->reverse, unreg, pvt + 2);
544
0
        if (pvt->rtps.uid == ruid) {
545
0
            pvt->rtps.uid = RTPP_UID_NONE;
546
0
            pvt->rtps.inact = 0;
547
0
            stopped = 1;
548
0
        }
549
0
        pthread_mutex_unlock(&pvt->lock);
550
0
    }
551
0
    if (stopped != 0) {
552
0
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
553
0
          "stopping player at port %d", self->port);
554
0
    }
555
0
}
556
557
static int
558
rtpp_stream_isplayer_active(struct rtpp_stream *self)
559
0
{
560
0
    struct rtpp_stream_priv *pvt;
561
0
    int rval;
562
563
0
    PUB2PVT(self, pvt);
564
0
    pthread_mutex_lock(&pvt->lock);
565
0
    rval = (pvt->rtps.uid != RTPP_UID_NONE) ? 1 : 0;
566
0
    pthread_mutex_unlock(&pvt->lock);
567
0
    return (rval);
568
0
}
569
570
static void
571
rtpp_stream_finish_playback(struct rtpp_stream *self, uint64_t sruid)
572
0
{
573
0
    struct rtpp_stream_priv *pvt;
574
575
0
    PUB2PVT(self, pvt);
576
0
    pthread_mutex_lock(&pvt->lock);
577
0
    if (pvt->rtps.uid != RTPP_UID_NONE && pvt->rtps.uid == sruid) {
578
0
        CALL_SMETHOD(self->pproc_manager->reverse, unreg, pvt + 2);
579
0
        _s_rtps(pvt, RTPP_UID_NONE, 1);
580
0
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
581
0
          "player at port %d has finished", self->port);
582
0
    }
583
0
    pthread_mutex_unlock(&pvt->lock);
584
0
}
585
586
static const char *
587
rtpp_stream_get_actor(struct rtpp_stream *self)
588
0
{
589
0
    struct rtpp_stream_priv *pvt;
590
591
0
    PUB2PVT(self, pvt);
592
0
    return (_rtpp_stream_get_actor(pvt));
593
0
}
594
595
static const char *
596
rtpp_stream_get_proto(struct rtpp_stream *self)
597
0
{
598
599
0
    return (PP_NAME(self->pipe_type));
600
0
}
601
602
static int
603
_rtpp_stream_latch(struct rtpp_stream_priv *pvt, double dtime,
604
  struct rtp_packet *packet)
605
0
{
606
0
    const char *actor, *ptype, *ssrc, *seq, *relatch;
607
0
    char ssrc_buf[11], seq_buf[6];
608
0
    char saddr[MAX_AP_STRBUF];
609
0
    int newlatch;
610
611
0
    if (pvt->latch_info.mode == RTPLM_FORCE_ON) {
612
0
        __rtpp_stream_fill_addr(pvt, packet);
613
0
        return (0);
614
0
    }
615
616
0
    if (pvt->last_update != 0 &&
617
0
      dtime - pvt->last_update < UPDATE_WINDOW) {
618
0
        return (0);
619
0
    }
620
621
0
    actor = _rtpp_stream_get_actor(pvt);
622
0
    ptype = _rtpp_stream_get_proto(pvt);
623
624
0
    if (pvt->pub.pipe_type == PIPE_RTP) {
625
0
        if (rtp_packet_parse(packet) == RTP_PARSER_OK) {
626
0
            pvt->latch_info.ssrc.val = packet->parsed->ssrc;
627
0
            pvt->latch_info.ssrc.inited = 1;
628
0
            pvt->latch_info.seq = packet->parsed->seq;
629
0
            pvt->latch_info.last_sync = dtime;
630
0
            snprintf(ssrc_buf, sizeof(ssrc_buf), SSRC_FMT, packet->parsed->ssrc);
631
0
            snprintf(seq_buf, sizeof(seq_buf), "%u", packet->parsed->seq);
632
0
            ssrc = ssrc_buf;
633
0
            seq = seq_buf;
634
0
        } else {
635
0
            pvt->latch_info.ssrc.val = 0;
636
0
            pvt->latch_info.ssrc.inited = 0;
637
0
            ssrc = seq = "INVALID";
638
0
        }
639
0
    } else {
640
0
        pvt->latch_info.ssrc.inited = 0;
641
0
        ssrc = seq = "UNKNOWN";
642
0
    }
643
644
0
    addrport2char_r(sstosa(&packet->raddr), saddr, sizeof(saddr), ':');
645
0
    newlatch = SSRC_IS_BAD(&pvt->latch_info.ssrc) ? 0 : 1;
646
0
    if (pvt->latch_info.latched == 0) {
647
0
        relatch = (newlatch != 0) ? "latched in" : "not latched (bad SSRC)";
648
0
    } else {
649
0
        relatch = "re-latched";
650
0
    }
651
#if !defined(FUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION)
652
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
653
      "%s's address %s: %s (%s), SSRC=%s, Seq=%s", actor, relatch,
654
      saddr, ptype, ssrc, seq);
655
#endif
656
0
    pvt->latch_info.latched = newlatch;
657
0
    return (1);
658
0
}
659
660
static int
661
rtpp_stream_latch(struct rtpp_stream *self, struct rtp_packet *packet)
662
0
{
663
0
    struct rtpp_stream_priv *pvt;
664
665
0
    PUB2PVT(self, pvt);
666
0
    pthread_mutex_lock(&pvt->lock);
667
0
    int rval = _rtpp_stream_latch(pvt, packet->rtime.mono, packet);
668
0
    pthread_mutex_unlock(&pvt->lock);
669
0
    return (rval);
670
0
}
671
672
static void
673
rtpp_stream_latch_setmode(struct rtpp_stream *self, enum rtpps_latch_mode mode)
674
0
{
675
0
    struct rtpp_stream_priv *pvt;
676
677
0
    PUB2PVT(self, pvt);
678
0
    pthread_mutex_lock(&pvt->lock);
679
0
    pvt->latch_info.mode = mode;
680
0
    pthread_mutex_unlock(&pvt->lock);
681
0
}
682
683
static enum rtpps_latch_mode
684
rtpp_stream_latch_getmode(struct rtpp_stream *self)
685
0
{
686
0
    struct rtpp_stream_priv *pvt;
687
688
0
    PUB2PVT(self, pvt);
689
0
    pthread_mutex_lock(&pvt->lock);
690
0
    enum rtpps_latch_mode mode = pvt->latch_info.mode;
691
0
    pthread_mutex_unlock(&pvt->lock);
692
0
    return (mode);
693
0
}
694
695
static void
696
_rtpp_stream_latch_sync(struct rtpp_stream_priv *pvt, double dtime,
697
  struct rtp_packet *packet)
698
0
{
699
0
    struct rtpps_latch *lip;
700
701
0
    lip = &pvt->latch_info;
702
0
    if (pvt->pub.pipe_type != PIPE_RTP || lip->ssrc.inited == 0)
703
0
        return;
704
0
    if (dtime - lip->last_sync < SEQ_SYNC_IVAL)
705
0
        return;
706
0
    if (rtp_packet_parse(packet) != RTP_PARSER_OK)
707
0
        return;
708
0
    if (lip->ssrc.val != packet->parsed->ssrc)
709
0
        return;
710
0
    lip->seq = packet->parsed->seq;
711
0
    lip->last_sync = dtime;
712
0
}
713
714
static int
715
_rtpp_stream_check_latch_override(struct rtpp_stream_priv *pvt,
716
  struct rtp_packet *packet, double dtime)
717
0
{
718
0
    const char *actor;
719
0
    char saddr[MAX_AP_STRBUF];
720
721
0
    if (pvt->pub.pipe_type == PIPE_RTCP || pvt->latch_info.ssrc.inited == 0)
722
0
        return (0);
723
0
    if (pvt->latch_info.mode == RTPLM_FORCE_ON)
724
0
        return (0);
725
0
    if (rtp_packet_parse(packet) != RTP_PARSER_OK)
726
0
        return (0);
727
0
    if (pvt->last_update == 0 || dtime - pvt->last_update > UPDATE_WINDOW) {
728
0
        if (packet->parsed->ssrc != pvt->latch_info.ssrc.val)
729
0
            return (0);
730
0
        if (SEQ_DIST(pvt->latch_info.seq, packet->parsed->seq) > 536)
731
0
            return (0);
732
0
    }
733
734
0
    actor = _rtpp_stream_get_actor(pvt);
735
736
0
    addrport2char_r(sstosa(&packet->raddr), saddr, sizeof(saddr), ':');
737
0
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
738
0
      "%s's address re-latched: %s (%s), SSRC=" SSRC_FMT ", Seq=%u->%u", actor,
739
0
      saddr, "RTP", pvt->latch_info.ssrc.val, pvt->latch_info.seq,
740
0
      packet->parsed->seq);
741
742
0
    pvt->latch_info.seq = packet->parsed->seq;
743
0
    pvt->latch_info.last_sync = packet->rtime.mono;
744
0
    return (1);
745
0
}
746
747
static void
748
_rtpp_stream_plr_start(struct rtpp_stream_priv *pvt, double dtime)
749
0
{
750
751
0
    RTPP_DBG_ASSERT(pvt->rtps.inact != 0);
752
0
    CALL_SMETHOD(pvt->proc_servers, plr_start, pvt->rtps.uid, dtime);
753
0
    pvt->rtps.inact = 0;
754
0
}
755
756
static void
757
__rtpp_stream_fill_addr(struct rtpp_stream_priv *pvt, struct rtp_packet *packet)
758
0
{
759
0
    const char *actor, *ptype;
760
0
    char saddr[MAX_AP_STRBUF];
761
762
0
    pvt->untrusted_addr = 1;
763
0
    CALL_SMETHOD(pvt->rem_addr, set, sstosa(&packet->raddr), packet->rlen);
764
0
    if (CALL_SMETHOD(pvt->raddr_prev, isempty) ||
765
0
      CALL_SMETHOD(pvt->raddr_prev, cmp, sstosa(&packet->raddr), packet->rlen) != 0) {
766
0
        pvt->latch_info.latched = 1;
767
0
    }
768
0
    if (pvt->rtps.inact != 0 && pvt->fd != NULL) {
769
0
        _rtpp_stream_plr_start(pvt, packet->rtime.mono);
770
0
    }
771
772
0
    actor = _rtpp_stream_get_actor(pvt);
773
0
    ptype = _rtpp_stream_get_proto(pvt);
774
0
    addrport2char_r(sstosa(&packet->raddr), saddr, sizeof(saddr), ':');
775
0
    const char *wice = (pvt->latch_info.mode == RTPLM_FORCE_ON) ? " (with ICE)" : "";
776
0
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
777
0
      "%s's address filled in%s: %s (%s)", actor, wice, saddr, ptype);
778
0
    return;
779
0
}
780
781
static int
782
rtpp_stream_guess_addr(struct rtpp_stream *self,
783
  struct rtp_packet *packet)
784
0
{
785
0
    int rport;
786
0
    const char *actor, *ptype;
787
0
    struct rtpp_stream_priv *pvt;
788
0
    struct sockaddr_storage ta;
789
790
0
    RTPP_DBG_ASSERT(self->pipe_type == PIPE_RTCP);
791
0
    PUB2PVT(self, pvt);
792
793
0
    if (!CALL_SMETHOD(pvt->rem_addr, isempty) &&
794
0
      CALL_SMETHOD(pvt->rem_addr, cmphost, sstosa(&packet->raddr))) {
795
0
        return (0);
796
0
    }
797
#if 0
798
    if (self->addr == NULL) {
799
        self->addr = malloc(packet->rlen);
800
        if (self->addr == NULL) {
801
            return (-1);
802
        }
803
    }
804
#endif
805
0
    actor = rtpp_stream_get_actor(self);
806
0
    ptype = rtpp_stream_get_proto(self);
807
0
    rport = ntohs(satosin(&packet->raddr)->sin_port);
808
0
    if (IS_LAST_PORT(rport)) {
809
0
        return (-1);
810
0
    }
811
812
0
    memcpy(&ta, &packet->raddr, packet->rlen);
813
0
    setport(sstosa(&ta), rport + 1);
814
815
0
    CALL_SMETHOD(pvt->rem_addr, set, sstosa(&ta), packet->rlen);
816
    /* Use guessed value as the only true one for asymmetric clients */
817
0
    pvt->latch_info.latched = self->asymmetric;
818
0
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "guessing %s port "
819
0
      "for %s to be %d", ptype, actor, rport + 1);
820
821
0
    return (0);
822
0
}
823
824
static void
825
rtpp_stream_prefill_addr(struct rtpp_stream *self, struct sockaddr **iapp,
826
  double dtime)
827
0
{
828
0
    struct rtpp_stream_priv *pvt;
829
0
    char saddr[MAX_AP_STRBUF];
830
0
    const char *actor, *ptype;
831
832
0
    PUB2PVT(self, pvt);
833
834
0
    actor = rtpp_stream_get_actor(self);
835
0
    ptype = rtpp_stream_get_proto(self);
836
0
    pthread_mutex_lock(&pvt->lock);
837
0
    if (pvt->hld_stat.status != 0) {
838
0
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "taking %s's %s stream off-hold",
839
0
            actor, ptype);
840
0
        pvt->hld_stat.status = 0;
841
0
    }
842
843
0
    if (!CALL_SMETHOD(pvt->rem_addr, isempty))
844
0
        pvt->last_update = dtime;
845
846
    /*
847
     * Unless the address provided by client historically
848
     * cannot be trusted and address is different from one
849
     * that we recorded update it.
850
     */
851
0
    if (pvt->untrusted_addr != 0) {
852
0
        pthread_mutex_unlock(&pvt->lock);
853
0
        return;
854
0
    }
855
0
    if (!CALL_SMETHOD(pvt->rem_addr, isempty) && CALL_SMETHOD(pvt->rem_addr,
856
0
      isaddrseq, *iapp)) {
857
0
        pthread_mutex_unlock(&pvt->lock);
858
0
        return;
859
0
    }
860
861
0
    addrport2char_r(*iapp, saddr, sizeof(saddr), ':');
862
0
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "pre-filling %s's %s address "
863
0
      "with %s", actor, ptype, saddr);
864
0
    if (!CALL_SMETHOD(pvt->rem_addr, isempty)) {
865
0
        if (pvt->latch_info.latched != 0) {
866
0
            CALL_SMETHOD(pvt->raddr_prev, copy, pvt->rem_addr);
867
0
        }
868
0
    }
869
0
    CALL_SMETHOD(pvt->rem_addr, set, *iapp, SA_LEN(*iapp));
870
0
    if (pvt->rtps.inact != 0 && pvt->fd != NULL) {
871
0
        _rtpp_stream_plr_start(pvt, dtime);
872
0
    }
873
0
    pthread_mutex_unlock(&pvt->lock);
874
0
}
875
876
static void rtpp_stream_reg_onhold(struct rtpp_stream *self)
877
0
{
878
0
    struct rtpp_stream_priv *pvt;
879
0
    const char *actor, *ptype;
880
881
0
    PUB2PVT(self, pvt);
882
0
    pthread_mutex_lock(&pvt->lock);
883
0
    if (pvt->hld_stat.status == 0) {
884
0
        actor = rtpp_stream_get_actor(self);
885
0
        ptype = rtpp_stream_get_proto(self);
886
0
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "putting %s's %s stream on hold",
887
0
           actor, ptype);
888
0
        pvt->hld_stat.status = 1;
889
0
    }
890
0
    pvt->hld_stat.cnt++;
891
0
    pthread_mutex_unlock(&pvt->lock);
892
0
}
893
894
static void
895
rtpp_stream_set_skt(struct rtpp_stream *self, struct rtpp_socket *new_skt)
896
0
{
897
0
    struct rtpp_stream_priv *pvt;
898
899
0
    PUB2PVT(self, pvt);
900
0
    pthread_mutex_lock(&pvt->lock);
901
0
    if (new_skt == NULL) {
902
0
        RTPP_DBG_ASSERT(pvt->fd != NULL);
903
0
        RTPP_OBJ_DECREF(pvt->fd);
904
0
        pvt->fd = NULL;
905
0
        pthread_mutex_unlock(&pvt->lock);
906
0
        return;
907
0
    }
908
0
    RTPP_DBG_ASSERT(pvt->fd == NULL);
909
0
    CALL_SMETHOD(new_skt, set_stuid, self->stuid);
910
0
    pvt->fd = new_skt;
911
0
    RTPP_OBJ_INCREF(pvt->fd);
912
0
    if (pvt->rtps.inact != 0 && !CALL_SMETHOD(pvt->rem_addr, isempty)) {
913
0
        _rtpp_stream_plr_start(pvt, getdtime());
914
0
    }
915
0
    pthread_mutex_unlock(&pvt->lock);
916
0
}
917
918
static struct rtpp_socket *
919
rtpp_stream_get_skt(struct rtpp_stream *self, HERETYPE mlp)
920
0
{
921
0
    struct rtpp_stream_priv *pvt;
922
0
    struct rtpp_socket *rval;
923
924
0
    PUB2PVT(self, pvt);
925
0
    pthread_mutex_lock(&pvt->lock);
926
0
    if (pvt->fd == NULL) {
927
0
        pthread_mutex_unlock(&pvt->lock);
928
0
        return (NULL);
929
0
    }
930
0
    RTPP_OBJ_INCREF(pvt->fd);
931
0
    rval = pvt->fd;
932
0
    pthread_mutex_unlock(&pvt->lock);
933
0
    return (rval);
934
0
}
935
936
static struct rtpp_socket *
937
rtpp_stream_update_skt(struct rtpp_stream *self, struct rtpp_socket *new_skt)
938
0
{
939
0
    struct rtpp_socket *old_skt;
940
0
    struct rtpp_stream_priv *pvt;
941
942
0
    RTPP_DBG_ASSERT(new_skt != NULL);
943
0
    PUB2PVT(self, pvt);
944
0
    pthread_mutex_lock(&pvt->lock);
945
0
    old_skt = pvt->fd;
946
0
    CALL_SMETHOD(new_skt, set_stuid, self->stuid);
947
0
    pvt->fd = new_skt;
948
0
    RTPP_OBJ_INCREF(pvt->fd);
949
0
    if (pvt->rtps.inact != 0 && !CALL_SMETHOD(pvt->rem_addr, isempty)) {
950
0
        _rtpp_stream_plr_start(pvt, getdtime());
951
0
    }
952
0
    pthread_mutex_unlock(&pvt->lock);
953
0
    return (old_skt);
954
0
}
955
956
static int
957
rtpp_stream_send_pkt(struct rtpp_stream *self, struct sthread_args *sap,
958
  struct rtp_packet *pkt)
959
0
{
960
0
    struct rtpp_stream_priv *pvt;
961
0
    int rval;
962
963
0
    PUB2PVT(self, pvt);
964
0
    pthread_mutex_lock(&pvt->lock);
965
0
    rval = CALL_SMETHOD(pvt->fd, send_pkt_na, sap, pvt->rem_addr, pkt,
966
0
      self->log);
967
0
    pthread_mutex_unlock(&pvt->lock);
968
0
    return (rval);
969
0
}
970
971
static int
972
rtpp_stream_send_pkt_to(struct rtpp_stream *self, struct sthread_args *sap,
973
  struct rtp_packet *pkt, struct rtpp_netaddr *rem_addr)
974
0
{
975
0
    struct rtpp_stream_priv *pvt;
976
0
    int rval;
977
978
0
    PUB2PVT(self, pvt);
979
0
    pthread_mutex_lock(&pvt->lock);
980
0
    rval = CALL_SMETHOD(pvt->fd, send_pkt_na, sap, rem_addr, pkt,
981
0
      self->log);
982
0
    pthread_mutex_unlock(&pvt->lock);
983
0
    return (rval);
984
0
}
985
986
static struct rtp_packet *
987
_rtpp_stream_recv_pkt(struct rtpp_stream_priv *pvt,
988
  const struct rtpp_timestamp *dtime)
989
0
{
990
0
    struct rtp_packet *pkt;
991
992
0
    pkt = CALL_SMETHOD(pvt->fd, rtp_recv, dtime, pvt->pub.laddr, pvt->pub.port);
993
0
    return (pkt);
994
0
}
995
996
static int
997
rtpp_stream_issendable(struct rtpp_stream *self)
998
0
{
999
0
    struct rtpp_stream_priv *pvt;
1000
1001
0
    PUB2PVT(self, pvt);
1002
0
    pthread_mutex_lock(&pvt->lock);
1003
0
    if (CALL_SMETHOD(pvt->rem_addr, isempty)) {
1004
0
        pthread_mutex_unlock(&pvt->lock);
1005
0
        return (0);
1006
0
    }
1007
0
    if (pvt->fd == NULL) {
1008
0
        pthread_mutex_unlock(&pvt->lock);
1009
0
        return (0);
1010
0
    }
1011
0
    pthread_mutex_unlock(&pvt->lock);
1012
0
    return (1);
1013
0
}
1014
1015
static int
1016
_rtpp_stream_islatched(struct rtpp_stream_priv *pvt)
1017
0
{
1018
0
    int rval;
1019
1020
0
    rval = pvt->latch_info.latched;
1021
0
    return (rval);
1022
0
}
1023
1024
static void
1025
rtpp_stream_locklatch(struct rtpp_stream *self)
1026
0
{
1027
0
    struct rtpp_stream_priv *pvt;
1028
1029
0
    PUB2PVT(self, pvt);
1030
0
    pthread_mutex_lock(&pvt->lock);
1031
0
    pvt->latch_info.latched = 1;
1032
0
    pthread_mutex_unlock(&pvt->lock);
1033
0
}
1034
1035
void
1036
rtpp_stream_get_stats(struct rtpp_stream *self, struct rtpp_acct_hold *ahp)
1037
0
{
1038
0
    struct rtpp_stream_priv *pvt;
1039
1040
0
    PUB2PVT(self, pvt);
1041
0
    pthread_mutex_lock(&pvt->lock);
1042
0
    *ahp = pvt->hld_stat;
1043
0
    pthread_mutex_unlock(&pvt->lock);
1044
0
}
1045
1046
static int
1047
_rtpp_stream_fill_addr(struct rtpp_stream_priv *pvt,
1048
  struct rtpp_weakref *rtcps_wrt, struct rtp_packet *packet)
1049
0
{
1050
0
    struct rtpp_stream *stp_rtcp;
1051
0
    int rval;
1052
1053
0
    __rtpp_stream_fill_addr(pvt, packet);
1054
0
    if (pvt->pub.stuid_rtcp == RTPP_UID_NONE) {
1055
0
        return (0);
1056
0
    }
1057
0
    stp_rtcp = CALL_SMETHOD(rtcps_wrt, get_by_idx,
1058
0
      pvt->pub.stuid_rtcp);
1059
0
    if (stp_rtcp == NULL) {
1060
0
        return (0);
1061
0
    }
1062
0
    rval = CALL_SMETHOD(stp_rtcp, guess_addr, packet);
1063
0
    RTPP_OBJ_DECREF(stp_rtcp);
1064
0
    return (rval);
1065
0
}
1066
1067
static struct rtp_packet *
1068
rtpp_stream_rx(struct rtpp_stream *self, struct rtpp_weakref *rtcps_wrt,
1069
  const struct rtpp_timestamp *dtime, struct rtpp_proc_rstats *rsp)
1070
0
{
1071
0
    struct rtp_packet *packet = NULL;
1072
0
    struct rtpp_stream_priv *pvt;
1073
1074
0
    PUB2PVT(self, pvt);
1075
0
    pthread_mutex_lock(&pvt->lock);
1076
0
    packet = _rtpp_stream_recv_pkt(pvt, dtime);
1077
0
    if (packet == NULL) {
1078
        /* Move on to the next session */
1079
0
        pthread_mutex_unlock(&pvt->lock);
1080
0
        return (NULL);
1081
0
    }
1082
0
    rsp->npkts_rcvd.cnt++;
1083
1084
0
    if (pvt->latch_info.mode == RTPLM_FORCE_OFF)
1085
0
        goto nolatch;
1086
1087
0
    if (!CALL_SMETHOD(pvt->rem_addr, isempty)) {
1088
        /* Check that the packet is authentic, drop if it isn't */
1089
0
        if (self->asymmetric == 0) {
1090
0
            if (CALL_SMETHOD(pvt->rem_addr, cmp, sstosa(&packet->raddr),
1091
0
              packet->rlen) != 0) {
1092
0
                if (_rtpp_stream_islatched(pvt) && \
1093
0
                  _rtpp_stream_check_latch_override(pvt, packet, dtime->mono) == 0) {
1094
                    /*
1095
                     * Continue, since there could be good packets in
1096
                     * queue.
1097
                     */
1098
0
                    CALL_SMETHOD(self->pcount, reg_ignr);
1099
0
                    goto discard_and_continue;
1100
0
                } else if (!_rtpp_stream_islatched(pvt)) {
1101
0
                    _rtpp_stream_latch(pvt, dtime->mono, packet);
1102
0
                }
1103
                /* Signal that an address has to be updated */
1104
0
                _rtpp_stream_fill_addr(pvt, rtcps_wrt, packet);
1105
0
            } else if (!_rtpp_stream_islatched(pvt)) {
1106
0
                _rtpp_stream_latch(pvt, dtime->mono, packet);
1107
0
            }
1108
0
        } else {
1109
            /*
1110
             * For asymmetric clients don't check
1111
             * source port since it may be different.
1112
             */
1113
0
            if (!CALL_SMETHOD(pvt->rem_addr, cmphost, sstosa(&packet->raddr))) {
1114
                /*
1115
                 * Continue, since there could be good packets in
1116
                 * queue.
1117
                 */
1118
0
                CALL_SMETHOD(self->pcount, reg_ignr);
1119
0
                goto discard_and_continue;
1120
0
            }
1121
0
        }
1122
0
    } else {
1123
        /* Update address recorded in the session */
1124
0
        _rtpp_stream_fill_addr(pvt, rtcps_wrt, packet);
1125
0
    }
1126
0
    _rtpp_stream_latch_sync(pvt, dtime->mono, packet);
1127
0
nolatch:
1128
0
    pthread_mutex_unlock(&pvt->lock);
1129
0
    return (packet);
1130
1131
0
discard_and_continue:
1132
0
    pthread_mutex_unlock(&pvt->lock);
1133
0
    RTPP_OBJ_DECREF(packet);
1134
0
    rsp->npkts_discard.cnt++;
1135
0
    return (RTPP_S_RX_DCONT);
1136
0
}
1137
1138
static struct rtpp_netaddr *
1139
rtpp_stream_get_rem_addr(struct rtpp_stream *self, int retempty)
1140
0
{
1141
0
    struct rtpp_stream_priv *pvt;
1142
0
    struct rtpp_netaddr *rval;
1143
1144
0
    PUB2PVT(self, pvt);
1145
0
    pthread_mutex_lock(&pvt->lock);
1146
0
    if (retempty == 0 && CALL_SMETHOD(pvt->rem_addr, isempty)) {
1147
0
        pthread_mutex_unlock(&pvt->lock);
1148
0
        return (NULL);
1149
0
    }
1150
0
    rval = pvt->rem_addr;
1151
0
    RTPP_OBJ_INCREF(rval);
1152
0
    pthread_mutex_unlock(&pvt->lock);
1153
0
    return (rval);
1154
0
}
1155
1156
static struct rtpp_stream *
1157
rtpp_stream_get_sender(struct rtpp_stream *self, const struct rtpp_cfg *cfsp)
1158
0
{
1159
0
    if (self->pipe_type == PIPE_RTP) {
1160
0
       return (CALL_SMETHOD(cfsp->rtp_streams_wrt, get_by_idx,
1161
0
         self->stuid_sendr));
1162
0
    }
1163
0
    return (CALL_SMETHOD(cfsp->rtcp_streams_wrt, get_by_idx,
1164
0
      self->stuid_sendr));
1165
0
}