Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/tcpclient.py: 24%

139 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-04-10 06:20 +0000

1# 

2# Copyright 2014 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""A non-blocking TCP connection factory. 

17""" 

18 

19import functools 

20import socket 

21import numbers 

22import datetime 

23import ssl 

24 

25from tornado.concurrent import Future, future_add_done_callback 

26from tornado.ioloop import IOLoop 

27from tornado.iostream import IOStream 

28from tornado import gen 

29from tornado.netutil import Resolver 

30from tornado.gen import TimeoutError 

31 

32from typing import Any, Union, Dict, Tuple, List, Callable, Iterator, Optional, Set 

33 

34_INITIAL_CONNECT_TIMEOUT = 0.3 

35 

36 

37class _Connector(object): 

38 """A stateless implementation of the "Happy Eyeballs" algorithm. 

39 

40 "Happy Eyeballs" is documented in RFC6555 as the recommended practice 

41 for when both IPv4 and IPv6 addresses are available. 

42 

43 In this implementation, we partition the addresses by family, and 

44 make the first connection attempt to whichever address was 

45 returned first by ``getaddrinfo``. If that connection fails or 

46 times out, we begin a connection in parallel to the first address 

47 of the other family. If there are additional failures we retry 

48 with other addresses, keeping one connection attempt per family 

49 in flight at a time. 

50 

51 http://tools.ietf.org/html/rfc6555 

52 

53 """ 

54 

55 def __init__( 

56 self, 

57 addrinfo: List[Tuple], 

58 connect: Callable[ 

59 [socket.AddressFamily, Tuple], Tuple[IOStream, "Future[IOStream]"] 

60 ], 

61 ) -> None: 

62 self.io_loop = IOLoop.current() 

63 self.connect = connect 

64 

65 self.future = ( 

66 Future() 

67 ) # type: Future[Tuple[socket.AddressFamily, Any, IOStream]] 

68 self.timeout = None # type: Optional[object] 

69 self.connect_timeout = None # type: Optional[object] 

70 self.last_error = None # type: Optional[Exception] 

71 self.remaining = len(addrinfo) 

72 self.primary_addrs, self.secondary_addrs = self.split(addrinfo) 

73 self.streams = set() # type: Set[IOStream] 

74 

75 @staticmethod 

76 def split( 

77 addrinfo: List[Tuple], 

78 ) -> Tuple[ 

79 List[Tuple[socket.AddressFamily, Tuple]], 

80 List[Tuple[socket.AddressFamily, Tuple]], 

81 ]: 

82 """Partition the ``addrinfo`` list by address family. 

83 

84 Returns two lists. The first list contains the first entry from 

85 ``addrinfo`` and all others with the same family, and the 

86 second list contains all other addresses (normally one list will 

87 be AF_INET and the other AF_INET6, although non-standard resolvers 

88 may return additional families). 

