Coverage Report

Created: 2025-12-31 07:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ffmpeg/libavformat/udp.c
Line
Count
Source
1
/*
2
 * UDP prototype streaming system
3
 * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
4
 *
5
 * This file is part of FFmpeg.
6
 *
7
 * FFmpeg is free software; you can redistribute it and/or
8
 * modify it under the terms of the GNU Lesser General Public
9
 * License as published by the Free Software Foundation; either
10
 * version 2.1 of the License, or (at your option) any later version.
11
 *
12
 * FFmpeg is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
 * Lesser General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU Lesser General Public
18
 * License along with FFmpeg; if not, write to the Free Software
19
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20
 */
21
22
/**
23
 * @file
24
 * UDP protocol
25
 */
26
27
#define _DEFAULT_SOURCE
28
#define _BSD_SOURCE     /* Needed for using struct ip_mreq with recent glibc */
29
30
#include "avformat.h"
31
#include "libavutil/avassert.h"
32
#include "libavutil/mem.h"
33
#include "libavutil/parseutils.h"
34
#include "libavutil/fifo.h"
35
#include "libavutil/intreadwrite.h"
36
#include "libavutil/opt.h"
37
#include "libavutil/log.h"
38
#include "libavutil/time.h"
39
#include "internal.h"
40
#include "network.h"
41
#include "os_support.h"
42
#include "url.h"
43
#include "ip.h"
44
45
#ifdef __APPLE__
46
#include "TargetConditionals.h"
47
#endif
48
49
#if HAVE_UDPLITE_H
50
#include "udplite.h"
51
#else
52
/* On many Linux systems, udplite.h is missing but the kernel supports UDP-Lite.
53
 * So, we provide a fallback here.
54
 */
