Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client.py: 31%

478 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-26 06:16 +0000

1"""HTTP Client for asyncio.""" 

2 

3import asyncio 

4import base64 

5import dataclasses 

6import hashlib 

7import json 

8import os 

9import sys 

10import traceback 

11import warnings 

12from contextlib import suppress 

13from types import SimpleNamespace, TracebackType 

14from typing import ( 

15 TYPE_CHECKING, 

16 Any, 

17 Awaitable, 

18 Callable, 

19 Collection, 

20 Coroutine, 

21 Final, 

22 FrozenSet, 

23 Generator, 

24 Generic, 

25 Iterable, 

26 List, 

27 Mapping, 

28 Optional, 

29 Set, 

30 Tuple, 

31 Type, 

32 TypeVar, 

33 Union, 

34 final, 

35) 

36 

37from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr 

38from yarl import URL 

39 

40from . import hdrs, http, payload 

41from .abc import AbstractCookieJar 

42from .client_exceptions import ( 

43 ClientConnectionError, 

44 ClientConnectorCertificateError, 

45 ClientConnectorError, 

46 ClientConnectorSSLError, 

47 ClientError, 

48 ClientHttpProxyError, 

49 ClientOSError, 

50 ClientPayloadError, 

51 ClientProxyConnectionError, 

52 ClientResponseError, 

53 ClientSSLError, 

54 ConnectionTimeoutError, 

55 ContentTypeError, 

56 InvalidURL, 

57 ServerConnectionError, 

58 ServerDisconnectedError, 

59 ServerFingerprintMismatch, 

60 ServerTimeoutError, 

61 SocketTimeoutError, 

62 TooManyRedirects, 

63 WSServerHandshakeError, 

64) 

65from .client_reqrep import ( 

66 SSL_ALLOWED_TYPES, 

67 ClientRequest, 

68 ClientResponse, 

69 Fingerprint, 

70 RequestInfo, 

71) 

72from .client_ws import ( 

73 DEFAULT_WS_CLIENT_TIMEOUT, 

74 ClientWebSocketResponse, 

75 ClientWSTimeout, 

76) 

77from .connector import BaseConnector, NamedPipeConnector, TCPConnector, UnixConnector 

78from .cookiejar import CookieJar 

79from .helpers import ( 

80 _SENTINEL, 

81 BasicAuth, 

82 TimeoutHandle, 

83 ceil_timeout, 

84 get_env_proxy_for_url, 

85 method_must_be_empty_body, 

86 sentinel, 

87 strip_auth_from_url, 

88) 

89from .http import WS_KEY, HttpVersion, WebSocketReader, WebSocketWriter 

90from .http_websocket import WSHandshakeError, WSMessage, ws_ext_gen, ws_ext_parse 

91from .streams import FlowControlDataQueue 

92from .tracing import Trace, TraceConfig 

93from .typedefs import JSONEncoder, LooseCookies, LooseHeaders, StrOrURL 

94 

95__all__ = ( 

96 # client_exceptions 

97 "ClientConnectionError", 

98 "ClientConnectorCertificateError", 

99 "ClientConnectorError", 

100 "ClientConnectorSSLError", 

101 "ClientError", 

102 "ClientHttpProxyError", 

103 "ClientOSError", 

104 "ClientPayloadError", 

105 "ClientProxyConnectionError", 

106 "ClientResponseError", 

107 "ClientSSLError", 

108 "ConnectionTimeoutError", 

109 "ContentTypeError", 

110 "InvalidURL", 

111 "ServerConnectionError", 

112 "ServerDisconnectedError", 

113 "ServerFingerprintMismatch", 

114 "ServerTimeoutError", 

115 "SocketTimeoutError", 

116 "TooManyRedirects", 

117 "WSServerHandshakeError", 

118 # client_reqrep 

119 "ClientRequest", 

120 "ClientResponse", 

121 "Fingerprint", 

122 "RequestInfo", 

123 # connector 

124 "BaseConnector", 

125 "TCPConnector", 

126 "UnixConnector", 

127 "NamedPipeConnector", 

128 # client_ws 

129 "ClientWebSocketResponse", 

130 # client 

131 "ClientSession", 

132 "ClientTimeout", 

133 "request", 

134) 

135 

136 

137if TYPE_CHECKING: 

138 from ssl import SSLContext 

139else: 

140 SSLContext = None 

141 

142 

143@dataclasses.dataclass(frozen=True) 

144class ClientTimeout: 

145 total: Optional[float] = None 

146 connect: Optional[float] = None 

147 sock_read: Optional[float] = None 

148 sock_connect: Optional[float] = None 

149 ceil_threshold: float = 5 

150 

151 # pool_queue_timeout: Optional[float] = None 

152 # dns_resolution_timeout: Optional[float] = None 

153 # socket_connect_timeout: Optional[float] = None 

154 # connection_acquiring_timeout: Optional[float] = None 

155 # new_connection_timeout: Optional[float] = None 

156 # http_header_timeout: Optional[float] = None 

157 # response_body_timeout: Optional[float] = None 

158 

159 # to create a timeout specific for a single request, either 

160 # - create a completely new one to overwrite the default 

161 # - or use https://docs.python.org/3/library/dataclasses.html#dataclasses.replace 

162 # to overwrite the defaults 

163 

164 

165# 5 Minute default read timeout 

166DEFAULT_TIMEOUT: Final[ClientTimeout] = ClientTimeout(total=5 * 60) 

167 

168# https://www.rfc-editor.org/rfc/rfc9110#section-9.2.2 

169IDEMPOTENT_METHODS = frozenset({"GET", "HEAD", "OPTIONS", "TRACE", "PUT", "DELETE"}) 

170 

171_RetType = TypeVar("_RetType") 

172_CharsetResolver = Callable[[ClientResponse, bytes], str] 

