Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/resolver.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

108 statements  

1import asyncio 

2import socket 

3import weakref 

4from typing import Any, Optional 

5 

6from .abc import AbstractResolver, ResolveResult 

7 

8__all__ = ("ThreadedResolver", "AsyncResolver", "DefaultResolver") 

9 

10 

11try: 

12 import aiodns 

13 

14 aiodns_default = hasattr(aiodns.DNSResolver, "getaddrinfo") 

15except ImportError: 

16 aiodns = None # type: ignore[assignment] 

17 aiodns_default = False 

18 

19 

20_NUMERIC_SOCKET_FLAGS = socket.AI_NUMERICHOST | socket.AI_NUMERICSERV 

21_NAME_SOCKET_FLAGS = socket.NI_NUMERICHOST | socket.NI_NUMERICSERV 

22_AI_ADDRCONFIG = socket.AI_ADDRCONFIG 

23if hasattr(socket, "AI_MASK"): 

24 _AI_ADDRCONFIG &= socket.AI_MASK 

25 

26 

27class ThreadedResolver(AbstractResolver): 

28 """Threaded resolver. 

29 

30 Uses an Executor for synchronous getaddrinfo() calls. 

31 concurrent.futures.ThreadPoolExecutor is used by default. 

32 """ 

33 

34 def __init__(self) -> None: 

35 self._loop = asyncio.get_running_loop() 

36 

37 async def resolve( 

38 self, host: str, port: int = 0, family: socket.AddressFamily = socket.AF_INET 

39 ) -> list[ResolveResult]: 

40 infos = await self._loop.getaddrinfo( 

41 host, 

42 port, 

43 type=socket.SOCK_STREAM, 

44 family=family, 

45 flags=_AI_ADDRCONFIG, 

46 ) 

47 

48 hosts: list[ResolveResult] = [] 

49 for family, _, proto, _, address in infos: 

50 if family == socket.AF_INET6: 

51 if len(address) < 3: 

52 # IPv6 is not supported by Python build, 

53 # or IPv6 is not enabled in the host 

54 continue 

55 if address[3]: 

56 # This is essential for link-local IPv6 addresses. 

57 # LL IPv6 is a VERY rare case. Strictly speaking, we should use 

58 # getnameinfo() unconditionally, but performance makes sense. 

59 resolved_host, _port = await self._loop.getnameinfo( 

60 address, _NAME_SOCKET_FLAGS 

61 ) 

62 port = int(_port) 

63 else: 

64 resolved_host, port = address[:2] 

65 else: # IPv4 

66 assert family == socket.AF_INET 

67 resolved_host, port = address # type: ignore[misc] 

68 hosts.append( 

69 ResolveResult( 

70 hostname=host, 

71 host=resolved_host, 

72 port=port, 

73 family=family, 

74 proto=proto, 

75 flags=_NUMERIC_SOCKET_FLAGS, 

76 ) 

77 ) 

78 

79 return hosts 

80 

81 async def close(self) -> None: 

82 pass 

83 

84 

85class AsyncResolver(AbstractResolver): 

86 """Use the `aiodns` package to make asynchronous DNS lookups""" 

87 

88 def __init__(self, *args: Any, **kwargs: Any) -> None: 

89 if aiodns is None: 

90 raise RuntimeError("Resolver requires aiodns library") 

91 

92 self._loop = asyncio.get_running_loop() 

93 self._manager: _DNSResolverManager | None = None 

94 # If custom args are provided, create a dedicated resolver instance 

95 # This means each AsyncResolver with custom args gets its own 

96 # aiodns.DNSResolver instance 

97 if args or kwargs: 

98 self._resolver = aiodns.DNSResolver(*args, **kwargs) 

99 return 

100 # Use the shared resolver from the manager for default arguments 

101 self._manager = _DNSResolverManager() 

102 self._resolver = self._manager.get_resolver(self, self._loop) 

103 

104 async def resolve( 

105 self, host: str, port: int = 0, family: socket.AddressFamily = socket.AF_INET 

106 ) -> list[ResolveResult]: 

107 try: 

108 resp = await self._resolver.getaddrinfo( 

109 host, 

110 port=port, 

111 type=socket.SOCK_STREAM, 

112 family=family, 

113 flags=_AI_ADDRCONFIG, 

114 ) 

115 except aiodns.error.DNSError as exc: 

116 msg = exc.args[1] if len(exc.args) >= 1 else "DNS lookup failed" 

117 raise OSError(None, msg) from exc 

118 hosts: list[ResolveResult] = [] 

119 for node in resp.nodes: 