55
0
#define UDPLITE_SEND_CSCOV                               10
56
0
#define UDPLITE_RECV_CSCOV                               11
57
#endif
58
59
#ifndef IPPROTO_UDPLITE
60
#define IPPROTO_UDPLITE                                  136
61
#endif
62
63
#if HAVE_W32THREADS
64
#undef HAVE_PTHREAD_CANCEL
65
#define HAVE_PTHREAD_CANCEL 1
66
#endif
67
68
#if HAVE_PTHREAD_CANCEL
69
#include "libavutil/thread.h"
70
#endif
71
72
#ifndef IPV6_ADD_MEMBERSHIP
73
#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
74
#define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
75
#endif
76
77
0
#define UDP_TX_BUF_SIZE 32768
78
0
#define UDP_RX_BUF_SIZE 393216
79
0
#define UDP_MAX_PKT_SIZE 65536
80
0
#define UDP_HEADER_SIZE 8
81
82
typedef struct UDPQueuedPacketHeader {
83
    int pkt_size;
84
    struct sockaddr_storage addr;
85
    socklen_t addr_len;
86
} UDPQueuedPacketHeader;
87
88
typedef struct UDPContext {
89
    const AVClass *class;
90
    int udp_fd;
91
    int ttl;
92
    int udplite_coverage;
93
    int buffer_size;
94
    int pkt_size;
95
    int is_multicast;
96
    int is_broadcast;
97
    int local_port;
98
    int reuse_socket;
99
    int overrun_nonfatal;
100
    struct sockaddr_storage dest_addr;
101
    int dest_addr_len;
102
    int is_connected;
103
104
    /* Circular Buffer variables for use in UDP receive code */
105
    int circular_buffer_size;
106
    AVFifo *rx_fifo;
107
    AVFifo *tx_fifo;
108
    int circular_buffer_error;
109
    int64_t bitrate; /* number of bits to send per second */
110
    int64_t burst_bits;
111
    int close_req;
112
#if HAVE_PTHREAD_CANCEL
113
    pthread_t circular_buffer_thread;
114
    pthread_mutex_t mutex;
115
    pthread_cond_t cond;
116
    int thread_started;
117
#endif
118
    uint8_t tmp[UDP_MAX_PKT_SIZE + sizeof(UDPQueuedPacketHeader)];
119
    int remaining_in_dg;
120
    char *localaddr;
121
    int timeout;
122
    int dscp;
123
    struct sockaddr_storage local_addr_storage;
124
    char *sources;
125
    char *block;
126
    IPSourceFilters filters;
127
    struct sockaddr_storage last_recv_addr;
128
    socklen_t last_recv_addr_len;
129
} UDPContext;
130
131
#define OFFSET(x) offsetof(UDPContext, x)
132
#define D AV_OPT_FLAG_DECODING_PARAM
133
#define E AV_OPT_FLAG_ENCODING_PARAM
134
static const AVOption options[] = {
135
    { "buffer_size",    "System data size (in bytes)",                     OFFSET(buffer_size),    AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
136
    { "bitrate",        "Bits to send per second",                         OFFSET(bitrate),        AV_OPT_TYPE_INT64,  { .i64 = 0  },     0, INT64_MAX, .flags = E },
137
    { "burst_bits",     "Max length of bursts in bits (when using bitrate)", OFFSET(burst_bits),   AV_OPT_TYPE_INT64,  { .i64 = 0  },     0, INT64_MAX, .flags = E },
138
    { "localport",      "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, D|E },
139
    { "local_port",     "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
140
    { "localaddr",      "Local address",                                   OFFSET(localaddr),      AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
141
    { "udplite_coverage", "choose UDPLite head size which should be validated by checksum", OFFSET(udplite_coverage), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
142
    { "pkt_size",       "Maximum UDP packet size",                         OFFSET(pkt_size),       AV_OPT_TYPE_INT,    { .i64 = 1472 },  -1, INT_MAX, .flags = D|E },
143
    { "reuse",          "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_BOOL,   { .i64 = -1 },    -1, 1,       D|E },
144
    { "reuse_socket",   "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_BOOL,   { .i64 = -1 },    -1, 1,       .flags = D|E },
145
    { "broadcast", "explicitly allow or disallow broadcast destination",   OFFSET(is_broadcast),   AV_OPT_TYPE_BOOL,   { .i64 = 0  },     0, 1,       E },
146
    { "ttl",            "Time to live (multicast only)",                   OFFSET(ttl),            AV_OPT_TYPE_INT,    { .i64 = 16 },     0, 255,     E },
147
    { "dscp",           "DSCP class for outgoing packets",                 OFFSET(dscp),           AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, 63,      E },
148
    { "connect",        "set if connect() should be called on socket",     OFFSET(is_connected),   AV_OPT_TYPE_BOOL,   { .i64 =  0 },     0, 1,       .flags = D|E },
149
    { "fifo_size",      "set the UDP circular buffer size (in 188-byte packets)", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = HAVE_PTHREAD_CANCEL ? 7*4096 : 0}, 0, INT_MAX, D },
150
    { "overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1,    D },
151
    { "timeout",        "set raise error timeout, in microseconds (only in read mode)",OFFSET(timeout),         AV_OPT_TYPE_INT,  {.i64 = 0}, 0, INT_MAX, D },
152
    { "sources",        "Source list",                                     OFFSET(sources),        AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
153
    { "block",          "Block list",                                      OFFSET(block),          AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
154
    { NULL }
155
};
156
157
static const AVClass udp_class = {
158
    .class_name = "udp",
159
    .item_name  = av_default_item_name,
160
    .option     = options,
161
    .version    = LIBAVUTIL_VERSION_INT,
162
};
163
164
static const AVClass udplite_context_class = {
165
    .class_name     = "udplite",
166
    .item_name      = av_default_item_name,
167
    .option         = options,
168
    .version        = LIBAVUTIL_VERSION_INT,
169
};
170
171
static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
172
                                 struct sockaddr *addr,
173
                                 void *logctx)
174
0
{
175
0
    int protocol, cmd;
176
177
    /* There is some confusion in the world whether IP_MULTICAST_TTL
178
     * takes a byte or an int as an argument.
179
     * BSD seems to indicate byte so we are going with that and use
180
     * int and fall back to byte to be safe */
181
0
    switch (addr->sa_family) {
182
0
#ifdef IP_MULTICAST_TTL
183
0
        case AF_INET:
184
0
            protocol = IPPROTO_IP;
185
0
            cmd      = IP_MULTICAST_TTL;
186
0
            break;
187
0
#endif
188
0
#ifdef IPV6_MULTICAST_HOPS
189
0
        case AF_INET6:
190
0
            protocol = IPPROTO_IPV6;
191
0
            cmd      = IPV6_MULTICAST_HOPS;
192
0
            break;
193
0
#endif
194
0
        default:
195
0
            return 0;
196
0
    }
197
198
0
    if (setsockopt(sockfd, protocol, cmd, &mcastTTL, sizeof(mcastTTL)) < 0) {
199
        /* BSD compatibility */
200
0
        unsigned char ttl = (unsigned char) mcastTTL;
201
202
0
        ff_log_net_error(logctx, AV_LOG_DEBUG, "setsockopt(IPV4/IPV6 MULTICAST TTL)");
203
0
        if (setsockopt(sockfd, protocol, cmd, &ttl, sizeof(ttl)) < 0) {
204
0
            ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV4/IPV6 MULTICAST TTL)");
205
0
            return ff_neterrno();
206
0
        }
207
0
    }
208
209
0
    return 0;
210
0
}
211
212
static int udp_join_multicast_group(int sockfd, struct sockaddr *addr,
213
                                    struct sockaddr *local_addr, void *logctx)
214
0
{
215
0
#ifdef IP_ADD_MEMBERSHIP
216
0
    if (addr->sa_family == AF_INET) {
217
0
        struct ip_mreq mreq;
218
219
0
        mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
220
0
        if (local_addr)
221
0
            mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
222
0
        else
223
0
            mreq.imr_interface.s_addr = INADDR_ANY;
224
0
        if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
225
0
            ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)");
226
0
            return ff_neterrno();
227
0
        }
228
0
    }
229
0
#endif
230
0
#if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
231
0
    if (addr->sa_family == AF_INET6) {
232
0
        struct ipv6_mreq mreq6;
233
234
0
        memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
235
        //TODO: Interface index should be looked up from local_addr
236
0
        mreq6.ipv6mr_interface = 0;
237
0
        if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
238
0
            ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)");
239
0
            return ff_neterrno();
240
0
        }
241
0
    }
242
0
#endif
243
0
    return 0;
244
0
}
245
246
static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr,
247
                                     struct sockaddr *local_addr, void *logctx)
248
0
{
249
0
#ifdef IP_DROP_MEMBERSHIP
250
0
    if (addr->sa_family == AF_INET) {
251
0
        struct ip_mreq mreq;
252
253
0
        mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
254
0
        if (local_addr)
255
0
            mreq.imr_interface = ((struct sockaddr_in *)local_addr)->sin_addr;
256
0
        else
257
0
            mreq.imr_interface.s_addr = INADDR_ANY;
258
0
        if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
259
0
            ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)");
260
0
            return -1;
261
0
        }
262
0
    }
263
0
#endif
264
0
#if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
265
0
    if (addr->sa_family == AF_INET6) {
266
0
        struct ipv6_mreq mreq6;
267
268
0
        memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
269
        //TODO: Interface index should be looked up from local_addr
270
0
        mreq6.ipv6mr_interface = 0;
271
0
        if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
272
0
            ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)");
273
0
            return -1;
274
0
        }
275
0
    }
276
0
#endif
277
0
    return 0;
278
0
}
279
280
static int udp_set_multicast_sources(URLContext *h,
281
                                     int sockfd, struct sockaddr *addr,
282
                                     int addr_len, struct sockaddr_storage *local_addr,
283
                                     struct sockaddr_storage *sources,
284
                                     int nb_sources, int include)
