/src/ffmpeg/libavformat/udp.c
Line | Count | Source |
1 | | /* |
2 | | * UDP prototype streaming system |
3 | | * Copyright (c) 2000, 2001, 2002 Fabrice Bellard |
4 | | * |
5 | | * This file is part of FFmpeg. |
6 | | * |
7 | | * FFmpeg is free software; you can redistribute it and/or |
8 | | * modify it under the terms of the GNU Lesser General Public |
9 | | * License as published by the Free Software Foundation; either |
10 | | * version 2.1 of the License, or (at your option) any later version. |
11 | | * |
12 | | * FFmpeg is distributed in the hope that it will be useful, |
13 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
14 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
15 | | * Lesser General Public License for more details. |
16 | | * |
17 | | * You should have received a copy of the GNU Lesser General Public |
18 | | * License along with FFmpeg; if not, write to the Free Software |
19 | | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
20 | | */ |
21 | | |
22 | | /** |
23 | | * @file |
24 | | * UDP protocol |
25 | | */ |
26 | | |
27 | | #define _DEFAULT_SOURCE |
28 | | #define _BSD_SOURCE /* Needed for using struct ip_mreq with recent glibc */ |
29 | | |
30 | | #include "avformat.h" |
31 | | #include "libavutil/avassert.h" |
32 | | #include "libavutil/mem.h" |
33 | | #include "libavutil/parseutils.h" |
34 | | #include "libavutil/fifo.h" |
35 | | #include "libavutil/intreadwrite.h" |
36 | | #include "libavutil/opt.h" |
37 | | #include "libavutil/log.h" |
38 | | #include "libavutil/time.h" |
39 | | #include "internal.h" |
40 | | #include "network.h" |
41 | | #include "os_support.h" |
42 | | #include "url.h" |
43 | | #include "ip.h" |
44 | | |
45 | | #ifdef __APPLE__ |
46 | | #include "TargetConditionals.h" |
47 | | #endif |
48 | | |
49 | | #if HAVE_UDPLITE_H |
50 | | #include "udplite.h" |
51 | | #else |
52 | | /* On many Linux systems, udplite.h is missing but the kernel supports UDP-Lite. |
53 | | * So, we provide a fallback here. |
54 | | */ |
55 | 0 | #define UDPLITE_SEND_CSCOV 10 |
56 | 0 | #define UDPLITE_RECV_CSCOV 11 |
57 | | #endif |
58 | | |
59 | | #ifndef IPPROTO_UDPLITE |
60 | | #define IPPROTO_UDPLITE 136 |
61 | | #endif |
62 | | |
63 | | #if HAVE_W32THREADS |
64 | | #undef HAVE_PTHREAD_CANCEL |
65 | | #define HAVE_PTHREAD_CANCEL 1 |
66 | | #endif |
67 | | |
68 | | #if HAVE_PTHREAD_CANCEL |
69 | | #include "libavutil/thread.h" |
70 | | #endif |
71 | | |
72 | | #ifndef IPV6_ADD_MEMBERSHIP |
73 | | #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP |
74 | | #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP |
75 | | #endif |
76 | | |
77 | 0 | #define UDP_TX_BUF_SIZE 32768 |
78 | 0 | #define UDP_RX_BUF_SIZE 393216 |
79 | 0 | #define UDP_MAX_PKT_SIZE 65536 |
80 | 0 | #define UDP_HEADER_SIZE 8 |
81 | | |
82 | | typedef struct UDPQueuedPacketHeader { |
83 | | int pkt_size; |
84 | | struct sockaddr_storage addr; |
85 | | socklen_t addr_len; |
86 | | } UDPQueuedPacketHeader; |
87 | | |
88 | | typedef struct UDPContext { |
89 | | const AVClass *class; |
90 | | int udp_fd; |
91 | | int ttl; |
92 | | int udplite_coverage; |
93 | | int buffer_size; |
94 | | int pkt_size; |
95 | | int is_multicast; |
96 | | int is_broadcast; |
97 | | int local_port; |
98 | | int reuse_socket; |
99 | | int overrun_nonfatal; |
100 | | struct sockaddr_storage dest_addr; |
101 | | int dest_addr_len; |
102 | | int is_connected; |
103 | | |
104 | | /* Circular Buffer variables for use in UDP receive code */ |
105 | | int circular_buffer_size; |
106 | | AVFifo *rx_fifo; |
107 | | AVFifo *tx_fifo; |
108 | | int circular_buffer_error; |
109 | | int64_t bitrate; /* number of bits to send per second */ |
110 | | int64_t burst_bits; |
111 | | int close_req; |
112 | | #if HAVE_PTHREAD_CANCEL |
113 | | pthread_t circular_buffer_thread; |
114 | | pthread_mutex_t mutex; |
115 | | pthread_cond_t cond; |
116 | | int thread_started; |
117 | | #endif |
118 | | uint8_t tmp[UDP_MAX_PKT_SIZE + sizeof(UDPQueuedPacketHeader)]; |
119 | | int remaining_in_dg; |
120 | | char *localaddr; |
121 | | int timeout; |
122 | | int dscp; |
123 | | struct sockaddr_storage local_addr_storage; |
124 | | char *sources; |
125 | | char *block; |
126 | | IPSourceFilters filters; |
127 | | struct sockaddr_storage last_recv_addr; |
128 | | socklen_t last_recv_addr_len; |
129 | | } UDPContext; |
130 | | |
131 | | #define OFFSET(x) offsetof(UDPContext, x) |
132 | | #define D AV_OPT_FLAG_DECODING_PARAM |
133 | | #define E AV_OPT_FLAG_ENCODING_PARAM |
134 | | static const AVOption options[] = { |
135 | | { "buffer_size", "System data size (in bytes)", OFFSET(buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E }, |
136 | | { "bitrate", "Bits to send per second", OFFSET(bitrate), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = E }, |
137 | | { "burst_bits", "Max length of bursts in bits (when using bitrate)", OFFSET(burst_bits), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = E }, |
138 | | { "localport", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, D|E }, |
139 | | { "local_port", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E }, |
140 | | { "localaddr", "Local address", OFFSET(localaddr), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E }, |
141 | | { "udplite_coverage", "choose UDPLite head size which should be validated by checksum", OFFSET(udplite_coverage), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E }, |
142 | | { "pkt_size", "Maximum UDP packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 1472 }, -1, INT_MAX, .flags = D|E }, |
143 | | { "reuse", "explicitly allow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, D|E }, |
144 | | { "reuse_socket", "explicitly allow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, .flags = D|E }, |
145 | | { "broadcast", "explicitly allow or disallow broadcast destination", OFFSET(is_broadcast), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, E }, |
146 | | { "ttl", "Time to live (multicast only)", OFFSET(ttl), AV_OPT_TYPE_INT, { .i64 = 16 }, 0, 255, E }, |
147 | | { "dscp", "DSCP class for outgoing packets", OFFSET(dscp), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, 63, E }, |
148 | | { "connect", "set if connect() should be called on socket", OFFSET(is_connected), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, .flags = D|E }, |
149 | | { "fifo_size", "set the UDP circular buffer size (in 188-byte packets)", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = HAVE_PTHREAD_CANCEL ? 7*4096 : 0}, 0, INT_MAX, D }, |
150 | | { "overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, D }, |
151 | | { "timeout", "set raise error timeout, in microseconds (only in read mode)",OFFSET(timeout), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D }, |
152 | | { "sources", "Source list", OFFSET(sources), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E }, |
153 | | { "block", "Block list", OFFSET(block), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E }, |
154 | | { NULL } |
155 | | }; |
156 | | |
157 | | static const AVClass udp_class = { |
158 | | .class_name = "udp", |
159 | | .item_name = av_default_item_name, |
160 | | .option = options, |
161 | | .version = LIBAVUTIL_VERSION_INT, |
162 | | }; |
163 | | |
164 | | static const AVClass udplite_context_class = { |
165 | | .class_name = "udplite", |
166 | | .item_name = av_default_item_name, |
167 | | .option = options, |
168 | | .version = LIBAVUTIL_VERSION_INT, |
169 | | }; |
170 | | |
171 | | static int udp_set_multicast_ttl(int sockfd, int mcastTTL, |
172 | | struct sockaddr *addr, |
173 | | void *logctx) |
174 | 0 | { |
175 | 0 | int protocol, cmd; |
176 | | |
177 | | /* There is some confusion in the world whether IP_MULTICAST_TTL |
178 | | * takes a byte or an int as an argument. |
179 | | * BSD seems to indicate byte so we are going with that and use |
180 | | * int and fall back to byte to be safe */ |
181 | 0 | switch (addr->sa_family) { |
182 | 0 | #ifdef IP_MULTICAST_TTL |
183 | 0 | case AF_INET: |
184 | 0 | protocol = IPPROTO_IP; |
185 | 0 | cmd = IP_MULTICAST_TTL; |
186 | 0 | break; |
187 | 0 | #endif |
188 | 0 | #ifdef IPV6_MULTICAST_HOPS |
189 | 0 | case AF_INET6: |
190 | 0 | protocol = IPPROTO_IPV6; |
191 | 0 | cmd = IPV6_MULTICAST_HOPS; |
192 | 0 | break; |
193 | 0 | #endif |
194 | 0 | default: |
195 | 0 | return 0; |
196 | 0 | } |
197 | | |
198 | 0 | if (setsockopt(sockfd, protocol, cmd, &mcastTTL, sizeof(mcastTTL)) < 0) { |
199 | | /* BSD compatibility */ |
200 | 0 | unsigned char ttl = (unsigned char) mcastTTL; |
201 | |
|
202 | 0 | ff_log_net_error(logctx, AV_LOG_DEBUG, "setsockopt(IPV4/IPV6 MULTICAST TTL)"); |
203 | 0 | if (setsockopt(sockfd, protocol, cmd, &ttl, sizeof(ttl)) < 0) { |
204 | 0 | ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV4/IPV6 MULTICAST TTL)"); |
205 | 0 | return ff_neterrno(); |
206 | 0 | } |
207 | 0 | } |
208 | | |
209 | 0 | return 0; |
210 | 0 | } |
211 | | |
212 | | static int udp_join_multicast_group(int sockfd, struct sockaddr *addr, |
213 | | struct sockaddr *local_addr, void *logctx) |
214 | 0 | { |
215 | 0 | #ifdef IP_ADD_MEMBERSHIP |
216 | 0 | if (addr->sa_family == AF_INET) { |
217 | 0 | struct ip_mreq mreq; |
218 | |
|
219 | 0 | mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr; |
220 | 0 | if (local_addr) |
221 | 0 | mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr; |
222 | 0 | else |
223 | 0 | mreq.imr_interface.s_addr = INADDR_ANY; |
224 | 0 | if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) { |
225 | 0 | ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)"); |
226 | 0 | return ff_neterrno(); |
227 | 0 | } |
228 | 0 | } |
229 | 0 | #endif |
230 | 0 | #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6) |
231 | 0 | if (addr->sa_family == AF_INET6) { |
232 | 0 | struct ipv6_mreq mreq6; |
233 | |
|
234 | 0 | memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr)); |
235 | | //TODO: Interface index should be looked up from local_addr |
236 | 0 | mreq6.ipv6mr_interface = 0; |
237 | 0 | if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) { |
238 | 0 | ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)"); |
239 | 0 | return ff_neterrno(); |
240 | 0 | } |
241 | 0 | } |
242 | 0 | #endif |
243 | 0 | return 0; |
244 | 0 | } |
245 | | |
246 | | static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr, |
247 | | struct sockaddr *local_addr, void *logctx) |
248 | 0 | { |
249 | 0 | #ifdef IP_DROP_MEMBERSHIP |
250 | 0 | if (addr->sa_family == AF_INET) { |
251 | 0 | struct ip_mreq mreq; |
252 | |
|
253 | 0 | mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr; |
254 | 0 | if (local_addr) |
255 | 0 | mreq.imr_interface = ((struct sockaddr_in *)local_addr)->sin_addr; |
256 | 0 | else |
257 | 0 | mreq.imr_interface.s_addr = INADDR_ANY; |
258 | 0 | if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) { |
259 | 0 | ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)"); |
260 | 0 | return -1; |
261 | 0 | } |
262 | 0 | } |
263 | 0 | #endif |
264 | 0 | #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6) |
265 | 0 | if (addr->sa_family == AF_INET6) { |
266 | 0 | struct ipv6_mreq mreq6; |
267 | |
|
268 | 0 | memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr)); |
269 | | //TODO: Interface index should be looked up from local_addr |
270 | 0 | mreq6.ipv6mr_interface = 0; |
271 | 0 | if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) { |
272 | 0 | ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)"); |
273 | 0 | return -1; |
274 | 0 | } |
275 | 0 | } |
276 | 0 | #endif |
277 | 0 | return 0; |
278 | 0 | } |
279 | | |
280 | | static int udp_set_multicast_sources(URLContext *h, |
281 | | int sockfd, struct sockaddr *addr, |
282 | | int addr_len, struct sockaddr_storage *local_addr, |
283 | | struct sockaddr_storage *sources, |
284 | | int nb_sources, int include) |
285 | 0 | { |
286 | 0 | if (addr->sa_family != AF_INET) { |
287 | 0 | #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE) |
288 | | /* For IPv4 prefer the old approach, as that alone works reliably on |
289 | | * Windows and it also supports supplying the interface based on its |
290 | | * address. */ |
291 | 0 | for (int i = 0; i < nb_sources; i++) { |
292 | 0 | struct group_source_req mreqs; |
293 | 0 | int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6; |
294 | | |
295 | | //TODO: Interface index should be looked up from local_addr |
296 | 0 | mreqs.gsr_interface = 0; |
297 | 0 | memcpy(&mreqs.gsr_group, addr, addr_len); |
298 | 0 | memcpy(&mreqs.gsr_source, &sources[i], sizeof(*sources)); |
299 | |
|
300 | 0 | if (setsockopt(sockfd, level, |
301 | 0 | include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE, |
302 | 0 | (const void *)&mreqs, sizeof(mreqs)) < 0) { |
303 | 0 | if (include) |
304 | 0 | ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)"); |
305 | 0 | else |
306 | 0 | ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)"); |
307 | 0 | return ff_neterrno(); |
308 | 0 | } |
309 | 0 | } |
310 | 0 | return 0; |
311 | | #else |
312 | | av_log(h, AV_LOG_ERROR, |
313 | | "Setting multicast sources only supported for IPv4\n"); |
314 | | return AVERROR(EINVAL); |
315 | | #endif |
316 | 0 | } |
317 | 0 | #if HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE) |
318 | 0 | for (int i = 0; i < nb_sources; i++) { |
319 | 0 | struct ip_mreq_source mreqs; |
320 | 0 | if (sources[i].ss_family != AF_INET) { |
321 | 0 | av_log(h, AV_LOG_ERROR, "Source/block address %d is of incorrect protocol family\n", i + 1); |
322 | 0 | return AVERROR(EINVAL); |
323 | 0 | } |
324 | | |
325 | 0 | mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr; |
326 | 0 | if (local_addr) |
327 | 0 | mreqs.imr_interface = ((struct sockaddr_in *)local_addr)->sin_addr; |
328 | 0 | else |
329 | 0 | mreqs.imr_interface.s_addr = INADDR_ANY; |
330 | 0 | mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)&sources[i])->sin_addr.s_addr; |
331 | |
|
332 | 0 | if (setsockopt(sockfd, IPPROTO_IP, |
333 | 0 | include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE, |
334 | 0 | (const void *)&mreqs, sizeof(mreqs)) < 0) { |
335 | 0 | if (include) |
336 | 0 | ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)"); |
337 | 0 | else |
338 | 0 | ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)"); |
339 | 0 | return ff_neterrno(); |
340 | 0 | } |
341 | 0 | } |
342 | | #else |
343 | | return AVERROR(ENOSYS); |
344 | | #endif |
345 | 0 | return 0; |
346 | 0 | } |
347 | | static int udp_set_url(URLContext *h, |
348 | | struct sockaddr_storage *addr, |
349 | | const char *hostname, int port) |
350 | 0 | { |
351 | 0 | struct addrinfo *res0; |
352 | 0 | int addr_len; |
353 | |
|
354 | 0 | res0 = ff_ip_resolve_host(h, hostname, port, SOCK_DGRAM, AF_UNSPEC, 0); |
355 | 0 | if (!res0) return AVERROR(EIO); |
356 | 0 | memcpy(addr, res0->ai_addr, res0->ai_addrlen); |
357 | 0 | addr_len = res0->ai_addrlen; |
358 | 0 | freeaddrinfo(res0); |
359 | |
|
360 | 0 | return addr_len; |
361 | 0 | } |
362 | | |
363 | | static int udp_socket_create(URLContext *h, struct sockaddr_storage *addr, |
364 | | socklen_t *addr_len, const char *localaddr) |
365 | 0 | { |
366 | 0 | UDPContext *s = h->priv_data; |
367 | 0 | int udp_fd = -1; |
368 | 0 | struct addrinfo *res0, *res; |
369 | 0 | int family = AF_UNSPEC; |
370 | |
|
371 | 0 | if (((struct sockaddr *) &s->dest_addr)->sa_family) |
372 | 0 | family = ((struct sockaddr *) &s->dest_addr)->sa_family; |
373 | 0 | res0 = ff_ip_resolve_host(h, (localaddr && localaddr[0]) ? localaddr : NULL, |
374 | 0 | s->local_port, |
375 | 0 | SOCK_DGRAM, family, AI_PASSIVE); |
376 | 0 | if (!res0) |
377 | 0 | goto fail; |
378 | 0 | for (res = res0; res; res=res->ai_next) { |
379 | 0 | if (s->udplite_coverage) |
380 | 0 | udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, IPPROTO_UDPLITE, h); |
381 | 0 | else |
382 | 0 | udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, 0, h); |
383 | 0 | if (udp_fd != -1) break; |
384 | 0 | ff_log_net_error(h, AV_LOG_ERROR, "socket"); |
385 | 0 | } |
386 | |
|
387 | 0 | if (udp_fd < 0) |
388 | 0 | goto fail; |
389 | | |
390 | 0 | memcpy(addr, res->ai_addr, res->ai_addrlen); |
391 | 0 | *addr_len = res->ai_addrlen; |
392 | |
|
393 | 0 | freeaddrinfo(res0); |
394 | |
|
395 | 0 | return udp_fd; |
396 | | |
397 | 0 | fail: |
398 | 0 | if (udp_fd >= 0) |
399 | 0 | closesocket(udp_fd); |
400 | 0 | if(res0) |
401 | 0 | freeaddrinfo(res0); |
402 | 0 | return -1; |
403 | 0 | } |
404 | | |
405 | | static int udp_port(struct sockaddr_storage *addr, int addr_len) |
406 | 0 | { |
407 | 0 | char sbuf[sizeof(int)*3+1]; |
408 | 0 | int error; |
409 | |
|
410 | 0 | if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0, sbuf, sizeof(sbuf), NI_NUMERICSERV)) != 0) { |
411 | 0 | av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error)); |
412 | 0 | return -1; |
413 | 0 | } |
414 | | |
415 | 0 | return strtol(sbuf, NULL, 10); |
416 | 0 | } |
417 | | |
418 | | |
419 | | /** |
420 | | * If no filename is given to av_open_input_file because you want to |
421 | | * get the local port first, then you must call this function to set |
422 | | * the remote server address. |
423 | | * |
424 | | * url syntax: udp://host:port[?option=val...] |
425 | | * option: 'ttl=n' : set the ttl value (for multicast only) |
426 | | * 'localport=n' : set the local port |
427 | | * 'pkt_size=n' : set max packet size |
428 | | * 'reuse=1' : enable reusing the socket |
429 | | * 'overrun_nonfatal=1': survive in case of circular buffer overrun |
430 | | * |
431 | | * @param h media file context |
432 | | * @param uri of the remote server |
433 | | * @return zero if no error. |
434 | | */ |
435 | | int ff_udp_set_remote_url(URLContext *h, const char *uri) |
436 | 0 | { |
437 | 0 | UDPContext *s = h->priv_data; |
438 | 0 | char hostname[256], buf[10]; |
439 | 0 | int port; |
440 | 0 | const char *p; |
441 | |
|
442 | 0 | av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri); |
443 | | |
444 | | /* set the destination address */ |
445 | 0 | s->dest_addr_len = udp_set_url(h, &s->dest_addr, hostname, port); |
446 | 0 | if (s->dest_addr_len < 0) { |
447 | 0 | return AVERROR(EIO); |
448 | 0 | } |
449 | 0 | s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr); |
450 | 0 | p = strchr(uri, '?'); |
451 | 0 | if (p) { |
452 | 0 | if (av_find_info_tag(buf, sizeof(buf), "connect", p)) { |
453 | 0 | int was_connected = s->is_connected; |
454 | 0 | s->is_connected = strtol(buf, NULL, 10); |
455 | 0 | if (s->is_connected && !was_connected) { |
456 | 0 | if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr, |
457 | 0 | s->dest_addr_len)) { |
458 | 0 | s->is_connected = 0; |
459 | 0 | ff_log_net_error(h, AV_LOG_ERROR, "connect"); |
460 | 0 | return AVERROR(EIO); |
461 | 0 | } |
462 | 0 | } |
463 | 0 | } |
464 | 0 | } |
465 | | |
466 | 0 | return 0; |
467 | 0 | } |
468 | | |
469 | | /** |
470 | | * This function is identical to ff_udp_set_remote_url, except that it takes a sockaddr directly. |
471 | | */ |
472 | | int ff_udp_set_remote_addr(URLContext *h, const struct sockaddr *dest_addr, socklen_t dest_addr_len, int do_connect) |
473 | 0 | { |
474 | 0 | UDPContext *s = h->priv_data; |
475 | | |
476 | | /* set the destination address */ |
477 | 0 | if ((size_t)dest_addr_len > sizeof(s->dest_addr)) |
478 | 0 | return AVERROR(EIO); |
479 | 0 | s->dest_addr_len = dest_addr_len; |
480 | 0 | memcpy(&s->dest_addr, dest_addr, dest_addr_len); |
481 | |
|
482 | 0 | s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr); |
483 | 0 | if (do_connect >= 0) { |
484 | 0 | int was_connected = s->is_connected; |
485 | 0 | s->is_connected = do_connect; |
486 | 0 | if (s->is_connected && !was_connected) { |
487 | 0 | if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr, |
488 | 0 | s->dest_addr_len)) { |
489 | 0 | s->is_connected = 0; |
490 | 0 | ff_log_net_error(h, AV_LOG_ERROR, "connect"); |
491 | 0 | return AVERROR(EIO); |
492 | 0 | } |
493 | 0 | } |
494 | 0 | } |
495 | | |
496 | 0 | return 0; |
497 | 0 | } |
498 | | |
499 | | /** |
500 | | * Return the local port used by the UDP connection |
501 | | * @param h media file context |
502 | | * @return the local port number |
503 | | */ |
504 | | int ff_udp_get_local_port(URLContext *h) |
505 | 0 | { |
506 | 0 | UDPContext *s = h->priv_data; |
507 | 0 | return s->local_port; |
508 | 0 | } |
509 | | |
510 | | void ff_udp_get_last_recv_addr(URLContext *h, struct sockaddr_storage *addr, socklen_t *addr_len) |
511 | 0 | { |
512 | 0 | UDPContext *s = h->priv_data; |
513 | 0 | *addr = s->last_recv_addr; |
514 | 0 | *addr_len = s->last_recv_addr_len; |
515 | 0 | } |
516 | | |
517 | | /** |
518 | | * Return the udp file handle for select() usage to wait for several RTP |
519 | | * streams at the same time. |
520 | | * @param h media file context |
521 | | */ |
522 | | static int udp_get_file_handle(URLContext *h) |
523 | 0 | { |
524 | 0 | UDPContext *s = h->priv_data; |
525 | 0 | return s->udp_fd; |
526 | 0 | } |
527 | | |
528 | | #if HAVE_PTHREAD_CANCEL |
529 | | static void *circular_buffer_task_rx( void *_URLContext) |
530 | 0 | { |
531 | 0 | URLContext *h = _URLContext; |
532 | 0 | UDPContext *s = h->priv_data; |
533 | 0 | int old_cancelstate; |
534 | |
|
535 | 0 | ff_thread_setname("udp-rx"); |
536 | |
|
537 | 0 | pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); |
538 | 0 | pthread_mutex_lock(&s->mutex); |
539 | 0 | if (ff_socket_nonblock(s->udp_fd, 0) < 0) { |
540 | 0 | av_log(h, AV_LOG_ERROR, "Failed to set blocking mode"); |
541 | 0 | s->circular_buffer_error = AVERROR(EIO); |
542 | 0 | goto end; |
543 | 0 | } |
544 | 0 | while(1) { |
545 | 0 | UDPQueuedPacketHeader pkt_header; |
546 | 0 | pkt_header.addr_len = sizeof(pkt_header.addr); |
547 | |
|
548 | 0 | pthread_mutex_unlock(&s->mutex); |
549 | | /* Blocking operations are always cancellation points; |
550 | | see "General Information" / "Thread Cancellation Overview" |
551 | | in Single Unix. */ |
552 | 0 | pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate); |
553 | 0 | pkt_header.pkt_size = recvfrom(s->udp_fd, s->tmp + sizeof(pkt_header), sizeof(s->tmp) - sizeof(pkt_header), 0, (struct sockaddr *)&pkt_header.addr, &pkt_header.addr_len); |
554 | 0 | pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); |
555 | 0 | pthread_mutex_lock(&s->mutex); |
556 | 0 | if (pkt_header.pkt_size < 0) { |
557 | 0 | if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) { |
558 | 0 | s->circular_buffer_error = ff_neterrno(); |
559 | 0 | goto end; |
560 | 0 | } |
561 | 0 | continue; |
562 | 0 | } |
563 | 0 | if (ff_ip_check_source_lists(&pkt_header.addr, &s->filters)) |
564 | 0 | continue; |
565 | 0 | memcpy(s->tmp, &pkt_header, sizeof(pkt_header)); |
566 | |
|
567 | 0 | if (av_fifo_can_write(s->rx_fifo) < pkt_header.pkt_size + sizeof(pkt_header)) { |
568 | | /* No Space left */ |
569 | 0 | if (s->overrun_nonfatal) { |
570 | 0 | av_log(h, AV_LOG_WARNING, "Circular buffer overrun. " |
571 | 0 | "Surviving due to overrun_nonfatal option\n"); |
572 | 0 | continue; |
573 | 0 | } else { |
574 | 0 | av_log(h, AV_LOG_ERROR, "Circular buffer overrun. " |
575 | 0 | "To avoid, increase fifo_size URL option. " |
576 | 0 | "To survive in such case, use overrun_nonfatal option\n"); |
577 | 0 | s->circular_buffer_error = AVERROR(EIO); |
578 | 0 | goto end; |
579 | 0 | } |
580 | 0 | } |
581 | 0 | av_fifo_write(s->rx_fifo, s->tmp, pkt_header.pkt_size + sizeof(pkt_header)); |
582 | 0 | pthread_cond_signal(&s->cond); |
583 | 0 | } |
584 | | |
585 | 0 | end: |
586 | 0 | pthread_cond_signal(&s->cond); |
587 | 0 | pthread_mutex_unlock(&s->mutex); |
588 | 0 | return NULL; |
589 | 0 | } |
590 | | |
591 | | static void *circular_buffer_task_tx( void *_URLContext) |
592 | 0 | { |
593 | 0 | URLContext *h = _URLContext; |
594 | 0 | UDPContext *s = h->priv_data; |
595 | 0 | int64_t target_timestamp = av_gettime_relative(); |
596 | 0 | int64_t start_timestamp = av_gettime_relative(); |
597 | 0 | int64_t sent_bits = 0; |
598 | 0 | int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0; |
599 | 0 | int64_t max_delay = s->bitrate ? ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0; |
600 | |
|
601 | 0 | ff_thread_setname("udp-tx"); |
602 | |
|
603 | 0 | pthread_mutex_lock(&s->mutex); |
604 | |
|
605 | 0 | if (ff_socket_nonblock(s->udp_fd, 0) < 0) { |
606 | 0 | av_log(h, AV_LOG_ERROR, "Failed to set blocking mode"); |
607 | 0 | s->circular_buffer_error = AVERROR(EIO); |
608 | 0 | goto end; |
609 | 0 | } |
610 | | |
611 | 0 | for(;;) { |
612 | 0 | int len; |
613 | 0 | const uint8_t *p; |
614 | 0 | uint8_t tmp[4]; |
615 | 0 | int64_t timestamp; |
616 | |
|
617 | 0 | len = av_fifo_can_read(s->tx_fifo); |
618 | |
|
619 | 0 | while (len<4) { |
620 | 0 | if (s->close_req) |
621 | 0 | goto end; |
622 | 0 | pthread_cond_wait(&s->cond, &s->mutex); |
623 | 0 | len = av_fifo_can_read(s->tx_fifo); |
624 | 0 | } |
625 | | |
626 | 0 | av_fifo_read(s->tx_fifo, tmp, 4); |
627 | 0 | len = AV_RL32(tmp); |
628 | |
|
629 | 0 | av_assert0(len >= 0); |
630 | 0 | av_assert0(len <= sizeof(s->tmp)); |
631 | | |
632 | 0 | av_fifo_read(s->tx_fifo, s->tmp, len); |
633 | |
|
634 | 0 | pthread_mutex_unlock(&s->mutex); |
635 | |
|
636 | 0 | if (s->bitrate) { |
637 | 0 | timestamp = av_gettime_relative(); |
638 | 0 | if (timestamp < target_timestamp) { |
639 | 0 | int64_t delay = target_timestamp - timestamp; |
640 | 0 | if (delay > max_delay) { |
641 | 0 | delay = max_delay; |
642 | 0 | start_timestamp = timestamp + delay; |
643 | 0 | sent_bits = 0; |
644 | 0 | } |
645 | 0 | av_usleep(delay); |
646 | 0 | } else { |
647 | 0 | if (timestamp - burst_interval > target_timestamp) { |
648 | 0 | start_timestamp = timestamp - burst_interval; |
649 | 0 | sent_bits = 0; |
650 | 0 | } |
651 | 0 | } |
652 | 0 | sent_bits += len * 8; |
653 | 0 | target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate; |
654 | 0 | } |
655 | |
|
656 | 0 | p = s->tmp; |
657 | 0 | while (len) { |
658 | 0 | int ret; |
659 | 0 | av_assert0(len > 0); |
660 | 0 | if (!s->is_connected) { |
661 | 0 | ret = sendto (s->udp_fd, p, len, 0, |
662 | 0 | (struct sockaddr *) &s->dest_addr, |
663 | 0 | s->dest_addr_len); |
664 | 0 | } else |
665 | 0 | ret = send(s->udp_fd, p, len, 0); |
666 | 0 | if (ret >= 0) { |
667 | 0 | len -= ret; |
668 | 0 | p += ret; |
669 | 0 | } else { |
670 | 0 | ret = ff_neterrno(); |
671 | 0 | if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) { |
672 | 0 | pthread_mutex_lock(&s->mutex); |
673 | 0 | s->circular_buffer_error = ret; |
674 | 0 | pthread_mutex_unlock(&s->mutex); |
675 | 0 | return NULL; |
676 | 0 | } |
677 | 0 | } |
678 | 0 | } |
679 | | |
680 | 0 | pthread_mutex_lock(&s->mutex); |
681 | 0 | } |
682 | | |
683 | 0 | end: |
684 | 0 | pthread_mutex_unlock(&s->mutex); |
685 | 0 | return NULL; |
686 | 0 | } |
687 | | |
688 | | |
689 | | #endif |
690 | | |
691 | | /* put it in UDP context */ |
692 | | /* return non zero if error */ |
693 | | static int udp_open(URLContext *h, const char *uri, int flags) |
694 | 0 | { |
695 | 0 | char hostname[1024]; |
696 | 0 | int port, udp_fd = -1, tmp, bind_ret = -1; |
697 | 0 | UDPContext *s = h->priv_data; |
698 | 0 | int is_output; |
699 | 0 | const char *p; |
700 | 0 | struct sockaddr_storage my_addr; |
701 | 0 | socklen_t len; |
702 | 0 | int ret; |
703 | |
|
704 | 0 | h->is_streamed = 1; |
705 | |
|
706 | 0 | is_output = !(flags & AVIO_FLAG_READ); |
707 | 0 | if (s->buffer_size < 0) |
708 | 0 | s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_RX_BUF_SIZE; |
709 | |
|
710 | 0 | p = strchr(uri, '?'); |
711 | 0 | if (p) { |
712 | 0 | ret = ff_parse_opts_from_query_string(s, p, 1); |
713 | 0 | if (ret < 0) |
714 | 0 | goto fail; |
715 | 0 | } |
716 | 0 | if (!HAVE_PTHREAD_CANCEL) { |
717 | 0 | int64_t optvals[] = {s->overrun_nonfatal, s->bitrate, s->circular_buffer_size}; |
718 | 0 | const char* optnames[] = { "overrun_nonfatal", "bitrate", "fifo_size"}; |
719 | 0 | for (unsigned i = 0; i < FF_ARRAY_ELEMS(optvals); i++) { |
720 | 0 | if (optvals[i]) |
721 | 0 | av_log(h, AV_LOG_WARNING, |
722 | 0 | "'%s' option was set but it is not supported " |
723 | 0 | "on this build (pthread support is required)\n", optnames[i]); |
724 | 0 | } |
725 | 0 | } |
726 | 0 | if (s->sources) { |
727 | 0 | if ((ret = ff_ip_parse_sources(h, s->sources, &s->filters)) < 0) |
728 | 0 | goto fail; |
729 | 0 | } |
730 | 0 | if (s->block) { |
731 | 0 | if ((ret = ff_ip_parse_blocks(h, s->block, &s->filters)) < 0) |
732 | 0 | goto fail; |
733 | 0 | } |
734 | | |
735 | | /* handling needed to support options picking from both AVOption and URL */ |
736 | 0 | s->circular_buffer_size *= 188; |
737 | 0 | if (flags & AVIO_FLAG_WRITE) { |
738 | 0 | h->max_packet_size = s->pkt_size; |
739 | 0 | } else { |
740 | 0 | h->max_packet_size = UDP_MAX_PKT_SIZE; |
741 | 0 | } |
742 | 0 | h->rw_timeout = s->timeout; |
743 | | |
744 | | /* fill the dest addr */ |
745 | 0 | av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri); |
746 | | |
747 | | /* XXX: fix av_url_split */ |
748 | 0 | if (hostname[0] == '\0' || hostname[0] == '?') { |
749 | | /* only accepts null hostname if input */ |
750 | 0 | if (!(flags & AVIO_FLAG_READ)) { |
751 | 0 | ret = AVERROR(EINVAL); |
752 | 0 | goto fail; |
753 | 0 | } |
754 | 0 | } else { |
755 | 0 | if ((ret = ff_udp_set_remote_url(h, uri)) < 0) |
756 | 0 | goto fail; |
757 | 0 | } |
758 | | |
759 | 0 | if ((s->is_multicast || s->local_port < 0) && (h->flags & AVIO_FLAG_READ)) |
760 | 0 | s->local_port = port; |
761 | |
|
762 | 0 | udp_fd = udp_socket_create(h, &my_addr, &len, s->localaddr); |
763 | 0 | if (udp_fd < 0) { |
764 | 0 | ret = AVERROR(EIO); |
765 | 0 | goto fail; |
766 | 0 | } |
767 | | |
768 | 0 | s->local_addr_storage=my_addr; //store for future multicast join |
769 | | |
770 | | /* Follow the requested reuse option, unless it's multicast in which |
771 | | * case enable reuse unless explicitly disabled. |
772 | | */ |
773 | 0 | if (s->reuse_socket > 0 || (s->is_multicast && s->reuse_socket < 0)) { |
774 | 0 | s->reuse_socket = 1; |
775 | 0 | if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0) { |
776 | 0 | ret = ff_neterrno(); |
777 | 0 | goto fail; |
778 | 0 | } |
779 | 0 | } |
780 | | |
781 | 0 | if (s->is_broadcast) { |
782 | 0 | #ifdef SO_BROADCAST |
783 | 0 | if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(s->is_broadcast), sizeof(s->is_broadcast)) != 0) { |
784 | 0 | ret = ff_neterrno(); |
785 | 0 | goto fail; |
786 | 0 | } |
787 | | #else |
788 | | ret = AVERROR(ENOSYS); |
789 | | goto fail; |
790 | | #endif |
791 | 0 | } |
792 | | |
793 | | /* Set the checksum coverage for UDP-Lite (RFC 3828) for sending and receiving. |
794 | | * The receiver coverage has to be less than or equal to the sender coverage. |
795 | | * Otherwise, the receiver will drop all packets. |
796 | | */ |
797 | 0 | if (s->udplite_coverage) { |
798 | 0 | if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_SEND_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0) |
799 | 0 | av_log(h, AV_LOG_WARNING, "socket option UDPLITE_SEND_CSCOV not available"); |
800 | |
|
801 | 0 | if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_RECV_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0) |
802 | 0 | av_log(h, AV_LOG_WARNING, "socket option UDPLITE_RECV_CSCOV not available"); |
803 | 0 | } |
804 | |
|
805 | 0 | if (s->dscp >= 0) { |
806 | 0 | int dscp = s->dscp << 2; |
807 | 0 | if (setsockopt (udp_fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp)) != 0) { |
808 | 0 | ret = ff_neterrno(); |
809 | 0 | goto fail; |
810 | 0 | } |
811 | 0 | } |
812 | | |
813 | | /* If multicast, try binding the multicast address first, to avoid |
814 | | * receiving UDP packets from other sources aimed at the same UDP |
815 | | * port. This fails on windows. This makes sending to the same address |
816 | | * using sendto() fail, so only do it if we're opened in read-only mode. */ |
817 | 0 | if (s->is_multicast && (h->flags & AVIO_FLAG_READ)) { |
818 | 0 | bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len); |
819 | 0 | } |
820 | | /* bind to the local address if not multicast or if the multicast |
821 | | * bind failed */ |
822 | | /* the bind is needed to give a port to the socket now */ |
823 | 0 | if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) { |
824 | 0 | ff_log_net_error(h, AV_LOG_ERROR, "bind failed"); |
825 | 0 | ret = ff_neterrno(); |
826 | 0 | goto fail; |
827 | 0 | } |
828 | | |
829 | 0 | len = sizeof(my_addr); |
830 | 0 | getsockname(udp_fd, (struct sockaddr *)&my_addr, &len); |
831 | 0 | s->local_port = udp_port(&my_addr, len); |
832 | |
|
833 | 0 | if (s->is_multicast) { |
834 | 0 | if (h->flags & AVIO_FLAG_WRITE) { |
835 | | /* output */ |
836 | 0 | if ((ret = udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr, h)) < 0) |
837 | 0 | goto fail; |
838 | 0 | } |
839 | 0 | if (h->flags & AVIO_FLAG_READ) { |
840 | | /* input */ |
841 | 0 | if (s->filters.nb_include_addrs) { |
842 | 0 | if ((ret = udp_set_multicast_sources(h, udp_fd, |
843 | 0 | (struct sockaddr *)&s->dest_addr, |
844 | 0 | s->dest_addr_len, &s->local_addr_storage, |
845 | 0 | s->filters.include_addrs, |
846 | 0 | s->filters.nb_include_addrs, 1)) < 0) |
847 | 0 | goto fail; |
848 | 0 | } else { |
849 | 0 | if ((ret = udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr, |
850 | 0 | (struct sockaddr *)&s->local_addr_storage, h)) < 0) |
851 | 0 | goto fail; |
852 | 0 | } |
853 | 0 | if (s->filters.nb_exclude_addrs) { |
854 | 0 | if ((ret = udp_set_multicast_sources(h, udp_fd, |
855 | 0 | (struct sockaddr *)&s->dest_addr, |
856 | 0 | s->dest_addr_len, &s->local_addr_storage, |
857 | 0 | s->filters.exclude_addrs, |
858 | 0 | s->filters.nb_exclude_addrs, 0)) < 0) |
859 | 0 | goto fail; |
860 | 0 | } |
861 | 0 | } |
862 | 0 | } |
863 | | |
864 | 0 | if (is_output) { |
865 | | /* limit the tx buf size to limit latency */ |
866 | 0 | tmp = s->buffer_size; |
867 | 0 | if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) { |
868 | 0 | ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)"); |
869 | 0 | ret = ff_neterrno(); |
870 | 0 | goto fail; |
871 | 0 | } |
872 | 0 | } else { |
873 | | /* set udp recv buffer size to the requested value (default UDP_RX_BUF_SIZE) */ |
874 | 0 | tmp = s->buffer_size; |
875 | 0 | if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) { |
876 | 0 | ff_log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)"); |
877 | 0 | } |
878 | 0 | len = sizeof(tmp); |
879 | 0 | if (getsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, &len) < 0) { |
880 | 0 | ff_log_net_error(h, AV_LOG_WARNING, "getsockopt(SO_RCVBUF)"); |
881 | 0 | } else { |
882 | 0 | av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp); |
883 | 0 | if(tmp < s->buffer_size) |
884 | 0 | av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d\n", s->buffer_size, tmp); |
885 | 0 | } |
886 | | |
887 | | /* make the socket non-blocking */ |
888 | 0 | ff_socket_nonblock(udp_fd, 1); |
889 | 0 | } |
890 | 0 | if (s->is_connected) { |
891 | 0 | if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) { |
892 | 0 | ff_log_net_error(h, AV_LOG_ERROR, "connect"); |
893 | 0 | ret = ff_neterrno(); |
894 | 0 | goto fail; |
895 | 0 | } |
896 | 0 | } |
897 | | |
898 | 0 | s->udp_fd = udp_fd; |
899 | |
|
900 | 0 | #if HAVE_PTHREAD_CANCEL |
901 | | /* |
902 | | Create thread in case of: |
903 | | 1. Input and circular_buffer_size is set |
904 | | 2. Output and bitrate and circular_buffer_size is set |
905 | | */ |
906 | |
|
907 | 0 | if (is_output && s->bitrate && !s->circular_buffer_size) { |
908 | | /* Warn user in case of 'circular_buffer_size' is not set */ |
909 | 0 | av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n"); |
910 | 0 | } |
911 | |
|
912 | 0 | if ((!is_output && s->circular_buffer_size) || (is_output && s->bitrate && s->circular_buffer_size)) { |
913 | | /* start the task going */ |
914 | 0 | AVFifo *fifo = av_fifo_alloc2(s->circular_buffer_size, 1, 0); |
915 | 0 | if (!fifo) { |
916 | 0 | ret = AVERROR(ENOMEM); |
917 | 0 | goto fail; |
918 | 0 | } |
919 | 0 | if (is_output) |
920 | 0 | s->tx_fifo = fifo; |
921 | 0 | else |
922 | 0 | s->rx_fifo = fifo; |
923 | 0 | ret = pthread_mutex_init(&s->mutex, NULL); |
924 | 0 | if (ret != 0) { |
925 | 0 | av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret)); |
926 | 0 | ret = AVERROR(ret); |
927 | 0 | goto fail; |
928 | 0 | } |
929 | 0 | ret = pthread_cond_init(&s->cond, NULL); |
930 | 0 | if (ret != 0) { |
931 | 0 | av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret)); |
932 | 0 | ret = AVERROR(ret); |
933 | 0 | goto cond_fail; |
934 | 0 | } |
935 | 0 | ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h); |
936 | 0 | if (ret != 0) { |
937 | 0 | av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret)); |
938 | 0 | ret = AVERROR(ret); |
939 | 0 | goto thread_fail; |
940 | 0 | } |
941 | 0 | s->thread_started = 1; |
942 | 0 | } |
943 | 0 | #endif |
944 | | |
945 | 0 | return 0; |
946 | 0 | #if HAVE_PTHREAD_CANCEL |
947 | 0 | thread_fail: |
948 | 0 | pthread_cond_destroy(&s->cond); |
949 | 0 | cond_fail: |
950 | 0 | pthread_mutex_destroy(&s->mutex); |
951 | 0 | #endif |
952 | 0 | fail: |
953 | 0 | if (udp_fd >= 0) |
954 | 0 | closesocket(udp_fd); |
955 | 0 | av_fifo_freep2(&s->rx_fifo); |
956 | 0 | av_fifo_freep2(&s->tx_fifo); |
957 | 0 | ff_ip_reset_filters(&s->filters); |
958 | 0 | return ret; |
959 | 0 | } |
960 | | |
961 | | static int udplite_open(URLContext *h, const char *uri, int flags) |
962 | 0 | { |
963 | 0 | UDPContext *s = h->priv_data; |
964 | | |
965 | | // set default checksum coverage |
966 | 0 | s->udplite_coverage = UDP_HEADER_SIZE; |
967 | |
|
968 | 0 | return udp_open(h, uri, flags); |
969 | 0 | } |
970 | | |
971 | | static int udp_read(URLContext *h, uint8_t *buf, int size) |
972 | 0 | { |
973 | 0 | UDPContext *s = h->priv_data; |
974 | 0 | int ret; |
975 | 0 | #if HAVE_PTHREAD_CANCEL |
976 | 0 | int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK; |
977 | |
|
978 | 0 | if (s->rx_fifo) { |
979 | 0 | pthread_mutex_lock(&s->mutex); |
980 | 0 | do { |
981 | 0 | avail = av_fifo_can_read(s->rx_fifo); |
982 | 0 | if (avail) { // >=size) { |
983 | 0 | UDPQueuedPacketHeader header; |
984 | |
|
985 | 0 | av_fifo_read(s->rx_fifo, &header, sizeof(header)); |
986 | |
|
987 | 0 | s->last_recv_addr = header.addr; |
988 | 0 | s->last_recv_addr_len = header.addr_len; |
989 | |
|
990 | 0 | avail = header.pkt_size; |
991 | 0 | if(avail > size){ |
992 | 0 | av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n"); |
993 | 0 | avail = size; |
994 | 0 | } |
995 | |
|
996 | 0 | av_fifo_read(s->rx_fifo, buf, avail); |
997 | 0 | av_fifo_drain2(s->rx_fifo, header.pkt_size - avail); |
998 | 0 | pthread_mutex_unlock(&s->mutex); |
999 | 0 | return avail; |
1000 | 0 | } else if(s->circular_buffer_error){ |
1001 | 0 | int err = s->circular_buffer_error; |
1002 | 0 | pthread_mutex_unlock(&s->mutex); |
1003 | 0 | return err; |
1004 | 0 | } else if(nonblock) { |
1005 | 0 | pthread_mutex_unlock(&s->mutex); |
1006 | 0 | return AVERROR(EAGAIN); |
1007 | 0 | } else { |
1008 | | /* FIXME: using the monotonic clock would be better, |
1009 | | but it does not exist on all supported platforms. */ |
1010 | 0 | int64_t t = av_gettime() + 100000; |
1011 | 0 | struct timespec tv = { .tv_sec = t / 1000000, |
1012 | 0 | .tv_nsec = (t % 1000000) * 1000 }; |
1013 | 0 | int err = pthread_cond_timedwait(&s->cond, &s->mutex, &tv); |
1014 | 0 | if (err) { |
1015 | 0 | pthread_mutex_unlock(&s->mutex); |
1016 | 0 | return AVERROR(err == ETIMEDOUT ? EAGAIN : err); |
1017 | 0 | } |
1018 | 0 | nonblock = 1; |
1019 | 0 | } |
1020 | 0 | } while(1); |
1021 | 0 | } |
1022 | 0 | #endif |
1023 | | |
1024 | 0 | if (!(h->flags & AVIO_FLAG_NONBLOCK)) { |
1025 | 0 | ret = ff_network_wait_fd(s->udp_fd, 0); |
1026 | 0 | if (ret < 0) |
1027 | 0 | return ret; |
1028 | 0 | } |
1029 | 0 | s->last_recv_addr_len = sizeof(s->last_recv_addr); |
1030 | 0 | ret = recvfrom(s->udp_fd, buf, size, 0, (struct sockaddr *)&s->last_recv_addr, &s->last_recv_addr_len); |
1031 | 0 | if (ret < 0) |
1032 | 0 | return ff_neterrno(); |
1033 | 0 | if (ff_ip_check_source_lists(&s->last_recv_addr, &s->filters)) |
1034 | 0 | return AVERROR(EINTR); |
1035 | 0 | return ret; |
1036 | 0 | } |
1037 | | |
1038 | | static int udp_write(URLContext *h, const uint8_t *buf, int size) |
1039 | 0 | { |
1040 | 0 | UDPContext *s = h->priv_data; |
1041 | 0 | int ret; |
1042 | |
|
1043 | 0 | #if HAVE_PTHREAD_CANCEL |
1044 | 0 | if (s->tx_fifo) { |
1045 | 0 | uint8_t tmp[4]; |
1046 | |
|
1047 | 0 | pthread_mutex_lock(&s->mutex); |
1048 | | |
1049 | | /* |
1050 | | Return error if last tx failed. |
1051 | | Here we can't know on which packet error was, but it needs to know that error exists. |
1052 | | */ |
1053 | 0 | if (s->circular_buffer_error<0) { |
1054 | 0 | int err = s->circular_buffer_error; |
1055 | 0 | pthread_mutex_unlock(&s->mutex); |
1056 | 0 | return err; |
1057 | 0 | } |
1058 | | |
1059 | 0 | if (av_fifo_can_write(s->tx_fifo) < size + 4) { |
1060 | | /* What about a partial packet tx ? */ |
1061 | 0 | pthread_mutex_unlock(&s->mutex); |
1062 | 0 | return AVERROR(ENOMEM); |
1063 | 0 | } |
1064 | 0 | AV_WL32(tmp, size); |
1065 | 0 | av_fifo_write(s->tx_fifo, tmp, 4); /* size of packet */ |
1066 | 0 | av_fifo_write(s->tx_fifo, buf, size); /* the data */ |
1067 | 0 | pthread_cond_signal(&s->cond); |
1068 | 0 | pthread_mutex_unlock(&s->mutex); |
1069 | 0 | return size; |
1070 | 0 | } |
1071 | 0 | #endif |
1072 | 0 | if (!(h->flags & AVIO_FLAG_NONBLOCK)) { |
1073 | 0 | ret = ff_network_wait_fd(s->udp_fd, 1); |
1074 | 0 | if (ret < 0) |
1075 | 0 | return ret; |
1076 | 0 | } |
1077 | | |
1078 | 0 | if (!s->is_connected) { |
1079 | 0 | ret = sendto (s->udp_fd, buf, size, 0, |
1080 | 0 | (struct sockaddr *) &s->dest_addr, |
1081 | 0 | s->dest_addr_len); |
1082 | 0 | } else |
1083 | 0 | ret = send(s->udp_fd, buf, size, 0); |
1084 | |
|
1085 | 0 | return ret < 0 ? ff_neterrno() : ret; |
1086 | 0 | } |
1087 | | |
1088 | | static int udp_close(URLContext *h) |
1089 | 0 | { |
1090 | 0 | UDPContext *s = h->priv_data; |
1091 | |
|
1092 | 0 | #if HAVE_PTHREAD_CANCEL |
1093 | | // Request close once writing is finished |
1094 | 0 | if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) { |
1095 | 0 | pthread_mutex_lock(&s->mutex); |
1096 | 0 | s->close_req = 1; |
1097 | 0 | pthread_cond_signal(&s->cond); |
1098 | 0 | pthread_mutex_unlock(&s->mutex); |
1099 | 0 | } |
1100 | 0 | #endif |
1101 | |
|
1102 | 0 | if (s->is_multicast && (h->flags & AVIO_FLAG_READ)) |
1103 | 0 | udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr, |
1104 | 0 | (struct sockaddr *)&s->local_addr_storage, h); |
1105 | 0 | #if HAVE_PTHREAD_CANCEL |
1106 | 0 | if (s->thread_started) { |
1107 | 0 | int ret; |
1108 | | // Cancel only read, as write has been signaled as success to the user |
1109 | 0 | if (h->flags & AVIO_FLAG_READ) { |
1110 | | #ifdef _WIN32 |
1111 | | /* recvfrom() is not a cancellation point for win32, so we shutdown |
1112 | | * the socket and abort pending IO, subsequent recvfrom() calls |
1113 | | * will fail with WSAESHUTDOWN causing the thread to exit. */ |
1114 | | shutdown(s->udp_fd, SD_RECEIVE); |
1115 | | CancelIoEx((HANDLE)(SOCKET)s->udp_fd, NULL); |
1116 | | #else |
1117 | 0 | pthread_cancel(s->circular_buffer_thread); |
1118 | 0 | #endif |
1119 | 0 | } |
1120 | 0 | ret = pthread_join(s->circular_buffer_thread, NULL); |
1121 | 0 | if (ret != 0) |
1122 | 0 | av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret)); |
1123 | 0 | pthread_mutex_destroy(&s->mutex); |
1124 | 0 | pthread_cond_destroy(&s->cond); |
1125 | 0 | } |
1126 | 0 | #endif |
1127 | 0 | closesocket(s->udp_fd); |
1128 | 0 | av_fifo_freep2(&s->rx_fifo); |
1129 | 0 | av_fifo_freep2(&s->tx_fifo); |
1130 | 0 | ff_ip_reset_filters(&s->filters); |
1131 | 0 | return 0; |
1132 | 0 | } |
1133 | | |
1134 | | const URLProtocol ff_udp_protocol = { |
1135 | | .name = "udp", |
1136 | | .url_open = udp_open, |
1137 | | .url_read = udp_read, |
1138 | | .url_write = udp_write, |
1139 | | .url_close = udp_close, |
1140 | | .url_get_file_handle = udp_get_file_handle, |
1141 | | .priv_data_size = sizeof(UDPContext), |
1142 | | .priv_data_class = &udp_class, |
1143 | | .flags = URL_PROTOCOL_FLAG_NETWORK, |
1144 | | }; |
1145 | | |
1146 | | const URLProtocol ff_udplite_protocol = { |
1147 | | .name = "udplite", |
1148 | | .url_open = udplite_open, |
1149 | | .url_read = udp_read, |
1150 | | .url_write = udp_write, |
1151 | | .url_close = udp_close, |
1152 | | .url_get_file_handle = udp_get_file_handle, |
1153 | | .priv_data_size = sizeof(UDPContext), |
1154 | | .priv_data_class = &udplite_context_class, |
1155 | | .flags = URL_PROTOCOL_FLAG_NETWORK, |
1156 | | }; |