Coverage Report

Created: 2023-09-25 06:44

/src/rtpproxy/src/rtpp_stream.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2004-2006 Maxim Sobolev <sobomax@FreeBSD.org>
3
 * Copyright (c) 2006-20015 Sippy Software, Inc., http://www.sippysoft.com
4
 * All rights reserved.
5
 *
6
 * Redistribution and use in source and binary forms, with or without
7
 * modification, are permitted provided that the following conditions
8
 * are met:
9
 * 1. Redistributions of source code must retain the above copyright
10
 *    notice, this list of conditions and the following disclaimer.
11
 * 2. Redistributions in binary form must reproduce the above copyright
12
 *    notice, this list of conditions and the following disclaimer in the
13
 *    documentation and/or other materials provided with the distribution.
14
 *
15
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
 * SUCH DAMAGE.
26
 *
27
 */
28
29
#include <sys/types.h>
30
#include <sys/socket.h>
31
#include <netinet/in.h>
32
#include <pthread.h>
33
#include <stdatomic.h>
34
#include <stddef.h>
35
#include <stdlib.h>
36
#include <stdint.h>
37
#include <stdio.h>
38
#include <string.h>
39
40
#include "config.h"
41
42
#include "rtpp_ssrc.h"
43
#include "rtpa_stats.h"
44
#include "rtpp_defines.h"
45
#include "rtpp_log.h"
46
#include "rtpp_types.h"
47
#include "rtpp_log_obj.h"
48
#include "rtp.h"
49
#include "rtp_analyze.h"
50
#include "rtpp_analyzer.h"
51
#include "rtp_resizer.h"
52
#include "rtpp_time.h"
53
#include "rtpp_command.h"
54
#include "rtpp_command_args.h"
55
#include "rtpp_command_sub.h"
56
#include "rtpp_command_private.h"
57
#include "rtpp_genuid_singlet.h"
58
#include "rtp_info.h"
59
#include "rtp_packet.h"
60
#include "rtpp_mallocs.h"
61
#include "rtpp_refcnt.h"
62
#include "rtpp_network.h"
63
#include "rtpp_pcount.h"
64
#include "rtpp_pcnt_strm.h"
65
#include "rtpp_proc.h"
66
#include "rtpp_record.h"
67
#include "rtpp_stats.h"
68
#include "rtpp_stream.h"
69
#include "rtpp_stream_fin.h"
70
#include "rtpp_server.h"
71
#include "rtpp_socket.h"
72
#include "rtpp_weakref.h"
73
#include "rtpp_ttl.h"
74
#include "rtpp_pipe.h"
75
#include "rtpp_netaddr.h"
76
#include "rtpp_debug.h"
77
#include "rtpp_acct_pipe.h"
78
#include "rtpp_cfg.h"
79
#include "rtpp_proc_servers.h"
80
#include "advanced/pproc_manager.h"
81
#include "advanced/packet_processor.h"
82
83
0
#define  SEQ_SYNC_IVAL   1.0    /* in seconds */
84
85
struct rtpps_latch {
86
    int latched;
87
    struct rtpp_ssrc ssrc;
88
    int seq;
89
    double last_sync;
90
};
91
92
struct rtps {
93
    uint64_t uid;
94
    int inact;
95
};
96
97
struct rtpp_stream_priv
98
{
99
    struct rtpp_stream pub;
100
    struct rtpp_proc_servers *proc_servers;
101
    struct rtpp_stats *rtpp_stats;
102
    pthread_mutex_t lock;
103
    /* Weak reference to the "rtpp_server" (player) */
104
    struct rtps rtps;
105
    /* Timestamp of the last session update */
106
    double last_update;
107
    /* Flag that indicates whether or not address supplied by client can't be trusted */
108
    int untrusted_addr;
109
    /* Save previous address when doing update */
110
    struct rtpp_netaddr *raddr_prev;
111
    /* Flag which tells if we are allowed to update address with RTP src IP */
112
    struct rtpps_latch latch_info;
113
    /* Structure to track hold requests */
114
    struct rtpp_acct_hold hld_stat;
115
    /* Descriptor */
116
    struct rtpp_socket *fd;
117
    /* Remote source address */
118
    struct rtpp_netaddr *rem_addr;
119
    int npkts_resizer_in_idx;
120
    /* Placeholder for per-module structures */
121
    struct pmod_data pmod_data;
122
};
123
124
static void rtpp_stream_dtor(struct rtpp_stream_priv *);
125
static int rtpp_stream_handle_play(struct rtpp_stream *, const char *,
126
  const char *, int, struct rtpp_command *, int);
127
static void rtpp_stream_handle_noplay(struct rtpp_stream *);
128
static int rtpp_stream_isplayer_active(struct rtpp_stream *);
129
static void rtpp_stream_finish_playback(struct rtpp_stream *, uint64_t);
130
static const char *rtpp_stream_get_actor(struct rtpp_stream *);
131
static const char *rtpp_stream_get_proto(struct rtpp_stream *);
132
static int _rtpp_stream_latch(struct rtpp_stream_priv *, double,
133
  struct rtp_packet *);
134
static int rtpp_stream_latch(struct rtpp_stream *, struct rtp_packet *);
135
static int _rtpp_stream_check_latch_override(struct rtpp_stream_priv *,
136
  struct rtp_packet *, double);
137
static void __rtpp_stream_fill_addr(struct rtpp_stream_priv *,
138
  struct rtp_packet *);
139
static int rtpp_stream_guess_addr(struct rtpp_stream *,
140
  struct rtp_packet *);
141
static void rtpp_stream_prefill_addr(struct rtpp_stream *,
142
  struct sockaddr **, double);
143
static void rtpp_stream_set_skt(struct rtpp_stream *, struct rtpp_socket *);
144
static struct rtpp_socket *rtpp_stream_get_skt(struct rtpp_stream *);
145
static struct rtpp_socket *rtpp_stream_update_skt(struct rtpp_stream *,
146
  struct rtpp_socket *);
147
static int rtpp_stream_send_pkt(struct rtpp_stream *, struct sthread_args *,
148
  struct rtp_packet *);
149
static struct rtp_packet *_rtpp_stream_recv_pkt(struct rtpp_stream_priv *,
150
  const struct rtpp_timestamp *);
151
static int rtpp_stream_issendable(struct rtpp_stream *);
152
static int _rtpp_stream_islatched(struct rtpp_stream_priv *);
153
static void rtpp_stream_locklatch(struct rtpp_stream *);
154
static void rtpp_stream_reg_onhold(struct rtpp_stream *);
155
void rtpp_stream_get_stats(struct rtpp_stream *, struct rtpp_acct_hold *);
156
static struct rtp_packet *rtpp_stream_rx(struct rtpp_stream *,
157
  struct rtpp_weakref *, const struct rtpp_timestamp *, struct rtpp_proc_rstats *);
