Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/helpers.py: 45%

519 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1"""Various helper functions""" 

2 

3import asyncio 

4import base64 

5import binascii 

6import contextlib 

7import datetime 

8import enum 

9import functools 

10import inspect 

11import netrc 

12import os 

13import platform 

14import re 

15import sys 

16import time 

17import warnings 

18import weakref 

19from collections import namedtuple 

20from contextlib import suppress 

21from email.parser import HeaderParser 

22from email.utils import parsedate 

23from math import ceil 

24from pathlib import Path 

25from types import TracebackType 

26from typing import ( 

27 Any, 

28 Callable, 

29 ContextManager, 

30 Dict, 

31 Generator, 

32 Generic, 

33 Iterable, 

34 Iterator, 

35 List, 

36 Mapping, 

37 Optional, 

38 Pattern, 

39 Protocol, 

40 Tuple, 

41 Type, 

42 TypeVar, 

43 Union, 

44 get_args, 

45 overload, 

46) 

47from urllib.parse import quote 

48from urllib.request import getproxies, proxy_bypass 

49 

50import attr 

51from multidict import MultiDict, MultiDictProxy, MultiMapping 

52from yarl import URL 

53 

54from . import hdrs 

55from .log import client_logger, internal_logger 

56 

57if sys.version_info >= (3, 11): 

58 import asyncio as async_timeout 

59else: 

60 import async_timeout 

61 

62__all__ = ("BasicAuth", "ChainMapProxy", "ETag") 

63 

64IS_MACOS = platform.system() == "Darwin" 

65IS_WINDOWS = platform.system() == "Windows" 

66 

67PY_310 = sys.version_info >= (3, 10) 

68PY_311 = sys.version_info >= (3, 11) 

69 

70 

71_T = TypeVar("_T") 

72_S = TypeVar("_S") 

73 

74_SENTINEL = enum.Enum("_SENTINEL", "sentinel") 

75sentinel = _SENTINEL.sentinel 

76 

77NO_EXTENSIONS = bool(os.environ.get("AIOHTTP_NO_EXTENSIONS")) 

78 

79DEBUG = sys.flags.dev_mode or ( 

80 not sys.flags.ignore_environment and bool(os.environ.get("PYTHONASYNCIODEBUG")) 

81) 

82 

83 

84CHAR = {chr(i) for i in range(0, 128)} 

85CTL = {chr(i) for i in range(0, 32)} | { 

86 chr(127), 

87} 

88SEPARATORS = { 

89 "(", 

90 ")", 

91 "<", 

92 ">", 

93 "@", 

94 ",", 

95 ";", 

96 ":", 

97 "\\", 

98 '"', 

99 "/", 

100 "[", 

101 "]", 

102 "?", 

103 "=", 

104 "{", 

105 "}", 

106 " ", 

107 chr(9), 

108} 

109TOKEN = CHAR ^ CTL ^ SEPARATORS 

110 

111 

112class noop: 

113 def __await__(self) -> Generator[None, None, None]: 

114 yield 

115 

116 

117class BasicAuth(namedtuple("BasicAuth", ["login", "password", "encoding"])): 

118 """Http basic authentication helper.""" 

119 

120 def __new__( 

121 cls, login: str, password: str = "", encoding: str = "latin1" 

122 ) -> "BasicAuth": 

123 if login is None: 

124 raise ValueError("None is not allowed as login value") 

125 

126 if password is None: 

127 raise ValueError("None is not allowed as password value") 

128 

129 if ":" in login: 

130 raise ValueError('A ":" is not allowed in login (RFC 1945#section-11.1)') 

131 

132 return super().__new__(cls, login, password, encoding) 

133 

134 @classmethod 

135 def decode(cls, auth_header: str, encoding: str = "latin1") -> "BasicAuth": 

136 """Create a BasicAuth object from an Authorization HTTP header.""" 

137 try: 

138 auth_type, encoded_credentials = auth_header.split(" ", 1) 

139 except ValueError: 

140 raise ValueError("Could not parse authorization header.") 

141 

142 if auth_type.lower() != "basic": 

143 raise ValueError("Unknown authorization method %s" % auth_type) 

144 

145 try: 

146 decoded = base64.b64decode( 

147 encoded_credentials.encode("ascii"), validate=True 

148 ).decode(encoding) 

149 except binascii.Error: 

150 raise ValueError("Invalid base64 encoding.") 

151 

152 try: 

153 # RFC 2617 HTTP Authentication 