285
0
{
286
0
    if (addr->sa_family != AF_INET) {
287
0
#if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE)
288
        /* For IPv4 prefer the old approach, as that alone works reliably on
289
         * Windows and it also supports supplying the interface based on its
290
         * address. */
291
0
        for (int i = 0; i < nb_sources; i++) {
292
0
            struct group_source_req mreqs;
293
0
            int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
294
295
            //TODO: Interface index should be looked up from local_addr
296
0
            mreqs.gsr_interface = 0;
297
0
            memcpy(&mreqs.gsr_group, addr, addr_len);
298
0
            memcpy(&mreqs.gsr_source, &sources[i], sizeof(*sources));
299
300
0
            if (setsockopt(sockfd, level,
301
0
                           include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
302
0
                           (const void *)&mreqs, sizeof(mreqs)) < 0) {
303
0
                if (include)
304
0
                    ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
305
0
                else
306
0
                    ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)");
307
0
                return ff_neterrno();
308
0
            }
309
0
        }
310
0
        return 0;
311
#else
312
        av_log(h, AV_LOG_ERROR,
313
               "Setting multicast sources only supported for IPv4\n");
314
        return AVERROR(EINVAL);
315
#endif
316
0
    }
317
0
#if HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
318
0
    for (int i = 0; i < nb_sources; i++) {
319
0
        struct ip_mreq_source mreqs;
320
0
        if (sources[i].ss_family != AF_INET) {
321
0
            av_log(h, AV_LOG_ERROR, "Source/block address %d is of incorrect protocol family\n", i + 1);
322
0
            return AVERROR(EINVAL);
323
0
        }
324
325
0
        mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
326
0
        if (local_addr)
327
0
            mreqs.imr_interface = ((struct sockaddr_in *)local_addr)->sin_addr;
328
0
        else
329
0
            mreqs.imr_interface.s_addr = INADDR_ANY;
330
0
        mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)&sources[i])->sin_addr.s_addr;
331
332
0
        if (setsockopt(sockfd, IPPROTO_IP,
333
0
                       include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
334
0
                       (const void *)&mreqs, sizeof(mreqs)) < 0) {
335
0
            if (include)
336
0
                ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
337
0
            else
338
0
                ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
339
0
            return ff_neterrno();
340
0
        }
341
0
    }
342
#else
343
    return AVERROR(ENOSYS);
344
#endif
345
0
    return 0;
346
0
}
347
static int udp_set_url(URLContext *h,
348
                       struct sockaddr_storage *addr,
349
                       const char *hostname, int port)
350
0
{
351
0
    struct addrinfo *res0;
352
0
    int addr_len;
353
354
0
    res0 = ff_ip_resolve_host(h, hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
355
0
    if (!res0) return AVERROR(EIO);
356
0
    memcpy(addr, res0->ai_addr, res0->ai_addrlen);
357
0
    addr_len = res0->ai_addrlen;
358
0
    freeaddrinfo(res0);
359
360
0
    return addr_len;
361
0
}
362
363
static int udp_socket_create(URLContext *h, struct sockaddr_storage *addr,
364
                             socklen_t *addr_len, const char *localaddr)
365
0
{
366
0
    UDPContext *s = h->priv_data;
367
0
    int udp_fd = -1;
368
0
    struct addrinfo *res0, *res;
369
0
    int family = AF_UNSPEC;
370
371
0
    if (((struct sockaddr *) &s->dest_addr)->sa_family)
372
0
        family = ((struct sockaddr *) &s->dest_addr)->sa_family;
373
0
    res0 = ff_ip_resolve_host(h, (localaddr && localaddr[0]) ? localaddr : NULL,
374
0
                            s->local_port,
375
0
                            SOCK_DGRAM, family, AI_PASSIVE);
376
0
    if (!res0)
377
0
        goto fail;
378
0
    for (res = res0; res; res=res->ai_next) {
379
0
        if (s->udplite_coverage)
380
0
            udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, IPPROTO_UDPLITE, h);
381
0
        else
382
0
            udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, 0, h);
383
0
        if (udp_fd != -1) break;
384
0
        ff_log_net_error(h, AV_LOG_ERROR, "socket");
385
0
    }
386
387
0
    if (udp_fd < 0)
388
0
        goto fail;
389
390
0
    memcpy(addr, res->ai_addr, res->ai_addrlen);
391
0
    *addr_len = res->ai_addrlen;
392
393
0
    freeaddrinfo(res0);
394
395
0
    return udp_fd;
396
397
0
 fail:
398
0
    if (udp_fd >= 0)
399
0
        closesocket(udp_fd);
400
0
    if(res0)
401
0
        freeaddrinfo(res0);
402
0
    return -1;
403
0
}
404
405
static int udp_port(struct sockaddr_storage *addr, int addr_len)
406
0
{
407
0
    char sbuf[sizeof(int)*3+1];
408
0
    int error;
409
410
0
    if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0,  sbuf, sizeof(sbuf), NI_NUMERICSERV)) != 0) {
411
0
        av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
412
0
        return -1;
413
0
    }
414
415
0
    return strtol(sbuf, NULL, 10);
416
0
}
417
418
419
/**
420
 * If no filename is given to av_open_input_file because you want to
421
 * get the local port first, then you must call this function to set
422
 * the remote server address.
423
 *
424
 * url syntax: udp://host:port[?option=val...]
425
 * option: 'ttl=n'       : set the ttl value (for multicast only)
426
 *         'localport=n' : set the local port
427
 *         'pkt_size=n'  : set max packet size
428
 *         'reuse=1'     : enable reusing the socket
429
 *         'overrun_nonfatal=1': survive in case of circular buffer overrun
430
 *
431
 * @param h media file context
432
 * @param uri of the remote server
433
 * @return zero if no error.
434
 */