173 

174 

175@final 

176class ClientSession: 

177 """First-class interface for making HTTP requests.""" 

178 

179 __slots__ = ( 

180 "_base_url", 

181 "_source_traceback", 

182 "_connector", 

183 "_loop", 

184 "_cookie_jar", 

185 "_connector_owner", 

186 "_default_auth", 

187 "_version", 

188 "_json_serialize", 

189 "_requote_redirect_url", 

190 "_timeout", 

191 "_raise_for_status", 

192 "_auto_decompress", 

193 "_trust_env", 

194 "_default_headers", 

195 "_skip_auto_headers", 

196 "_request_class", 

197 "_response_class", 

198 "_ws_response_class", 

199 "_trace_configs", 

200 "_read_bufsize", 

201 "_max_line_size", 

202 "_max_field_size", 

203 "_resolve_charset", 

204 ) 

205 

206 def __init__( 

207 self, 

208 base_url: Optional[StrOrURL] = None, 

209 *, 

210 connector: Optional[BaseConnector] = None, 

211 cookies: Optional[LooseCookies] = None, 

212 headers: Optional[LooseHeaders] = None, 

213 skip_auto_headers: Optional[Iterable[str]] = None, 

214 auth: Optional[BasicAuth] = None, 

215 json_serialize: JSONEncoder = json.dumps, 

216 request_class: Type[ClientRequest] = ClientRequest, 

217 response_class: Type[ClientResponse] = ClientResponse, 

218 ws_response_class: Type[ClientWebSocketResponse] = ClientWebSocketResponse, 

219 version: HttpVersion = http.HttpVersion11, 

220 cookie_jar: Optional[AbstractCookieJar] = None, 

221 connector_owner: bool = True, 

222 raise_for_status: Union[ 

223 bool, Callable[[ClientResponse], Awaitable[None]] 

224 ] = False, 

225 timeout: Union[_SENTINEL, ClientTimeout, None] = sentinel, 

226 auto_decompress: bool = True, 

227 trust_env: bool = False, 

228 requote_redirect_url: bool = True, 

229 trace_configs: Optional[List[TraceConfig]] = None, 

230 read_bufsize: int = 2**16, 

231 max_line_size: int = 8190, 

232 max_field_size: int = 8190, 

233 fallback_charset_resolver: _CharsetResolver = lambda r, b: "utf-8", 

234 ) -> None: 

235 if base_url is None or isinstance(base_url, URL): 

236 self._base_url: Optional[URL] = base_url 

237 else: 

238 self._base_url = URL(base_url) 

239 assert ( 

240 self._base_url.origin() == self._base_url 

241 ), "Only absolute URLs without path part are supported" 

242 

243 loop = asyncio.get_running_loop() 

244 

245 if connector is None: 

246 connector = TCPConnector() 

247 

248 # Initialize these three attrs before raising any exception, 

249 # they are used in __del__ 

250 self._connector: Optional[BaseConnector] = connector 

251 self._loop = loop 

252 if loop.get_debug(): 

253 self._source_traceback: Optional[ 

254 traceback.StackSummary 

255 ] = traceback.extract_stack(sys._getframe(1)) 

256 else: 

257 self._source_traceback = None 

258 

259 if connector._loop is not loop: 

260 raise RuntimeError("Session and connector have to use same event loop") 

261 

262 if cookie_jar is None: 

263 cookie_jar = CookieJar() 

264 self._cookie_jar = cookie_jar 

265 

266 if cookies is not None: 

267 self._cookie_jar.update_cookies(cookies) 

268 

269 self._connector_owner = connector_owner 

270 self._default_auth = auth 

271 self._version = version 

272 self._json_serialize = json_serialize 

273 if timeout is sentinel or timeout is None: 

274 timeout = DEFAULT_TIMEOUT 

275 if not isinstance(timeout, ClientTimeout): 

276 raise ValueError( 

277 f"timeout parameter cannot be of {type(timeout)} type, " 

278 "please use 'timeout=ClientTimeout(...)'", 

279 ) 

280 self._timeout = timeout 

281 self._raise_for_status = raise_for_status 

282 self._auto_decompress = auto_decompress 

283 self._trust_env = trust_env 

284 self._requote_redirect_url = requote_redirect_url 

285 self._read_bufsize = read_bufsize 

286 self._max_line_size = max_line_size 

287 self._max_field_size = max_field_size 

288 

289 # Convert to list of tuples 

290 if headers: 

291 real_headers: CIMultiDict[str] = CIMultiDict(headers) 

292 else: 

293 real_headers = CIMultiDict() 

294 self._default_headers: CIMultiDict[str] = real_headers 

295 if skip_auto_headers is not None: 

296 self._skip_auto_headers = frozenset(istr(i) for i in skip_auto_headers) 

297 else: 

298 self._skip_auto_headers = frozenset() 

299 

300 self._request_class = request_class 

301 self._response_class = response_class 

302 self._ws_response_class = ws_response_class 

303 

304 self._trace_configs = trace_configs or [] 

305 for trace_config in self._trace_configs: 

306 trace_config.freeze() 

307 

308 self._resolve_charset = fallback_charset_resolver 

309 

310 def __init_subclass__(cls: Type["ClientSession"]) -> None: 

311 raise TypeError( 

312 "Inheritance class {} from ClientSession " 

313 "is forbidden".format(cls.__name__) 

314 ) 

315 

316 def __del__(self, _warnings: Any = warnings) -> None: 

317 if not self.closed: 

318 _warnings.warn( 

319 f"Unclosed client session {self!r}", 

320 ResourceWarning, 

321 source=self, 

322 ) 

323 context = {"client_session": self, "message": "Unclosed client session"} 

324 if self._source_traceback is not None: 

325 context["source_traceback"] = self._source_traceback 

326 self._loop.call_exception_handler(context) 