154 # https://www.ietf.org/rfc/rfc2617.txt 

155 # the colon must be present, but the username and password may be 

156 # otherwise blank. 

157 username, password = decoded.split(":", 1) 

158 except ValueError: 

159 raise ValueError("Invalid credentials.") 

160 

161 return cls(username, password, encoding=encoding) 

162 

163 @classmethod 

164 def from_url(cls, url: URL, *, encoding: str = "latin1") -> Optional["BasicAuth"]: 

165 """Create BasicAuth from url.""" 

166 if not isinstance(url, URL): 

167 raise TypeError("url should be yarl.URL instance") 

168 if url.user is None: 

169 return None 

170 return cls(url.user, url.password or "", encoding=encoding) 

171 

172 def encode(self) -> str: 

173 """Encode credentials.""" 

174 creds = (f"{self.login}:{self.password}").encode(self.encoding) 

175 return "Basic %s" % base64.b64encode(creds).decode(self.encoding) 

176 

177 

178def strip_auth_from_url(url: URL) -> Tuple[URL, Optional[BasicAuth]]: 

179 auth = BasicAuth.from_url(url) 

180 if auth is None: 

181 return url, None 

182 else: 

183 return url.with_user(None), auth 

184 

185 

186def netrc_from_env() -> Optional[netrc.netrc]: 

187 """Load netrc from file. 

188 

189 Attempt to load it from the path specified by the env-var 

190 NETRC or in the default location in the user's home directory. 

191 

192 Returns None if it couldn't be found or fails to parse. 

193 """ 

194 netrc_env = os.environ.get("NETRC") 

195 

196 if netrc_env is not None: 

197 netrc_path = Path(netrc_env) 

198 else: 

199 try: 

200 home_dir = Path.home() 

201 except RuntimeError as e: # pragma: no cover 

202 # if pathlib can't resolve home, it may raise a RuntimeError 

203 client_logger.debug( 

204 "Could not resolve home directory when " 

205 "trying to look for .netrc file: %s", 

206 e, 

207 ) 

208 return None 

209 

210 netrc_path = home_dir / ("_netrc" if IS_WINDOWS else ".netrc") 

211 

212 try: 

213 return netrc.netrc(str(netrc_path)) 

214 except netrc.NetrcParseError as e: 

215 client_logger.warning("Could not parse .netrc file: %s", e) 

216 except OSError as e: 

217 netrc_exists = False 

218 with contextlib.suppress(OSError): 

219 netrc_exists = netrc_path.is_file() 

220 # we couldn't read the file (doesn't exist, permissions, etc.) 

221 if netrc_env or netrc_exists: 

222 # only warn if the environment wanted us to load it, 

223 # or it appears like the default file does actually exist 

224 client_logger.warning("Could not read .netrc file: %s", e) 

225 

226 return None 

227 

228 

229@attr.s(auto_attribs=True, frozen=True, slots=True) 

230class ProxyInfo: 

231 proxy: URL 

232 proxy_auth: Optional[BasicAuth] 

233 

234 

235def basicauth_from_netrc(netrc_obj: Optional[netrc.netrc], host: str) -> BasicAuth: 

236 """ 

237 Return :py:class:`~aiohttp.BasicAuth` credentials for ``host`` from ``netrc_obj``. 

238 

239 :raises LookupError: if ``netrc_obj`` is :py:data:`None` or if no 

240 entry is found for the ``host``. 

241 """ 

242 if netrc_obj is None: 

243 raise LookupError("No .netrc file found") 

244 auth_from_netrc = netrc_obj.authenticators(host) 

245 

246 if auth_from_netrc is None: 

247 raise LookupError(f"No entry for {host!s} found in the `.netrc` file.") 

248 login, account, password = auth_from_netrc 

249 

250 # TODO(PY311): username = login or account 

251 # Up to python 3.10, account could be None if not specified, 

252 # and login will be empty string if not specified. From 3.11, 

253 # login and account will be empty string if not specified. 

254 username = login if (login or account is None) else account 

255 

256 # TODO(PY311): Remove this, as password will be empty string 

257 # if not specified 

258 if password is None: 

259 password = "" 

260 

261 return BasicAuth(username, password) 

262 

263 

264def proxies_from_env() -> Dict[str, ProxyInfo]: 

265 proxy_urls = { 

266 k: URL(v) 

267 for k, v in getproxies().items() 

268 if k in ("http", "https", "ws", "wss") 

269 } 

