Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/resolver.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

108 statements  

1import asyncio 

2import socket 

3import weakref 

4from typing import Any, List, Optional, Tuple, Type, Union 

5 

6from .abc import AbstractResolver, ResolveResult 

7 

8__all__ = ("ThreadedResolver", "AsyncResolver", "DefaultResolver") 

9 

10 

11try: 

12 import aiodns 

13 

14 aiodns_default = hasattr(aiodns.DNSResolver, "getaddrinfo") 

15except ImportError: 

16 aiodns = None # type: ignore[assignment] 

17 aiodns_default = False 

18 

19 

20_NUMERIC_SOCKET_FLAGS = socket.AI_NUMERICHOST | socket.AI_NUMERICSERV 

21_NAME_SOCKET_FLAGS = socket.NI_NUMERICHOST | socket.NI_NUMERICSERV 

22_AI_ADDRCONFIG = socket.AI_ADDRCONFIG 

23if hasattr(socket, "AI_MASK"): 

24 _AI_ADDRCONFIG &= socket.AI_MASK 

25 

26 

27class ThreadedResolver(AbstractResolver): 

28 """Threaded resolver. 

29 

30 Uses an Executor for synchronous getaddrinfo() calls. 

31 concurrent.futures.ThreadPoolExecutor is used by default. 

32 """ 

33 

34 def __init__(self) -> None: 

35 self._loop = asyncio.get_running_loop() 

36 

37 async def resolve( 

38 self, host: str, port: int = 0, family: socket.AddressFamily = socket.AF_INET 

39 ) -> List[ResolveResult]: 

40 infos = await self._loop.getaddrinfo( 

41 host, 

42 port, 

43 type=socket.SOCK_STREAM, 

44 family=family, 

45 flags=_AI_ADDRCONFIG, 

46 ) 

47 

48 hosts: List[ResolveResult] = [] 

49 for family, _, proto, _, address in infos: 

50 if family == socket.AF_INET6: 

51 if len(address) < 3: 

52 # IPv6 is not supported by Python build, 

53 # or IPv6 is not enabled in the host 

54 continue 

55 if address[3]: 

56 # This is essential for link-local IPv6 addresses. 

57 # LL IPv6 is a VERY rare case. Strictly speaking, we should use 

58 # getnameinfo() unconditionally, but performance makes sense. 

59 resolved_host, _port = await self._loop.getnameinfo( 

60 address, _NAME_SOCKET_FLAGS 

61 ) 

62 port = int(_port) 

63 else: 

64 resolved_host, port = address[:2] 

65 else: # IPv4 

66 assert family == socket.AF_INET 

67 resolved_host, port = address # type: ignore[misc] 

68 hosts.append( 

69 ResolveResult( 

70 hostname=host, 

71 host=resolved_host, 

72 port=port, 

73 family=family, 

74 proto=proto, 

75 flags=_NUMERIC_SOCKET_FLAGS, 

76 ) 

77 ) 

78 

79 return hosts 

80 

81 async def close(self) -> None: 

82 pass 

83 

84 

85class AsyncResolver(AbstractResolver): 

86 """Use the `aiodns` package to make asynchronous DNS lookups""" 

87 

88 def __init__(self, *args: Any, **kwargs: Any) -> None: 

89 if aiodns is None: 

90 raise RuntimeError("Resolver requires aiodns library") 

91 

92 self._loop = asyncio.get_running_loop() 

93 self._manager: Optional[_DNSResolverManager] = None 

94 # If custom args are provided, create a dedicated resolver instance 

95 # This means each AsyncResolver with custom args gets its own 

96 # aiodns.DNSResolver instance 

97 if args or kwargs: 

98 self._resolver = aiodns.DNSResolver(*args, **kwargs) 

99 return 

100 # Use the shared resolver from the manager for default arguments 

101 self._manager = _DNSResolverManager() 

102 self._resolver = self._manager.get_resolver(self, self._loop) 

103 

104 async def resolve( 

105 self, host: str, port: int = 0, family: socket.AddressFamily = socket.AF_INET 

106 ) -> List[ResolveResult]: 

107 try: 

108 resp = await self._resolver.getaddrinfo( 

109 host, 

110 port=port, 

111 type=socket.SOCK_STREAM, 

112 family=family, 

113 flags=_AI_ADDRCONFIG, 

114 ) 

115 except aiodns.error.DNSError as exc: 

116 msg = exc.args[1] if len(exc.args) >= 1 else "DNS lookup failed" 

117 raise OSError(None, msg) from exc 

118 hosts: List[ResolveResult] = [] 

