Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/resolver.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

120 statements  

1import asyncio 

2import socket 

3import weakref 

4from typing import Any, Final, Optional 

5 

6from .abc import AbstractResolver, ResolveResult 

7 

8__all__ = ("ThreadedResolver", "AsyncResolver", "DefaultResolver") 

9 

10 

11try: 

12 import aiodns 

13 

14 aiodns_default = hasattr(aiodns.DNSResolver, "getaddrinfo") 

15except ImportError: # pragma: no cover 

16 aiodns = None # type: ignore[assignment] 

17 aiodns_default = False 

18 

19 

20_NUMERIC_SOCKET_FLAGS = socket.AI_NUMERICHOST | socket.AI_NUMERICSERV 

21_NAME_SOCKET_FLAGS = socket.NI_NUMERICHOST | socket.NI_NUMERICSERV 

22_AI_ADDRCONFIG = socket.AI_ADDRCONFIG 

23if hasattr(socket, "AI_MASK"): 

24 _AI_ADDRCONFIG &= socket.AI_MASK 

25 

26 

27class ThreadedResolver(AbstractResolver): 

28 """Threaded resolver. 

29 

30 Uses an Executor for synchronous getaddrinfo() calls. 

31 concurrent.futures.ThreadPoolExecutor is used by default. 

32 """ 

33 

34 def __init__(self, loop: asyncio.AbstractEventLoop | None = None) -> None: 

35 self._loop = loop or asyncio.get_running_loop() 

36 

37 async def resolve( 

38 self, host: str, port: int = 0, family: socket.AddressFamily = socket.AF_INET 

39 ) -> list[ResolveResult]: 

40 infos = await self._loop.getaddrinfo( 

41 host, 

42 port, 

43 type=socket.SOCK_STREAM, 

44 family=family, 

45 flags=_AI_ADDRCONFIG, 

46 ) 

47 

48 hosts: list[ResolveResult] = [] 

49 for family, _, proto, _, address in infos: 

50 if family == socket.AF_INET6: 

51 if len(address) < 3: 

52 # IPv6 is not supported by Python build, 

53 # or IPv6 is not enabled in the host 

54 continue 

55 if address[3]: 

56 # This is essential for link-local IPv6 addresses. 

57 # LL IPv6 is a VERY rare case. Strictly speaking, we should use 

58 # getnameinfo() unconditionally, but performance makes sense. 

59 resolved_host, _port = await self._loop.getnameinfo( 

60 address, _NAME_SOCKET_FLAGS 

61 ) 

62 port = int(_port) 

63 else: 

64 resolved_host, port = address[:2] 

65 else: # IPv4 

66 assert family == socket.AF_INET 

67 resolved_host, port = address # type: ignore[misc] 

68 hosts.append( 

69 ResolveResult( 

70 hostname=host, 

71 host=resolved_host, 

72 port=port, 

73 family=family, 

74 proto=proto, 

75 flags=_NUMERIC_SOCKET_FLAGS, 

76 ) 

77 ) 

78 

79 return hosts 

80 

81 async def close(self) -> None: 

82 pass 

83 

84 

85class AsyncResolver(AbstractResolver): 

86 """Use the `aiodns` package to make asynchronous DNS lookups""" 

87 

88 def __init__( 

89 self, 

90 loop: asyncio.AbstractEventLoop | None = None, 

91 *args: Any, 

92 **kwargs: Any, 

93 ) -> None: 

94 if aiodns is None: 

95 raise RuntimeError("Resolver requires aiodns library") 

96 

97 self._loop = loop or asyncio.get_running_loop() 

98 self._manager: _DNSResolverManager | None = None 

99 # If custom args are provided, create a dedicated resolver instance 

100 # This means each AsyncResolver with custom args gets its own 

101 # aiodns.DNSResolver instance 

102 if args or kwargs: 

103 self._resolver = aiodns.DNSResolver(*args, **kwargs) 

104 return 

105 # Use the shared resolver from the manager for default arguments 

106 self._manager = _DNSResolverManager() 

107 self._resolver = self._manager.get_resolver(self, self._loop) 

108 

109 if not hasattr(self._resolver, "gethostbyname"): 

110 # aiodns 1.1 is not available, fallback to DNSResolver.query 

111 self.resolve = self._resolve_with_query # type: ignore 

112 

113 async def resolve( 

114 self, host: str, port: int = 0, family: socket.AddressFamily = socket.AF_INET 

115 ) -> list[ResolveResult]: 

116 try: 

117 resp = await self._resolver.getaddrinfo( 

118 host, 

119 port=port, 

120 type=socket.SOCK_STREAM, 

121 family=family, 

122 flags=_AI_ADDRCONFIG, 

123 ) 

124 except aiodns.error.DNSError as exc: 

125 msg = exc.args[1] if len(exc.args) >= 1 else "DNS lookup failed" 

126 raise OSError(None, msg) from exc 

127 hosts: list[ResolveResult] = [] 

128 for node in resp.nodes: 

129 address: tuple[bytes, int] | tuple[bytes, int, int, int] = node.addr 

130 if node.family == socket.AF_INET6: 

131 if len(address) > 3 and address[3]: 

132 # This is essential for link-local IPv6 addresses. 