270 netrc_obj = netrc_from_env() 

271 stripped = {k: strip_auth_from_url(v) for k, v in proxy_urls.items()} 

272 ret = {} 

273 for proto, val in stripped.items(): 

274 proxy, auth = val 

275 if proxy.scheme in ("https", "wss"): 

276 client_logger.warning( 

277 "%s proxies %s are not supported, ignoring", proxy.scheme.upper(), proxy 

278 ) 

279 continue 

280 if netrc_obj and auth is None: 

281 if proxy.host is not None: 

282 try: 

283 auth = basicauth_from_netrc(netrc_obj, proxy.host) 

284 except LookupError: 

285 auth = None 

286 ret[proto] = ProxyInfo(proxy, auth) 

287 return ret 

288 

289 

290def current_task( 

291 loop: Optional[asyncio.AbstractEventLoop] = None, 

292) -> "Optional[asyncio.Task[Any]]": 

293 return asyncio.current_task(loop=loop) 

294 

295 

296def get_running_loop( 

297 loop: Optional[asyncio.AbstractEventLoop] = None, 

298) -> asyncio.AbstractEventLoop: 

299 if loop is None: 

300 loop = asyncio.get_event_loop() 

301 if not loop.is_running(): 

302 warnings.warn( 

303 "The object should be created within an async function", 

304 DeprecationWarning, 

305 stacklevel=3, 

306 ) 

307 if loop.get_debug(): 

308 internal_logger.warning( 

309 "The object should be created within an async function", stack_info=True 

310 ) 

311 return loop 

312 

313 

314def isasyncgenfunction(obj: Any) -> bool: 

315 func = getattr(inspect, "isasyncgenfunction", None) 

316 if func is not None: 

317 return func(obj) # type: ignore[no-any-return] 

318 else: 

319 return False 

320 

321 

322def get_env_proxy_for_url(url: URL) -> Tuple[URL, Optional[BasicAuth]]: 

323 """Get a permitted proxy for the given URL from the env.""" 

324 if url.host is not None and proxy_bypass(url.host): 

325 raise LookupError(f"Proxying is disallowed for `{url.host!r}`") 

326 

327 proxies_in_env = proxies_from_env() 

328 try: 

329 proxy_info = proxies_in_env[url.scheme] 

330 except KeyError: 

331 raise LookupError(f"No proxies found for `{url!s}` in the env") 

332 else: 

333 return proxy_info.proxy, proxy_info.proxy_auth 

334 

335 

336@attr.s(auto_attribs=True, frozen=True, slots=True) 

337class MimeType: 

338 type: str 

339 subtype: str 

340 suffix: str 

341 parameters: "MultiDictProxy[str]" 

342 

343 

344@functools.lru_cache(maxsize=56) 

345def parse_mimetype(mimetype: str) -> MimeType: 

346 """Parses a MIME type into its components. 

347 

348 mimetype is a MIME type string. 

349 

350 Returns a MimeType object. 

351 

352 Example: 

353 

354 >>> parse_mimetype('text/html; charset=utf-8') 

355 MimeType(type='text', subtype='html', suffix='', 

356 parameters={'charset': 'utf-8'}) 

357 

358 """ 

359 if not mimetype: 

360 return MimeType( 

361 type="", subtype="", suffix="", parameters=MultiDictProxy(MultiDict()) 

362 ) 

363 

364 parts = mimetype.split(";") 

365 params: MultiDict[str] = MultiDict() 

366 for item in parts[1:]: 

367 if not item: 

368 continue 

369 key, _, value = item.partition("=") 

370 params.add(key.lower().strip(), value.strip(' "')) 

371 

372 fulltype = parts[0].strip().lower() 

373 if fulltype == "*": 

374 fulltype = "*/*" 

375 

376 mtype, _, stype = fulltype.partition("/") 

377 stype, _, suffix = stype.partition("+") 

378 

379 return MimeType( 

380 type=mtype, subtype=stype, suffix=suffix, parameters=MultiDictProxy(params) 

381 ) 

382 

383 

384def guess_filename(obj: Any, default: Optional[str] = None) -> Optional[str]: 

385 name = getattr(obj, "name", None) 

386 if name and isinstance(name, str) and name[0] != "<" and name[-1] != ">": 

387 return Path(name).name 

388 return default 

389 

390 

391not_qtext_re = re.compile(r"[^\041\043-\133\135-\176]") 