158
static struct rtpp_netaddr *rtpp_stream_get_rem_addr(struct rtpp_stream *, int);
159
static struct rtpp_stream *rtpp_stream_get_sender(struct rtpp_stream *,
160
  const struct rtpp_cfg *cfsp);
161
162
DEFINE_SMETHODS(rtpp_stream,
163
    .handle_play = &rtpp_stream_handle_play,
164
    .handle_noplay = &rtpp_stream_handle_noplay,
165
    .isplayer_active = &rtpp_stream_isplayer_active,
166
    .finish_playback = &rtpp_stream_finish_playback,
167
    .get_actor = &rtpp_stream_get_actor,
168
    .get_proto = &rtpp_stream_get_proto,
169
    .prefill_addr = &rtpp_stream_prefill_addr,
170
    .set_skt = &rtpp_stream_set_skt,
171
    .get_skt = &rtpp_stream_get_skt,
172
    .update_skt = &rtpp_stream_update_skt,
173
    .send_pkt = &rtpp_stream_send_pkt,
174
    .guess_addr = &rtpp_stream_guess_addr,
175
    .issendable = &rtpp_stream_issendable,
176
    .locklatch = &rtpp_stream_locklatch,
177
    .reg_onhold = &rtpp_stream_reg_onhold,
178
    .get_stats = &rtpp_stream_get_stats,
179
    .rx = &rtpp_stream_rx,
180
    .get_rem_addr = &rtpp_stream_get_rem_addr,
181
    .latch = &rtpp_stream_latch,
182
    .get_sender = &rtpp_stream_get_sender,
183
);
184
185
static enum pproc_action
186
analyze_rtp_packet(const struct pkt_proc_ctx *pktxp)
187
0
{
188
0
    struct rtpp_stream *stp_in = pktxp->strmp_in;
189
0
    struct rtp_packet *packet = pktxp->pktp;
190
191
0
    CALL_SMETHOD(stp_in->pcnt_strm, reg_pktin, packet);
192
0
    if (CALL_SMETHOD(stp_in->analyzer, update, packet) == UPDATE_SSRC_CHG) {
193
0
        CALL_SMETHOD(stp_in, latch, packet);
194
0
    }
195
0
    return (PPROC_ACT_NOP);
196
0
}
197
198
static enum pproc_action
199
analyze_rtcp_packet(const struct pkt_proc_ctx *pktxp)
200
0
{
201
0
    struct rtpp_stream *stp_in = pktxp->strmp_in;
202
0
    struct rtp_packet *packet = pktxp->pktp;
203
204
0
    CALL_SMETHOD(stp_in->pcnt_strm, reg_pktin, packet);
205
0
    return (PPROC_ACT_NOP);
206
0
}
207
208
static enum pproc_action
209
resizer_injest(const struct pkt_proc_ctx *pktxp)
210
0
{
211
0
    struct rtpp_stream *stp_in = pktxp->strmp_in;
212
0
    struct rtp_packet *packet = pktxp->pktp;
213
0
    struct rtpp_stream_priv *pvt;
214
215
0
    if (stp_in->resizer != NULL) {
216
0
        rtp_resizer_enqueue(stp_in->resizer, &packet, pktxp->rsp);
217
0
        if (packet == NULL) {
218
0
            if (pktxp->rsp != NULL) {
219
0
                pktxp->rsp->npkts_resizer_in.cnt++;
220
0
            } else {
221
0
                pvt = (struct rtpp_stream_priv *)pktxp->pproc->arg;
222
0
                CALL_SMETHOD(pvt->rtpp_stats, updatebyidx, pvt->npkts_resizer_in_idx, 1);
223
0
            }
224
0
        }
225
0
    }
226
0
    return ((packet == NULL) ? PPROC_ACT_TAKE : PPROC_ACT_NOP);
227
0
}
228
229
struct rtpp_stream *
230
rtpp_stream_ctor(const struct r_stream_ctor_args *ap)
231
0
{
232
0
    struct rtpp_stream_priv *pvt;
233
0
    size_t alen;
234
235
0
    alen = offsetof(struct rtpp_stream_priv, pmod_data.adp) + 
236
0
      (ap->nmodules * sizeof(pvt->pmod_data.adp[0]));
237
0
    pvt = rtpp_rzmalloc(alen, PVT_RCOFFS(pvt));
238
0
    if (pvt == NULL) {
239
0
        goto e0;
240
0
    }
241
0
    if (pthread_mutex_init(&pvt->lock, NULL) != 0) {
242
0
        goto e1;
243
0
    }
244
0
    pvt->pub.pproc_manager = CALL_SMETHOD(ap->pproc_manager, clone);
245
0
    if (pvt->pub.pproc_manager == NULL) {
246
0
        goto e2;
247
0
    }
248
0
    if (ap->pipe_type == PIPE_RTP) {
249
0
        pvt->pub.analyzer = rtpp_analyzer_ctor(ap->log);
250
0
        if (pvt->pub.analyzer == NULL) {
251
0
            goto e3;
252
0
        }
253
254
0
        const struct packet_processor_if resize_packet_poi = {
255
0
            .descr = "resize_packet",
256
0
            .arg = (void *)pvt,
257
0
            .key = (void *)(pvt + 1),
258
0
            .enqueue = &resizer_injest
259
0
        };
260
0
        if (CALL_SMETHOD(pvt->pub.pproc_manager, reg, PPROC_ORD_RESIZE, &resize_packet_poi) < 0)
261
0
            goto e4;
262
0
        pvt->npkts_resizer_in_idx = CALL_SMETHOD(ap->rtpp_stats, getidxbyname,
263
0
          "npkts_resizer_in");
264
0
        if (pvt->npkts_resizer_in_idx == -1)
265
0
            goto e5;
266
267
0
    }
268
269
0
    const struct packet_processor_if analyze_packet_poi = {
270
0
        .descr = "analyze_packet",
271
0
        .arg = (void *)pvt->pub.analyzer,
272
0
        .key = (void *)pvt,
273
0
        .enqueue = (ap->pipe_type == PIPE_RTP) ? &analyze_rtp_packet : &analyze_rtcp_packet,
274
0
    };
275
0
    if (CALL_SMETHOD(pvt->pub.pproc_manager, reg, PPROC_ORD_ANALYZE, &analyze_packet_poi) < 0)
276
0
        goto e5;
277
278
0
    pvt->pub.pcnt_strm = rtpp_pcnt_strm_ctor();
279
0
    if (pvt->pub.pcnt_strm == NULL) {
280
0
        goto e6;
281
0
    }
282
0
    pvt->raddr_prev = rtpp_netaddr_ctor();
283
0
    if (pvt->raddr_prev == NULL) {
284
0
        goto e7;
285
0
    }
286
0
    pvt->rem_addr = rtpp_netaddr_ctor();
287
0
    if (pvt->rem_addr == NULL) {
288
0
        goto e8;
289
0
    }
290
0
    pvt->proc_servers = ap->proc_servers;
291
0
    RTPP_OBJ_INCREF(ap->proc_servers);
292
0
    pvt->rtpp_stats = ap->rtpp_stats;
293
0
    pvt->pub.log = ap->log;
294
0
    RTPP_OBJ_INCREF(ap->log);
295
0
    pvt->pub.side = ap->side;
296
0
    pvt->pub.pipe_type = ap->pipe_type;
297
298
0
    rtpp_gen_uid(&pvt->pub.stuid);
299
0
    pvt->pub.seuid = ap->seuid;
300
0
    for (unsigned int i = 0; i < ap->nmodules; i++) {
301
0
        atomic_init(&(pvt->pmod_data.adp[i]), NULL);
302
0
    }
303
0
    pvt->pmod_data.nmodules = ap->nmodules;
304
0
    pvt->pub.pmod_datap = &(pvt->pmod_data);
305
0
    PUBINST_FININIT(&pvt->pub, pvt, rtpp_stream_dtor);
306
0
    return (&pvt->pub);
307
308
0
e8:
309
0
    RTPP_OBJ_DECREF(pvt->raddr_prev);
310
0
e7:
311
0
    RTPP_OBJ_DECREF(pvt->pub.pcnt_strm);
312
0
e6:
313
0
    CALL_SMETHOD(pvt->pub.pproc_manager, unreg, pvt);
314
0
e5:
315
0
    if (ap->pipe_type == PIPE_RTP) {
316
0
        CALL_SMETHOD(pvt->pub.pproc_manager, unreg, pvt + 1);
317
0
    }
318
0
e4:
319
0
    if (ap->pipe_type == PIPE_RTP) {
320
0
        RTPP_OBJ_DECREF(pvt->pub.analyzer);
321
0
    }
322
0
e3:
323
0
    RTPP_OBJ_DECREF(pvt->pub.pproc_manager);
324
0
e2:
325
0
    pthread_mutex_destroy(&pvt->lock);
326
0
e1:
327
0
    RTPP_OBJ_DECREF(&(pvt->pub));
328
0
    free(pvt);
329
0
e0:
330
0
    return (NULL);
331
0
}
332
333
static inline void
334
_s_rtps(struct rtpp_stream_priv *pvt, uint64_t rtps, int replace)
335
0
{
336
337
0
    RTPP_DBG_ASSERT(pvt->pub.pipe_type == PIPE_RTP);
338
0
    if (replace == 0) {
339
0
        RTPP_DBG_ASSERT(pvt->rtps.uid == RTPP_UID_NONE);
340
0
    }
341
0
    pvt->rtps.uid = rtps;
342
0
    if (CALL_SMETHOD(pvt->rem_addr, isempty) || pvt->fd == NULL) {
343
0
        pvt->rtps.inact = 1;
344
0
    }
345
0
}
346
347
static const char *
348
_rtpp_stream_get_actor(struct rtpp_stream_priv *pvt)
349
0
{
350
351
0
    return ((pvt->pub.side == RTPP_SSIDE_CALLER) ? "caller" : "callee");
352
0
}
353
354
static const char *
355
_rtpp_stream_get_proto(struct rtpp_stream_priv *pvt)
356
0
{
357
358
0
    return (PP_NAME(pvt->pub.pipe_type));
359
0
}
360
361
static void
362
rtpp_stream_dtor(struct rtpp_stream_priv *pvt)
363
0
{
364
0
    struct rtpp_stream *pub;
365
366
0
    pub = &(pvt->pub);
367
0
    rtpp_stream_fin(pub);
368
0
    if (pub->analyzer != NULL) {
369
0
         struct rtpa_stats rst;
370
0
         char ssrc_buf[11];
371
0
         const char *actor, *ssrc;
372
373
0
         actor = _rtpp_stream_get_actor(pvt);
374
0
         CALL_SMETHOD(pub->analyzer, get_stats, &rst);
375
0
         if (rst.ssrc_changes != 0) {
376
0
             snprintf(ssrc_buf, sizeof(ssrc_buf), SSRC_FMT, rst.last_ssrc.val);
377
0
             ssrc = ssrc_buf;
378
0
         } else {
379
0
             ssrc = "NONE";
380
0
         }
381
0
         RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "RTP stream from %s: "
382
0
           "SSRC=%s, ssrc_changes=%lu, psent=%lu, precvd=%lu, plost=%lu, pdups=%lu",
383
0
           actor, ssrc, rst.ssrc_changes, rst.psent, rst.precvd,
384
0
           rst.plost, rst.pdups);
385
0
         if (rst.psent > 0) {
386
0
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_nsent", rst.psent);
387
0
         }
388
0
         if (rst.precvd > 0) {
389
0
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_nrcvd", rst.precvd);
390
0
         }
391
0
         if (rst.pdups > 0) {
392
0
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_ndups", rst.pdups);
393
0
         }
394
0
         if (rst.plost > 0) {
395
0
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_nlost", rst.plost);
396
0
         }
397
0
         if (rst.pecount > 0) {
398
0
             CALL_SMETHOD(pvt->rtpp_stats, updatebyname, "rtpa_perrs", rst.pecount);
399
0
         }
400
0
         RTPP_OBJ_DECREF(pvt->pub.analyzer);
401
0
    }
