Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/helpers.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

540 statements  

1"""Various helper functions""" 

2 

3import asyncio 

4import base64 

5import binascii 

6import contextlib 

7import dataclasses 

8import datetime 

9import enum 

10import functools 

11import inspect 

12import netrc 

13import os 

14import platform 

15import re 

16import sys 

17import time 

18import warnings 

19import weakref 

20from collections import namedtuple 

21from contextlib import suppress 

22from email.parser import HeaderParser 

23from email.utils import parsedate 

24from http.cookies import SimpleCookie 

25from math import ceil 

26from pathlib import Path 

27from types import MappingProxyType, TracebackType 

28from typing import ( 

29 TYPE_CHECKING, 

30 Any, 

31 Callable, 

32 ContextManager, 

33 Dict, 

34 Generic, 

35 Iterable, 

36 Iterator, 

37 List, 

38 Mapping, 

39 Optional, 

40 Protocol, 

41 Tuple, 

42 Type, 

43 TypeVar, 

44 Union, 

45 final, 

46 get_args, 

47 overload, 

48) 

49from urllib.parse import quote 

50from urllib.request import getproxies, proxy_bypass 

51 

52from multidict import CIMultiDict, MultiDict, MultiDictProxy, MultiMapping 

53from propcache.api import under_cached_property as reify 

54from yarl import URL 

55 

56from . import hdrs 

57from .log import client_logger 

58from .typedefs import PathLike # noqa 

59 

60if sys.version_info >= (3, 11): 

61 import asyncio as async_timeout 

62else: 

63 import async_timeout 

64 

65if TYPE_CHECKING: 

66 from dataclasses import dataclass as frozen_dataclass_decorator 

67elif sys.version_info < (3, 10): 

68 frozen_dataclass_decorator = functools.partial(dataclasses.dataclass, frozen=True) 

69else: 

70 frozen_dataclass_decorator = functools.partial( 

71 dataclasses.dataclass, frozen=True, slots=True 

72 ) 

73 

74__all__ = ("BasicAuth", "ChainMapProxy", "ETag", "frozen_dataclass_decorator", "reify") 

75 

76PY_310 = sys.version_info >= (3, 10) 

77 

78COOKIE_MAX_LENGTH = 4096 

79 

80_T = TypeVar("_T") 

81_S = TypeVar("_S") 

82 

83_SENTINEL = enum.Enum("_SENTINEL", "sentinel") 

84sentinel = _SENTINEL.sentinel 

85 

86NO_EXTENSIONS = bool(os.environ.get("AIOHTTP_NO_EXTENSIONS")) 

87 

88# https://datatracker.ietf.org/doc/html/rfc9112#section-6.3-2.1 

89EMPTY_BODY_STATUS_CODES = frozenset((204, 304, *range(100, 200))) 

90# https://datatracker.ietf.org/doc/html/rfc9112#section-6.3-2.1 

91# https://datatracker.ietf.org/doc/html/rfc9112#section-6.3-2.2 

92EMPTY_BODY_METHODS = hdrs.METH_HEAD_ALL 

93 

94DEBUG = sys.flags.dev_mode or ( 

95 not sys.flags.ignore_environment and bool(os.environ.get("PYTHONASYNCIODEBUG")) 

96) 

97 

98 

99CHAR = {chr(i) for i in range(0, 128)} 

100CTL = {chr(i) for i in range(0, 32)} | { 

101 chr(127), 

102} 

103SEPARATORS = { 

104 "(", 

105 ")", 

106 "<", 

107 ">", 

108 "@", 

109 ",", 

110 ";", 

111 ":", 

112 "\\", 

113 '"', 

114 "/", 

115 "[", 

116 "]", 

117 "?", 

118 "=", 

119 "{", 

120 "}", 

121 " ", 

122 chr(9), 

123} 

124TOKEN = CHAR ^ CTL ^ SEPARATORS 

125 

126 

127json_re = re.compile(r"(?:application/|[\w.-]+/[\w.+-]+?\+)json$", re.IGNORECASE) 

128 

129 

130class BasicAuth(namedtuple("BasicAuth", ["login", "password", "encoding"])): 

131 """Http basic authentication helper.""" 

132 

133 def __new__( 

134 cls, login: str, password: str = "", encoding: str = "latin1" 

135 ) -> "BasicAuth": 

136 if login is None: 

137 raise ValueError("None is not allowed as login value") 

138 

139 if password is None: 

140 raise ValueError("None is not allowed as password value") 

141 

142 if ":" in login: 

143 raise ValueError('A ":" is not allowed in login (RFC 1945#section-11.1)') 

144 

145 return super().__new__(cls, login, password, encoding) 

146 

147 @classmethod 

148 def decode(cls, auth_header: str, encoding: str = "latin1") -> "BasicAuth": # type: ignore[misc] 

149 """Create a BasicAuth object from an Authorization HTTP header.""" 