392QCONTENT = {chr(i) for i in range(0x20, 0x7F)} | {"\t"} 

393 

394 

395def quoted_string(content: str) -> str: 

396 """Return 7-bit content as quoted-string. 

397 

398 Format content into a quoted-string as defined in RFC5322 for 

399 Internet Message Format. Notice that this is not the 8-bit HTTP 

400 format, but the 7-bit email format. Content must be in usascii or 

401 a ValueError is raised. 

402 """ 

403 if not (QCONTENT > set(content)): 

404 raise ValueError(f"bad content for quoted-string {content!r}") 

405 return not_qtext_re.sub(lambda x: "\\" + x.group(0), content) 

406 

407 

408def content_disposition_header( 

409 disptype: str, quote_fields: bool = True, _charset: str = "utf-8", **params: str 

410) -> str: 

411 """Sets ``Content-Disposition`` header for MIME. 

412 

413 This is the MIME payload Content-Disposition header from RFC 2183 

414 and RFC 7579 section 4.2, not the HTTP Content-Disposition from 

415 RFC 6266. 

416 

417 disptype is a disposition type: inline, attachment, form-data. 

418 Should be valid extension token (see RFC 2183) 

419 

420 quote_fields performs value quoting to 7-bit MIME headers 

421 according to RFC 7578. Set to quote_fields to False if recipient 

422 can take 8-bit file names and field values. 

423 

424 _charset specifies the charset to use when quote_fields is True. 

425 

426 params is a dict with disposition params. 

427 """ 

428 if not disptype or not (TOKEN > set(disptype)): 

429 raise ValueError("bad content disposition type {!r}" "".format(disptype)) 

430 

431 value = disptype 

432 if params: 

433 lparams = [] 

434 for key, val in params.items(): 

435 if not key or not (TOKEN > set(key)): 

436 raise ValueError( 

437 "bad content disposition parameter" " {!r}={!r}".format(key, val) 

438 ) 

439 if quote_fields: 

440 if key.lower() == "filename": 

441 qval = quote(val, "", encoding=_charset) 

442 lparams.append((key, '"%s"' % qval)) 

443 else: 

444 try: 

445 qval = quoted_string(val) 

446 except ValueError: 

447 qval = "".join( 

448 (_charset, "''", quote(val, "", encoding=_charset)) 

449 ) 

450 lparams.append((key + "*", qval)) 

451 else: 

452 lparams.append((key, '"%s"' % qval)) 

453 else: 

454 qval = val.replace("\\", "\\\\").replace('"', '\\"') 

455 lparams.append((key, '"%s"' % qval)) 

456 sparams = "; ".join("=".join(pair) for pair in lparams) 

457 value = "; ".join((value, sparams)) 

458 return value 

459 

460 

461class _TSelf(Protocol, Generic[_T]): 

462 _cache: Dict[str, _T] 

463 

464 

465class reify(Generic[_T]): 

466 """Use as a class method decorator. 

467 

468 It operates almost exactly like 

469 the Python `@property` decorator, but it puts the result of the 

470 method it decorates into the instance dict after the first call, 

471 effectively replacing the function it decorates with an instance 

472 variable. It is, in Python parlance, a data descriptor. 

473 """ 

474 

475 def __init__(self, wrapped: Callable[..., _T]) -> None: 

476 self.wrapped = wrapped 

477 self.__doc__ = wrapped.__doc__ 

478 self.name = wrapped.__name__ 

479 

480 def __get__(self, inst: _TSelf[_T], owner: Optional[Type[Any]] = None) -> _T: 

481 try: 

482 try: 

483 return inst._cache[self.name] 

484 except KeyError: 

485 val = self.wrapped(inst) 

486 inst._cache[self.name] = val 

487 return val 

488 except AttributeError: 

489 if inst is None: 

490 return self 

491 raise 

492 

493 def __set__(self, inst: _TSelf[_T], value: _T) -> None: 

494 raise AttributeError("reified property is read-only") 

495 

496 

497reify_py = reify 

498 

499try: 

500 from ._helpers import reify as reify_c 

501 

502 if not NO_EXTENSIONS: 

503 reify = reify_c # type: ignore[misc,assignment] 

504except ImportError: 

505 pass 

506 

507_ipv4_pattern = ( 

508 r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}" 

509 r"(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$" 

510) 