402
0
    if (pvt->fd != NULL)
403
0
        RTPP_OBJ_DECREF(pvt->fd);
404
0
    if (pub->codecs != NULL)
405
0
        free(pub->codecs);
406
0
    if (pvt->rtps.uid != RTPP_UID_NONE)
407
0
        CALL_SMETHOD(pvt->proc_servers, unreg, pvt->rtps.uid);
408
0
    if (pub->resizer != NULL)
409
0
        rtp_resizer_free(pvt->rtpp_stats, pub->resizer);
410
0
    if (pub->rrc != NULL)
411
0
        RTPP_OBJ_DECREF(pub->rrc);
412
0
    if (pub->pcount != NULL)
413
0
        RTPP_OBJ_DECREF(pub->pcount);
414
0
    for (unsigned int i = 0; i < pvt->pmod_data.nmodules; i++) {
415
0
        struct rtpp_refcnt *mdata_rcnt;
416
0
        mdata_rcnt = atomic_load(&(pvt->pmod_data.adp[i]));
417
0
        if (mdata_rcnt != NULL) {
418
0
            RC_DECREF(mdata_rcnt);
419
0
        }
420
0
    }
421
0
    if (pub->ttl != NULL)
422
0
        RTPP_OBJ_DECREF(pub->ttl);