150 try: 

151 auth_type, encoded_credentials = auth_header.split(" ", 1) 

152 except ValueError: 

153 raise ValueError("Could not parse authorization header.") 

154 

155 if auth_type.lower() != "basic": 

156 raise ValueError("Unknown authorization method %s" % auth_type) 

157 

158 try: 

159 decoded = base64.b64decode( 

160 encoded_credentials.encode("ascii"), validate=True 

161 ).decode(encoding) 

162 except binascii.Error: 

163 raise ValueError("Invalid base64 encoding.") 

164 

165 try: 

166 # RFC 2617 HTTP Authentication 

167 # https://www.ietf.org/rfc/rfc2617.txt 

168 # the colon must be present, but the username and password may be 

169 # otherwise blank. 

170 username, password = decoded.split(":", 1) 

171 except ValueError: 

172 raise ValueError("Invalid credentials.") 

173 

174 return cls(username, password, encoding=encoding) 

175 

176 @classmethod 

177 def from_url(cls, url: URL, *, encoding: str = "latin1") -> Optional["BasicAuth"]: # type: ignore[misc] 

178 """Create BasicAuth from url.""" 

179 if not isinstance(url, URL): 

180 raise TypeError("url should be yarl.URL instance") 

181 # Check raw_user and raw_password first as yarl is likely 

182 # to already have these values parsed from the netloc in the cache. 

183 if url.raw_user is None and url.raw_password is None: 

184 return None 

185 return cls(url.user or "", url.password or "", encoding=encoding) 

186 

187 def encode(self) -> str: 

188 """Encode credentials.""" 

189 creds = (f"{self.login}:{self.password}").encode(self.encoding) 

190 return "Basic %s" % base64.b64encode(creds).decode(self.encoding) 

191 

192 

193def strip_auth_from_url(url: URL) -> Tuple[URL, Optional[BasicAuth]]: 

194 """Remove user and password from URL if present and return BasicAuth object.""" 

195 # Check raw_user and raw_password first as yarl is likely 

196 # to already have these values parsed from the netloc in the cache. 

197 if url.raw_user is None and url.raw_password is None: 

198 return url, None 

199 return url.with_user(None), BasicAuth(url.user or "", url.password or "") 

200 

201 

202def netrc_from_env() -> Optional[netrc.netrc]: 

203 """Load netrc from file. 

204 

205 Attempt to load it from the path specified by the env-var 

206 NETRC or in the default location in the user's home directory. 

207 

208 Returns None if it couldn't be found or fails to parse. 

209 """ 

210 netrc_env = os.environ.get("NETRC") 

211 

212 if netrc_env is not None: 

213 netrc_path = Path(netrc_env) 

214 else: 

215 try: 

216 home_dir = Path.home() 

217 except RuntimeError as e: 

218 # if pathlib can't resolve home, it may raise a RuntimeError 

219 client_logger.debug( 

220 "Could not resolve home directory when " 

221 "trying to look for .netrc file: %s", 

222 e, 

223 ) 

224 return None 

225 

226 netrc_path = home_dir / ( 

227 "_netrc" if platform.system() == "Windows" else ".netrc" 

228 ) 

229 

230 try: 

231 return netrc.netrc(str(netrc_path)) 

232 except netrc.NetrcParseError as e: 

233 client_logger.warning("Could not parse .netrc file: %s", e) 

234 except OSError as e: 

235 netrc_exists = False 

236 with contextlib.suppress(OSError): 

237 netrc_exists = netrc_path.is_file() 

238 # we couldn't read the file (doesn't exist, permissions, etc.) 

239 if netrc_env or netrc_exists: 

240 # only warn if the environment wanted us to load it, 

241 # or it appears like the default file does actually exist 

242 client_logger.warning("Could not read .netrc file: %s", e) 

243 

244 return None 

245 

246 

247@frozen_dataclass_decorator 

248class ProxyInfo: # type: ignore[misc] 

249 proxy: URL 

250 proxy_auth: Optional[BasicAuth] 

251 

252 

253def basicauth_from_netrc(netrc_obj: Optional[netrc.netrc], host: str) -> BasicAuth: 

254 """ 

255 Return :py:class:`~aiohttp.BasicAuth` credentials for ``host`` from ``netrc_obj``. 

256 

257 :raises LookupError: if ``netrc_obj`` is :py:data:`None` or if no 

258 entry is found for the ``host``. 

259 """ 

260 if netrc_obj is None: 

261 raise LookupError("No .netrc file found") 

262 auth_from_netrc = netrc_obj.authenticators(host) 

263 

264 if auth_from_netrc is None: 

265 raise LookupError(f"No entry for {host!s} found in the `.netrc` file.") 

266 login, account, password = auth_from_netrc 

267 

268 # TODO(PY311): username = login or account 

269 # Up to python 3.10, account could be None if not specified, 

270 # and login will be empty string if not specified. From 3.11, 