327 

328 def request( 

329 self, method: str, url: StrOrURL, **kwargs: Any 

330 ) -> "_RequestContextManager": 

331 """Perform HTTP request.""" 

332 return _RequestContextManager(self._request(method, url, **kwargs)) 

333 

334 def _build_url(self, str_or_url: StrOrURL) -> URL: 

335 url = URL(str_or_url) 

336 if self._base_url is None: 

337 return url 

338 else: 

339 assert not url.is_absolute() and url.path.startswith("/") 

340 return self._base_url.join(url) 

341 

342 async def _request( 

343 self, 

344 method: str, 

345 str_or_url: StrOrURL, 

346 *, 

347 params: Optional[Mapping[str, str]] = None, 

348 data: Any = None, 

349 json: Any = None, 

350 cookies: Optional[LooseCookies] = None, 

351 headers: Optional[LooseHeaders] = None, 

352 skip_auto_headers: Optional[Iterable[str]] = None, 

353 auth: Optional[BasicAuth] = None, 

354 allow_redirects: bool = True, 

355 max_redirects: int = 10, 

356 compress: Optional[str] = None, 

357 chunked: Optional[bool] = None, 

358 expect100: bool = False, 

359 raise_for_status: Union[ 

360 None, bool, Callable[[ClientResponse], Awaitable[None]] 

361 ] = None, 

362 read_until_eof: bool = True, 

363 proxy: Optional[StrOrURL] = None, 

364 proxy_auth: Optional[BasicAuth] = None, 

365 timeout: Union[ClientTimeout, _SENTINEL, None] = sentinel, 

366 ssl: Union[SSLContext, bool, Fingerprint] = True, 

367 server_hostname: Optional[str] = None, 

368 proxy_headers: Optional[LooseHeaders] = None, 

369 trace_request_ctx: Optional[SimpleNamespace] = None, 

370 read_bufsize: Optional[int] = None, 

371 auto_decompress: Optional[bool] = None, 

372 max_line_size: Optional[int] = None, 

373 max_field_size: Optional[int] = None, 

374 ) -> ClientResponse: 

375 # NOTE: timeout clamps existing connect and read timeouts. We cannot 

376 # set the default to None because we need to detect if the user wants 

377 # to use the existing timeouts by setting timeout to None. 

378 

379 if self.closed: 

380 raise RuntimeError("Session is closed") 

381 

382 if not isinstance(ssl, SSL_ALLOWED_TYPES): 

383 raise TypeError( 

384 "ssl should be SSLContext, Fingerprint, or bool, " 

385 "got {!r} instead.".format(ssl) 

386 ) 

387 

388 if data is not None and json is not None: 

389 raise ValueError( 

390 "data and json parameters can not be used at the same time" 

391 ) 

392 elif json is not None: 

393 data = payload.JsonPayload(json, dumps=self._json_serialize) 

394 

395 redirects = 0 

396 history = [] 

397 version = self._version 

398 params = params or {} 

399 

400 # Merge with default headers and transform to CIMultiDict 

401 headers = self._prepare_headers(headers) 

402 proxy_headers = self._prepare_headers(proxy_headers) 

403 

404 try: 

405 url = self._build_url(str_or_url) 

406 except ValueError as e: 

407 raise InvalidURL(str_or_url) from e 

408 

409 skip_headers = set(self._skip_auto_headers) 

410 if skip_auto_headers is not None: 

411 for i in skip_auto_headers: 

412 skip_headers.add(istr(i)) 

413 

414 if proxy is not None: 

415 try: 

416 proxy = URL(proxy) 

417 except ValueError as e: 

418 raise InvalidURL(proxy) from e 

419 

420 if timeout is sentinel or timeout is None: 

421 real_timeout: ClientTimeout = self._timeout 

422 else: 

423 real_timeout = timeout 

424 # timeout is cumulative for all request operations 

425 # (request, redirects, responses, data consuming) 

426 tm = TimeoutHandle( 

427 self._loop, real_timeout.total, ceil_threshold=real_timeout.ceil_threshold 

428 ) 

429 handle = tm.start() 

430 

431 if read_bufsize is None: 

432 read_bufsize = self._read_bufsize 

433 

434 if auto_decompress is None: 

435 auto_decompress = self._auto_decompress 

436 

437 if max_line_size is None: 

438 max_line_size = self._max_line_size 

439 

440 if max_field_size is None: 

441 max_field_size = self._max_field_size 

442 

443 traces = [ 

444 Trace( 

445 self, 

446 trace_config, 

447 trace_config.trace_config_ctx(trace_request_ctx=trace_request_ctx), 

448 ) 

449 for trace_config in self._trace_configs 

450 ] 

451 

452 for trace in traces: 

453 await trace.send_request_start(method, url.update_query(params), headers) 

454 

455 timer = tm.timer() 

456 try: 

457 with timer: 

458 # https://www.rfc-editor.org/rfc/rfc9112.html#name-retrying-requests 

459 retry_persistent_connection = method in IDEMPOTENT_METHODS 

460 while True: 

461 url, auth_from_url = strip_auth_from_url(url) 

462 if auth and auth_from_url: 

463 raise ValueError( 

464 "Cannot combine AUTH argument with " 

465 "credentials encoded in URL" 

466 ) 

467 

468 if auth is None: 

469 auth = auth_from_url 

470 if auth is None: 

471 auth = self._default_auth 

472 # It would be confusing if we support explicit 

473 # Authorization header with auth argument 

474 if auth is not None and hdrs.AUTHORIZATION in headers: 

475 raise ValueError( 

476 "Cannot combine AUTHORIZATION header " 

477 "with AUTH argument or credentials " 

478 "encoded in URL" 

479 ) 

480 

481 all_cookies = self._cookie_jar.filter_cookies(url) 

482 

483 if cookies is not None: 