435
int ff_udp_set_remote_url(URLContext *h, const char *uri)
436
0
{
437
0
    UDPContext *s = h->priv_data;
438
0
    char hostname[256], buf[10];
439
0
    int port;
440
0
    const char *p;
441
442
0
    av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
443
444
    /* set the destination address */
445
0
    s->dest_addr_len = udp_set_url(h, &s->dest_addr, hostname, port);
446
0
    if (s->dest_addr_len < 0) {
447
0
        return AVERROR(EIO);
448
0
    }
449
0
    s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
450
0
    p = strchr(uri, '?');
451
0
    if (p) {
452
0
        if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
453
0
            int was_connected = s->is_connected;
454
0
            s->is_connected = strtol(buf, NULL, 10);
455
0
            if (s->is_connected && !was_connected) {
456
0
                if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
457
0
                            s->dest_addr_len)) {
458
0
                    s->is_connected = 0;
459
0
                    ff_log_net_error(h, AV_LOG_ERROR, "connect");
460
0
                    return AVERROR(EIO);
461
0
                }
462
0
            }
463
0
        }
464
0
    }
465
466
0
    return 0;
467
0
}
468
469
/**
470
 * This function is identical to ff_udp_set_remote_url, except that it takes a sockaddr directly.
471
 */
472
int ff_udp_set_remote_addr(URLContext *h, const struct sockaddr *dest_addr, socklen_t dest_addr_len, int do_connect)
473
0
{
474
0
    UDPContext *s = h->priv_data;
475
476
    /* set the destination address */
477
0
    if ((size_t)dest_addr_len > sizeof(s->dest_addr))
478
0
        return AVERROR(EIO);
479
0
    s->dest_addr_len = dest_addr_len;
480
0
    memcpy(&s->dest_addr, dest_addr, dest_addr_len);
481
482
0
    s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
483
0
    if (do_connect >= 0) {
484
0
        int was_connected = s->is_connected;
485
0
        s->is_connected = do_connect;
486
0
        if (s->is_connected && !was_connected) {
487
0
            if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
488
0
                        s->dest_addr_len)) {
489
0
                s->is_connected = 0;
490
0
                ff_log_net_error(h, AV_LOG_ERROR, "connect");
491
0
                return AVERROR(EIO);
492
0
            }
493
0
        }
494
0
    }
495
496
0
    return 0;
497
0
}
498
499
/**
500
 * Return the local port used by the UDP connection
501
 * @param h media file context
502
 * @return the local port number
503
 */
504
int ff_udp_get_local_port(URLContext *h)
505
0
{
506
0
    UDPContext *s = h->priv_data;
507
0
    return s->local_port;
508
0
}
509
510
void ff_udp_get_last_recv_addr(URLContext *h, struct sockaddr_storage *addr, socklen_t *addr_len)
511
0
{
512
0
    UDPContext *s = h->priv_data;
513
0
    *addr = s->last_recv_addr;
514
0
    *addr_len = s->last_recv_addr_len;
515
0
}
516
517
/**
518
 * Return the udp file handle for select() usage to wait for several RTP
519
 * streams at the same time.
520
 * @param h media file context
521
 */
522
static int udp_get_file_handle(URLContext *h)
523
0
{
524
0
    UDPContext *s = h->priv_data;
525
0
    return s->udp_fd;
526
0
}
527
528
#if HAVE_PTHREAD_CANCEL
529
static void *circular_buffer_task_rx( void *_URLContext)
530
0
{
531
0
    URLContext *h = _URLContext;
532
0
    UDPContext *s = h->priv_data;
533
0
    int old_cancelstate;
534
535
0
    ff_thread_setname("udp-rx");
536
537
0
    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
538
0
    pthread_mutex_lock(&s->mutex);
539
0
    if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
540
0
        av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
541
0
        s->circular_buffer_error = AVERROR(EIO);
542
0
        goto end;
543
0
    }
544
0
    while(1) {
545
0
        UDPQueuedPacketHeader pkt_header;
546
0
        pkt_header.addr_len = sizeof(pkt_header.addr);
547
548
0
        pthread_mutex_unlock(&s->mutex);
549
        /* Blocking operations are always cancellation points;
550
           see "General Information" / "Thread Cancellation Overview"
551
           in Single Unix. */
552
0
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
553
0
        pkt_header.pkt_size = recvfrom(s->udp_fd, s->tmp + sizeof(pkt_header), sizeof(s->tmp) - sizeof(pkt_header), 0, (struct sockaddr *)&pkt_header.addr, &pkt_header.addr_len);
554
0
        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
555
0
        pthread_mutex_lock(&s->mutex);
556
0
        if (pkt_header.pkt_size < 0) {
557
0
            if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
558
0
                s->circular_buffer_error = ff_neterrno();
559
0
                goto end;
560
0
            }
561
0
            continue;
562
0
        }
563
0
        if (ff_ip_check_source_lists(&pkt_header.addr, &s->filters))
564
0
            continue;
565
0
        memcpy(s->tmp, &pkt_header, sizeof(pkt_header));
566
567
0
        if (av_fifo_can_write(s->rx_fifo) < pkt_header.pkt_size + sizeof(pkt_header)) {
568
            /* No Space left */
569
0
            if (s->overrun_nonfatal) {
570
0
                av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
571
0
                        "Surviving due to overrun_nonfatal option\n");
572
0
                continue;
573
0
            } else {
574
0
                av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
575
0
                        "To avoid, increase fifo_size URL option. "
576
0
                        "To survive in such case, use overrun_nonfatal option\n");
577
0
                s->circular_buffer_error = AVERROR(EIO);
578
0
                goto end;
579
0
            }
580
0
        }
581
0
        av_fifo_write(s->rx_fifo, s->tmp, pkt_header.pkt_size + sizeof(pkt_header));
