Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/resolver.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import socket
3import weakref
4from typing import Any, Final, Optional
6from .abc import AbstractResolver, ResolveResult
8__all__ = ("ThreadedResolver", "AsyncResolver", "DefaultResolver")
11try:
12 import aiodns
14 aiodns_default = hasattr(aiodns.DNSResolver, "getaddrinfo")
15except ImportError: # pragma: no cover
16 aiodns = None # type: ignore[assignment]
17 aiodns_default = False
20_NUMERIC_SOCKET_FLAGS = socket.AI_NUMERICHOST | socket.AI_NUMERICSERV
21_NAME_SOCKET_FLAGS = socket.NI_NUMERICHOST | socket.NI_NUMERICSERV
22_AI_ADDRCONFIG = socket.AI_ADDRCONFIG
23if hasattr(socket, "AI_MASK"):
24 _AI_ADDRCONFIG &= socket.AI_MASK
27class ThreadedResolver(AbstractResolver):
28 """Threaded resolver.
30 Uses an Executor for synchronous getaddrinfo() calls.
31 concurrent.futures.ThreadPoolExecutor is used by default.
32 """
34 def __init__(self, loop: asyncio.AbstractEventLoop | None = None) -> None:
35 self._loop = loop or asyncio.get_running_loop()
37 async def resolve(
38 self, host: str, port: int = 0, family: socket.AddressFamily = socket.AF_INET
39 ) -> list[ResolveResult]:
40 infos = await self._loop.getaddrinfo(
41 host,
42 port,
43 type=socket.SOCK_STREAM,
44 family=family,
45 flags=_AI_ADDRCONFIG,
46 )
48 hosts: list[ResolveResult] = []
49 for family, _, proto, _, address in infos:
50 if family == socket.AF_INET6:
51 if len(address) < 3:
52 # IPv6 is not supported by Python build,
53 # or IPv6 is not enabled in the host
54 continue
55 if address[3]:
56 # This is essential for link-local IPv6 addresses.
57 # LL IPv6 is a VERY rare case. Strictly speaking, we should use
58 # getnameinfo() unconditionally, but performance makes sense.
59 resolved_host, _port = await self._loop.getnameinfo(
60 address, _NAME_SOCKET_FLAGS
61 )
62 port = int(_port)
63 else:
64 resolved_host, port = address[:2]
65 else: # IPv4
66 assert family == socket.AF_INET
67 resolved_host, port = address # type: ignore[misc]
68 hosts.append(
69 ResolveResult(
70 hostname=host,
71 host=resolved_host,
72 port=port,
73 family=family,
74 proto=proto,
75 flags=_NUMERIC_SOCKET_FLAGS,
76 )
77 )
79 return hosts
81 async def close(self) -> None:
82 pass
85class AsyncResolver(AbstractResolver):
86 """Use the `aiodns` package to make asynchronous DNS lookups"""
88 def __init__(
89 self,
90 loop: asyncio.AbstractEventLoop | None = None,
91 *args: Any,
92 **kwargs: Any,
93 ) -> None:
94 if aiodns is None:
95 raise RuntimeError("Resolver requires aiodns library")
97 self._loop = loop or asyncio.get_running_loop()
98 self._manager: _DNSResolverManager | None = None
99 # If custom args are provided, create a dedicated resolver instance
100 # This means each AsyncResolver with custom args gets its own
101 # aiodns.DNSResolver instance
102 if args or kwargs:
103 self._resolver = aiodns.DNSResolver(*args, **kwargs)
104 return
105 # Use the shared resolver from the manager for default arguments
106 self._manager = _DNSResolverManager()
107 self._resolver = self._manager.get_resolver(self, self._loop)
109 if not hasattr(self._resolver, "gethostbyname"):
110 # aiodns 1.1 is not available, fallback to DNSResolver.query
111 self.resolve = self._resolve_with_query # type: ignore
113 async def resolve(
114 self, host: str, port: int = 0, family: socket.AddressFamily = socket.AF_INET
115 ) -> list[ResolveResult]:
116 try:
117 resp = await self._resolver.getaddrinfo(
118 host,
119 port=port,
120 type=socket.SOCK_STREAM,
121 family=family,
122 flags=_AI_ADDRCONFIG,
123 )
124 except aiodns.error.DNSError as exc:
125 msg = exc.args[1] if len(exc.args) >= 1 else "DNS lookup failed"
126 raise OSError(None, msg) from exc
127 hosts: list[ResolveResult] = []
128 for node in resp.nodes:
129 address: tuple[bytes, int] | tuple[bytes, int, int, int] = node.addr
130 if node.family == socket.AF_INET6:
131 if len(address) > 3 and address[3]:
132 # This is essential for link-local IPv6 addresses.
133 # LL IPv6 is a VERY rare case. Strictly speaking, we should use
134 # getnameinfo() unconditionally, but performance makes sense.
135 result = await self._resolver.getnameinfo(
136 (address[0].decode("ascii"), *address[1:]),
137 _NAME_SOCKET_FLAGS,
138 )
139 resolved_host = result.node
140 else:
141 resolved_host = address[0].decode("ascii")
142 port = address[1]
143 else: # IPv4
144 assert node.family == socket.AF_INET
145 resolved_host = address[0].decode("ascii")
146 port = address[1]
147 hosts.append(
148 ResolveResult(
149 hostname=host,
150 host=resolved_host,
151 port=port,
152 family=node.family,
153 proto=0,
154 flags=_NUMERIC_SOCKET_FLAGS,
155 )
156 )
158 if not hosts:
159 raise OSError(None, "DNS lookup failed")
161 return hosts
163 async def _resolve_with_query(
164 self, host: str, port: int = 0, family: int = socket.AF_INET
165 ) -> list[dict[str, Any]]:
166 qtype: Final = "AAAA" if family == socket.AF_INET6 else "A"
168 try:
169 resp = await self._resolver.query(host, qtype)
170 except aiodns.error.DNSError as exc:
171 msg = exc.args[1] if len(exc.args) >= 1 else "DNS lookup failed"
172 raise OSError(None, msg) from exc
174 hosts = []
175 for rr in resp:
176 hosts.append(
177 {
178 "hostname": host,
179 "host": rr.host,
180 "port": port,
181 "family": family,
182 "proto": 0,
183 "flags": socket.AI_NUMERICHOST,
184 }
185 )
187 if not hosts:
188 raise OSError(None, "DNS lookup failed")
190 return hosts
192 async def close(self) -> None:
193 if self._manager:
194 # Release the resolver from the manager if using the shared resolver
195 self._manager.release_resolver(self, self._loop)
196 self._manager = None # Clear reference to manager
197 self._resolver = None # type: ignore[assignment] # Clear reference to resolver
198 return
199 # Otherwise cancel our dedicated resolver
200 if self._resolver is not None:
201 self._resolver.cancel()
202 self._resolver = None # type: ignore[assignment] # Clear reference
205class _DNSResolverManager:
206 """Manager for aiodns.DNSResolver objects.
208 This class manages shared aiodns.DNSResolver instances
209 with no custom arguments across different event loops.
210 """
212 _instance: Optional["_DNSResolverManager"] = None
214 def __new__(cls) -> "_DNSResolverManager":
215 if cls._instance is None:
216 cls._instance = super().__new__(cls)
217 cls._instance._init()
218 return cls._instance
220 def _init(self) -> None:
221 # Use WeakKeyDictionary to allow event loops to be garbage collected
222 self._loop_data: weakref.WeakKeyDictionary[
223 asyncio.AbstractEventLoop,
224 tuple[aiodns.DNSResolver, weakref.WeakSet[AsyncResolver]],
225 ] = weakref.WeakKeyDictionary()
227 def get_resolver(
228 self, client: "AsyncResolver", loop: asyncio.AbstractEventLoop
229 ) -> "aiodns.DNSResolver":
230 """Get or create the shared aiodns.DNSResolver instance for a specific event loop.
232 Args:
233 client: The AsyncResolver instance requesting the resolver.
234 This is required to track resolver usage.
235 loop: The event loop to use for the resolver.
236 """
237 # Create a new resolver and client set for this loop if it doesn't exist
238 if loop not in self._loop_data:
239 resolver = aiodns.DNSResolver(loop=loop)
240 client_set: weakref.WeakSet[AsyncResolver] = weakref.WeakSet()
241 self._loop_data[loop] = (resolver, client_set)
242 else:
243 # Get the existing resolver and client set
244 resolver, client_set = self._loop_data[loop]
246 # Register this client with the loop
247 client_set.add(client)
248 return resolver
250 def release_resolver(
251 self, client: "AsyncResolver", loop: asyncio.AbstractEventLoop
252 ) -> None:
253 """Release the resolver for an AsyncResolver client when it's closed.
255 Args:
256 client: The AsyncResolver instance to release.
257 loop: The event loop the resolver was using.
258 """
259 # Remove client from its loop's tracking
260 current_loop_data = self._loop_data.get(loop)
261 if current_loop_data is None:
262 return
263 resolver, client_set = current_loop_data
264 client_set.discard(client)
265 # If no more clients for this loop, cancel and remove its resolver
266 if not client_set:
267 if resolver is not None:
268 resolver.cancel()
269 del self._loop_data[loop]
272_DefaultType = type[AsyncResolver | ThreadedResolver]
273DefaultResolver: _DefaultType = AsyncResolver if aiodns_default else ThreadedResolver