484 tmp_cookie_jar = CookieJar() 

485 tmp_cookie_jar.update_cookies(cookies) 

486 req_cookies = tmp_cookie_jar.filter_cookies(url) 

487 if req_cookies: 

488 all_cookies.load(req_cookies) 

489 

490 if proxy is not None: 

491 proxy = URL(proxy) 

492 elif self._trust_env: 

493 with suppress(LookupError): 

494 proxy, proxy_auth = get_env_proxy_for_url(url) 

495 

496 req = self._request_class( 

497 method, 

498 url, 

499 params=params, 

500 headers=headers, 

501 skip_auto_headers=skip_headers, 

502 data=data, 

503 cookies=all_cookies, 

504 auth=auth, 

505 version=version, 

506 compress=compress, 

507 chunked=chunked, 

508 expect100=expect100, 

509 loop=self._loop, 

510 response_class=self._response_class, 

511 proxy=proxy, 

512 proxy_auth=proxy_auth, 

513 timer=timer, 

514 session=self, 

515 ssl=ssl, 

516 server_hostname=server_hostname, 

517 proxy_headers=proxy_headers, 

518 traces=traces, 

519 trust_env=self.trust_env, 

520 ) 

521 

522 # connection timeout 

523 try: 

524 async with ceil_timeout( 

525 real_timeout.connect, 

526 ceil_threshold=real_timeout.ceil_threshold, 

527 ): 

528 assert self._connector is not None 

529 conn = await self._connector.connect( 

530 req, traces=traces, timeout=real_timeout 

531 ) 

532 except asyncio.TimeoutError as exc: 

533 raise ConnectionTimeoutError( 

534 f"Connection timeout to host {url}" 

535 ) from exc 

536 

537 assert conn.transport is not None 

538 

539 assert conn.protocol is not None 

540 conn.protocol.set_response_params( 

541 timer=timer, 

542 skip_payload=method_must_be_empty_body(method), 

543 read_until_eof=read_until_eof, 

544 auto_decompress=auto_decompress, 

545 read_timeout=real_timeout.sock_read, 

546 read_bufsize=read_bufsize, 

547 timeout_ceil_threshold=self._connector._timeout_ceil_threshold, 

548 max_line_size=max_line_size, 

549 max_field_size=max_field_size, 

550 ) 

551 

552 try: 

553 try: 

554 resp = await req.send(conn) 

555 try: 

556 await resp.start(conn) 

557 except BaseException: 

558 resp.close() 

559 raise 

560 except BaseException: 

561 conn.close() 

562 raise 

563 except (ClientOSError, ServerDisconnectedError): 

564 if retry_persistent_connection: 

565 retry_persistent_connection = False 

566 continue 

567 raise 

568 except ClientError: 

569 raise 

570 except OSError as exc: 

571 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

572 raise 

573 raise ClientOSError(*exc.args) from exc 

574 

575 self._cookie_jar.update_cookies(resp.cookies, resp.url) 

576 

577 # redirects 

578 if resp.status in (301, 302, 303, 307, 308) and allow_redirects: 

579 for trace in traces: 

580 await trace.send_request_redirect( 

581 method, url.update_query(params), headers, resp 

582 ) 

583 

584 redirects += 1 

585 history.append(resp) 

586 if max_redirects and redirects >= max_redirects: 

587 resp.close() 

588 raise TooManyRedirects( 

589 history[0].request_info, tuple(history) 

590 ) 

591 

592 # For 301 and 302, mimic IE, now changed in RFC 

593 # https://github.com/kennethreitz/requests/pull/269 

594 if (resp.status == 303 and resp.method != hdrs.METH_HEAD) or ( 

595 resp.status in (301, 302) and resp.method == hdrs.METH_POST 

596 ): 

597 method = hdrs.METH_GET 

598 data = None 

599 if headers.get(hdrs.CONTENT_LENGTH): 

600 headers.pop(hdrs.CONTENT_LENGTH) 

601 

602 r_url = resp.headers.get(hdrs.LOCATION) or resp.headers.get( 

603 hdrs.URI 

604 ) 

605 if r_url is None: 

606 # see github.com/aio-libs/aiohttp/issues/2022 

607 break 

608 else: 

609 # reading from correct redirection 

610 # response is forbidden 

611 resp.release() 

612 

613 try: 

614 parsed_url = URL( 

615 r_url, encoded=not self._requote_redirect_url 

616 ) 

617 

618 except ValueError as e: 

619 raise InvalidURL(r_url) from e 

620 

621 scheme = parsed_url.scheme 

622 if scheme not in ("http", "https", ""): 

623 resp.close() 

624 raise ValueError("Can redirect only to http or https") 

625 elif not scheme: 

626 parsed_url = url.join(parsed_url) 

627 

628 is_same_host_https_redirect = ( 

629 url.host == parsed_url.host 

630 and parsed_url.scheme == "https" 

631 and url.scheme == "http" 

632 ) 

633 

634 if ( 

635 url.origin() != parsed_url.origin() 

636 and not is_same_host_https_redirect 

637 ): 

638 auth = None 

639 headers.pop(hdrs.AUTHORIZATION, None) 

640 

641 url = parsed_url 

642 params = {} 

643 resp.release() 

644 continue 

645 

646 break 

647 

648 # check response status 

649 if raise_for_status is None: 

650 raise_for_status = self._raise_for_status 

651 

652 if raise_for_status is None: 

653 pass 

654 elif callable(raise_for_status): 

655 await raise_for_status(resp) 

656 elif raise_for_status: 

657 resp.raise_for_status() 

658 

659 # register connection 

660 if handle is not None: 

661 if resp.connection is not None: 

662 resp.connection.add_callback(handle.cancel) 

663 else: 

664 handle.cancel() 

665 

666 resp._history = tuple(history) 

667 