423
0
    RTPP_OBJ_DECREF(pub->pcnt_strm);
424
0
    RTPP_OBJ_DECREF(pvt->pub.log);
425
0
    RTPP_OBJ_DECREF(pvt->rem_addr);
426
0
    RTPP_OBJ_DECREF(pvt->raddr_prev);
427
0
    RTPP_OBJ_DECREF(pvt->proc_servers);
428
0
    if (pvt->pub.pipe_type == PIPE_RTP) {
429
0
        CALL_SMETHOD(pvt->pub.pproc_manager, unreg, pvt + 1);
430
0
    }
431
0
    CALL_SMETHOD(pvt->pub.pproc_manager, unreg, pvt);
432
0
    RTPP_OBJ_DECREF(pvt->pub.pproc_manager);
433
434
0
    pthread_mutex_destroy(&pvt->lock);
435
0
    free(pvt);
436
0
}
437
438
static void
439
player_predestroy_cb(struct rtpp_stats *rtpp_stats)
440
0
{
441
442
0
    CALL_SMETHOD(rtpp_stats, updatebyname, "nplrs_destroyed", 1);
443
0
}
444
445
static enum pproc_action
446
drop_packets(const struct pkt_proc_ctx *pktxp)
447
0
{
448
449
0
    return (PPROC_ACT_DROP);
450
0
}
451
452
static int
453
rtpp_stream_handle_play(struct rtpp_stream *self, const char *codecs,
454
  const char *pname, int playcount, struct rtpp_command *cmd, int ptime)
455
0
{
456
0
    struct rtpp_stream_priv *pvt;
457
0
    int n;
458
0
    char *cp;
459
0
    struct rtpp_server *rsrv;
460
0
    uint16_t seq;
461
0
    uint32_t ssrc;
462
0
    const char *plerror;
463
0
    struct rtpp_server_ctor_args sca = {.name = pname, .loop = playcount,
464
0
      .ptime = ptime};
465
466
0
    PUB2PVT(self, pvt);
467
468
0
    const struct packet_processor_if drop_on_pa_poi = {
469
0
        .descr = "drop_packets(player_active)",
470
0
        .arg = (void *)pvt,
471
0
        .key = (void *)(pvt + 2),
472
0
        .enqueue = &drop_packets
473
0
    };
474
475
0
    pthread_mutex_lock(&pvt->lock);
476
0
    plerror = "reason unknown";
477
0
    while (*codecs != '\0') {
478
0
        n = strtol(codecs, &cp, 10);
479
0
        if (cp == codecs) {
480
0
            plerror = "invalid codecs";
481
0
            goto e0;
482
0
        }
483
0
        codecs = cp;
484
0
        if (*codecs != '\0')
485
0
            codecs++;
486
0
        sca.codec = n;
487
0
        rsrv = rtpp_server_ctor(&sca);
488
0
        if (rsrv == NULL) {
489
0
            RTPP_LOG(pvt->pub.log, RTPP_LOG_DBUG, "rtpp_server_ctor(\"%s\", %d, %d) failed",
490
0
              pname, n, playcount);
491
0
            plerror = "rtpp_server_ctor() failed";
492
0
            if (sca.result == RTPP_SERV_NOENT)
493
0
                continue;
494
0
            goto e0;
495
0
        }
496
0
        rsrv->stuid = self->stuid;
497
0
        ssrc = CALL_SMETHOD(rsrv, get_ssrc);
498
0
        seq = CALL_SMETHOD(rsrv, get_seq);
499
0
        _s_rtps(pvt, rsrv->sruid, 0);
500
0
        int regres = CALL_SMETHOD(self->pproc_manager->reverse, reg, PPROC_ORD_PLAY,
501
0
          &drop_on_pa_poi);
502
0
        if (regres < 0) {
503
0
            plerror = "pproc_manager->reg() method failed";
504
0
            goto e1;
505
0
        }
506
0
        if (pvt->rtps.inact == 0) {
507
0
            CALL_SMETHOD(rsrv, start, cmd->dtime->mono);
508
0
        }
509
0
        if (CALL_SMETHOD(pvt->proc_servers, reg, rsrv, pvt->rtps.inact) != 0) {
510
0
            plerror = "proc_servers->reg() method failed";
511
0
            goto e2;
512
0
        }
513
0
        pthread_mutex_unlock(&pvt->lock);
514
0
        cmd->csp->nplrs_created.cnt++;
515
0
        CALL_SMETHOD(rsrv->rcnt, reg_pd, (rtpp_refcnt_dtor_t)player_predestroy_cb,
516
0
          pvt->rtpp_stats);
517
0
        RTPP_OBJ_DECREF(rsrv);
518
0
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
519
0
          "%d times playing prompt %s codec %d: SSRC=" SSRC_FMT ", seq=%u",
520
0
          playcount, pname, n, ssrc, seq);
521
0
        return 0;
522
0
    }
523
0
    goto e0;
524
0
e2:
525
0
    CALL_SMETHOD(self->pproc_manager->reverse, unreg, drop_on_pa_poi.key);
526
0
e1:
527
0
    RTPP_OBJ_DECREF(rsrv);
528
0
e0:
529
0
    pthread_mutex_unlock(&pvt->lock);
530
0
    RTPP_LOG(pvt->pub.log, RTPP_LOG_ERR, "can't create player: %s", plerror);
531
0
    return -1;