582
0
        pthread_cond_signal(&s->cond);
583
0
    }
584
585
0
end:
586
0
    pthread_cond_signal(&s->cond);
587
0
    pthread_mutex_unlock(&s->mutex);
588
0
    return NULL;
589
0
}
590
591
static void *circular_buffer_task_tx( void *_URLContext)
592
0
{
593
0
    URLContext *h = _URLContext;
594
0
    UDPContext *s = h->priv_data;
595
0
    int64_t target_timestamp = av_gettime_relative();
596
0
    int64_t start_timestamp = av_gettime_relative();
597
0
    int64_t sent_bits = 0;
598
0
    int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
599
0
    int64_t max_delay = s->bitrate ?  ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
600
601
0
    ff_thread_setname("udp-tx");
602
603
0
    pthread_mutex_lock(&s->mutex);
604
605
0
    if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
606
0
        av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
607
0
        s->circular_buffer_error = AVERROR(EIO);
608
0
        goto end;
609
0
    }
610
611
0
    for(;;) {
612
0
        int len;
613
0
        const uint8_t *p;
614
0
        uint8_t tmp[4];
615
0
        int64_t timestamp;
616
617
0
        len = av_fifo_can_read(s->tx_fifo);
618
619
0
        while (len<4) {
620
0
            if (s->close_req)
621
0
                goto end;
622
0
            pthread_cond_wait(&s->cond, &s->mutex);
623
0
            len = av_fifo_can_read(s->tx_fifo);
624
0
        }
625
626
0
        av_fifo_read(s->tx_fifo, tmp, 4);
627
0
        len = AV_RL32(tmp);
628
629
0
        av_assert0(len >= 0);
630
0
        av_assert0(len <= sizeof(s->tmp));
631
632
0
        av_fifo_read(s->tx_fifo, s->tmp, len);
633
634
0
        pthread_mutex_unlock(&s->mutex);
635
636
0
        if (s->bitrate) {
637
0
            timestamp = av_gettime_relative();
638
0
            if (timestamp < target_timestamp) {
639
0
                int64_t delay = target_timestamp - timestamp;
640
0
                if (delay > max_delay) {
641
0
                    delay = max_delay;
642
0
                    start_timestamp = timestamp + delay;
643
0
                    sent_bits = 0;
644
0
                }
645
0
                av_usleep(delay);
646
0
            } else {
647
0
                if (timestamp - burst_interval > target_timestamp) {
648
0
                    start_timestamp = timestamp - burst_interval;
649
0
                    sent_bits = 0;
650
0
                }
651
0
            }
652
0
            sent_bits += len * 8;
653
0
            target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate;
654
0
        }
655
656
0
        p = s->tmp;
657
0
        while (len) {
658
0
            int ret;
659
0
            av_assert0(len > 0);
660
0
            if (!s->is_connected) {
661
0
                ret = sendto (s->udp_fd, p, len, 0,
662
0
                            (struct sockaddr *) &s->dest_addr,
663
0
                            s->dest_addr_len);
664
0
            } else
665
0
                ret = send(s->udp_fd, p, len, 0);
666
0
            if (ret >= 0) {
667
0
                len -= ret;
668
0
                p   += ret;
669
0
            } else {
670
0
                ret = ff_neterrno();
671
0
                if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
672
0
                    pthread_mutex_lock(&s->mutex);
673
0
                    s->circular_buffer_error = ret;
674
0
                    pthread_mutex_unlock(&s->mutex);
675
0
                    return NULL;
676
0
                }
677
0
            }
678
0
        }
679
680
0
        pthread_mutex_lock(&s->mutex);
681
0
    }
682
683
0
end:
684
0
    pthread_mutex_unlock(&s->mutex);
685
0
    return NULL;
686
0
}
687
688
689
#endif
690
691
/* put it in UDP context */
692
/* return non zero if error */
693
static int udp_open(URLContext *h, const char *uri, int flags)
694
0
{
695
0
    char hostname[1024];
696
0
    int port, udp_fd = -1, tmp, bind_ret = -1;
697
0
    UDPContext *s = h->priv_data;
698
0
    int is_output;
699
0
    const char *p;
700
0
    struct sockaddr_storage my_addr;
701
0
    socklen_t len;
702
0
    int ret;
703
704
0
    h->is_streamed = 1;
705
706
0
    is_output = !(flags & AVIO_FLAG_READ);
707
0
    if (s->buffer_size < 0)
708
0
        s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_RX_BUF_SIZE;
709
710
0
    p = strchr(uri, '?');
711
0
    if (p) {
712
0
        ret = ff_parse_opts_from_query_string(s, p, 1);
713
0
        if (ret < 0)
714
0
            goto fail;
715
0
    }
716
0
    if (!HAVE_PTHREAD_CANCEL) {
717
0
        int64_t      optvals[] = {s->overrun_nonfatal, s->bitrate, s->circular_buffer_size};
718
0
        const char* optnames[] = {  "overrun_nonfatal",  "bitrate",  "fifo_size"};
719
0
        for (unsigned i = 0; i < FF_ARRAY_ELEMS(optvals); i++) {
720
0
            if (optvals[i])
721
0
                av_log(h, AV_LOG_WARNING,
722
0
                       "'%s' option was set but it is not supported "
723
0
                       "on this build (pthread support is required)\n", optnames[i]);
724
0
        }
725
0
    }
726
0
    if (s->sources) {
727
0
        if ((ret = ff_ip_parse_sources(h, s->sources, &s->filters)) < 0)
728
0
            goto fail;
729
0
    }
730
0
    if (s->block) {
731
0
        if ((ret = ff_ip_parse_blocks(h, s->block, &s->filters)) < 0)
732
0
            goto fail;
733
0
    }
734
735
    /* handling needed to support options picking from both AVOption and URL */
736
0
    s->circular_buffer_size *= 188;
737
0
    if (flags & AVIO_FLAG_WRITE) {
738
0
        h->max_packet_size = s->pkt_size;
739
0
    } else {
740
0
        h->max_packet_size = UDP_MAX_PKT_SIZE;
741
0
    }
742
0
    h->rw_timeout = s->timeout;
743
744
    /* fill the dest addr */
745
0
    av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
746
747
    /* XXX: fix av_url_split */
748
0
    if (hostname[0] == '\0' || hostname[0] == '?') {
749
        /* only accepts null hostname if input */
750
0
        if (!(flags & AVIO_FLAG_READ)) {
751
0
            ret = AVERROR(EINVAL);
752
0
            goto fail;
753
0
        }
754
0
    } else {
755
0
        if ((ret = ff_udp_set_remote_url(h, uri)) < 0)
756
0
            goto fail;
757
0
    }
758
759
0
    if ((s->is_multicast || s->local_port < 0) && (h->flags & AVIO_FLAG_READ))
760
0
        s->local_port = port;
761
762
0
    udp_fd = udp_socket_create(h, &my_addr, &len, s->localaddr);
763
0
    if (udp_fd < 0) {
764
0
        ret = AVERROR(EIO);
765
0
        goto fail;
766
0
    }
767
768
0
    s->local_addr_storage=my_addr; //store for future multicast join
769
770
    /* Follow the requested reuse option, unless it's multicast in which
771
     * case enable reuse unless explicitly disabled.
772
     */
773
0
    if (s->reuse_socket > 0 || (s->is_multicast && s->reuse_socket < 0)) {
774
0
        s->reuse_socket = 1;
775
0
        if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0) {
776
0
            ret = ff_neterrno();
777
0
            goto fail;
778
0
        }
779
0
    }