120 address: tuple[bytes, int] | tuple[bytes, int, int, int] = node.addr 

121 if node.family == socket.AF_INET6: 

122 if len(address) > 3 and address[3]: 

123 # This is essential for link-local IPv6 addresses. 

124 # LL IPv6 is a VERY rare case. Strictly speaking, we should use 

125 # getnameinfo() unconditionally, but performance makes sense. 

126 result = await self._resolver.getnameinfo( 

127 (address[0].decode("ascii"), *address[1:]), 

128 _NAME_SOCKET_FLAGS, 

129 ) 

130 resolved_host = result.node 

131 else: 

132 resolved_host = address[0].decode("ascii") 

133 port = address[1] 

134 else: # IPv4 

135 assert node.family == socket.AF_INET 

136 resolved_host = address[0].decode("ascii") 

137 port = address[1] 

138 hosts.append( 

139 ResolveResult( 

140 hostname=host, 

141 host=resolved_host, 

142 port=port, 

143 family=node.family, 

144 proto=0, 

145 flags=_NUMERIC_SOCKET_FLAGS, 

146 ) 

147 ) 

148 

149 if not hosts: 

150 raise OSError(None, "DNS lookup failed") 

151 

152 return hosts 

153 

154 async def close(self) -> None: 

155 if self._manager: 

156 # Release the resolver from the manager if using the shared resolver 

157 self._manager.release_resolver(self, self._loop) 

158 self._manager = None # Clear reference to manager 

159 self._resolver = None # type: ignore[assignment] # Clear reference to resolver 

160 return 

161 # Otherwise cancel our dedicated resolver 

162 if self._resolver is not None: 

163 self._resolver.cancel() 

164 self._resolver = None # type: ignore[assignment] # Clear reference 

165 

166 

167class _DNSResolverManager: 

168 """Manager for aiodns.DNSResolver objects. 

169 

170 This class manages shared aiodns.DNSResolver instances 

171 with no custom arguments across different event loops. 

172 """ 

173 

174 _instance: Optional["_DNSResolverManager"] = None 

175 

176 def __new__(cls) -> "_DNSResolverManager": 

177 if cls._instance is None: 

178 cls._instance = super().__new__(cls) 

179 cls._instance._init() 

180 return cls._instance 

181 

182 def _init(self) -> None: 

183 # Use WeakKeyDictionary to allow event loops to be garbage collected 

184 self._loop_data: weakref.WeakKeyDictionary[ 

185 asyncio.AbstractEventLoop, 

186 tuple[aiodns.DNSResolver, weakref.WeakSet[AsyncResolver]], 

187 ] = weakref.WeakKeyDictionary() 

188 

189 def get_resolver( 

190 self, client: "AsyncResolver", loop: asyncio.AbstractEventLoop 

191 ) -> "aiodns.DNSResolver": 

192 """Get or create the shared aiodns.DNSResolver instance for a specific event loop. 

193 

194 Args: 

195 client: The AsyncResolver instance requesting the resolver. 

196 This is required to track resolver usage. 

197 loop: The event loop to use for the resolver. 

198 """ 

199 # Create a new resolver and client set for this loop if it doesn't exist 

200 if loop not in self._loop_data: 

201 resolver = aiodns.DNSResolver(loop=loop) 

202 client_set: weakref.WeakSet[AsyncResolver] = weakref.WeakSet() 

203 self._loop_data[loop] = (resolver, client_set) 

204 else: 

205 # Get the existing resolver and client set 

206 resolver, client_set = self._loop_data[loop] 

207 

208 # Register this client with the loop 

209 client_set.add(client) 

210 return resolver 

211 

212 def release_resolver( 

213 self, client: "AsyncResolver", loop: asyncio.AbstractEventLoop 

214 ) -> None: 

215 """Release the resolver for an AsyncResolver client when it's closed. 

216 

217 Args: 

218 client: The AsyncResolver instance to release. 

219 loop: The event loop the resolver was using. 

220 """ 

221 # Remove client from its loop's tracking 

222 current_loop_data = self._loop_data.get(loop) 

223 if current_loop_data is None: 

224 return 

225 resolver, client_set = current_loop_data 

226 client_set.discard(client) 

227 # If no more clients for this loop, cancel and remove its resolver 

228 if not client_set: 

229 if resolver is not None: 

230 resolver.cancel() 

231 del self._loop_data[loop] 

232 

233 

234_DefaultType = type[AsyncResolver | ThreadedResolver] 

235DefaultResolver: _DefaultType = AsyncResolver if aiodns_default else ThreadedResolver