668 for trace in traces: 

669 await trace.send_request_end( 

670 method, url.update_query(params), headers, resp 

671 ) 

672 return resp 

673 

674 except BaseException as e: 

675 # cleanup timer 

676 tm.close() 

677 if handle: 

678 handle.cancel() 

679 handle = None 

680 

681 for trace in traces: 

682 await trace.send_request_exception( 

683 method, url.update_query(params), headers, e 

684 ) 

685 raise 

686 

687 def ws_connect( 

688 self, 

689 url: StrOrURL, 

690 *, 

691 method: str = hdrs.METH_GET, 

692 protocols: Collection[str] = (), 

693 timeout: Union[ClientWSTimeout, float, _SENTINEL, None] = sentinel, 

694 receive_timeout: Optional[float] = None, 

695 autoclose: bool = True, 

696 autoping: bool = True, 

697 heartbeat: Optional[float] = None, 

698 auth: Optional[BasicAuth] = None, 

699 origin: Optional[str] = None, 

700 params: Optional[Mapping[str, str]] = None, 

701 headers: Optional[LooseHeaders] = None, 

702 proxy: Optional[StrOrURL] = None, 

703 proxy_auth: Optional[BasicAuth] = None, 

704 ssl: Union[SSLContext, bool, Fingerprint] = True, 

705 server_hostname: Optional[str] = None, 

706 proxy_headers: Optional[LooseHeaders] = None, 

707 compress: int = 0, 

708 max_msg_size: int = 4 * 1024 * 1024, 

709 ) -> "_WSRequestContextManager": 

710 """Initiate websocket connection.""" 

711 return _WSRequestContextManager( 

712 self._ws_connect( 

713 url, 

714 method=method, 

715 protocols=protocols, 

716 timeout=timeout, 

717 receive_timeout=receive_timeout, 

718 autoclose=autoclose, 

719 autoping=autoping, 

720 heartbeat=heartbeat, 

721 auth=auth, 

722 origin=origin, 

723 params=params, 

724 headers=headers, 

725 proxy=proxy, 

726 proxy_auth=proxy_auth, 

727 ssl=ssl, 

728 server_hostname=server_hostname, 

729 proxy_headers=proxy_headers, 

730 compress=compress, 

731 max_msg_size=max_msg_size, 

732 ) 

733 ) 

734 

735 async def _ws_connect( 

736 self, 

737 url: StrOrURL, 

738 *, 

739 method: str = hdrs.METH_GET, 

740 protocols: Collection[str] = (), 

741 timeout: Union[ClientWSTimeout, float, _SENTINEL, None] = sentinel, 

742 receive_timeout: Optional[float] = None, 

743 autoclose: bool = True, 

744 autoping: bool = True, 

745 heartbeat: Optional[float] = None, 

746 auth: Optional[BasicAuth] = None, 

747 origin: Optional[str] = None, 

748 params: Optional[Mapping[str, str]] = None, 

749 headers: Optional[LooseHeaders] = None, 

750 proxy: Optional[StrOrURL] = None, 

751 proxy_auth: Optional[BasicAuth] = None, 

752 ssl: Union[SSLContext, bool, Fingerprint] = True, 

753 server_hostname: Optional[str] = None, 

754 proxy_headers: Optional[LooseHeaders] = None, 

755 compress: int = 0, 

756 max_msg_size: int = 4 * 1024 * 1024, 

757 ) -> ClientWebSocketResponse: 

758 if timeout is sentinel or timeout is None: 

759 ws_timeout = DEFAULT_WS_CLIENT_TIMEOUT 

760 else: 

761 if isinstance(timeout, ClientWSTimeout): 

762 ws_timeout = timeout 

763 else: 

764 warnings.warn( 

765 "parameter 'timeout' of type 'float' " 

766 "is deprecated, please use " 

767 "'timeout=ClientWSTimeout(ws_close=...)'", 

768 DeprecationWarning, 

769 stacklevel=2, 

770 ) 

771 ws_timeout = ClientWSTimeout(ws_close=timeout) 

772 

773 if receive_timeout is not None: 

774 warnings.warn( 

775 "float parameter 'receive_timeout' " 

776 "is deprecated, please use parameter " 

777 "'timeout=ClientWSTimeout(ws_receive=...)'", 

778 DeprecationWarning, 

779 stacklevel=2, 

780 ) 

781 ws_timeout = dataclasses.replace(ws_timeout, ws_receive=receive_timeout) 

782 

783 if headers is None: 

784 real_headers: CIMultiDict[str] = CIMultiDict() 

785 else: 

786 real_headers = CIMultiDict(headers) 

787 

788 default_headers = { 

789 hdrs.UPGRADE: "websocket", 

790 hdrs.CONNECTION: "Upgrade", 

791 hdrs.SEC_WEBSOCKET_VERSION: "13", 

792 } 

793 

794 for key, value in default_headers.items(): 

795 real_headers.setdefault(key, value) 

796 

797 sec_key = base64.b64encode(os.urandom(16)) 

798 real_headers[hdrs.SEC_WEBSOCKET_KEY] = sec_key.decode() 

799 

800 if protocols: 

801 real_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = ",".join(protocols) 

802 if origin is not None: 

803 real_headers[hdrs.ORIGIN] = origin 

804 if compress: 

805 extstr = ws_ext_gen(compress=compress) 

806 real_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = extstr 

807 

808 if not isinstance(ssl, SSL_ALLOWED_TYPES): 

809 raise TypeError( 

810 "ssl should be SSLContext, Fingerprint, or bool, " 

811 "got {!r} instead.".format(ssl) 

812 ) 

813 

814 # send request 