532
0
}
533
534
static void
535
rtpp_stream_handle_noplay(struct rtpp_stream *self)
536
0
{
537
0
    struct rtpp_stream_priv *pvt;
538
0
    uint64_t ruid;
539
0
    int stopped;
540
541
0
    stopped = 0;
542
0
    PUB2PVT(self, pvt);
543
0
    pthread_mutex_lock(&pvt->lock);
544
0
    ruid = pvt->rtps.uid;
545
0
    pthread_mutex_unlock(&pvt->lock);
546
0
    if (ruid == RTPP_UID_NONE)
547
0
        return;
548
0
    if (CALL_SMETHOD(pvt->proc_servers, unreg, ruid) == 0) {
549
0
        pthread_mutex_lock(&pvt->lock);
550
0
        CALL_SMETHOD(self->pproc_manager->reverse, unreg, pvt + 2);
551
0
        if (pvt->rtps.uid == ruid) {
552
0
            pvt->rtps.uid = RTPP_UID_NONE;
553
0
            pvt->rtps.inact = 0;
554
0
            stopped = 1;
555
0
        }
556
0
        pthread_mutex_unlock(&pvt->lock);
557
0
    }
558
0
    if (stopped != 0) {
559
0
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
560
0
          "stopping player at port %d", self->port);
561
0
    }
562
0
}
563
564
static int
565
rtpp_stream_isplayer_active(struct rtpp_stream *self)
566
0
{
567
0
    struct rtpp_stream_priv *pvt;
568
0
    int rval;
569
570
0
    PUB2PVT(self, pvt);
571
0
    pthread_mutex_lock(&pvt->lock);
572
0
    rval = (pvt->rtps.uid != RTPP_UID_NONE) ? 1 : 0;
573
0
    pthread_mutex_unlock(&pvt->lock);
574
0
    return (rval);
575
0
}
576
577
static void
578
rtpp_stream_finish_playback(struct rtpp_stream *self, uint64_t sruid)
579
0
{
580
0
    struct rtpp_stream_priv *pvt;
581
582
0
    PUB2PVT(self, pvt);
583
0
    pthread_mutex_lock(&pvt->lock);
584
0
    if (pvt->rtps.uid != RTPP_UID_NONE && pvt->rtps.uid == sruid) {
585
0
        CALL_SMETHOD(self->pproc_manager->reverse, unreg, pvt + 2);
586
0
        _s_rtps(pvt, RTPP_UID_NONE, 1);
587
0
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
588
0
          "player at port %d has finished", self->port);
589
0
    }
590
0
    pthread_mutex_unlock(&pvt->lock);
591
0
}
592
593
static const char *
594
rtpp_stream_get_actor(struct rtpp_stream *self)
595
0
{
596
0
    struct rtpp_stream_priv *pvt;
597
598
0
    PUB2PVT(self, pvt);
599
0
    return (_rtpp_stream_get_actor(pvt));
600
0
}
601
602
static const char *
603
rtpp_stream_get_proto(struct rtpp_stream *self)
604
0
{
605
606
0
    return (PP_NAME(self->pipe_type));
607
0
}
608
609
static int
610
_rtpp_stream_latch(struct rtpp_stream_priv *pvt, double dtime,
611
  struct rtp_packet *packet)
612
0
{
613
0
    const char *actor, *ptype, *ssrc, *seq, *relatch;
614
0
    char ssrc_buf[11], seq_buf[6];
615
0
    char saddr[MAX_AP_STRBUF];
616
0
    int newlatch;
617
618
0
    if (pvt->last_update != 0 && \
619
0
      dtime - pvt->last_update < UPDATE_WINDOW) {
620
0
        return (0);
621
0
    }
622
623
0
    actor = _rtpp_stream_get_actor(pvt);
624
0
    ptype = _rtpp_stream_get_proto(pvt);
625
626
0
    if (pvt->pub.pipe_type == PIPE_RTP) {
627
0
        if (rtp_packet_parse(packet) == RTP_PARSER_OK) {
628
0
            pvt->latch_info.ssrc.val = packet->parsed->ssrc;
629
0
            pvt->latch_info.ssrc.inited = 1;
630
0
            pvt->latch_info.seq = packet->parsed->seq;
631
0
            pvt->latch_info.last_sync = dtime;
632
0
            snprintf(ssrc_buf, sizeof(ssrc_buf), SSRC_FMT, packet->parsed->ssrc);
633
0
            snprintf(seq_buf, sizeof(seq_buf), "%u", packet->parsed->seq);
634
0
            ssrc = ssrc_buf;
635
0
            seq = seq_buf;
636
0
        } else {
637
0
            pvt->latch_info.ssrc.val = 0;
638
0
            pvt->latch_info.ssrc.inited = 0;
639
0
            ssrc = seq = "INVALID";
640
0
        }
641
0
    } else {
642
0
        pvt->latch_info.ssrc.inited = 0;
643
0
        ssrc = seq = "UNKNOWN";
644
0
    }
645
646
0
    addrport2char_r(sstosa(&packet->raddr), saddr, sizeof(saddr), ':');
647
0
    newlatch = SSRC_IS_BAD(&pvt->latch_info.ssrc) ? 0 : 1;
648
0
    if (pvt->latch_info.latched == 0) {
649
0
        relatch = (newlatch != 0) ? "latched in" : "not latched (bad SSRC)";
650
0
    } else {
651
0
        relatch = "re-latched";
652
0
    }
653
0
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
654
0
      "%s's address %s: %s (%s), SSRC=%s, Seq=%s", actor, relatch,
655
0
      saddr, ptype, ssrc, seq);
656
0
    pvt->latch_info.latched = newlatch;
657
0
    return (1);
658
0
}
659
660
static int
661
rtpp_stream_latch(struct rtpp_stream *self, struct rtp_packet *packet)
662
0
{
663
0
    struct rtpp_stream_priv *pvt;
664
665
0
    PUB2PVT(self, pvt);
666
0
    pthread_mutex_lock(&pvt->lock);
667
0
    int rval = _rtpp_stream_latch(pvt, packet->rtime.mono, packet);
668
0
    pthread_mutex_unlock(&pvt->lock);
669
0
    return (rval);
670
0
}
671
672
static void
673
_rtpp_stream_latch_sync(struct rtpp_stream_priv *pvt, double dtime,
674
  struct rtp_packet *packet)
675
0
{
676
0
    struct rtpps_latch *lip;
677
678
0
    lip = &pvt->latch_info;
679
0
    if (pvt->pub.pipe_type != PIPE_RTP || lip->ssrc.inited == 0)
680
0
        return;
681
0
    if (dtime - lip->last_sync < SEQ_SYNC_IVAL)
682
0
        return;
683
0
    if (rtp_packet_parse(packet) != RTP_PARSER_OK)
684
0
        return;
685
0
    if (lip->ssrc.val != packet->parsed->ssrc)
686
0
        return;
687
0
    lip->seq = packet->parsed->seq;
688
0
    lip->last_sync = dtime;
689
0
}
690
691
static int
692
_rtpp_stream_check_latch_override(struct rtpp_stream_priv *pvt,
693
  struct rtp_packet *packet, double dtime)