271 # login and account will be empty string if not specified. 

272 username = login if (login or account is None) else account 

273 

274 # TODO(PY311): Remove this, as password will be empty string 

275 # if not specified 

276 if password is None: 

277 password = "" # type: ignore[unreachable] 

278 

279 return BasicAuth(username, password) 

280 

281 

282def proxies_from_env() -> Dict[str, ProxyInfo]: 

283 proxy_urls = { 

284 k: URL(v) 

285 for k, v in getproxies().items() 

286 if k in ("http", "https", "ws", "wss") 

287 } 

288 netrc_obj = netrc_from_env() 

289 stripped = {k: strip_auth_from_url(v) for k, v in proxy_urls.items()} 

290 ret = {} 

291 for proto, val in stripped.items(): 

292 proxy, auth = val 

293 if proxy.scheme in ("https", "wss"): 

294 client_logger.warning( 

295 "%s proxies %s are not supported, ignoring", proxy.scheme.upper(), proxy 

296 ) 

297 continue 

298 if netrc_obj and auth is None: 

299 if proxy.host is not None: 

300 try: 

301 auth = basicauth_from_netrc(netrc_obj, proxy.host) 

302 except LookupError: 

303 auth = None 

304 ret[proto] = ProxyInfo(proxy, auth) 

305 return ret 

306 

307 

308def get_env_proxy_for_url(url: URL) -> Tuple[URL, Optional[BasicAuth]]: 

309 """Get a permitted proxy for the given URL from the env.""" 

310 if url.host is not None and proxy_bypass(url.host): 

311 raise LookupError(f"Proxying is disallowed for `{url.host!r}`") 

312 

313 proxies_in_env = proxies_from_env() 

314 try: 

315 proxy_info = proxies_in_env[url.scheme] 

316 except KeyError: 

317 raise LookupError(f"No proxies found for `{url!s}` in the env") 

318 else: 

319 return proxy_info.proxy, proxy_info.proxy_auth 

320 

321 

322@frozen_dataclass_decorator 

323class MimeType: 

324 type: str 

325 subtype: str 

326 suffix: str 

327 parameters: "MultiDictProxy[str]" 

328 

329 

330@functools.lru_cache(maxsize=56) 

331def parse_mimetype(mimetype: str) -> MimeType: 

332 """Parses a MIME type into its components. 

333 

334 mimetype is a MIME type string. 

335 

336 Returns a MimeType object. 

337 

338 Example: 

339 

340 >>> parse_mimetype('text/html; charset=utf-8') 

341 MimeType(type='text', subtype='html', suffix='', 

342 parameters={'charset': 'utf-8'}) 

343 

344 """ 

345 if not mimetype: 

346 return MimeType( 

347 type="", subtype="", suffix="", parameters=MultiDictProxy(MultiDict()) 

348 ) 

349 

350 parts = mimetype.split(";") 

351 params: MultiDict[str] = MultiDict() 

352 for item in parts[1:]: 

353 if not item: 

354 continue 

355 key, _, value = item.partition("=") 

356 params.add(key.lower().strip(), value.strip(' "')) 

357 

358 fulltype = parts[0].strip().lower() 

359 if fulltype == "*": 

360 fulltype = "*/*" 

361 

362 mtype, _, stype = fulltype.partition("/") 

363 stype, _, suffix = stype.partition("+") 

364 

365 return MimeType( 

366 type=mtype, subtype=stype, suffix=suffix, parameters=MultiDictProxy(params) 

367 ) 

368 

369 

370@functools.lru_cache(maxsize=56) 

371def parse_content_type(raw: str) -> Tuple[str, MappingProxyType[str, str]]: 

372 """Parse Content-Type header. 

373 

374 Returns a tuple of the parsed content type and a 

375 MappingProxyType of parameters. 

376 """ 

377 msg = HeaderParser().parsestr(f"Content-Type: {raw}") 

378 content_type = msg.get_content_type() 

379 params = msg.get_params(()) 

380 content_dict = dict(params[1:]) # First element is content type again 

381 return content_type, MappingProxyType(content_dict) 

382 

383 

384def guess_filename(obj: Any, default: Optional[str] = None) -> Optional[str]: 

385 name = getattr(obj, "name", None) 

386 if name and isinstance(name, str) and name[0] != "<" and name[-1] != ">": 

387 return Path(name).name 

388 return default 

389 

390 

391not_qtext_re = re.compile(r"[^\041\043-\133\135-\176]") 

392QCONTENT = {chr(i) for i in range(0x20, 0x7F)} | {"\t"} 

393 

394 

395def quoted_string(content: str) -> str: 

396 """Return 7-bit content as quoted-string. 

397 

398 Format content into a quoted-string as defined in RFC5322 for 

399 Internet Message Format. Notice that this is not the 8-bit HTTP 

400 format, but the 7-bit email format. Content must be in usascii or 

401 a ValueError is raised. 

402 """ 