780
781
0
    if (s->is_broadcast) {
782
0
#ifdef SO_BROADCAST
783
0
        if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(s->is_broadcast), sizeof(s->is_broadcast)) != 0) {
784
0
            ret = ff_neterrno();
785
0
            goto fail;
786
0
        }
787
#else
788
        ret = AVERROR(ENOSYS);
789
        goto fail;
790
#endif
791
0
    }
792
793
    /* Set the checksum coverage for UDP-Lite (RFC 3828) for sending and receiving.
794
     * The receiver coverage has to be less than or equal to the sender coverage.
795
     * Otherwise, the receiver will drop all packets.
796
     */
797
0
    if (s->udplite_coverage) {
798
0
        if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_SEND_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
799
0
            av_log(h, AV_LOG_WARNING, "socket option UDPLITE_SEND_CSCOV not available");
800
801
0
        if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_RECV_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
802
0
            av_log(h, AV_LOG_WARNING, "socket option UDPLITE_RECV_CSCOV not available");
803
0
    }
804
805
0
    if (s->dscp >= 0) {
806
0
        int dscp = s->dscp << 2;
807
0
        if (setsockopt (udp_fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp)) != 0) {
808
0
            ret = ff_neterrno();
809
0
            goto fail;
810
0
        }
811
0
    }
812
813
    /* If multicast, try binding the multicast address first, to avoid
814
     * receiving UDP packets from other sources aimed at the same UDP
815
     * port. This fails on windows. This makes sending to the same address
816
     * using sendto() fail, so only do it if we're opened in read-only mode. */
817
0
    if (s->is_multicast && (h->flags & AVIO_FLAG_READ)) {
818
0
        bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
819
0
    }
820
    /* bind to the local address if not multicast or if the multicast
821
     * bind failed */
822
    /* the bind is needed to give a port to the socket now */
823
0
    if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
824
0
        ff_log_net_error(h, AV_LOG_ERROR, "bind failed");
825
0
        ret = ff_neterrno();
826
0
        goto fail;
827
0
    }
828
829
0
    len = sizeof(my_addr);
830
0
    getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
831
0
    s->local_port = udp_port(&my_addr, len);
832
833
0
    if (s->is_multicast) {
834
0
        if (h->flags & AVIO_FLAG_WRITE) {
835
            /* output */
836
0
            if ((ret = udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr, h)) < 0)
837
0
                goto fail;
838
0
        }
839
0
        if (h->flags & AVIO_FLAG_READ) {
840
            /* input */
841
0
            if (s->filters.nb_include_addrs) {
842
0
                if ((ret = udp_set_multicast_sources(h, udp_fd,
843
0
                                              (struct sockaddr *)&s->dest_addr,
844
0
                                              s->dest_addr_len, &s->local_addr_storage,
845
0
                                              s->filters.include_addrs,
846
0
                                              s->filters.nb_include_addrs, 1)) < 0)
847
0
                    goto fail;
848
0
            } else {
849
0
                if ((ret = udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr,
850
0
                                                    (struct sockaddr *)&s->local_addr_storage, h)) < 0)
851
0
                    goto fail;
852
0
            }
853
0
            if (s->filters.nb_exclude_addrs) {
854
0
                if ((ret = udp_set_multicast_sources(h, udp_fd,
855
0
                                              (struct sockaddr *)&s->dest_addr,
856
0
                                              s->dest_addr_len, &s->local_addr_storage,
857
0
                                              s->filters.exclude_addrs,
858
0
                                              s->filters.nb_exclude_addrs, 0)) < 0)
859
0
                    goto fail;
860
0
            }
861
0
        }
862
0
    }
863
864
0
    if (is_output) {
865
        /* limit the tx buf size to limit latency */
866
0
        tmp = s->buffer_size;
867
0
        if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
868
0
            ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
869
0
            ret = ff_neterrno();
870
0
            goto fail;
871
0
        }
872
0
    } else {
873
        /* set udp recv buffer size to the requested value (default UDP_RX_BUF_SIZE) */
874
0
        tmp = s->buffer_size;
875
0
        if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
876
0
            ff_log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
877
0
        }