511_ipv6_pattern = ( 

512 r"^(?:(?:(?:[A-F0-9]{1,4}:){6}|(?=(?:[A-F0-9]{0,4}:){0,6}" 

513 r"(?:[0-9]{1,3}\.){3}[0-9]{1,3}$)(([0-9A-F]{1,4}:){0,5}|:)" 

514 r"((:[0-9A-F]{1,4}){1,5}:|:)|::(?:[A-F0-9]{1,4}:){5})" 

515 r"(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}" 

516 r"(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])|(?:[A-F0-9]{1,4}:){7}" 

517 r"[A-F0-9]{1,4}|(?=(?:[A-F0-9]{0,4}:){0,7}[A-F0-9]{0,4}$)" 

518 r"(([0-9A-F]{1,4}:){1,7}|:)((:[0-9A-F]{1,4}){1,7}|:)|(?:[A-F0-9]{1,4}:){7}" 

519 r":|:(:[A-F0-9]{1,4}){7})$" 

520) 

521_ipv4_regex = re.compile(_ipv4_pattern) 

522_ipv6_regex = re.compile(_ipv6_pattern, flags=re.IGNORECASE) 

523_ipv4_regexb = re.compile(_ipv4_pattern.encode("ascii")) 

524_ipv6_regexb = re.compile(_ipv6_pattern.encode("ascii"), flags=re.IGNORECASE) 

525 

526 

527def _is_ip_address( 

528 regex: Pattern[str], regexb: Pattern[bytes], host: Optional[Union[str, bytes]] 

529) -> bool: 

530 if host is None: 

531 return False 

532 if isinstance(host, str): 

533 return bool(regex.match(host)) 

534 elif isinstance(host, (bytes, bytearray, memoryview)): 

535 return bool(regexb.match(host)) 

536 else: 

537 raise TypeError(f"{host} [{type(host)}] is not a str or bytes") 

538 

539 

540is_ipv4_address = functools.partial(_is_ip_address, _ipv4_regex, _ipv4_regexb) 

541is_ipv6_address = functools.partial(_is_ip_address, _ipv6_regex, _ipv6_regexb) 

542 

543 

544def is_ip_address(host: Optional[Union[str, bytes, bytearray, memoryview]]) -> bool: 

545 return is_ipv4_address(host) or is_ipv6_address(host) 

546 

547 

548_cached_current_datetime: Optional[int] = None 

549_cached_formatted_datetime = "" 

550 

551 

552def rfc822_formatted_time() -> str: 

553 global _cached_current_datetime 

554 global _cached_formatted_datetime 

555 

556 now = int(time.time()) 

557 if now != _cached_current_datetime: 

558 # Weekday and month names for HTTP date/time formatting; 

559 # always English! 

560 # Tuples are constants stored in codeobject! 

561 _weekdayname = ("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun") 

562 _monthname = ( 

563 "", # Dummy so we can use 1-based month numbers 

564 "Jan", 

565 "Feb", 

566 "Mar", 

567 "Apr", 

568 "May", 

569 "Jun", 

570 "Jul", 

571 "Aug", 

572 "Sep", 

573 "Oct", 

574 "Nov", 

575 "Dec", 

576 ) 

577 

578 year, month, day, hh, mm, ss, wd, *tail = time.gmtime(now) 

579 _cached_formatted_datetime = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % ( 

580 _weekdayname[wd], 

581 day, 

582 _monthname[month], 

583 year, 

584 hh, 

585 mm, 

586 ss, 

587 ) 

588 _cached_current_datetime = now 

589 return _cached_formatted_datetime 

590 

591 

592def _weakref_handle(info: "Tuple[weakref.ref[object], str]") -> None: 

593 ref, name = info 

594 ob = ref() 

595 if ob is not None: 

596 with suppress(Exception): 

597 getattr(ob, name)() 

598 

599 

600def weakref_handle( 

601 ob: object, 

602 name: str, 

603 timeout: float, 

604 loop: asyncio.AbstractEventLoop, 

605 timeout_ceil_threshold: float = 5, 

606) -> Optional[asyncio.TimerHandle]: 

607 if timeout is not None and timeout > 0: 

608 when = loop.time() + timeout 

609 if timeout >= timeout_ceil_threshold: 

610 when = ceil(when) 

611 

612 return loop.call_at(when, _weakref_handle, (weakref.ref(ob), name)) 

613 return None 

614 

615 

616def call_later( 

617 cb: Callable[[], Any], 

618 timeout: float, 

619 loop: asyncio.AbstractEventLoop, 

620 timeout_ceil_threshold: float = 5, 

621) -> Optional[asyncio.TimerHandle]: 