403 if not (QCONTENT > set(content)): 

404 raise ValueError(f"bad content for quoted-string {content!r}") 

405 return not_qtext_re.sub(lambda x: "\\" + x.group(0), content) 

406 

407 

408def content_disposition_header( 

409 disptype: str, 

410 quote_fields: bool = True, 

411 _charset: str = "utf-8", 

412 params: Optional[Dict[str, str]] = None, 

413) -> str: 

414 """Sets ``Content-Disposition`` header for MIME. 

415 

416 This is the MIME payload Content-Disposition header from RFC 2183 

417 and RFC 7579 section 4.2, not the HTTP Content-Disposition from 

418 RFC 6266. 

419 

420 disptype is a disposition type: inline, attachment, form-data. 

421 Should be valid extension token (see RFC 2183) 

422 

423 quote_fields performs value quoting to 7-bit MIME headers 

424 according to RFC 7578. Set to quote_fields to False if recipient 

425 can take 8-bit file names and field values. 

426 

427 _charset specifies the charset to use when quote_fields is True. 

428 

429 params is a dict with disposition params. 

430 """ 

431 if not disptype or not (TOKEN > set(disptype)): 

432 raise ValueError(f"bad content disposition type {disptype!r}") 

433 

434 value = disptype 

435 if params: 

436 lparams = [] 

437 for key, val in params.items(): 

438 if not key or not (TOKEN > set(key)): 

439 raise ValueError(f"bad content disposition parameter {key!r}={val!r}") 

440 if quote_fields: 

441 if key.lower() == "filename": 

442 qval = quote(val, "", encoding=_charset) 

443 lparams.append((key, '"%s"' % qval)) 

444 else: 

445 try: 

446 qval = quoted_string(val) 

447 except ValueError: 

448 qval = "".join( 

449 (_charset, "''", quote(val, "", encoding=_charset)) 

450 ) 

451 lparams.append((key + "*", qval)) 

452 else: 

453 lparams.append((key, '"%s"' % qval)) 

454 else: 

455 qval = val.replace("\\", "\\\\").replace('"', '\\"') 

456 lparams.append((key, '"%s"' % qval)) 

457 sparams = "; ".join("=".join(pair) for pair in lparams) 

458 value = "; ".join((value, sparams)) 

459 return value 

460 

461 

462def is_expected_content_type( 

463 response_content_type: str, expected_content_type: str 

464) -> bool: 

465 """Checks if received content type is processable as an expected one. 

466 

467 Both arguments should be given without parameters. 

468 """ 

469 if expected_content_type == "application/json": 

470 return json_re.match(response_content_type) is not None 

471 return expected_content_type in response_content_type 

472 

473 

474def is_ip_address(host: Optional[str]) -> bool: 

475 """Check if host looks like an IP Address. 

476 

477 This check is only meant as a heuristic to ensure that 

478 a host is not a domain name. 

479 """ 

480 if not host: 

481 return False 

482 # For a host to be an ipv4 address, it must be all numeric. 

483 # The host must contain a colon to be an IPv6 address. 

484 return ":" in host or host.replace(".", "").isdigit() 

485 

486 

487_cached_current_datetime: Optional[int] = None 

488_cached_formatted_datetime = "" 

489 

490 

491def rfc822_formatted_time() -> str: 

492 global _cached_current_datetime 

493 global _cached_formatted_datetime 

494 

495 now = int(time.time()) 

496 if now != _cached_current_datetime: 

497 # Weekday and month names for HTTP date/time formatting; 

498 # always English! 

499 # Tuples are constants stored in codeobject! 

500 _weekdayname = ("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun") 

501 _monthname = ( 

502 "", # Dummy so we can use 1-based month numbers 

503 "Jan", 

504 "Feb", 

505 "Mar", 

506 "Apr", 

507 "May", 

508 "Jun", 

509 "Jul", 

510 "Aug", 

511 "Sep", 

512 "Oct", 

513 "Nov", 

514 "Dec", 

515 ) 

516 

517 year, month, day, hh, mm, ss, wd, *tail = time.gmtime(now) 

518 _cached_formatted_datetime = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % ( 

519 _weekdayname[wd], 

520 day, 

521 _monthname[month], 

522 year, 

523 hh, 

524 mm, 

525 ss, 

526 ) 

527 _cached_current_datetime = now 

528 return _cached_formatted_datetime 

529 

530 

531def _weakref_handle(info: "Tuple[weakref.ref[object], str]") -> None: 

532 ref, name = info 

533 ob = ref() 

534 if ob is not None: 

535 with suppress(Exception): 

536 getattr(ob, name)() 

537 

538 

539def weakref_handle( 

540 ob: object, 

541 name: str, 

542 timeout: Optional[float], 

543 loop: asyncio.AbstractEventLoop, 

544 timeout_ceil_threshold: float = 5, 

545) -> Optional[asyncio.TimerHandle]: 