878
0
        len = sizeof(tmp);
879
0
        if (getsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, &len) < 0) {
880
0
            ff_log_net_error(h, AV_LOG_WARNING, "getsockopt(SO_RCVBUF)");
881
0
        } else {
882
0
            av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp);
883
0
            if(tmp < s->buffer_size)
884
0
                av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d\n", s->buffer_size, tmp);
885
0
        }
886
887
        /* make the socket non-blocking */
888
0
        ff_socket_nonblock(udp_fd, 1);
889
0
    }
890
0
    if (s->is_connected) {
891
0
        if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
892
0
            ff_log_net_error(h, AV_LOG_ERROR, "connect");
893
0
            ret = ff_neterrno();
894
0
            goto fail;
895
0
        }
896
0
    }
897
898
0
    s->udp_fd = udp_fd;
899
900
0
#if HAVE_PTHREAD_CANCEL
901
    /*
902
      Create thread in case of:
903
      1. Input and circular_buffer_size is set
904
      2. Output and bitrate and circular_buffer_size is set
905
    */
906
907
0
    if (is_output && s->bitrate && !s->circular_buffer_size) {
908
        /* Warn user in case of 'circular_buffer_size' is not set */
909
0
        av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
910
0
    }
911
912
0
    if ((!is_output && s->circular_buffer_size) || (is_output && s->bitrate && s->circular_buffer_size)) {
913
        /* start the task going */
914
0
        AVFifo *fifo = av_fifo_alloc2(s->circular_buffer_size, 1, 0);
915
0
        if (!fifo) {
916
0
            ret = AVERROR(ENOMEM);
917
0
            goto fail;
918
0
        }
919
0
        if (is_output)
920
0
            s->tx_fifo = fifo;
921
0
        else
922
0
            s->rx_fifo = fifo;
923
0
        ret = pthread_mutex_init(&s->mutex, NULL);
924
0
        if (ret != 0) {
925
0
            av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
926
0
            ret = AVERROR(ret);
927
0
            goto fail;
928
0
        }
929
0
        ret = pthread_cond_init(&s->cond, NULL);
930
0
        if (ret != 0) {
931
0
            av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
932
0
            ret = AVERROR(ret);
933
0
            goto cond_fail;
934
0
        }
935
0
        ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
936
0
        if (ret != 0) {
937
0
            av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
938
0
            ret = AVERROR(ret);
939
0
            goto thread_fail;
940
0
        }
941
0
        s->thread_started = 1;
942
0
    }
943
0
#endif
944
945
0
    return 0;
946
0
#if HAVE_PTHREAD_CANCEL
947
0
 thread_fail:
948
0
    pthread_cond_destroy(&s->cond);
949
0
 cond_fail:
950
0
    pthread_mutex_destroy(&s->mutex);
951
0
#endif
952
0
 fail:
953
0
    if (udp_fd >= 0)
954
0
        closesocket(udp_fd);
955
0
    av_fifo_freep2(&s->rx_fifo);
956
0
    av_fifo_freep2(&s->tx_fifo);
957
0
    ff_ip_reset_filters(&s->filters);
958
0
    return ret;
959
0
}
960
961
static int udplite_open(URLContext *h, const char *uri, int flags)
962
0
{
963
0
    UDPContext *s = h->priv_data;
964
965
    // set default checksum coverage
966
0
    s->udplite_coverage = UDP_HEADER_SIZE;
967
968
0
    return udp_open(h, uri, flags);
969
0
}
970
971
static int udp_read(URLContext *h, uint8_t *buf, int size)
972
0
{
973
0
    UDPContext *s = h->priv_data;
974
0
    int ret;
975
0
#if HAVE_PTHREAD_CANCEL
976
0
    int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
977
978
0
    if (s->rx_fifo) {
979
0
        pthread_mutex_lock(&s->mutex);
980
0
        do {
981
0
            avail = av_fifo_can_read(s->rx_fifo);
982
0
            if (avail) { // >=size) {
983
0
                UDPQueuedPacketHeader header;
984
985
0
                av_fifo_read(s->rx_fifo, &header, sizeof(header));
986
987
0
                s->last_recv_addr = header.addr;
988
0
                s->last_recv_addr_len = header.addr_len;
989
990
0
                avail = header.pkt_size;
991
0
                if(avail > size){
992
0
                    av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
993
0
                    avail = size;
994
0
                }
995
996
0
                av_fifo_read(s->rx_fifo, buf, avail);
997
0
                av_fifo_drain2(s->rx_fifo, header.pkt_size - avail);
998
0
                pthread_mutex_unlock(&s->mutex);
999
0
                return avail;
1000
0
            } else if(s->circular_buffer_error){
1001
0
                int err = s->circular_buffer_error;
1002
0
                pthread_mutex_unlock(&s->mutex);
1003
0
                return err;
1004
0
            } else if(nonblock) {
1005
0
                pthread_mutex_unlock(&s->mutex);
1006
0
                return AVERROR(EAGAIN);
1007
0
            } else {
1008
                /* FIXME: using the monotonic clock would be better,
1009
                   but it does not exist on all supported platforms. */
1010
0
                int64_t t = av_gettime() + 100000;
1011
0
                struct timespec tv = { .tv_sec  =  t / 1000000,
1012
0
                                       .tv_nsec = (t % 1000000) * 1000 };
1013
0
                int err = pthread_cond_timedwait(&s->cond, &s->mutex, &tv);
1014
0
                if (err) {
1015
0
                    pthread_mutex_unlock(&s->mutex);
1016
0
                    return AVERROR(err == ETIMEDOUT ? EAGAIN : err);
1017
0
                }
1018
0
                nonblock = 1;
1019
0
            }
1020
0
        } while(1);
1021
0
    }
1022
0
#endif
1023
1024
0
    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
1025
0
        ret = ff_network_wait_fd(s->udp_fd, 0);
1026
0
        if (ret < 0)
1027
0
            return ret;
1028
0
    }