119 for node in resp.nodes: 

120 address: Union[Tuple[bytes, int], Tuple[bytes, int, int, int]] = node.addr 

121 family = node.family 

122 if family == socket.AF_INET6: 

123 if len(address) > 3 and address[3]: 

124 # This is essential for link-local IPv6 addresses. 

125 # LL IPv6 is a VERY rare case. Strictly speaking, we should use 

126 # getnameinfo() unconditionally, but performance makes sense. 

127 result = await self._resolver.getnameinfo( 

128 (address[0].decode("ascii"), *address[1:]), 

129 _NAME_SOCKET_FLAGS, 

130 ) 

131 resolved_host = result.node 

132 else: 

133 resolved_host = address[0].decode("ascii") 

134 port = address[1] 

135 else: # IPv4 

136 assert family == socket.AF_INET 

137 resolved_host = address[0].decode("ascii") 

138 port = address[1] 

139 hosts.append( 

140 ResolveResult( 

141 hostname=host, 

142 host=resolved_host, 

143 port=port, 

144 family=family, 

145 proto=0, 

146 flags=_NUMERIC_SOCKET_FLAGS, 

147 ) 

148 ) 

149 

150 if not hosts: 

151 raise OSError(None, "DNS lookup failed") 

152 

153 return hosts 

154 

155 async def close(self) -> None: 

156 if self._manager: 

157 # Release the resolver from the manager if using the shared resolver 

158 self._manager.release_resolver(self, self._loop) 

159 self._manager = None # Clear reference to manager 

160 self._resolver = None # type: ignore[assignment] # Clear reference to resolver 

161 return 

162 # Otherwise cancel our dedicated resolver 

163 if self._resolver is not None: 

164 self._resolver.cancel() 

165 self._resolver = None # type: ignore[assignment] # Clear reference 

166 

167 

168class _DNSResolverManager: 

169 """Manager for aiodns.DNSResolver objects. 

170 

171 This class manages shared aiodns.DNSResolver instances 

172 with no custom arguments across different event loops. 

173 """ 

174 

175 _instance: Optional["_DNSResolverManager"] = None 

176 

177 def __new__(cls) -> "_DNSResolverManager": 

178 if cls._instance is None: 

179 cls._instance = super().__new__(cls) 

180 cls._instance._init() 

181 return cls._instance 

182 

183 def _init(self) -> None: 

184 # Use WeakKeyDictionary to allow event loops to be garbage collected 

185 self._loop_data: weakref.WeakKeyDictionary[ 

186 asyncio.AbstractEventLoop, 

187 tuple["aiodns.DNSResolver", weakref.WeakSet["AsyncResolver"]], 

188 ] = weakref.WeakKeyDictionary() 

189 

190 def get_resolver( 

191 self, client: "AsyncResolver", loop: asyncio.AbstractEventLoop 

192 ) -> "aiodns.DNSResolver": 

193 """Get or create the shared aiodns.DNSResolver instance for a specific event loop. 

194 

195 Args: 

196 client: The AsyncResolver instance requesting the resolver. 

197 This is required to track resolver usage. 

198 loop: The event loop to use for the resolver. 

199 """ 

200 # Create a new resolver and client set for this loop if it doesn't exist 

201 if loop not in self._loop_data: 

202 resolver = aiodns.DNSResolver(loop=loop) 

203 client_set: weakref.WeakSet["AsyncResolver"] = weakref.WeakSet() 

204 self._loop_data[loop] = (resolver, client_set) 

205 else: 

206 # Get the existing resolver and client set 

207 resolver, client_set = self._loop_data[loop] 

208 

209 # Register this client with the loop 

210 client_set.add(client) 

211 return resolver 

212 

213 def release_resolver( 

214 self, client: "AsyncResolver", loop: asyncio.AbstractEventLoop 

215 ) -> None: 

216 """Release the resolver for an AsyncResolver client when it's closed. 

217 

218 Args: 

219 client: The AsyncResolver instance to release. 

220 loop: The event loop the resolver was using. 

221 """ 

222 # Remove client from its loop's tracking 

223 if loop not in self._loop_data: 

224 return 

225 resolver, client_set = self._loop_data[loop] 

226 client_set.discard(client) 

227 # If no more clients for this loop, cancel and remove its resolver 

228 if not client_set: 

229 if resolver is not None: 

230 resolver.cancel() 

231 del self._loop_data[loop] 

232 

233 

234_DefaultType = Type[Union[AsyncResolver, ThreadedResolver]] 

235DefaultResolver: _DefaultType = AsyncResolver if aiodns_default else ThreadedResolver