815 resp = await self.request( 

816 method, 

817 url, 

818 params=params, 

819 headers=real_headers, 

820 read_until_eof=False, 

821 auth=auth, 

822 proxy=proxy, 

823 proxy_auth=proxy_auth, 

824 ssl=ssl, 

825 server_hostname=server_hostname, 

826 proxy_headers=proxy_headers, 

827 ) 

828 

829 try: 

830 # check handshake 

831 if resp.status != 101: 

832 raise WSServerHandshakeError( 

833 resp.request_info, 

834 resp.history, 

835 message="Invalid response status", 

836 status=resp.status, 

837 headers=resp.headers, 

838 ) 

839 

840 if resp.headers.get(hdrs.UPGRADE, "").lower() != "websocket": 

841 raise WSServerHandshakeError( 

842 resp.request_info, 

843 resp.history, 

844 message="Invalid upgrade header", 

845 status=resp.status, 

846 headers=resp.headers, 

847 ) 

848 

849 if resp.headers.get(hdrs.CONNECTION, "").lower() != "upgrade": 

850 raise WSServerHandshakeError( 

851 resp.request_info, 

852 resp.history, 

853 message="Invalid connection header", 

854 status=resp.status, 

855 headers=resp.headers, 

856 ) 

857 

858 # key calculation 

859 r_key = resp.headers.get(hdrs.SEC_WEBSOCKET_ACCEPT, "") 

860 match = base64.b64encode(hashlib.sha1(sec_key + WS_KEY).digest()).decode() 

861 if r_key != match: 

862 raise WSServerHandshakeError( 

863 resp.request_info, 

864 resp.history, 

865 message="Invalid challenge response", 

866 status=resp.status, 

867 headers=resp.headers, 

868 ) 

869 

870 # websocket protocol 

871 protocol = None 

872 if protocols and hdrs.SEC_WEBSOCKET_PROTOCOL in resp.headers: 

873 resp_protocols = [ 

874 proto.strip() 

875 for proto in resp.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",") 

876 ] 

877 

878 for proto in resp_protocols: 

879 if proto in protocols: 

880 protocol = proto 

881 break 

882 

883 # websocket compress 

884 notakeover = False 

885 if compress: 

886 compress_hdrs = resp.headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS) 

887 if compress_hdrs: 

888 try: 

889 compress, notakeover = ws_ext_parse(compress_hdrs) 

890 except WSHandshakeError as exc: 

891 raise WSServerHandshakeError( 

892 resp.request_info, 

893 resp.history, 

894 message=exc.args[0], 

895 status=resp.status, 

896 headers=resp.headers, 

897 ) from exc 

898 else: 

899 compress = 0 

900 notakeover = False 

901 

902 conn = resp.connection 

903 assert conn is not None 

904 conn_proto = conn.protocol 

905 assert conn_proto is not None 

906 transport = conn.transport 

907 assert transport is not None 

908 reader: FlowControlDataQueue[WSMessage] = FlowControlDataQueue( 

909 conn_proto, 2**16, loop=self._loop 

910 ) 

911 conn_proto.set_parser(WebSocketReader(reader, max_msg_size), reader) 

912 writer = WebSocketWriter( 

913 conn_proto, 

914 transport, 

915 use_mask=True, 

916 compress=compress, 

917 notakeover=notakeover, 

918 ) 

919 except BaseException: 

920 resp.close() 

921 raise 

922 else: 

923 return self._ws_response_class( 

924 reader, 

925 writer, 

926 protocol, 

927 resp, 

928 ws_timeout, 

929 autoclose, 

930 autoping, 

931 self._loop, 

932 heartbeat=heartbeat, 

933 compress=compress, 

934 client_notakeover=notakeover, 

935 ) 

936 

937 def _prepare_headers(self, headers: Optional[LooseHeaders]) -> "CIMultiDict[str]": 

938 """Add default headers and transform it to CIMultiDict""" 

939 # Convert headers to MultiDict 

940 result = CIMultiDict(self._default_headers) 

941 if headers: 

942 if not isinstance(headers, (MultiDictProxy, MultiDict)): 

943 headers = CIMultiDict(headers) 

944 added_names: Set[str] = set() 

945 for key, value in headers.items(): 

946 if key in added_names: 

947 result.add(key, value) 

948 else: 

949 result[key] = value 

950 added_names.add(key) 

951 return result 

952 

953 def get( 

954 self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any 

955 ) -> "_RequestContextManager": 

956 """Perform HTTP GET request.""" 

957 return _RequestContextManager( 

958 self._request(hdrs.METH_GET, url, allow_redirects=allow_redirects, **kwargs) 

959 ) 

960 

961 def options( 

962 self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any 

963 ) -> "_RequestContextManager": 

964 """Perform HTTP OPTIONS request.""" 

965 return _RequestContextManager( 

966 self._request( 

967 hdrs.METH_OPTIONS, url, allow_redirects=allow_redirects, **kwargs 

968 ) 

969 ) 

970 

971 def head( 

972 self, url: StrOrURL, *, allow_redirects: bool = False, **kwargs: Any 

973 ) -> "_RequestContextManager": 

974 """Perform HTTP HEAD request.""" 

975 return _RequestContextManager( 

976 self._request( 

977 hdrs.METH_HEAD, url, allow_redirects=allow_redirects, **kwargs 

978 ) 

979 ) 

980 

981 def post( 

982 self, url: StrOrURL, *, data: Any = None, **kwargs: Any 

983 ) -> "_RequestContextManager": 

984 """Perform HTTP POST request.""" 

985 return _RequestContextManager( 

986 self._request(hdrs.METH_POST, url, data=data, **kwargs) 

987 ) 

988 

989 def put( 

990 self, url: StrOrURL, *, data: Any = None, **kwargs: Any 

991 ) -> "_RequestContextManager": 

992 """Perform HTTP PUT request.""" 

993 return _RequestContextManager( 

994 self._request(hdrs.METH_PUT, url, data=data, **kwargs) 

995 ) 