1029
0
    s->last_recv_addr_len = sizeof(s->last_recv_addr);
1030
0
    ret = recvfrom(s->udp_fd, buf, size, 0, (struct sockaddr *)&s->last_recv_addr, &s->last_recv_addr_len);
1031
0
    if (ret < 0)
1032
0
        return ff_neterrno();
1033
0
    if (ff_ip_check_source_lists(&s->last_recv_addr, &s->filters))
1034
0
        return AVERROR(EINTR);
1035
0
    return ret;
1036
0
}
1037
1038
static int udp_write(URLContext *h, const uint8_t *buf, int size)
1039
0
{
1040
0
    UDPContext *s = h->priv_data;
1041
0
    int ret;
1042
1043
0
#if HAVE_PTHREAD_CANCEL
1044
0
    if (s->tx_fifo) {
1045
0
        uint8_t tmp[4];
1046
1047
0
        pthread_mutex_lock(&s->mutex);
1048
1049
        /*
1050
          Return error if last tx failed.
1051
          Here we can't know on which packet error was, but it needs to know that error exists.
1052
        */
1053
0
        if (s->circular_buffer_error<0) {
1054
0
            int err = s->circular_buffer_error;
1055
0
            pthread_mutex_unlock(&s->mutex);
1056
0
            return err;
1057
0
        }
1058
1059
0
        if (av_fifo_can_write(s->tx_fifo) < size + 4) {
1060
            /* What about a partial packet tx ? */
1061
0
            pthread_mutex_unlock(&s->mutex);
1062
0
            return AVERROR(ENOMEM);
1063
0
        }
1064
0
        AV_WL32(tmp, size);
1065
0
        av_fifo_write(s->tx_fifo, tmp, 4); /* size of packet */
1066
0
        av_fifo_write(s->tx_fifo, buf, size); /* the data */
1067
0
        pthread_cond_signal(&s->cond);
1068
0
        pthread_mutex_unlock(&s->mutex);
1069
0
        return size;
1070
0
    }
1071
0
#endif
1072
0
    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
1073
0
        ret = ff_network_wait_fd(s->udp_fd, 1);
1074
0
        if (ret < 0)
1075
0
            return ret;
1076
0
    }
1077
1078
0
    if (!s->is_connected) {
1079
0
        ret = sendto (s->udp_fd, buf, size, 0,
1080
0
                      (struct sockaddr *) &s->dest_addr,
1081
0
                      s->dest_addr_len);
1082
0
    } else
1083
0
        ret = send(s->udp_fd, buf, size, 0);
1084
1085
0
    return ret < 0 ? ff_neterrno() : ret;
1086
0
}
1087
1088
static int udp_close(URLContext *h)
1089
0
{
1090
0
    UDPContext *s = h->priv_data;
1091
1092
0
#if HAVE_PTHREAD_CANCEL
1093
    // Request close once writing is finished
1094
0
    if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
1095
0
        pthread_mutex_lock(&s->mutex);
1096
0
        s->close_req = 1;
1097
0
        pthread_cond_signal(&s->cond);
1098
0
        pthread_mutex_unlock(&s->mutex);
1099
0
    }
1100
0
#endif
1101
1102
0
    if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
1103
0
        udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,
1104
0
                                  (struct sockaddr *)&s->local_addr_storage, h);
1105
0
#if HAVE_PTHREAD_CANCEL
1106
0
    if (s->thread_started) {
1107
0
        int ret;
1108
        // Cancel only read, as write has been signaled as success to the user
1109
0
        if (h->flags & AVIO_FLAG_READ) {
1110
#ifdef _WIN32
1111
            /* recvfrom() is not a cancellation point for win32, so we shutdown
1112
             * the socket and abort pending IO, subsequent recvfrom() calls
1113
             * will fail with WSAESHUTDOWN causing the thread to exit. */
1114
            shutdown(s->udp_fd, SD_RECEIVE);
1115
            CancelIoEx((HANDLE)(SOCKET)s->udp_fd, NULL);
1116
#else
1117
0
            pthread_cancel(s->circular_buffer_thread);
1118
0
#endif
1119
0
        }
1120
0
        ret = pthread_join(s->circular_buffer_thread, NULL);
1121
0
        if (ret != 0)
1122
0
            av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
1123
0
        pthread_mutex_destroy(&s->mutex);
1124
0
        pthread_cond_destroy(&s->cond);
1125
0
    }
1126
0
#endif
1127
0
    closesocket(s->udp_fd);
1128
0
    av_fifo_freep2(&s->rx_fifo);
1129
0
    av_fifo_freep2(&s->tx_fifo);
1130
0
    ff_ip_reset_filters(&s->filters);
1131
0
    return 0;
1132
0
}
1133
1134
const URLProtocol ff_udp_protocol = {
1135
    .name                = "udp",
1136
    .url_open            = udp_open,
1137
    .url_read            = udp_read,
1138
    .url_write           = udp_write,
1139
    .url_close           = udp_close,
1140
    .url_get_file_handle = udp_get_file_handle,
1141
    .priv_data_size      = sizeof(UDPContext),
1142
    .priv_data_class     = &udp_class,
1143
    .flags               = URL_PROTOCOL_FLAG_NETWORK,
1144
};
1145
1146
const URLProtocol ff_udplite_protocol = {
1147
    .name                = "udplite",
1148
    .url_open            = udplite_open,
1149
    .url_read            = udp_read,
1150
    .url_write           = udp_write,
1151
    .url_close           = udp_close,
1152
    .url_get_file_handle = udp_get_file_handle,
1153
    .priv_data_size      = sizeof(UDPContext),
1154
    .priv_data_class     = &udplite_context_class,
1155
    .flags               = URL_PROTOCOL_FLAG_NETWORK,
1156
};