133 # LL IPv6 is a VERY rare case. Strictly speaking, we should use 

134 # getnameinfo() unconditionally, but performance makes sense. 

135 result = await self._resolver.getnameinfo( 

136 (address[0].decode("ascii"), *address[1:]), 

137 _NAME_SOCKET_FLAGS, 

138 ) 

139 resolved_host = result.node 

140 else: 

141 resolved_host = address[0].decode("ascii") 

142 port = address[1] 

143 else: # IPv4 

144 assert node.family == socket.AF_INET 

145 resolved_host = address[0].decode("ascii") 

146 port = address[1] 

147 hosts.append( 

148 ResolveResult( 

149 hostname=host, 

150 host=resolved_host, 

151 port=port, 

152 family=node.family, 

153 proto=0, 

154 flags=_NUMERIC_SOCKET_FLAGS, 

155 ) 

156 ) 

157 

158 if not hosts: 

159 raise OSError(None, "DNS lookup failed") 

160 

161 return hosts 

162 

163 async def _resolve_with_query( 

164 self, host: str, port: int = 0, family: int = socket.AF_INET 

165 ) -> list[dict[str, Any]]: 

166 qtype: Final = "AAAA" if family == socket.AF_INET6 else "A" 

167 

168 try: 

169 resp = await self._resolver.query(host, qtype) 

170 except aiodns.error.DNSError as exc: 

171 msg = exc.args[1] if len(exc.args) >= 1 else "DNS lookup failed" 

172 raise OSError(None, msg) from exc 

173 

174 hosts = [] 

175 for rr in resp: 

176 hosts.append( 

177 { 

178 "hostname": host, 

179 "host": rr.host, 

180 "port": port, 

181 "family": family, 

182 "proto": 0, 

183 "flags": socket.AI_NUMERICHOST, 

184 } 

185 ) 

186 

187 if not hosts: 

188 raise OSError(None, "DNS lookup failed") 

189 

190 return hosts 

191 

192 async def close(self) -> None: 

193 if self._manager: 

194 # Release the resolver from the manager if using the shared resolver 

195 self._manager.release_resolver(self, self._loop) 

196 self._manager = None # Clear reference to manager 

197 self._resolver = None # type: ignore[assignment] # Clear reference to resolver 

198 return 

199 # Otherwise cancel our dedicated resolver 

200 if self._resolver is not None: 

201 self._resolver.cancel() 

202 self._resolver = None # type: ignore[assignment] # Clear reference 

203 

204 

205class _DNSResolverManager: 

206 """Manager for aiodns.DNSResolver objects. 

207 

208 This class manages shared aiodns.DNSResolver instances 

209 with no custom arguments across different event loops. 

210 """ 

211 

212 _instance: Optional["_DNSResolverManager"] = None 

213 

214 def __new__(cls) -> "_DNSResolverManager": 

215 if cls._instance is None: 

216 cls._instance = super().__new__(cls) 

217 cls._instance._init() 

218 return cls._instance 

219 

220 def _init(self) -> None: 

221 # Use WeakKeyDictionary to allow event loops to be garbage collected 

222 self._loop_data: weakref.WeakKeyDictionary[ 

223 asyncio.AbstractEventLoop, 

224 tuple[aiodns.DNSResolver, weakref.WeakSet[AsyncResolver]], 

225 ] = weakref.WeakKeyDictionary() 

226 

227 def get_resolver( 

228 self, client: "AsyncResolver", loop: asyncio.AbstractEventLoop 

229 ) -> "aiodns.DNSResolver": 

230 """Get or create the shared aiodns.DNSResolver instance for a specific event loop. 

231 

232 Args: 

233 client: The AsyncResolver instance requesting the resolver. 

234 This is required to track resolver usage. 

235 loop: The event loop to use for the resolver. 

236 """ 

237 # Create a new resolver and client set for this loop if it doesn't exist 

238 if loop not in self._loop_data: 

239 resolver = aiodns.DNSResolver(loop=loop) 

240 client_set: weakref.WeakSet[AsyncResolver] = weakref.WeakSet() 

241 self._loop_data[loop] = (resolver, client_set) 

242 else: 

243 # Get the existing resolver and client set 

244 resolver, client_set = self._loop_data[loop] 

245 

246 # Register this client with the loop 

247 client_set.add(client) 

248 return resolver 

249 

250 def release_resolver( 

251 self, client: "AsyncResolver", loop: asyncio.AbstractEventLoop 

252 ) -> None: 

253 """Release the resolver for an AsyncResolver client when it's closed. 

254 

255 Args: 

256 client: The AsyncResolver instance to release. 

257 loop: The event loop the resolver was using. 

258 """ 

259 # Remove client from its loop's tracking 

260 current_loop_data = self._loop_data.get(loop) 

261 if current_loop_data is None: 

262 return 

263 resolver, client_set = current_loop_data 

264 client_set.discard(client) 

265 # If no more clients for this loop, cancel and remove its resolver 

266 if not client_set: 

267 if resolver is not None: 

268 resolver.cancel() 

269 del self._loop_data[loop] 

270 

271 

272_DefaultType = type[AsyncResolver | ThreadedResolver] 

273DefaultResolver: _DefaultType = AsyncResolver if aiodns_default else ThreadedResolver