996 

997 def patch( 

998 self, url: StrOrURL, *, data: Any = None, **kwargs: Any 

999 ) -> "_RequestContextManager": 

1000 """Perform HTTP PATCH request.""" 

1001 return _RequestContextManager( 

1002 self._request(hdrs.METH_PATCH, url, data=data, **kwargs) 

1003 ) 

1004 

1005 def delete(self, url: StrOrURL, **kwargs: Any) -> "_RequestContextManager": 

1006 """Perform HTTP DELETE request.""" 

1007 return _RequestContextManager(self._request(hdrs.METH_DELETE, url, **kwargs)) 

1008 

1009 async def close(self) -> None: 

1010 """Close underlying connector. 

1011 

1012 Release all acquired resources. 

1013 """ 

1014 if not self.closed: 

1015 if self._connector is not None and self._connector_owner: 

1016 await self._connector.close() 

1017 self._connector = None 

1018 

1019 @property 

1020 def closed(self) -> bool: 

1021 """Is client session closed. 

1022 

1023 A readonly property. 

1024 """ 

1025 return self._connector is None or self._connector.closed 

1026 

1027 @property 

1028 def connector(self) -> Optional[BaseConnector]: 

1029 """Connector instance used for the session.""" 

1030 return self._connector 

1031 

1032 @property 

1033 def cookie_jar(self) -> AbstractCookieJar: 

1034 """The session cookies.""" 

1035 return self._cookie_jar 

1036 

1037 @property 

1038 def version(self) -> Tuple[int, int]: 

1039 """The session HTTP protocol version.""" 

1040 return self._version 

1041 

1042 @property 

1043 def requote_redirect_url(self) -> bool: 

1044 """Do URL requoting on redirection handling.""" 

1045 return self._requote_redirect_url 

1046 

1047 @property 

1048 def timeout(self) -> ClientTimeout: 

1049 """Timeout for the session.""" 

1050 return self._timeout 

1051 

1052 @property 

1053 def headers(self) -> "CIMultiDict[str]": 

1054 """The default headers of the client session.""" 

1055 return self._default_headers 

1056 

1057 @property 

1058 def skip_auto_headers(self) -> FrozenSet[istr]: 

1059 """Headers for which autogeneration should be skipped""" 

1060 return self._skip_auto_headers 

1061 

1062 @property 

1063 def auth(self) -> Optional[BasicAuth]: 

1064 """An object that represents HTTP Basic Authorization""" 

1065 return self._default_auth 

1066 

1067 @property 

1068 def json_serialize(self) -> JSONEncoder: 

1069 """Json serializer callable""" 

1070 return self._json_serialize 

1071 

1072 @property 

1073 def connector_owner(self) -> bool: 

1074 """Should connector be closed on session closing""" 

1075 return self._connector_owner 

1076 

1077 @property 

1078 def raise_for_status( 

1079 self, 

1080 ) -> Union[bool, Callable[[ClientResponse], Awaitable[None]]]: 

1081 """Should `ClientResponse.raise_for_status()` be called for each response.""" 

1082 return self._raise_for_status 

1083 

1084 @property 

1085 def auto_decompress(self) -> bool: 

1086 """Should the body response be automatically decompressed.""" 

1087 return self._auto_decompress 

1088 

1089 @property 

1090 def trust_env(self) -> bool: 

1091 """ 

1092 Should proxies information from environment or netrc be trusted. 

1093 

1094 Information is from HTTP_PROXY / HTTPS_PROXY environment variables 

1095 or ~/.netrc file if present. 

1096 """ 

1097 return self._trust_env 

1098 

1099 @property 

1100 def trace_configs(self) -> List[TraceConfig]: 

1101 """A list of TraceConfig instances used for client tracing""" 

1102 return self._trace_configs 

1103 

1104 def detach(self) -> None: 

1105 """Detach connector from session without closing the former. 

1106 

1107 Session is switched to closed state anyway. 

1108 """ 

1109 self._connector = None 

1110 

1111 async def __aenter__(self) -> "ClientSession": 

1112 return self 

1113 

1114 async def __aexit__( 

1115 self, 

1116 exc_type: Optional[Type[BaseException]], 

1117 exc_val: Optional[BaseException], 

1118 exc_tb: Optional[TracebackType], 

1119 ) -> None: 

1120 await self.close() 

1121 

1122 

1123class _BaseRequestContextManager(Coroutine[Any, Any, _RetType], Generic[_RetType]): 

1124 __slots__ = ("_coro", "_resp") 

1125 

1126 def __init__(self, coro: Coroutine["asyncio.Future[Any]", None, _RetType]) -> None: 

1127 self._coro = coro 

1128 

1129 def send(self, arg: None) -> "asyncio.Future[Any]": 

1130 return self._coro.send(arg) 

1131 

1132 def throw(self, *args: Any, **kwargs: Any) -> "asyncio.Future[Any]": 

1133 return self._coro.throw(*args, **kwargs) 

1134 

1135 def close(self) -> None: 

1136 return self._coro.close() 

1137 

1138 def __await__(self) -> Generator[Any, None, _RetType]: 

1139 ret = self._coro.__await__() 

1140 return ret 

1141 

1142 def __iter__(self) -> Generator[Any, None, _RetType]: 

1143 return self.__await__() 

1144 

1145 async def __aenter__(self) -> _RetType: 

1146 self._resp = await self._coro 

1147 return self._resp 

1148 

1149 

1150class _RequestContextManager(_BaseRequestContextManager[ClientResponse]): 

1151 __slots__ = () 

1152 

1153 async def __aexit__( 

1154 self, 

1155 exc_type: Optional[Type[BaseException]], 

1156 exc: Optional[BaseException], 

1157 tb: Optional[TracebackType], 

1158 ) -> None: 