546 if timeout is not None and timeout > 0: 

547 when = loop.time() + timeout 

548 if timeout >= timeout_ceil_threshold: 

549 when = ceil(when) 

550 

551 return loop.call_at(when, _weakref_handle, (weakref.ref(ob), name)) 

552 return None 

553 

554 

555def call_later( 

556 cb: Callable[[], Any], 

557 timeout: Optional[float], 

558 loop: asyncio.AbstractEventLoop, 

559 timeout_ceil_threshold: float = 5, 

560) -> Optional[asyncio.TimerHandle]: 

561 if timeout is None or timeout <= 0: 

562 return None 

563 now = loop.time() 

564 when = calculate_timeout_when(now, timeout, timeout_ceil_threshold) 

565 return loop.call_at(when, cb) 

566 

567 

568def calculate_timeout_when( 

569 loop_time: float, 

570 timeout: float, 

571 timeout_ceiling_threshold: float, 

572) -> float: 

573 """Calculate when to execute a timeout.""" 

574 when = loop_time + timeout 

575 if timeout > timeout_ceiling_threshold: 

576 return ceil(when) 

577 return when 

578 

579 

580class TimeoutHandle: 

581 """Timeout handle""" 

582 

583 __slots__ = ("_timeout", "_loop", "_ceil_threshold", "_callbacks") 

584 

585 def __init__( 

586 self, 

587 loop: asyncio.AbstractEventLoop, 

588 timeout: Optional[float], 

589 ceil_threshold: float = 5, 

590 ) -> None: 

591 self._timeout = timeout 

592 self._loop = loop 

593 self._ceil_threshold = ceil_threshold 

594 self._callbacks: List[ 

595 Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]] 

596 ] = [] 

597 

598 def register( 

599 self, callback: Callable[..., None], *args: Any, **kwargs: Any 

600 ) -> None: 

601 self._callbacks.append((callback, args, kwargs)) 

602 

603 def close(self) -> None: 

604 self._callbacks.clear() 

605 

606 def start(self) -> Optional[asyncio.TimerHandle]: 

607 timeout = self._timeout 

608 if timeout is not None and timeout > 0: 

609 when = self._loop.time() + timeout 

610 if timeout >= self._ceil_threshold: 

611 when = ceil(when) 

612 return self._loop.call_at(when, self.__call__) 

613 else: 

614 return None 

615 

616 def timer(self) -> "BaseTimerContext": 

617 if self._timeout is not None and self._timeout > 0: 

618 timer = TimerContext(self._loop) 

619 self.register(timer.timeout) 

620 return timer 

621 else: 

622 return TimerNoop() 

623 

624 def __call__(self) -> None: 

625 for cb, args, kwargs in self._callbacks: 

626 with suppress(Exception): 

627 cb(*args, **kwargs) 

628 

629 self._callbacks.clear() 

630 

631 

632class BaseTimerContext(ContextManager["BaseTimerContext"]): 

633 

634 __slots__ = () 

635 

636 def assert_timeout(self) -> None: 

637 """Raise TimeoutError if timeout has been exceeded.""" 

638 

639 

640class TimerNoop(BaseTimerContext): 

641 

642 __slots__ = () 

643 

644 def __enter__(self) -> BaseTimerContext: 

645 return self 

646 

647 def __exit__( 

648 self, 

649 exc_type: Optional[Type[BaseException]], 

650 exc_val: Optional[BaseException], 

651 exc_tb: Optional[TracebackType], 

652 ) -> None: 

653 return 

654 

655 

656class TimerContext(BaseTimerContext): 

657 """Low resolution timeout context manager""" 

658 

659 __slots__ = ("_loop", "_tasks", "_cancelled", "_cancelling") 

660 

661 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

662 self._loop = loop 

663 self._tasks: List[asyncio.Task[Any]] = [] 

664 self._cancelled = False 

665 self._cancelling = 0 

666 

667 def assert_timeout(self) -> None: 

668 """Raise TimeoutError if timer has already been cancelled.""" 

669 if self._cancelled: 

670 raise asyncio.TimeoutError from None 

671 

672 def __enter__(self) -> BaseTimerContext: 

673 task = asyncio.current_task(loop=self._loop) 

674 if task is None: 

675 raise RuntimeError("Timeout context manager should be used inside a task") 

676 

677 if sys.version_info >= (3, 11): 

678 # Remember if the task was already cancelling 

679 # so when we __exit__ we can decide if we should 

680 # raise asyncio.TimeoutError or let the cancellation propagate 

681 self._cancelling = task.cancelling() 

682 

683 if self._cancelled: 

684 raise asyncio.TimeoutError from None 

685 

686 self._tasks.append(task) 

687 return self 

688 

689 def __exit__( 

690 self, 

691 exc_type: Optional[Type[BaseException]], 

692 exc_val: Optional[BaseException], 

693 exc_tb: Optional[TracebackType], 

694 ) -> Optional[bool]: 

