Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/resolver.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import socket
3import weakref
4from typing import Any, List, Optional, Tuple, Type, Union
6from .abc import AbstractResolver, ResolveResult
8__all__ = ("ThreadedResolver", "AsyncResolver", "DefaultResolver")
11try:
12 import aiodns
14 aiodns_default = hasattr(aiodns.DNSResolver, "getaddrinfo")
15except ImportError:
16 aiodns = None # type: ignore[assignment]
17 aiodns_default = False
20_NUMERIC_SOCKET_FLAGS = socket.AI_NUMERICHOST | socket.AI_NUMERICSERV
21_NAME_SOCKET_FLAGS = socket.NI_NUMERICHOST | socket.NI_NUMERICSERV
22_AI_ADDRCONFIG = socket.AI_ADDRCONFIG
23if hasattr(socket, "AI_MASK"):
24 _AI_ADDRCONFIG &= socket.AI_MASK
27class ThreadedResolver(AbstractResolver):
28 """Threaded resolver.
30 Uses an Executor for synchronous getaddrinfo() calls.
31 concurrent.futures.ThreadPoolExecutor is used by default.
32 """
34 def __init__(self) -> None:
35 self._loop = asyncio.get_running_loop()
37 async def resolve(
38 self, host: str, port: int = 0, family: socket.AddressFamily = socket.AF_INET
39 ) -> List[ResolveResult]:
40 infos = await self._loop.getaddrinfo(
41 host,
42 port,
43 type=socket.SOCK_STREAM,
44 family=family,
45 flags=_AI_ADDRCONFIG,
46 )
48 hosts: List[ResolveResult] = []
49 for family, _, proto, _, address in infos:
50 if family == socket.AF_INET6:
51 if len(address) < 3:
52 # IPv6 is not supported by Python build,
53 # or IPv6 is not enabled in the host
54 continue
55 if address[3]:
56 # This is essential for link-local IPv6 addresses.
57 # LL IPv6 is a VERY rare case. Strictly speaking, we should use
58 # getnameinfo() unconditionally, but performance makes sense.
59 resolved_host, _port = await self._loop.getnameinfo(
60 address, _NAME_SOCKET_FLAGS
61 )
62 port = int(_port)
63 else:
64 resolved_host, port = address[:2]
65 else: # IPv4
66 assert family == socket.AF_INET
67 resolved_host, port = address # type: ignore[misc]
68 hosts.append(
69 ResolveResult(
70 hostname=host,
71 host=resolved_host,
72 port=port,
73 family=family,
74 proto=proto,
75 flags=_NUMERIC_SOCKET_FLAGS,
76 )
77 )
79 return hosts
81 async def close(self) -> None:
82 pass
85class AsyncResolver(AbstractResolver):
86 """Use the `aiodns` package to make asynchronous DNS lookups"""
88 def __init__(self, *args: Any, **kwargs: Any) -> None:
89 if aiodns is None:
90 raise RuntimeError("Resolver requires aiodns library")
92 self._loop = asyncio.get_running_loop()
93 self._manager: Optional[_DNSResolverManager] = None
94 # If custom args are provided, create a dedicated resolver instance
95 # This means each AsyncResolver with custom args gets its own
96 # aiodns.DNSResolver instance
97 if args or kwargs:
98 self._resolver = aiodns.DNSResolver(*args, **kwargs)
99 return
100 # Use the shared resolver from the manager for default arguments
101 self._manager = _DNSResolverManager()
102 self._resolver = self._manager.get_resolver(self, self._loop)
104 async def resolve(
105 self, host: str, port: int = 0, family: socket.AddressFamily = socket.AF_INET
106 ) -> List[ResolveResult]:
107 try:
108 resp = await self._resolver.getaddrinfo(
109 host,
110 port=port,
111 type=socket.SOCK_STREAM,
112 family=family,
113 flags=_AI_ADDRCONFIG,
114 )
115 except aiodns.error.DNSError as exc:
116 msg = exc.args[1] if len(exc.args) >= 1 else "DNS lookup failed"
117 raise OSError(None, msg) from exc
118 hosts: List[ResolveResult] = []
119 for node in resp.nodes:
120 address: Union[Tuple[bytes, int], Tuple[bytes, int, int, int]] = node.addr
121 family = node.family
122 if family == socket.AF_INET6:
123 if len(address) > 3 and address[3]:
124 # This is essential for link-local IPv6 addresses.
125 # LL IPv6 is a VERY rare case. Strictly speaking, we should use
126 # getnameinfo() unconditionally, but performance makes sense.
127 result = await self._resolver.getnameinfo(
128 (address[0].decode("ascii"), *address[1:]),
129 _NAME_SOCKET_FLAGS,
130 )
131 resolved_host = result.node
132 else:
133 resolved_host = address[0].decode("ascii")
134 port = address[1]
135 else: # IPv4
136 assert family == socket.AF_INET
137 resolved_host = address[0].decode("ascii")
138 port = address[1]
139 hosts.append(
140 ResolveResult(
141 hostname=host,
142 host=resolved_host,
143 port=port,
144 family=family,
145 proto=0,
146 flags=_NUMERIC_SOCKET_FLAGS,
147 )
148 )
150 if not hosts:
151 raise OSError(None, "DNS lookup failed")
153 return hosts
155 async def close(self) -> None:
156 if self._manager:
157 # Release the resolver from the manager if using the shared resolver
158 self._manager.release_resolver(self, self._loop)
159 self._manager = None # Clear reference to manager
160 self._resolver = None # type: ignore[assignment] # Clear reference to resolver
161 return
162 # Otherwise cancel our dedicated resolver
163 if self._resolver is not None:
164 self._resolver.cancel()
165 self._resolver = None # type: ignore[assignment] # Clear reference
168class _DNSResolverManager:
169 """Manager for aiodns.DNSResolver objects.
171 This class manages shared aiodns.DNSResolver instances
172 with no custom arguments across different event loops.
173 """
175 _instance: Optional["_DNSResolverManager"] = None
177 def __new__(cls) -> "_DNSResolverManager":
178 if cls._instance is None:
179 cls._instance = super().__new__(cls)
180 cls._instance._init()
181 return cls._instance
183 def _init(self) -> None:
184 # Use WeakKeyDictionary to allow event loops to be garbage collected
185 self._loop_data: weakref.WeakKeyDictionary[
186 asyncio.AbstractEventLoop,
187 tuple["aiodns.DNSResolver", weakref.WeakSet["AsyncResolver"]],
188 ] = weakref.WeakKeyDictionary()
190 def get_resolver(
191 self, client: "AsyncResolver", loop: asyncio.AbstractEventLoop
192 ) -> "aiodns.DNSResolver":
193 """Get or create the shared aiodns.DNSResolver instance for a specific event loop.
195 Args:
196 client: The AsyncResolver instance requesting the resolver.
197 This is required to track resolver usage.
198 loop: The event loop to use for the resolver.
199 """
200 # Create a new resolver and client set for this loop if it doesn't exist
201 if loop not in self._loop_data:
202 resolver = aiodns.DNSResolver(loop=loop)
203 client_set: weakref.WeakSet["AsyncResolver"] = weakref.WeakSet()
204 self._loop_data[loop] = (resolver, client_set)
205 else:
206 # Get the existing resolver and client set
207 resolver, client_set = self._loop_data[loop]
209 # Register this client with the loop
210 client_set.add(client)
211 return resolver
213 def release_resolver(
214 self, client: "AsyncResolver", loop: asyncio.AbstractEventLoop
215 ) -> None:
216 """Release the resolver for an AsyncResolver client when it's closed.
218 Args:
219 client: The AsyncResolver instance to release.
220 loop: The event loop the resolver was using.
221 """
222 # Remove client from its loop's tracking
223 if loop not in self._loop_data:
224 return
225 resolver, client_set = self._loop_data[loop]
226 client_set.discard(client)
227 # If no more clients for this loop, cancel and remove its resolver
228 if not client_set:
229 if resolver is not None:
230 resolver.cancel()
231 del self._loop_data[loop]
234_DefaultType = Type[Union[AsyncResolver, ThreadedResolver]]
235DefaultResolver: _DefaultType = AsyncResolver if aiodns_default else ThreadedResolver