622 if timeout is not None and timeout > 0: 

623 when = loop.time() + timeout 

624 if timeout > timeout_ceil_threshold: 

625 when = ceil(when) 

626 return loop.call_at(when, cb) 

627 return None 

628 

629 

630class TimeoutHandle: 

631 """Timeout handle""" 

632 

633 def __init__( 

634 self, 

635 loop: asyncio.AbstractEventLoop, 

636 timeout: Optional[float], 

637 ceil_threshold: float = 5, 

638 ) -> None: 

639 self._timeout = timeout 

640 self._loop = loop 

641 self._ceil_threshold = ceil_threshold 

642 self._callbacks: List[ 

643 Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]] 

644 ] = [] 

645 

646 def register( 

647 self, callback: Callable[..., None], *args: Any, **kwargs: Any 

648 ) -> None: 

649 self._callbacks.append((callback, args, kwargs)) 

650 

651 def close(self) -> None: 

652 self._callbacks.clear() 

653 

654 def start(self) -> Optional[asyncio.Handle]: 

655 timeout = self._timeout 

656 if timeout is not None and timeout > 0: 

657 when = self._loop.time() + timeout 

658 if timeout >= self._ceil_threshold: 

659 when = ceil(when) 

660 return self._loop.call_at(when, self.__call__) 

661 else: 

662 return None 

663 

664 def timer(self) -> "BaseTimerContext": 

665 if self._timeout is not None and self._timeout > 0: 

666 timer = TimerContext(self._loop) 

667 self.register(timer.timeout) 

668 return timer 

669 else: 

670 return TimerNoop() 

671 

672 def __call__(self) -> None: 

673 for cb, args, kwargs in self._callbacks: 

674 with suppress(Exception): 

675 cb(*args, **kwargs) 

676 

677 self._callbacks.clear() 

678 

679 

680class BaseTimerContext(ContextManager["BaseTimerContext"]): 

681 def assert_timeout(self) -> None: 

682 """Raise TimeoutError if timeout has been exceeded.""" 

683 

684 

685class TimerNoop(BaseTimerContext): 

686 def __enter__(self) -> BaseTimerContext: 

687 return self 

688 

689 def __exit__( 

690 self, 

691 exc_type: Optional[Type[BaseException]], 

692 exc_val: Optional[BaseException], 

693 exc_tb: Optional[TracebackType], 

694 ) -> None: 

695 return 

696 

697 

698class TimerContext(BaseTimerContext): 

699 """Low resolution timeout context manager""" 

700 

701 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

702 self._loop = loop 

703 self._tasks: List[asyncio.Task[Any]] = [] 

704 self._cancelled = False 

705 

706 def assert_timeout(self) -> None: 

707 """Raise TimeoutError if timer has already been cancelled.""" 

708 if self._cancelled: 

709 raise asyncio.TimeoutError from None 

710 

711 def __enter__(self) -> BaseTimerContext: 

712 task = current_task(loop=self._loop) 

713 

714 if task is None: 

715 raise RuntimeError( 

716 "Timeout context manager should be used " "inside a task" 

717 ) 

718 

719 if self._cancelled: 

720 raise asyncio.TimeoutError from None 

721 

722 self._tasks.append(task) 

723 return self 

724 

725 def __exit__( 

726 self, 

727 exc_type: Optional[Type[BaseException]], 

728 exc_val: Optional[BaseException], 

729 exc_tb: Optional[TracebackType], 

730 ) -> Optional[bool]: 

731 if self._tasks: 

732 self._tasks.pop() 

733 

734 if exc_type is asyncio.CancelledError and self._cancelled: 

735 raise asyncio.TimeoutError from None 

736 return None 

737 

738 def timeout(self) -> None: 

739 if not self._cancelled: 

740 for task in set(self._tasks): 

741 task.cancel() 

742 

743 self._cancelled = True 

744 

745 

746def ceil_timeout( 

747 delay: Optional[float], ceil_threshold: float = 5 

748) -> async_timeout.Timeout: 

749 if delay is None or delay <= 0: 

750 return async_timeout.timeout(None) 

751 

752 loop = get_running_loop() 

753 now = loop.time() 

754 when = now + delay 

755 if delay > ceil_threshold: 

756 when = ceil(when) 

757 return async_timeout.timeout_at(when) 

758 

759 

760class HeadersMixin: 

761 ATTRS = frozenset(["_content_type", "_content_dict", "_stored_content_type"]) 