89 """ 

90 primary = [] 

91 secondary = [] 

92 primary_af = addrinfo[0][0] 

93 for af, addr in addrinfo: 

94 if af == primary_af: 

95 primary.append((af, addr)) 

96 else: 

97 secondary.append((af, addr)) 

98 return primary, secondary 

99 

100 def start( 

101 self, 

102 timeout: float = _INITIAL_CONNECT_TIMEOUT, 

103 connect_timeout: Optional[Union[float, datetime.timedelta]] = None, 

104 ) -> "Future[Tuple[socket.AddressFamily, Any, IOStream]]": 

105 self.try_connect(iter(self.primary_addrs)) 

106 self.set_timeout(timeout) 

107 if connect_timeout is not None: 

108 self.set_connect_timeout(connect_timeout) 

109 return self.future 

110 

111 def try_connect(self, addrs: Iterator[Tuple[socket.AddressFamily, Tuple]]) -> None: 

112 try: 

113 af, addr = next(addrs) 

114 except StopIteration: 

115 # We've reached the end of our queue, but the other queue 

116 # might still be working. Send a final error on the future 

117 # only when both queues are finished. 

118 if self.remaining == 0 and not self.future.done(): 

119 self.future.set_exception( 

120 self.last_error or IOError("connection failed") 

121 ) 

122 return 

123 stream, future = self.connect(af, addr) 

124 self.streams.add(stream) 

125 future_add_done_callback( 

126 future, functools.partial(self.on_connect_done, addrs, af, addr) 

127 ) 

128 

129 def on_connect_done( 

130 self, 

131 addrs: Iterator[Tuple[socket.AddressFamily, Tuple]], 

132 af: socket.AddressFamily, 

133 addr: Tuple, 

134 future: "Future[IOStream]", 

135 ) -> None: 

136 self.remaining -= 1 

137 try: 

138 stream = future.result() 

139 except Exception as e: 

140 if self.future.done(): 

141 return 

142 # Error: try again (but remember what happened so we have an 

143 # error to raise in the end) 

144 self.last_error = e 

145 self.try_connect(addrs) 

146 if self.timeout is not None: 

147 # If the first attempt failed, don't wait for the 

148 # timeout to try an address from the secondary queue. 

149 self.io_loop.remove_timeout(self.timeout) 

150 self.on_timeout() 

151 return 

152 self.clear_timeouts() 

153 if self.future.done(): 

154 # This is a late arrival; just drop it. 

155 stream.close() 

156 else: 

157 self.streams.discard(stream) 

158 self.future.set_result((af, addr, stream)) 

159 self.close_streams() 

160 

161 def set_timeout(self, timeout: float) -> None: 

162 self.timeout = self.io_loop.add_timeout( 

163 self.io_loop.time() + timeout, self.on_timeout 

164 ) 

165 

166 def on_timeout(self) -> None: 

167 self.timeout = None 

168 if not self.future.done(): 

169 self.try_connect(iter(self.secondary_addrs)) 

170 

171 def clear_timeout(self) -> None: 

172 if self.timeout is not None: 

173 self.io_loop.remove_timeout(self.timeout) 

174 

175 def set_connect_timeout( 

176 self, connect_timeout: Union[float, datetime.timedelta] 

177 ) -> None: 

178 self.connect_timeout = self.io_loop.add_timeout( 

179 connect_timeout, self.on_connect_timeout 

180 ) 

181 

182 def on_connect_timeout(self) -> None: 

183 if not self.future.done(): 

184 self.future.set_exception(TimeoutError()) 

185 self.close_streams() 

186 

187 def clear_timeouts(self) -> None: 

188 if self.timeout is not None: 

189 self.io_loop.remove_timeout(self.timeout) 

190 if self.connect_timeout is not None: 

191 self.io_loop.remove_timeout(self.connect_timeout) 

192 

193 def close_streams(self) -> None: 

194 for stream in self.streams: 

195 stream.close() 

196 

197 

198class TCPClient(object): 

199 """A non-blocking TCP connection factory. 

200 

201 .. versionchanged:: 5.0 

202 The ``io_loop`` argument (deprecated since version 4.1) has been removed. 

203 """ 

204 

205 def __init__(self, resolver: Optional[Resolver] = None) -> None: 

206 if resolver is not None: 

207 self.resolver = resolver 

208 self._own_resolver = False 

209 else: 

210 self.resolver = Resolver() 

211 self._own_resolver = True 

212 

213 def close(self) -> None: 

214 if self._own_resolver: 

215 self.resolver.close() 

216 

217 async def connect( 

218 self, 

219 host: str, 

220 port: int, 

221 af: socket.AddressFamily = socket.AF_UNSPEC, 

222 ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None, 

223 max_buffer_size: Optional[int] = None, 

224 source_ip: Optional[str] = None, 

225 source_port: Optional[int] = None, 

226 timeout: Optional[Union[float, datetime.timedelta]] = None, 

227 ) -> IOStream: 

228 """Connect to the given host and port. 

229 