695 enter_task: Optional[asyncio.Task[Any]] = None 

696 if self._tasks: 

697 enter_task = self._tasks.pop() 

698 

699 if exc_type is asyncio.CancelledError and self._cancelled: 

700 assert enter_task is not None 

701 # The timeout was hit, and the task was cancelled 

702 # so we need to uncancel the last task that entered the context manager 

703 # since the cancellation should not leak out of the context manager 

704 if sys.version_info >= (3, 11): 

705 # If the task was already cancelling don't raise 

706 # asyncio.TimeoutError and instead return None 

707 # to allow the cancellation to propagate 

708 if enter_task.uncancel() > self._cancelling: 

709 return None 

710 raise asyncio.TimeoutError from exc_val 

711 return None 

712 

713 def timeout(self) -> None: 

714 if not self._cancelled: 

715 for task in set(self._tasks): 

716 task.cancel() 

717 

718 self._cancelled = True 

719 

720 

721def ceil_timeout( 

722 delay: Optional[float], ceil_threshold: float = 5 

723) -> async_timeout.Timeout: 

724 if delay is None or delay <= 0: 

725 return async_timeout.timeout(None) 

726 

727 loop = asyncio.get_running_loop() 

728 now = loop.time() 

729 when = now + delay 

730 if delay > ceil_threshold: 

731 when = ceil(when) 

732 return async_timeout.timeout_at(when) 

733 

734 

735class HeadersMixin: 

736 """Mixin for handling headers.""" 

737 

738 _headers: MultiMapping[str] 

739 _content_type: Optional[str] = None 

740 _content_dict: Optional[Dict[str, str]] = None 

741 _stored_content_type: Union[str, None, _SENTINEL] = sentinel 

742 

743 def _parse_content_type(self, raw: Optional[str]) -> None: 

744 self._stored_content_type = raw 

745 if raw is None: 

746 # default value according to RFC 2616 

747 self._content_type = "application/octet-stream" 

748 self._content_dict = {} 

749 else: 

750 content_type, content_mapping_proxy = parse_content_type(raw) 

751 self._content_type = content_type 

752 # _content_dict needs to be mutable so we can update it 

753 self._content_dict = content_mapping_proxy.copy() 

754 

755 @property 

756 def content_type(self) -> str: 

757 """The value of content part for Content-Type HTTP header.""" 

758 raw = self._headers.get(hdrs.CONTENT_TYPE) 

759 if self._stored_content_type != raw: 

760 self._parse_content_type(raw) 

761 assert self._content_type is not None 

762 return self._content_type 

763 

764 @property 

765 def charset(self) -> Optional[str]: 

766 """The value of charset part for Content-Type HTTP header.""" 

767 raw = self._headers.get(hdrs.CONTENT_TYPE) 

768 if self._stored_content_type != raw: 

769 self._parse_content_type(raw) 

770 assert self._content_dict is not None 

771 return self._content_dict.get("charset") 

772 

773 @property 

774 def content_length(self) -> Optional[int]: 

775 """The value of Content-Length HTTP header.""" 

776 content_length = self._headers.get(hdrs.CONTENT_LENGTH) 

777 return None if content_length is None else int(content_length) 

778 

779 

780def set_result(fut: "asyncio.Future[_T]", result: _T) -> None: 

781 if not fut.done(): 

782 fut.set_result(result) 

783 

784 

785_EXC_SENTINEL = BaseException() 

786 

787 

788class ErrorableProtocol(Protocol): 

789 def set_exception( 

790 self, 

791 exc: Union[Type[BaseException], BaseException], 

792 exc_cause: BaseException = ..., 

793 ) -> None: ... 

794 

795 

796def set_exception( 

797 fut: Union["asyncio.Future[_T]", ErrorableProtocol], 

798 exc: Union[Type[BaseException], BaseException], 

799 exc_cause: BaseException = _EXC_SENTINEL, 

800) -> None: 

801 """Set future exception. 

802 

803 If the future is marked as complete, this function is a no-op. 

804 

805 :param exc_cause: An exception that is a direct cause of ``exc``. 

806 Only set if provided. 

807 """ 

808 if asyncio.isfuture(fut) and fut.done(): 

809 return 

810 

811 exc_is_sentinel = exc_cause is _EXC_SENTINEL 

812 exc_causes_itself = exc is exc_cause 

813 if not exc_is_sentinel and not exc_causes_itself: 

814 exc.__cause__ = exc_cause 

815 

816 fut.set_exception(exc) 

817 

818 

819@functools.total_ordering 

820class AppKey(Generic[_T]): 

821 """Keys for static typing support in Application.""" 

822 

823 __slots__ = ("_name", "_t", "__orig_class__") 

824 

825 # This may be set by Python when instantiating with a generic type. We need to 

826 # support this, in order to support types that are not concrete classes, 

