Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/resolver.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import socket
3import weakref
4from typing import Any, Optional
6from .abc import AbstractResolver, ResolveResult
8__all__ = ("ThreadedResolver", "AsyncResolver", "DefaultResolver")
11try:
12 import aiodns
14 aiodns_default = hasattr(aiodns.DNSResolver, "getaddrinfo")
15except ImportError:
16 aiodns = None # type: ignore[assignment]
17 aiodns_default = False
20_NUMERIC_SOCKET_FLAGS = socket.AI_NUMERICHOST | socket.AI_NUMERICSERV
21_NAME_SOCKET_FLAGS = socket.NI_NUMERICHOST | socket.NI_NUMERICSERV
22_AI_ADDRCONFIG = socket.AI_ADDRCONFIG
23if hasattr(socket, "AI_MASK"):
24 _AI_ADDRCONFIG &= socket.AI_MASK
27class ThreadedResolver(AbstractResolver):
28 """Threaded resolver.
30 Uses an Executor for synchronous getaddrinfo() calls.
31 concurrent.futures.ThreadPoolExecutor is used by default.
32 """
34 def __init__(self) -> None:
35 self._loop = asyncio.get_running_loop()
37 async def resolve(
38 self, host: str, port: int = 0, family: socket.AddressFamily = socket.AF_INET
39 ) -> list[ResolveResult]:
40 infos = await self._loop.getaddrinfo(
41 host,
42 port,
43 type=socket.SOCK_STREAM,
44 family=family,
45 flags=_AI_ADDRCONFIG,
46 )
48 hosts: list[ResolveResult] = []
49 for family, _, proto, _, address in infos:
50 if family == socket.AF_INET6:
51 if len(address) < 3:
52 # IPv6 is not supported by Python build,
53 # or IPv6 is not enabled in the host
54 continue
55 if address[3]:
56 # This is essential for link-local IPv6 addresses.
57 # LL IPv6 is a VERY rare case. Strictly speaking, we should use
58 # getnameinfo() unconditionally, but performance makes sense.
59 resolved_host, _port = await self._loop.getnameinfo(
60 address, _NAME_SOCKET_FLAGS
61 )
62 port = int(_port)
63 else:
64 resolved_host, port = address[:2]
65 else: # IPv4
66 assert family == socket.AF_INET
67 resolved_host, port = address # type: ignore[misc]
68 hosts.append(
69 ResolveResult(
70 hostname=host,
71 host=resolved_host,
72 port=port,
73 family=family,
74 proto=proto,
75 flags=_NUMERIC_SOCKET_FLAGS,
76 )
77 )
79 return hosts
81 async def close(self) -> None:
82 pass
85class AsyncResolver(AbstractResolver):
86 """Use the `aiodns` package to make asynchronous DNS lookups"""
88 def __init__(self, *args: Any, **kwargs: Any) -> None:
89 if aiodns is None:
90 raise RuntimeError("Resolver requires aiodns library")
92 self._loop = asyncio.get_running_loop()
93 self._manager: _DNSResolverManager | None = None
94 # If custom args are provided, create a dedicated resolver instance
95 # This means each AsyncResolver with custom args gets its own
96 # aiodns.DNSResolver instance
97 if args or kwargs:
98 self._resolver = aiodns.DNSResolver(*args, **kwargs)
99 return
100 # Use the shared resolver from the manager for default arguments
101 self._manager = _DNSResolverManager()
102 self._resolver = self._manager.get_resolver(self, self._loop)
104 async def resolve(
105 self, host: str, port: int = 0, family: socket.AddressFamily = socket.AF_INET
106 ) -> list[ResolveResult]:
107 try:
108 resp = await self._resolver.getaddrinfo(
109 host,
110 port=port,
111 type=socket.SOCK_STREAM,
112 family=family,
113 flags=_AI_ADDRCONFIG,
114 )
115 except aiodns.error.DNSError as exc:
116 msg = exc.args[1] if len(exc.args) >= 1 else "DNS lookup failed"
117 raise OSError(None, msg) from exc
118 hosts: list[ResolveResult] = []
119 for node in resp.nodes:
120 address: tuple[bytes, int] | tuple[bytes, int, int, int] = node.addr
121 if node.family == socket.AF_INET6:
122 if len(address) > 3 and address[3]:
123 # This is essential for link-local IPv6 addresses.
124 # LL IPv6 is a VERY rare case. Strictly speaking, we should use
125 # getnameinfo() unconditionally, but performance makes sense.
126 result = await self._resolver.getnameinfo(
127 (address[0].decode("ascii"), *address[1:]),
128 _NAME_SOCKET_FLAGS,
129 )
130 resolved_host = result.node
131 else:
132 resolved_host = address[0].decode("ascii")
133 port = address[1]
134 else: # IPv4
135 assert node.family == socket.AF_INET
136 resolved_host = address[0].decode("ascii")
137 port = address[1]
138 hosts.append(
139 ResolveResult(
140 hostname=host,
141 host=resolved_host,
142 port=port,
143 family=node.family,
144 proto=0,
145 flags=_NUMERIC_SOCKET_FLAGS,
146 )
147 )
149 if not hosts:
150 raise OSError(None, "DNS lookup failed")
152 return hosts
154 async def close(self) -> None:
155 if self._manager:
156 # Release the resolver from the manager if using the shared resolver
157 self._manager.release_resolver(self, self._loop)
158 self._manager = None # Clear reference to manager
159 self._resolver = None # type: ignore[assignment] # Clear reference to resolver
160 return
161 # Otherwise cancel our dedicated resolver
162 if self._resolver is not None:
163 self._resolver.cancel()
164 self._resolver = None # type: ignore[assignment] # Clear reference
167class _DNSResolverManager:
168 """Manager for aiodns.DNSResolver objects.
170 This class manages shared aiodns.DNSResolver instances
171 with no custom arguments across different event loops.
172 """
174 _instance: Optional["_DNSResolverManager"] = None
176 def __new__(cls) -> "_DNSResolverManager":
177 if cls._instance is None:
178 cls._instance = super().__new__(cls)
179 cls._instance._init()
180 return cls._instance
182 def _init(self) -> None:
183 # Use WeakKeyDictionary to allow event loops to be garbage collected
184 self._loop_data: weakref.WeakKeyDictionary[
185 asyncio.AbstractEventLoop,
186 tuple[aiodns.DNSResolver, weakref.WeakSet[AsyncResolver]],
187 ] = weakref.WeakKeyDictionary()
189 def get_resolver(
190 self, client: "AsyncResolver", loop: asyncio.AbstractEventLoop
191 ) -> "aiodns.DNSResolver":
192 """Get or create the shared aiodns.DNSResolver instance for a specific event loop.
194 Args:
195 client: The AsyncResolver instance requesting the resolver.
196 This is required to track resolver usage.
197 loop: The event loop to use for the resolver.
198 """
199 # Create a new resolver and client set for this loop if it doesn't exist
200 if loop not in self._loop_data:
201 resolver = aiodns.DNSResolver(loop=loop)
202 client_set: weakref.WeakSet[AsyncResolver] = weakref.WeakSet()
203 self._loop_data[loop] = (resolver, client_set)
204 else:
205 # Get the existing resolver and client set
206 resolver, client_set = self._loop_data[loop]
208 # Register this client with the loop
209 client_set.add(client)
210 return resolver
212 def release_resolver(
213 self, client: "AsyncResolver", loop: asyncio.AbstractEventLoop
214 ) -> None:
215 """Release the resolver for an AsyncResolver client when it's closed.
217 Args:
218 client: The AsyncResolver instance to release.
219 loop: The event loop the resolver was using.
220 """
221 # Remove client from its loop's tracking
222 current_loop_data = self._loop_data.get(loop)
223 if current_loop_data is None:
224 return
225 resolver, client_set = current_loop_data
226 client_set.discard(client)
227 # If no more clients for this loop, cancel and remove its resolver
228 if not client_set:
229 if resolver is not None:
230 resolver.cancel()
231 del self._loop_data[loop]
234_DefaultType = type[AsyncResolver | ThreadedResolver]
235DefaultResolver: _DefaultType = AsyncResolver if aiodns_default else ThreadedResolver