1159 # We're basing behavior on the exception as it can be caused by 

1160 # user code unrelated to the status of the connection. If you 

1161 # would like to close a connection you must do that 

1162 # explicitly. Otherwise connection error handling should kick in 

1163 # and close/recycle the connection as required. 

1164 self._resp.release() 

1165 await self._resp.wait_for_close() 

1166 

1167 

1168class _WSRequestContextManager(_BaseRequestContextManager[ClientWebSocketResponse]): 

1169 __slots__ = () 

1170 

1171 async def __aexit__( 

1172 self, 

1173 exc_type: Optional[Type[BaseException]], 

1174 exc: Optional[BaseException], 

1175 tb: Optional[TracebackType], 

1176 ) -> None: 

1177 await self._resp.close() 

1178 

1179 

1180class _SessionRequestContextManager: 

1181 __slots__ = ("_coro", "_resp", "_session") 

1182 

1183 def __init__( 

1184 self, 

1185 coro: Coroutine["asyncio.Future[Any]", None, ClientResponse], 

1186 session: ClientSession, 

1187 ) -> None: 

1188 self._coro = coro 

1189 self._resp: Optional[ClientResponse] = None 

1190 self._session = session 

1191 

1192 async def __aenter__(self) -> ClientResponse: 

1193 try: 

1194 self._resp = await self._coro 

1195 except BaseException: 

1196 await self._session.close() 

1197 raise 

1198 else: 

1199 return self._resp 

1200 

1201 async def __aexit__( 

1202 self, 

1203 exc_type: Optional[Type[BaseException]], 

1204 exc: Optional[BaseException], 

1205 tb: Optional[TracebackType], 

1206 ) -> None: 

1207 assert self._resp is not None 

1208 self._resp.close() 

1209 await self._session.close() 

1210 

1211 

1212def request( 

1213 method: str, 

1214 url: StrOrURL, 

1215 *, 

1216 params: Optional[Mapping[str, str]] = None, 

1217 data: Any = None, 

1218 json: Any = None, 

1219 headers: Optional[LooseHeaders] = None, 

1220 skip_auto_headers: Optional[Iterable[str]] = None, 

1221 auth: Optional[BasicAuth] = None, 

1222 allow_redirects: bool = True, 

1223 max_redirects: int = 10, 

1224 compress: Optional[str] = None, 

1225 chunked: Optional[bool] = None, 

1226 expect100: bool = False, 

1227 raise_for_status: Optional[bool] = None, 

1228 read_until_eof: bool = True, 

1229 proxy: Optional[StrOrURL] = None, 

1230 proxy_auth: Optional[BasicAuth] = None, 

1231 timeout: Union[ClientTimeout, _SENTINEL] = sentinel, 

1232 cookies: Optional[LooseCookies] = None, 

1233 version: HttpVersion = http.HttpVersion11, 

1234 connector: Optional[BaseConnector] = None, 

1235 read_bufsize: Optional[int] = None, 

1236 max_line_size: int = 8190, 

1237 max_field_size: int = 8190, 

1238) -> _SessionRequestContextManager: 

1239 """Constructs and sends a request. 

1240 

1241 Returns response object. 

1242 method - HTTP method 

1243 url - request url 

1244 params - (optional) Dictionary or bytes to be sent in the query 

1245 string of the new request 

1246 data - (optional) Dictionary, bytes, or file-like object to 

1247 send in the body of the request 

1248 json - (optional) Any json compatible python object 

1249 headers - (optional) Dictionary of HTTP Headers to send with 

1250 the request 

1251 cookies - (optional) Dict object to send with the request 

1252 auth - (optional) BasicAuth named tuple represent HTTP Basic Auth 

1253 auth - aiohttp.helpers.BasicAuth 

1254 allow_redirects - (optional) If set to False, do not follow 

1255 redirects 

1256 version - Request HTTP version. 

1257 compress - Set to True if request has to be compressed 

1258 with deflate encoding. 

1259 chunked - Set to chunk size for chunked transfer encoding. 

1260 expect100 - Expect 100-continue response from server. 

1261 connector - BaseConnector sub-class instance to support 

1262 connection pooling. 

1263 read_until_eof - Read response until eof if response 

1264 does not have Content-Length header. 

1265 loop - Optional event loop. 

1266 timeout - Optional ClientTimeout settings structure, 5min 

1267 total timeout by default. 

1268 Usage:: 

1269 >>> import aiohttp 

1270 >>> async with aiohttp.request('GET', 'http://python.org/') as resp: 

1271 ... print(resp) 

1272 ... data = await resp.read() 

1273 <ClientResponse(https://www.python.org/) [200 OK]> 

1274 """ 

1275 connector_owner = False 

1276 if connector is None: 

1277 connector_owner = True 

1278 connector = TCPConnector(force_close=True) 

1279 

1280 session = ClientSession( 

1281 cookies=cookies, 

1282 version=version, 

1283 timeout=timeout, 

1284 connector=connector, 

1285 connector_owner=connector_owner, 

1286 ) 

1287 

1288 return _SessionRequestContextManager( 

1289 session._request( 

1290 method, 

1291 url, 

1292 params=params, 

1293 data=data, 

1294 json=json, 

1295 headers=headers, 

1296 skip_auto_headers=skip_auto_headers, 

1297 auth=auth, 

1298 allow_redirects=allow_redirects, 

1299 max_redirects=max_redirects, 

1300 compress=compress, 

1301 chunked=chunked, 

1302 expect100=expect100, 

1303 raise_for_status=raise_for_status, 

1304 read_until_eof=read_until_eof, 

1305 proxy=proxy, 

1306 proxy_auth=proxy_auth, 

1307 read_bufsize=read_bufsize, 

1308 max_line_size=max_line_size, 

1309 max_field_size=max_field_size, 

1310 ), 

1311 session, 

1312 )