827 # like Iterable, which can't be passed as the second parameter to __init__. 

828 __orig_class__: Type[object] 

829 

830 # TODO(PY314): Change Type to TypeForm (this should resolve unreachable below). 

831 def __init__(self, name: str, t: Optional[Type[_T]] = None): 

832 # Prefix with module name to help deduplicate key names. 

833 frame = inspect.currentframe() 

834 while frame: 

835 if frame.f_code.co_name == "<module>": 

836 module: str = frame.f_globals["__name__"] 

837 break 

838 frame = frame.f_back 

839 else: 

840 raise RuntimeError("Failed to get module name.") 

841 

842 # https://github.com/python/mypy/issues/14209 

843 self._name = module + "." + name # type: ignore[possibly-undefined] 

844 self._t = t 

845 

846 def __lt__(self, other: object) -> bool: 

847 if isinstance(other, AppKey): 

848 return self._name < other._name 

849 return True # Order AppKey above other types. 

850 

851 def __repr__(self) -> str: 

852 t = self._t 

853 if t is None: 

854 with suppress(AttributeError): 

855 # Set to type arg. 

856 t = get_args(self.__orig_class__)[0] 

857 

858 if t is None: 

859 t_repr = "<<Unknown>>" 

860 elif isinstance(t, type): 

861 if t.__module__ == "builtins": 

862 t_repr = t.__qualname__ 

863 else: 

864 t_repr = f"{t.__module__}.{t.__qualname__}" 

865 else: 

866 t_repr = repr(t) # type: ignore[unreachable] 

867 return f"<AppKey({self._name}, type={t_repr})>" 

868 

869 

870@final 

871class ChainMapProxy(Mapping[Union[str, AppKey[Any]], Any]): 

872 __slots__ = ("_maps",) 

873 

874 def __init__(self, maps: Iterable[Mapping[Union[str, AppKey[Any]], Any]]) -> None: 

875 self._maps = tuple(maps) 

876 

877 def __init_subclass__(cls) -> None: 

878 raise TypeError( 

879 "Inheritance class {} from ChainMapProxy " 

880 "is forbidden".format(cls.__name__) 

881 ) 

882 

883 @overload # type: ignore[override] 

884 def __getitem__(self, key: AppKey[_T]) -> _T: ... 

885 

886 @overload 

887 def __getitem__(self, key: str) -> Any: ... # type: ignore[misc] 

888 

889 def __getitem__(self, key: Union[str, AppKey[_T]]) -> Any: 

890 for mapping in self._maps: 

891 try: 

892 return mapping[key] 

893 except KeyError: 

894 pass 

895 raise KeyError(key) 

896 

897 @overload # type: ignore[override] 

898 def get(self, key: AppKey[_T], default: _S) -> Union[_T, _S]: ... 

899 

900 @overload 

901 def get(self, key: AppKey[_T], default: None = ...) -> Optional[_T]: ... 

902 

903 @overload 

904 def get(self, key: str, default: Any = ...) -> Any: ... # type: ignore[misc] 

905 

906 def get(self, key: Union[str, AppKey[_T]], default: Any = None) -> Any: 

907 try: 

908 return self[key] 

909 except KeyError: 

910 return default 

911 

912 def __len__(self) -> int: 

913 # reuses stored hash values if possible 

914 return len(set().union(*self._maps)) 

915 

916 def __iter__(self) -> Iterator[Union[str, AppKey[Any]]]: 

917 d: Dict[Union[str, AppKey[Any]], Any] = {} 

918 for mapping in reversed(self._maps): 

919 # reuses stored hash values if possible 

920 d.update(mapping) 

921 return iter(d) 

922 

923 def __contains__(self, key: object) -> bool: 

924 return any(key in m for m in self._maps) 

925 

926 def __bool__(self) -> bool: 

927 return any(self._maps) 

928 

929 def __repr__(self) -> str: 

930 content = ", ".join(map(repr, self._maps)) 

931 return f"ChainMapProxy({content})" 

932 

933 

934class CookieMixin: 

935 """Mixin for handling cookies.""" 

936 

937 _cookies: Optional[SimpleCookie] = None 

938 

939 @property 

940 def cookies(self) -> SimpleCookie: 

941 if self._cookies is None: 

942 self._cookies = SimpleCookie() 

943 return self._cookies 

944 

945 def set_cookie( 

946 self, 

947 name: str, 

948 value: str, 

949 *, 

950 expires: Optional[str] = None, 

951 domain: Optional[str] = None, 

952 max_age: Optional[Union[int, str]] = None, 

953 path: str = "/", 

954 secure: Optional[bool] = None, 

955 httponly: Optional[bool] = None, 

956 samesite: Optional[str] = None, 

957 partitioned: Optional[bool] = None, 

958 ) -> None: 

959 """Set or update response cookie. 

960 

961 Sets new cookie or updates existent with new value. 

962 Also updates only those params which are not None. 

963 """ 

