Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/tcpclient.py: 24%
139 statements
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-10 06:20 +0000
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-10 06:20 +0000
1#
2# Copyright 2014 Facebook
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""A non-blocking TCP connection factory.
17"""
19import functools
20import socket
21import numbers
22import datetime
23import ssl
25from tornado.concurrent import Future, future_add_done_callback
26from tornado.ioloop import IOLoop
27from tornado.iostream import IOStream
28from tornado import gen
29from tornado.netutil import Resolver
30from tornado.gen import TimeoutError
32from typing import Any, Union, Dict, Tuple, List, Callable, Iterator, Optional, Set
34_INITIAL_CONNECT_TIMEOUT = 0.3
37class _Connector(object):
38 """A stateless implementation of the "Happy Eyeballs" algorithm.
40 "Happy Eyeballs" is documented in RFC6555 as the recommended practice
41 for when both IPv4 and IPv6 addresses are available.
43 In this implementation, we partition the addresses by family, and
44 make the first connection attempt to whichever address was
45 returned first by ``getaddrinfo``. If that connection fails or
46 times out, we begin a connection in parallel to the first address
47 of the other family. If there are additional failures we retry
48 with other addresses, keeping one connection attempt per family
49 in flight at a time.
51 http://tools.ietf.org/html/rfc6555
53 """
55 def __init__(
56 self,
57 addrinfo: List[Tuple],
58 connect: Callable[
59 [socket.AddressFamily, Tuple], Tuple[IOStream, "Future[IOStream]"]
60 ],
61 ) -> None:
62 self.io_loop = IOLoop.current()
63 self.connect = connect
65 self.future = (
66 Future()
67 ) # type: Future[Tuple[socket.AddressFamily, Any, IOStream]]
68 self.timeout = None # type: Optional[object]
69 self.connect_timeout = None # type: Optional[object]
70 self.last_error = None # type: Optional[Exception]
71 self.remaining = len(addrinfo)
72 self.primary_addrs, self.secondary_addrs = self.split(addrinfo)
73 self.streams = set() # type: Set[IOStream]
75 @staticmethod
76 def split(
77 addrinfo: List[Tuple],
78 ) -> Tuple[
79 List[Tuple[socket.AddressFamily, Tuple]],
80 List[Tuple[socket.AddressFamily, Tuple]],
81 ]:
82 """Partition the ``addrinfo`` list by address family.
84 Returns two lists. The first list contains the first entry from
85 ``addrinfo`` and all others with the same family, and the
86 second list contains all other addresses (normally one list will
87 be AF_INET and the other AF_INET6, although non-standard resolvers
88 may return additional families).
89 """
90 primary = []
91 secondary = []
92 primary_af = addrinfo[0][0]
93 for af, addr in addrinfo:
94 if af == primary_af:
95 primary.append((af, addr))
96 else:
97 secondary.append((af, addr))
98 return primary, secondary
100 def start(
101 self,
102 timeout: float = _INITIAL_CONNECT_TIMEOUT,
103 connect_timeout: Optional[Union[float, datetime.timedelta]] = None,
104 ) -> "Future[Tuple[socket.AddressFamily, Any, IOStream]]":
105 self.try_connect(iter(self.primary_addrs))
106 self.set_timeout(timeout)
107 if connect_timeout is not None:
108 self.set_connect_timeout(connect_timeout)
109 return self.future
111 def try_connect(self, addrs: Iterator[Tuple[socket.AddressFamily, Tuple]]) -> None:
112 try:
113 af, addr = next(addrs)
114 except StopIteration:
115 # We've reached the end of our queue, but the other queue
116 # might still be working. Send a final error on the future
117 # only when both queues are finished.
118 if self.remaining == 0 and not self.future.done():
119 self.future.set_exception(
120 self.last_error or IOError("connection failed")
121 )
122 return
123 stream, future = self.connect(af, addr)
124 self.streams.add(stream)
125 future_add_done_callback(
126 future, functools.partial(self.on_connect_done, addrs, af, addr)
127 )
129 def on_connect_done(
130 self,
131 addrs: Iterator[Tuple[socket.AddressFamily, Tuple]],
132 af: socket.AddressFamily,
133 addr: Tuple,
134 future: "Future[IOStream]",
135 ) -> None:
136 self.remaining -= 1
137 try:
138 stream = future.result()
139 except Exception as e:
140 if self.future.done():
141 return
142 # Error: try again (but remember what happened so we have an
143 # error to raise in the end)
144 self.last_error = e
145 self.try_connect(addrs)
146 if self.timeout is not None:
147 # If the first attempt failed, don't wait for the
148 # timeout to try an address from the secondary queue.
149 self.io_loop.remove_timeout(self.timeout)
150 self.on_timeout()
151 return
152 self.clear_timeouts()
153 if self.future.done():
154 # This is a late arrival; just drop it.
155 stream.close()
156 else:
157 self.streams.discard(stream)
158 self.future.set_result((af, addr, stream))
159 self.close_streams()
161 def set_timeout(self, timeout: float) -> None:
162 self.timeout = self.io_loop.add_timeout(
163 self.io_loop.time() + timeout, self.on_timeout
164 )
166 def on_timeout(self) -> None:
167 self.timeout = None
168 if not self.future.done():
169 self.try_connect(iter(self.secondary_addrs))
171 def clear_timeout(self) -> None:
172 if self.timeout is not None:
173 self.io_loop.remove_timeout(self.timeout)
175 def set_connect_timeout(
176 self, connect_timeout: Union[float, datetime.timedelta]
177 ) -> None:
178 self.connect_timeout = self.io_loop.add_timeout(
179 connect_timeout, self.on_connect_timeout
180 )
182 def on_connect_timeout(self) -> None:
183 if not self.future.done():
184 self.future.set_exception(TimeoutError())
185 self.close_streams()
187 def clear_timeouts(self) -> None:
188 if self.timeout is not None:
189 self.io_loop.remove_timeout(self.timeout)
190 if self.connect_timeout is not None:
191 self.io_loop.remove_timeout(self.connect_timeout)
193 def close_streams(self) -> None:
194 for stream in self.streams:
195 stream.close()
198class TCPClient(object):
199 """A non-blocking TCP connection factory.
201 .. versionchanged:: 5.0
202 The ``io_loop`` argument (deprecated since version 4.1) has been removed.
203 """
205 def __init__(self, resolver: Optional[Resolver] = None) -> None:
206 if resolver is not None:
207 self.resolver = resolver
208 self._own_resolver = False
209 else:
210 self.resolver = Resolver()
211 self._own_resolver = True
213 def close(self) -> None:
214 if self._own_resolver:
215 self.resolver.close()
217 async def connect(
218 self,
219 host: str,
220 port: int,
221 af: socket.AddressFamily = socket.AF_UNSPEC,
222 ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
223 max_buffer_size: Optional[int] = None,
224 source_ip: Optional[str] = None,
225 source_port: Optional[int] = None,
226 timeout: Optional[Union[float, datetime.timedelta]] = None,
227 ) -> IOStream:
228 """Connect to the given host and port.
230 Asynchronously returns an `.IOStream` (or `.SSLIOStream` if
231 ``ssl_options`` is not None).
233 Using the ``source_ip`` kwarg, one can specify the source
234 IP address to use when establishing the connection.
235 In case the user needs to resolve and
236 use a specific interface, it has to be handled outside
237 of Tornado as this depends very much on the platform.
239 Raises `TimeoutError` if the input future does not complete before
240 ``timeout``, which may be specified in any form allowed by
241 `.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or an absolute time
242 relative to `.IOLoop.time`)
244 Similarly, when the user requires a certain source port, it can
245 be specified using the ``source_port`` arg.
247 .. versionchanged:: 4.5
248 Added the ``source_ip`` and ``source_port`` arguments.
250 .. versionchanged:: 5.0
251 Added the ``timeout`` argument.
252 """
253 if timeout is not None:
254 if isinstance(timeout, numbers.Real):
255 timeout = IOLoop.current().time() + timeout
256 elif isinstance(timeout, datetime.timedelta):
257 timeout = IOLoop.current().time() + timeout.total_seconds()
258 else:
259 raise TypeError("Unsupported timeout %r" % timeout)
260 if timeout is not None:
261 addrinfo = await gen.with_timeout(
262 timeout, self.resolver.resolve(host, port, af)
263 )
264 else:
265 addrinfo = await self.resolver.resolve(host, port, af)
266 connector = _Connector(
267 addrinfo,
268 functools.partial(
269 self._create_stream,
270 max_buffer_size,
271 source_ip=source_ip,
272 source_port=source_port,
273 ),
274 )
275 af, addr, stream = await connector.start(connect_timeout=timeout)
276 # TODO: For better performance we could cache the (af, addr)
277 # information here and re-use it on subsequent connections to
278 # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)
279 if ssl_options is not None:
280 if timeout is not None:
281 stream = await gen.with_timeout(
282 timeout,
283 stream.start_tls(
284 False, ssl_options=ssl_options, server_hostname=host
285 ),
286 )
287 else:
288 stream = await stream.start_tls(
289 False, ssl_options=ssl_options, server_hostname=host
290 )
291 return stream
293 def _create_stream(
294 self,
295 max_buffer_size: int,
296 af: socket.AddressFamily,
297 addr: Tuple,
298 source_ip: Optional[str] = None,
299 source_port: Optional[int] = None,
300 ) -> Tuple[IOStream, "Future[IOStream]"]:
301 # Always connect in plaintext; we'll convert to ssl if necessary
302 # after one connection has completed.
303 source_port_bind = source_port if isinstance(source_port, int) else 0
304 source_ip_bind = source_ip
305 if source_port_bind and not source_ip:
306 # User required a specific port, but did not specify
307 # a certain source IP, will bind to the default loopback.
308 source_ip_bind = "::1" if af == socket.AF_INET6 else "127.0.0.1"
309 # Trying to use the same address family as the requested af socket:
310 # - 127.0.0.1 for IPv4
311 # - ::1 for IPv6
312 socket_obj = socket.socket(af)
313 if source_port_bind or source_ip_bind:
314 # If the user requires binding also to a specific IP/port.
315 try:
316 socket_obj.bind((source_ip_bind, source_port_bind))
317 except socket.error:
318 socket_obj.close()
319 # Fail loudly if unable to use the IP/port.
320 raise
321 try:
322 stream = IOStream(socket_obj, max_buffer_size=max_buffer_size)
323 except socket.error as e:
324 fu = Future() # type: Future[IOStream]
325 fu.set_exception(e)
326 return stream, fu
327 else:
328 return stream, stream.connect(addr)