694
0
{
695
0
    const char *actor;
696
0
    char saddr[MAX_AP_STRBUF];
697
698
0
    if (pvt->pub.pipe_type == PIPE_RTCP || pvt->latch_info.ssrc.inited == 0)
699
0
        return (0);
700
0
    if (rtp_packet_parse(packet) != RTP_PARSER_OK)
701
0
        return (0);
702
0
    if (pvt->last_update == 0 || dtime - pvt->last_update > UPDATE_WINDOW) {
703
0
        if (packet->parsed->ssrc != pvt->latch_info.ssrc.val)
704
0
            return (0);
705
0
        if (SEQ_DIST(pvt->latch_info.seq, packet->parsed->seq) > 536)
706
0
            return (0);
707
0
    }
708
709
0
    actor = _rtpp_stream_get_actor(pvt);
710
711
0
    addrport2char_r(sstosa(&packet->raddr), saddr, sizeof(saddr), ':');
712
0
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
713
0
      "%s's address re-latched: %s (%s), SSRC=" SSRC_FMT ", Seq=%u->%u", actor,
714
0
      saddr, "RTP", pvt->latch_info.ssrc.val, pvt->latch_info.seq,
715
0
      packet->parsed->seq);
716
717
0
    pvt->latch_info.seq = packet->parsed->seq;
718
0
    pvt->latch_info.last_sync = packet->rtime.mono;
719
0
    return (1);
720
0
}
721
722
static void
723
_rtpp_stream_plr_start(struct rtpp_stream_priv *pvt, double dtime)
724
0
{
725
726
0
    RTPP_DBG_ASSERT(pvt->rtps.inact != 0);
727
0
    CALL_SMETHOD(pvt->proc_servers, plr_start, pvt->rtps.uid, dtime);
728
0
    pvt->rtps.inact = 0;
729
0
}
730
731
static void
732
__rtpp_stream_fill_addr(struct rtpp_stream_priv *pvt, struct rtp_packet *packet)
733
0
{
734
0
    const char *actor, *ptype;
735
0
    char saddr[MAX_AP_STRBUF];
736
737
0
    pvt->untrusted_addr = 1;
738
0
    CALL_SMETHOD(pvt->rem_addr, set, sstosa(&packet->raddr), packet->rlen);
739
0
    if (CALL_SMETHOD(pvt->raddr_prev, isempty) ||
740
0
      CALL_SMETHOD(pvt->raddr_prev, cmp, sstosa(&packet->raddr), packet->rlen) != 0) {
741
0
        pvt->latch_info.latched = 1;
742
0
    }
743
0
    if (pvt->rtps.inact != 0 && pvt->fd != NULL) {
744
0
        _rtpp_stream_plr_start(pvt, packet->rtime.mono);
745
0
    }
746
747
0
    actor = _rtpp_stream_get_actor(pvt);
748
0
    ptype = _rtpp_stream_get_proto(pvt);
749
0
    addrport2char_r(sstosa(&packet->raddr), saddr, sizeof(saddr), ':');
750
0
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
751
0
      "%s's address filled in: %s (%s)", actor, saddr, ptype);
752
0
    return;
753
0
}
754
755
static int
756
rtpp_stream_guess_addr(struct rtpp_stream *self,
757
  struct rtp_packet *packet)
758
0
{
759
0
    int rport;
760
0
    const char *actor, *ptype;
761
0
    struct rtpp_stream_priv *pvt;
762
0
    struct sockaddr_storage ta;
763
764
0
    RTPP_DBG_ASSERT(self->pipe_type == PIPE_RTCP);
765
0
    PUB2PVT(self, pvt);
766
767
0
    if (!CALL_SMETHOD(pvt->rem_addr, isempty) &&
768
0
      CALL_SMETHOD(pvt->rem_addr, cmphost, sstosa(&packet->raddr))) {
769
0
        return (0);
770
0
    }
771
#if 0
772
    if (self->addr == NULL) {
773
        self->addr = malloc(packet->rlen);
774
        if (self->addr == NULL) {
775
            return (-1);
776
        }
777
    }
778
#endif
779
0
    actor = rtpp_stream_get_actor(self);
780
0
    ptype = rtpp_stream_get_proto(self);
781
0
    rport = ntohs(satosin(&packet->raddr)->sin_port);
782
0
    if (IS_LAST_PORT(rport)) {
783
0
        return (-1);
784
0
    }
785
786
0
    memcpy(&ta, &packet->raddr, packet->rlen);
787
0
    setport(sstosa(&ta), rport + 1);
788
789
0
    CALL_SMETHOD(pvt->rem_addr, set, sstosa(&ta), packet->rlen);
790
    /* Use guessed value as the only true one for asymmetric clients */
791
0
    pvt->latch_info.latched = self->asymmetric;
792
0
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "guessing %s port "
793
0
      "for %s to be %d", ptype, actor, rport + 1);
794
795
0
    return (0);
796
0
}
797
798
static void
799
rtpp_stream_prefill_addr(struct rtpp_stream *self, struct sockaddr **iapp,
800
  double dtime)
801
0
{
802
0
    struct rtpp_stream_priv *pvt;
803
0
    char saddr[MAX_AP_STRBUF];
804
0
    const char *actor, *ptype;
805
806
0
    PUB2PVT(self, pvt);
807
808
0
    actor = rtpp_stream_get_actor(self);
809
0
    ptype = rtpp_stream_get_proto(self);
810
0
    pthread_mutex_lock(&pvt->lock);
811
0
    if (pvt->hld_stat.status != 0) {
812
0
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "taking %s's %s stream off-hold",
813
0
            actor, ptype);
814
0
        pvt->hld_stat.status = 0;
815
0
    }
816
817
0
    if (!CALL_SMETHOD(pvt->rem_addr, isempty))
818
0
        pvt->last_update = dtime;
819
820
    /*
821
     * Unless the address provided by client historically
822
     * cannot be trusted and address is different from one
823
     * that we recorded update it.
824
     */
825
0
    if (pvt->untrusted_addr != 0) {
826
0
        pthread_mutex_unlock(&pvt->lock);
827
0
        return;
828
0
    }
829
0
    if (!CALL_SMETHOD(pvt->rem_addr, isempty) && CALL_SMETHOD(pvt->rem_addr,
830
0
      isaddrseq, *iapp)) {
831
0
        pthread_mutex_unlock(&pvt->lock);
832
0
        return;
833
0
    }
834
835
0
    addrport2char_r(*iapp, saddr, sizeof(saddr), ':');
836
0
    RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "pre-filling %s's %s address "
837
0
      "with %s", actor, ptype, saddr);