964 if self._cookies is None: 

965 self._cookies = SimpleCookie() 

966 

967 self._cookies[name] = value 

968 c = self._cookies[name] 

969 

970 if expires is not None: 

971 c["expires"] = expires 

972 elif c.get("expires") == "Thu, 01 Jan 1970 00:00:00 GMT": 

973 del c["expires"] 

974 

975 if domain is not None: 

976 c["domain"] = domain 

977 

978 if max_age is not None: 

979 c["max-age"] = str(max_age) 

980 elif "max-age" in c: 

981 del c["max-age"] 

982 

983 c["path"] = path 

984 

985 if secure is not None: 

986 c["secure"] = secure 

987 if httponly is not None: 

988 c["httponly"] = httponly 

989 if samesite is not None: 

990 c["samesite"] = samesite 

991 

992 if partitioned is not None: 

993 c["partitioned"] = partitioned 

994 

995 if DEBUG: 

996 cookie_length = len(c.output(header="")[1:]) 

997 if cookie_length > COOKIE_MAX_LENGTH: 

998 warnings.warn( 

999 "The size of is too large, it might get ignored by the client.", 

1000 UserWarning, 

1001 stacklevel=2, 

1002 ) 

1003 

1004 def del_cookie( 

1005 self, 

1006 name: str, 

1007 *, 

1008 domain: Optional[str] = None, 

1009 path: str = "/", 

1010 secure: Optional[bool] = None, 

1011 httponly: Optional[bool] = None, 

1012 samesite: Optional[str] = None, 

1013 ) -> None: 

1014 """Delete cookie. 

1015 

1016 Creates new empty expired cookie. 

1017 """ 

1018 # TODO: do we need domain/path here? 

1019 if self._cookies is not None: 

1020 self._cookies.pop(name, None) 

1021 self.set_cookie( 

1022 name, 

1023 "", 

1024 max_age=0, 

1025 expires="Thu, 01 Jan 1970 00:00:00 GMT", 

1026 domain=domain, 

1027 path=path, 

1028 secure=secure, 

1029 httponly=httponly, 

1030 samesite=samesite, 

1031 ) 

1032 

1033 

1034def populate_with_cookies(headers: "CIMultiDict[str]", cookies: SimpleCookie) -> None: 

1035 for cookie in cookies.values(): 

1036 value = cookie.output(header="")[1:] 

1037 headers.add(hdrs.SET_COOKIE, value) 

1038 

1039 

1040# https://tools.ietf.org/html/rfc7232#section-2.3 

1041_ETAGC = r"[!\x23-\x7E\x80-\xff]+" 

1042_ETAGC_RE = re.compile(_ETAGC) 

1043_QUOTED_ETAG = rf'(W/)?"({_ETAGC})"' 

1044QUOTED_ETAG_RE = re.compile(_QUOTED_ETAG) 

1045LIST_QUOTED_ETAG_RE = re.compile(rf"({_QUOTED_ETAG})(?:\s*,\s*|$)|(.)") 

1046 

1047ETAG_ANY = "*" 

1048 

1049 

1050@frozen_dataclass_decorator 

1051class ETag: 

1052 value: str 

1053 is_weak: bool = False 

1054 

1055 

1056def validate_etag_value(value: str) -> None: 

1057 if value != ETAG_ANY and not _ETAGC_RE.fullmatch(value): 

1058 raise ValueError( 

1059 f"Value {value!r} is not a valid etag. Maybe it contains '\"'?" 

1060 ) 

1061 

1062 

1063def parse_http_date(date_str: Optional[str]) -> Optional[datetime.datetime]: 

1064 """Process a date string, return a datetime object""" 

1065 if date_str is not None: 

1066 timetuple = parsedate(date_str) 

1067 if timetuple is not None: 

1068 with suppress(ValueError): 

1069 return datetime.datetime(*timetuple[:6], tzinfo=datetime.timezone.utc) 

1070 return None 

1071 

1072 

1073@functools.lru_cache 

1074def must_be_empty_body(method: str, code: int) -> bool: 

1075 """Check if a request must return an empty body.""" 

1076 return ( 

1077 code in EMPTY_BODY_STATUS_CODES 

1078 or method in EMPTY_BODY_METHODS 

1079 or (200 <= code < 300 and method in hdrs.METH_CONNECT_ALL) 

1080 ) 

1081 

1082 

1083def should_remove_content_length(method: str, code: int) -> bool: 

1084 """Check if a Content-Length header should be removed. 

1085 

1086 This should always be a subset of must_be_empty_body 

1087 """ 

1088 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-8 

1089 # https://www.rfc-editor.org/rfc/rfc9110.html#section-15.4.5-4 

1090 return code in EMPTY_BODY_STATUS_CODES or ( 

1091 200 <= code < 300 and method in hdrs.METH_CONNECT_ALL 

1092 )