230 Asynchronously returns an `.IOStream` (or `.SSLIOStream` if 

231 ``ssl_options`` is not None). 

232 

233 Using the ``source_ip`` kwarg, one can specify the source 

234 IP address to use when establishing the connection. 

235 In case the user needs to resolve and 

236 use a specific interface, it has to be handled outside 

237 of Tornado as this depends very much on the platform. 

238 

239 Raises `TimeoutError` if the input future does not complete before 

240 ``timeout``, which may be specified in any form allowed by 

241 `.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or an absolute time 

242 relative to `.IOLoop.time`) 

243 

244 Similarly, when the user requires a certain source port, it can 

245 be specified using the ``source_port`` arg. 

246 

247 .. versionchanged:: 4.5 

248 Added the ``source_ip`` and ``source_port`` arguments. 

249 

250 .. versionchanged:: 5.0 

251 Added the ``timeout`` argument. 

252 """ 

253 if timeout is not None: 

254 if isinstance(timeout, numbers.Real): 

255 timeout = IOLoop.current().time() + timeout 

256 elif isinstance(timeout, datetime.timedelta): 

257 timeout = IOLoop.current().time() + timeout.total_seconds() 

258 else: 

259 raise TypeError("Unsupported timeout %r" % timeout) 

260 if timeout is not None: 

261 addrinfo = await gen.with_timeout( 

262 timeout, self.resolver.resolve(host, port, af) 

263 ) 

264 else: 

265 addrinfo = await self.resolver.resolve(host, port, af) 

266 connector = _Connector( 

267 addrinfo, 

268 functools.partial( 

269 self._create_stream, 

270 max_buffer_size, 

271 source_ip=source_ip, 

272 source_port=source_port, 

273 ), 

274 ) 

275 af, addr, stream = await connector.start(connect_timeout=timeout) 

276 # TODO: For better performance we could cache the (af, addr) 

277 # information here and re-use it on subsequent connections to 

278 # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2) 

279 if ssl_options is not None: 

280 if timeout is not None: 

281 stream = await gen.with_timeout( 

282 timeout, 

283 stream.start_tls( 

284 False, ssl_options=ssl_options, server_hostname=host 

285 ), 

286 ) 

287 else: 

288 stream = await stream.start_tls( 

289 False, ssl_options=ssl_options, server_hostname=host 

290 ) 

291 return stream 

292 

293 def _create_stream( 

294 self, 

295 max_buffer_size: int, 

296 af: socket.AddressFamily, 

297 addr: Tuple, 

298 source_ip: Optional[str] = None, 

299 source_port: Optional[int] = None, 

300 ) -> Tuple[IOStream, "Future[IOStream]"]: 

301 # Always connect in plaintext; we'll convert to ssl if necessary 

302 # after one connection has completed. 

303 source_port_bind = source_port if isinstance(source_port, int) else 0 

304 source_ip_bind = source_ip 

305 if source_port_bind and not source_ip: 

306 # User required a specific port, but did not specify 

307 # a certain source IP, will bind to the default loopback. 

308 source_ip_bind = "::1" if af == socket.AF_INET6 else "127.0.0.1" 

309 # Trying to use the same address family as the requested af socket: 

310 # - 127.0.0.1 for IPv4 

311 # - ::1 for IPv6 

312 socket_obj = socket.socket(af) 

313 if source_port_bind or source_ip_bind: 

314 # If the user requires binding also to a specific IP/port. 

315 try: 

316 socket_obj.bind((source_ip_bind, source_port_bind)) 

317 except socket.error: 

318 socket_obj.close() 

319 # Fail loudly if unable to use the IP/port. 

320 raise 

321 try: 

322 stream = IOStream(socket_obj, max_buffer_size=max_buffer_size) 

323 except socket.error as e: 

324 fu = Future() # type: Future[IOStream] 

325 fu.set_exception(e) 

326 return stream, fu 

327 else: 

328 return stream, stream.connect(addr)