762 

763 _headers: MultiMapping[str] 

764 

765 _content_type: Optional[str] = None 

766 _content_dict: Optional[Dict[str, str]] = None 

767 _stored_content_type: Union[str, None, _SENTINEL] = sentinel 

768 

769 def _parse_content_type(self, raw: Optional[str]) -> None: 

770 self._stored_content_type = raw 

771 if raw is None: 

772 # default value according to RFC 2616 

773 self._content_type = "application/octet-stream" 

774 self._content_dict = {} 

775 else: 

776 msg = HeaderParser().parsestr("Content-Type: " + raw) 

777 self._content_type = msg.get_content_type() 

778 params = msg.get_params(()) 

779 self._content_dict = dict(params[1:]) # First element is content type again 

780 

781 @property 

782 def content_type(self) -> str: 

783 """The value of content part for Content-Type HTTP header.""" 

784 raw = self._headers.get(hdrs.CONTENT_TYPE) 

785 if self._stored_content_type != raw: 

786 self._parse_content_type(raw) 

787 return self._content_type # type: ignore[return-value] 

788 

789 @property 

790 def charset(self) -> Optional[str]: 

791 """The value of charset part for Content-Type HTTP header.""" 

792 raw = self._headers.get(hdrs.CONTENT_TYPE) 

793 if self._stored_content_type != raw: 

794 self._parse_content_type(raw) 

795 return self._content_dict.get("charset") # type: ignore[union-attr] 

796 

797 @property 

798 def content_length(self) -> Optional[int]: 

799 """The value of Content-Length HTTP header.""" 

800 content_length = self._headers.get(hdrs.CONTENT_LENGTH) 

801 

802 if content_length is not None: 

803 return int(content_length) 

804 else: 

805 return None 

806 

807 

808def set_result(fut: "asyncio.Future[_T]", result: _T) -> None: 

809 if not fut.done(): 

810 fut.set_result(result) 

811 

812 

813def set_exception(fut: "asyncio.Future[_T]", exc: BaseException) -> None: 

814 if not fut.done(): 

815 fut.set_exception(exc) 

816 

817 

818@functools.total_ordering 

819class AppKey(Generic[_T]): 

820 """Keys for static typing support in Application.""" 

821 

822 __slots__ = ("_name", "_t", "__orig_class__") 

823 

824 # This may be set by Python when instantiating with a generic type. We need to 

825 # support this, in order to support types that are not concrete classes, 

826 # like Iterable, which can't be passed as the second parameter to __init__. 

827 __orig_class__: Type[object] 

828 

829 def __init__(self, name: str, t: Optional[Type[_T]] = None): 

830 # Prefix with module name to help deduplicate key names. 

831 frame = inspect.currentframe() 

832 while frame: 

833 if frame.f_code.co_name == "<module>": 

834 module: str = frame.f_globals["__name__"] 

835 break 

836 frame = frame.f_back 

837 

838 self._name = module + "." + name 

839 self._t = t 

840 

841 def __lt__(self, other: object) -> bool: 

842 if isinstance(other, AppKey): 

843 return self._name < other._name 

844 return True # Order AppKey above other types. 

845 

846 def __repr__(self) -> str: 

847 t = self._t 

848 if t is None: 

849 with suppress(AttributeError): 

850 # Set to type arg. 

851 t = get_args(self.__orig_class__)[0] 

852 

853 if t is None: 

854 t_repr = "<<Unknown>>" 

855 elif isinstance(t, type): 

856 if t.__module__ == "builtins": 

857 t_repr = t.__qualname__ 

858 else: 

859 t_repr = f"{t.__module__}.{t.__qualname__}" 

860 else: 

861 t_repr = repr(t) 

862 return f"<AppKey({self._name}, type={t_repr})>" 

863 

864 

865class ChainMapProxy(Mapping[Union[str, AppKey[Any]], Any]): 

866 __slots__ = ("_maps",) 

867 

868 def __init__(self, maps: Iterable[Mapping[Union[str, AppKey[Any]], Any]]) -> None: 

869 self._maps = tuple(maps) 

870 

871 def __init_subclass__(cls) -> None: 

872 raise TypeError( 

873 "Inheritance class {} from ChainMapProxy " 

874 "is forbidden".format(cls.__name__) 

875 ) 

876 

877 @overload # type: ignore[override] 

878 def __getitem__(self, key: AppKey[_T]) -> _T: 

879 ... 