838
0
    if (!CALL_SMETHOD(pvt->rem_addr, isempty)) {
839
0
        if (pvt->latch_info.latched != 0) {
840
0
            CALL_SMETHOD(pvt->raddr_prev, copy, pvt->rem_addr);
841
0
        }
842
0
    }
843
0
    CALL_SMETHOD(pvt->rem_addr, set, *iapp, SA_LEN(*iapp));
844
0
    if (pvt->rtps.inact != 0 && pvt->fd != NULL) {
845
0
        _rtpp_stream_plr_start(pvt, dtime);
846
0
    }
847
0
    pthread_mutex_unlock(&pvt->lock);
848
0
}
849
850
static void rtpp_stream_reg_onhold(struct rtpp_stream *self)
851
0
{
852
0
    struct rtpp_stream_priv *pvt;
853
0
    const char *actor, *ptype;
854
855
0
    PUB2PVT(self, pvt);
856
0
    pthread_mutex_lock(&pvt->lock);
857
0
    if (pvt->hld_stat.status == 0) {
858
0
        actor = rtpp_stream_get_actor(self);
859
0
        ptype = rtpp_stream_get_proto(self);
860
0
        RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "putting %s's %s stream on hold",
861
0
           actor, ptype);
862
0
        pvt->hld_stat.status = 1;
863
0
    }
864
0
    pvt->hld_stat.cnt++;
865
0
    pthread_mutex_unlock(&pvt->lock);
866
0
}
867
868
static void
869
rtpp_stream_set_skt(struct rtpp_stream *self, struct rtpp_socket *new_skt)
870
0
{
871
0
    struct rtpp_stream_priv *pvt;
872
873
0
    PUB2PVT(self, pvt);
874
0
    pthread_mutex_lock(&pvt->lock);
875
0
    if (new_skt == NULL) {
876
0
        RTPP_DBG_ASSERT(pvt->fd != NULL);
877
0
        RTPP_OBJ_DECREF(pvt->fd);
878
0
        pvt->fd = NULL;
879
0
        pthread_mutex_unlock(&pvt->lock);
880
0
        return;
881
0
    }
882
0
    RTPP_DBG_ASSERT(pvt->fd == NULL);
883
0
    CALL_METHOD(new_skt, set_stuid, self->stuid);
884
0
    pvt->fd = new_skt;
885
0
    RTPP_OBJ_INCREF(pvt->fd);
886
0
    if (pvt->rtps.inact != 0 && !CALL_SMETHOD(pvt->rem_addr, isempty)) {
887
0
        _rtpp_stream_plr_start(pvt, getdtime());
888
0
    }
889
0
    pthread_mutex_unlock(&pvt->lock);
890
0
}
891
892
static struct rtpp_socket *
893
rtpp_stream_get_skt(struct rtpp_stream *self)
894
0
{
895
0
    struct rtpp_stream_priv *pvt;
896
0
    struct rtpp_socket *rval;
897
898
0
    PUB2PVT(self, pvt);
899
0
    pthread_mutex_lock(&pvt->lock);
900
0
    if (pvt->fd == NULL) {
901
0
        pthread_mutex_unlock(&pvt->lock);
902
0
        return (NULL);
903
0
    }
904
0
    RTPP_OBJ_INCREF(pvt->fd);
905
0
    rval = pvt->fd;
906
0
    pthread_mutex_unlock(&pvt->lock);
907
0
    return (rval);
908
0
}
909
910
static struct rtpp_socket *
911
rtpp_stream_update_skt(struct rtpp_stream *self, struct rtpp_socket *new_skt)
912
0
{
913
0
    struct rtpp_socket *old_skt;
914
0
    struct rtpp_stream_priv *pvt;
915
916
0
    RTPP_DBG_ASSERT(new_skt != NULL);
917
0
    PUB2PVT(self, pvt);
918
0
    pthread_mutex_lock(&pvt->lock);
919
0
    old_skt = pvt->fd;
920
0
    CALL_METHOD(new_skt, set_stuid, self->stuid);
921
0
    pvt->fd = new_skt;
922
0
    RTPP_OBJ_INCREF(pvt->fd);
923
0
    if (pvt->rtps.inact != 0 && !CALL_SMETHOD(pvt->rem_addr, isempty)) {
924
0
        _rtpp_stream_plr_start(pvt, getdtime());
925
0
    }
926
0
    pthread_mutex_unlock(&pvt->lock);
927
0
    return (old_skt);
928
0
}
929
930
static int
931
rtpp_stream_send_pkt(struct rtpp_stream *self, struct sthread_args *sap,
932
  struct rtp_packet *pkt)
933
0
{
934
0
    struct rtpp_stream_priv *pvt;
935
0
    int rval;
936
937
0
    PUB2PVT(self, pvt);
938
0
    pthread_mutex_lock(&pvt->lock);
939
0
    rval = CALL_METHOD(pvt->fd, send_pkt_na, sap, pvt->rem_addr, pkt,
940
0
      self->log);
941
0
    pthread_mutex_unlock(&pvt->lock);
942
0
    return (rval);
943
0
}
944
945
static struct rtp_packet *
946
_rtpp_stream_recv_pkt(struct rtpp_stream_priv *pvt,
947
  const struct rtpp_timestamp *dtime)
948
0
{
949
0
    struct rtp_packet *pkt;
950
951
0
    pkt = CALL_METHOD(pvt->fd, rtp_recv, dtime, pvt->pub.laddr, pvt->pub.port);
952
0
    return (pkt);
953
0
}
954
955
static int
956
rtpp_stream_issendable(struct rtpp_stream *self)
957
0
{
958
0
    struct rtpp_stream_priv *pvt;
959
960
0
    PUB2PVT(self, pvt);
961
0
    pthread_mutex_lock(&pvt->lock);
962
0
    if (CALL_SMETHOD(pvt->rem_addr, isempty)) {
963
0
        pthread_mutex_unlock(&pvt->lock);
964
0
        return (0);
965
0
    }
966
0
    if (pvt->fd == NULL) {
967
0
        pthread_mutex_unlock(&pvt->lock);
968
0
        return (0);
969
0
    }
970
0
    pthread_mutex_unlock(&pvt->lock);
971
0
    return (1);
972
0
}
973
974
static int
975
_rtpp_stream_islatched(struct rtpp_stream_priv *pvt)
976
0
{
977
0
    int rval;
978
979
0
    rval = pvt->latch_info.latched;
980
0
    return (rval);
981
0
}
982
983
static void
984
rtpp_stream_locklatch(struct rtpp_stream *self)
985
0
{
986
0
    struct rtpp_stream_priv *pvt;
987
988
0
    PUB2PVT(self, pvt);
989
0
    pthread_mutex_lock(&pvt->lock);
990
0
    pvt->latch_info.latched = 1;
991
0
    pthread_mutex_unlock(&pvt->lock);
992
0
}
993
994
void
995
rtpp_stream_get_stats(struct rtpp_stream *self, struct rtpp_acct_hold *ahp)
996
0
{
997
0
    struct rtpp_stream_priv *pvt;
998
999
0
    PUB2PVT(self, pvt);
1000
0
    pthread_mutex_lock(&pvt->lock);
1001
0
    *ahp = pvt->hld_stat;
1002
0
    pthread_mutex_unlock(&pvt->lock);
1003
0
}
1004
1005
static int
1006
_rtpp_stream_fill_addr(struct rtpp_stream_priv *pvt,
1007
  struct rtpp_weakref *rtcps_wrt, struct rtp_packet *packet)