880 

881 @overload 

882 def __getitem__(self, key: str) -> Any: 

883 ... 

884 

885 def __getitem__(self, key: Union[str, AppKey[_T]]) -> Any: 

886 for mapping in self._maps: 

887 try: 

888 return mapping[key] 

889 except KeyError: 

890 pass 

891 raise KeyError(key) 

892 

893 @overload # type: ignore[override] 

894 def get(self, key: AppKey[_T], default: _S) -> Union[_T, _S]: 

895 ... 

896 

897 @overload 

898 def get(self, key: AppKey[_T], default: None = ...) -> Optional[_T]: 

899 ... 

900 

901 @overload 

902 def get(self, key: str, default: Any = ...) -> Any: 

903 ... 

904 

905 def get(self, key: Union[str, AppKey[_T]], default: Any = None) -> Any: 

906 try: 

907 return self[key] 

908 except KeyError: 

909 return default 

910 

911 def __len__(self) -> int: 

912 # reuses stored hash values if possible 

913 return len(set().union(*self._maps)) 

914 

915 def __iter__(self) -> Iterator[Union[str, AppKey[Any]]]: 

916 d: Dict[Union[str, AppKey[Any]], Any] = {} 

917 for mapping in reversed(self._maps): 

918 # reuses stored hash values if possible 

919 d.update(mapping) 

920 return iter(d) 

921 

922 def __contains__(self, key: object) -> bool: 

923 return any(key in m for m in self._maps) 

924 

925 def __bool__(self) -> bool: 

926 return any(self._maps) 

927 

928 def __repr__(self) -> str: 

929 content = ", ".join(map(repr, self._maps)) 

930 return f"ChainMapProxy({content})" 

931 

932 

933# https://tools.ietf.org/html/rfc7232#section-2.3 

934_ETAGC = r"[!\x23-\x7E\x80-\xff]+" 

935_ETAGC_RE = re.compile(_ETAGC) 

936_QUOTED_ETAG = rf'(W/)?"({_ETAGC})"' 

937QUOTED_ETAG_RE = re.compile(_QUOTED_ETAG) 

938LIST_QUOTED_ETAG_RE = re.compile(rf"({_QUOTED_ETAG})(?:\s*,\s*|$)|(.)") 

939 

940ETAG_ANY = "*" 

941 

942 

943@attr.s(auto_attribs=True, frozen=True, slots=True) 

944class ETag: 

945 value: str 

946 is_weak: bool = False 

947 

948 

949def validate_etag_value(value: str) -> None: 

950 if value != ETAG_ANY and not _ETAGC_RE.fullmatch(value): 

951 raise ValueError( 

952 f"Value {value!r} is not a valid etag. Maybe it contains '\"'?" 

953 ) 

954 

955 

956def parse_http_date(date_str: Optional[str]) -> Optional[datetime.datetime]: 

957 """Process a date string, return a datetime object""" 

958 if date_str is not None: 

959 timetuple = parsedate(date_str) 

960 if timetuple is not None: 

961 with suppress(ValueError): 

962 return datetime.datetime(*timetuple[:6], tzinfo=datetime.timezone.utc) 

963 return None 

964 

965 

966def must_be_empty_body(method: str, code: int) -> bool: 

967 """Check if a request must return an empty body.""" 

968 return ( 

969 status_code_must_be_empty_body(code) 

970 or method_must_be_empty_body(method) 

971 or (200 <= code < 300 and method.upper() == hdrs.METH_CONNECT) 

972 ) 

973 

974 

975def method_must_be_empty_body(method: str) -> bool: 

976 """Check if a method must return an empty body.""" 

977 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.3-2.1 

978 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.3-2.2 

979 return method.upper() == hdrs.METH_HEAD 

980 

981 

982def status_code_must_be_empty_body(code: int) -> bool: 

983 """Check if a status code must return an empty body.""" 

984 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.3-2.1 

985 return code in {204, 304} or 100 <= code < 200 

986 

987 

988def should_remove_content_length(method: str, code: int) -> bool: 

989 """Check if a Content-Length header should be removed. 

990 

991 This should always be a subset of must_be_empty_body 

992 """ 

993 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-8 

994 # https://www.rfc-editor.org/rfc/rfc9110.html#section-15.4.5-4 

995 return ( 

996 code in {204, 304} 

997 or 100 <= code < 200 

998 or (200 <= code < 300 and method.upper() == hdrs.METH_CONNECT) 

999 )