1008
0
{
1009
0
    struct rtpp_stream *stp_rtcp;
1010
0
    int rval;
1011
1012
0
    __rtpp_stream_fill_addr(pvt, packet);
1013
0
    if (pvt->pub.stuid_rtcp == RTPP_UID_NONE) {
1014
0
        return (0);
1015
0
    }
1016
0
    stp_rtcp = CALL_SMETHOD(rtcps_wrt, get_by_idx,
1017
0
      pvt->pub.stuid_rtcp);
1018
0
    if (stp_rtcp == NULL) {
1019
0
        return (0);
1020
0
    }
1021
0
    rval = CALL_SMETHOD(stp_rtcp, guess_addr, packet);
1022
0
    RTPP_OBJ_DECREF(stp_rtcp);
1023
0
    return (rval);
1024
0
}
1025
1026
static struct rtp_packet *
1027
rtpp_stream_rx(struct rtpp_stream *self, struct rtpp_weakref *rtcps_wrt,
1028
  const struct rtpp_timestamp *dtime, struct rtpp_proc_rstats *rsp)
1029
0
{
1030
0
    struct rtp_packet *packet = NULL;
1031
0
    struct rtpp_stream_priv *pvt;
1032
1033
0
    PUB2PVT(self, pvt);
1034
0
    pthread_mutex_lock(&pvt->lock);
1035
0
    packet = _rtpp_stream_recv_pkt(pvt, dtime);
1036
0
    if (packet == NULL) {
1037
        /* Move on to the next session */
1038
0
        pthread_mutex_unlock(&pvt->lock);
1039
0
        return (NULL);
1040
0
    }
1041
0
    rsp->npkts_rcvd.cnt++;
1042
1043
0
    if (!CALL_SMETHOD(pvt->rem_addr, isempty)) {
1044
        /* Check that the packet is authentic, drop if it isn't */
1045
0
        if (self->asymmetric == 0) {
1046
0
            if (CALL_SMETHOD(pvt->rem_addr, cmp, sstosa(&packet->raddr),
1047
0
              packet->rlen) != 0) {
1048
0
                if (_rtpp_stream_islatched(pvt) && \
1049
0
                  _rtpp_stream_check_latch_override(pvt, packet, dtime->mono) == 0) {
1050
                    /*
1051
                     * Continue, since there could be good packets in
1052
                     * queue.
1053
                     */
1054
0
                    CALL_SMETHOD(self->pcount, reg_ignr);
1055
0
                    goto discard_and_continue;
1056
0
                } else if (!_rtpp_stream_islatched(pvt)) {
1057
0
                    _rtpp_stream_latch(pvt, dtime->mono, packet);
1058
0
                }
1059
                /* Signal that an address has to be updated */
1060
0
                _rtpp_stream_fill_addr(pvt, rtcps_wrt, packet);
1061
0
            } else if (!_rtpp_stream_islatched(pvt)) {
1062
0
                _rtpp_stream_latch(pvt, dtime->mono, packet);
1063
0
            }
1064
0
        } else {
1065
            /*
1066
             * For asymmetric clients don't check
1067
             * source port since it may be different.
1068
             */
1069
0
            if (!CALL_SMETHOD(pvt->rem_addr, cmphost, sstosa(&packet->raddr))) {
1070
                /*
1071
                 * Continue, since there could be good packets in
1072
                 * queue.
1073
                 */
1074
0
                CALL_SMETHOD(self->pcount, reg_ignr);
1075
0
                goto discard_and_continue;
1076
0
            }
1077
0
        }
1078
0
    } else {
1079
        /* Update address recorded in the session */
1080
0
        _rtpp_stream_fill_addr(pvt, rtcps_wrt, packet);
1081
0
    }
1082
0
    _rtpp_stream_latch_sync(pvt, dtime->mono, packet);
1083
0
    pthread_mutex_unlock(&pvt->lock);
1084
0
    return (packet);
1085
1086
0
discard_and_continue:
1087
0
    pthread_mutex_unlock(&pvt->lock);
1088
0
    RTPP_OBJ_DECREF(packet);
1089
0
    rsp->npkts_discard.cnt++;
1090
0
    return (RTPP_S_RX_DCONT);
1091
0
}
1092
1093
static struct rtpp_netaddr *
1094
rtpp_stream_get_rem_addr(struct rtpp_stream *self, int retempty)
1095
0
{
1096
0
    struct rtpp_stream_priv *pvt;
1097
0
    struct rtpp_netaddr *rval;
1098
1099
0
    PUB2PVT(self, pvt);
1100
0
    pthread_mutex_lock(&pvt->lock);
1101
0
    if (retempty == 0 && CALL_SMETHOD(pvt->rem_addr, isempty)) {
1102
0
        pthread_mutex_unlock(&pvt->lock);
1103
0
        return (NULL);
1104
0
    }
1105
0
    rval = pvt->rem_addr;
1106
0
    RTPP_OBJ_INCREF(rval);
1107
0
    pthread_mutex_unlock(&pvt->lock);
1108
0
    return (rval);
1109
0
}
1110
1111
static struct rtpp_stream *
1112
rtpp_stream_get_sender(struct rtpp_stream *self, const struct rtpp_cfg *cfsp)
1113
0
{
1114
0
    if (self->pipe_type == PIPE_RTP) {
1115
0
       return (CALL_SMETHOD(cfsp->rtp_streams_wrt, get_by_idx,
1116
0
         self->stuid_sendr));
1117
0
    }
1118
0
    return (CALL_SMETHOD(cfsp->rtcp_streams_wrt, get_by_idx,
1119
0
      self->stuid_sendr));
1120
0
}