Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client.py: 51%

475 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:56 +0000

1"""HTTP Client for asyncio.""" 

2 

3import asyncio 

4import base64 

5import hashlib 

6import json 

7import os 

8import sys 

9import traceback 

10import warnings 

11from contextlib import suppress 

12from types import SimpleNamespace, TracebackType 

13from typing import ( 

14 Any, 

15 Awaitable, 

16 Callable, 

17 Coroutine, 

18 FrozenSet, 

19 Generator, 

20 Generic, 

21 Iterable, 

22 List, 

23 Mapping, 

24 Optional, 

25 Set, 

26 Tuple, 

27 Type, 

28 TypeVar, 

29 Union, 

30) 

31 

32import attr 

33from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr 

34from yarl import URL 

35 

36from . import hdrs, http, payload 

37from .abc import AbstractCookieJar 

38from .client_exceptions import ( 

39 ClientConnectionError as ClientConnectionError, 

40 ClientConnectorCertificateError as ClientConnectorCertificateError, 

41 ClientConnectorError as ClientConnectorError, 

42 ClientConnectorSSLError as ClientConnectorSSLError, 

43 ClientError as ClientError, 

44 ClientHttpProxyError as ClientHttpProxyError, 

45 ClientOSError as ClientOSError, 

46 ClientPayloadError as ClientPayloadError, 

47 ClientProxyConnectionError as ClientProxyConnectionError, 

48 ClientResponseError as ClientResponseError, 

49 ClientSSLError as ClientSSLError, 

50 ContentTypeError as ContentTypeError, 

51 InvalidURL as InvalidURL, 

52 ServerConnectionError as ServerConnectionError, 

53 ServerDisconnectedError as ServerDisconnectedError, 

54 ServerFingerprintMismatch as ServerFingerprintMismatch, 

55 ServerTimeoutError as ServerTimeoutError, 

56 TooManyRedirects as TooManyRedirects, 

57 WSServerHandshakeError as WSServerHandshakeError, 

58) 

59from .client_reqrep import ( 

60 ClientRequest as ClientRequest, 

61 ClientResponse as ClientResponse, 

62 Fingerprint as Fingerprint, 

63 RequestInfo as RequestInfo, 

64 _merge_ssl_params, 

65) 

66from .client_ws import ClientWebSocketResponse as ClientWebSocketResponse 

67from .connector import ( 

68 BaseConnector as BaseConnector, 

69 NamedPipeConnector as NamedPipeConnector, 

70 TCPConnector as TCPConnector, 

71 UnixConnector as UnixConnector, 

72) 

73from .cookiejar import CookieJar 

74from .helpers import ( 

75 DEBUG, 

76 PY_36, 

77 BasicAuth, 

78 TimeoutHandle, 

79 ceil_timeout, 

80 get_env_proxy_for_url, 

81 get_running_loop, 

82 sentinel, 

83 strip_auth_from_url, 

84) 

85from .http import WS_KEY, HttpVersion, WebSocketReader, WebSocketWriter 

86from .http_websocket import WSHandshakeError, WSMessage, ws_ext_gen, ws_ext_parse 

87from .streams import FlowControlDataQueue 

88from .tracing import Trace, TraceConfig 

89from .typedefs import Final, JSONEncoder, LooseCookies, LooseHeaders, StrOrURL 

90 

91__all__ = ( 

92 # client_exceptions 

93 "ClientConnectionError", 

94 "ClientConnectorCertificateError", 

95 "ClientConnectorError", 

96 "ClientConnectorSSLError", 

97 "ClientError", 

98 "ClientHttpProxyError", 

99 "ClientOSError", 

100 "ClientPayloadError", 

101 "ClientProxyConnectionError", 

102 "ClientResponseError", 

103 "ClientSSLError", 

104 "ContentTypeError", 

105 "InvalidURL", 

106 "ServerConnectionError", 

107 "ServerDisconnectedError", 

108 "ServerFingerprintMismatch", 

109 "ServerTimeoutError", 

110 "TooManyRedirects", 

111 "WSServerHandshakeError", 

112 # client_reqrep 

113 "ClientRequest", 

114 "ClientResponse", 

115 "Fingerprint", 

116 "RequestInfo", 

117 # connector 

118 "BaseConnector", 

119 "TCPConnector", 

120 "UnixConnector", 

121 "NamedPipeConnector", 

122 # client_ws 

123 "ClientWebSocketResponse", 

124 # client 

125 "ClientSession", 

126 "ClientTimeout", 

127 "request", 

128) 

129 

130 

131try: 

132 from ssl import SSLContext 

133except ImportError: # pragma: no cover 

134 SSLContext = object # type: ignore[misc,assignment] 

135 

136 

137@attr.s(auto_attribs=True, frozen=True, slots=True) 

138class ClientTimeout: 

139 total: Optional[float] = None 

140 connect: Optional[float] = None 

141 sock_read: Optional[float] = None 

142 sock_connect: Optional[float] = None 

143 

144 # pool_queue_timeout: Optional[float] = None 

145 # dns_resolution_timeout: Optional[float] = None 

146 # socket_connect_timeout: Optional[float] = None 

147 # connection_acquiring_timeout: Optional[float] = None 

148 # new_connection_timeout: Optional[float] = None 

149 # http_header_timeout: Optional[float] = None 

150 # response_body_timeout: Optional[float] = None 

151 

152 # to create a timeout specific for a single request, either 

153 # - create a completely new one to overwrite the default 

154 # - or use http://www.attrs.org/en/stable/api.html#attr.evolve 

155 # to overwrite the defaults 

156 

157 

158# 5 Minute default read timeout 

159DEFAULT_TIMEOUT: Final[ClientTimeout] = ClientTimeout(total=5 * 60) 

160 

161_RetType = TypeVar("_RetType") 

162 

163 

164class ClientSession: 

165 """First-class interface for making HTTP requests.""" 

166 

167 ATTRS = frozenset( 

168 [ 

169 "_base_url", 

170 "_source_traceback", 

171 "_connector", 

172 "requote_redirect_url", 

173 "_loop", 

174 "_cookie_jar", 

175 "_connector_owner", 

176 "_default_auth", 

177 "_version", 

178 "_json_serialize", 

179 "_requote_redirect_url", 

180 "_timeout", 

181 "_raise_for_status", 

182 "_auto_decompress", 

183 "_trust_env", 

184 "_default_headers", 

185 "_skip_auto_headers", 

186 "_request_class", 

187 "_response_class", 

188 "_ws_response_class", 

189 "_trace_configs", 

190 "_read_bufsize", 

191 ] 

192 ) 

193 

194 _source_traceback = None # type: Optional[traceback.StackSummary] 

195 _connector = None # type: Optional[BaseConnector] 

196 

197 def __init__( 

198 self, 

199 base_url: Optional[StrOrURL] = None, 

200 *, 

201 connector: Optional[BaseConnector] = None, 

202 loop: Optional[asyncio.AbstractEventLoop] = None, 

203 cookies: Optional[LooseCookies] = None, 

204 headers: Optional[LooseHeaders] = None, 

205 skip_auto_headers: Optional[Iterable[str]] = None, 

206 auth: Optional[BasicAuth] = None, 

207 json_serialize: JSONEncoder = json.dumps, 

208 request_class: Type[ClientRequest] = ClientRequest, 

209 response_class: Type[ClientResponse] = ClientResponse, 

210 ws_response_class: Type[ClientWebSocketResponse] = ClientWebSocketResponse, 

211 version: HttpVersion = http.HttpVersion11, 

212 cookie_jar: Optional[AbstractCookieJar] = None, 

213 connector_owner: bool = True, 

214 raise_for_status: bool = False, 

215 read_timeout: Union[float, object] = sentinel, 

216 conn_timeout: Optional[float] = None, 

217 timeout: Union[object, ClientTimeout] = sentinel, 

218 auto_decompress: bool = True, 

219 trust_env: bool = False, 

220 requote_redirect_url: bool = True, 

221 trace_configs: Optional[List[TraceConfig]] = None, 

222 read_bufsize: int = 2**16, 

223 ) -> None: 

224 if loop is None: 

225 if connector is not None: 

226 loop = connector._loop 

227 

228 loop = get_running_loop(loop) 

229 

230 if base_url is None or isinstance(base_url, URL): 

231 self._base_url: Optional[URL] = base_url 

232 else: 

233 self._base_url = URL(base_url) 

234 assert ( 

235 self._base_url.origin() == self._base_url 

236 ), "Only absolute URLs without path part are supported" 

237 

238 if connector is None: 

239 connector = TCPConnector(loop=loop) 

240 

241 if connector._loop is not loop: 

242 raise RuntimeError("Session and connector has to use same event loop") 

243 

244 self._loop = loop 

245 

246 if loop.get_debug(): 

247 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

248 

249 if cookie_jar is None: 

250 cookie_jar = CookieJar(loop=loop) 

251 self._cookie_jar = cookie_jar 

252 

253 if cookies is not None: 

254 self._cookie_jar.update_cookies(cookies) 

255 

256 self._connector = connector 

257 self._connector_owner = connector_owner 

258 self._default_auth = auth 

259 self._version = version 

260 self._json_serialize = json_serialize 

261 if timeout is sentinel: 

262 self._timeout = DEFAULT_TIMEOUT 

263 if read_timeout is not sentinel: 

264 warnings.warn( 

265 "read_timeout is deprecated, " "use timeout argument instead", 

266 DeprecationWarning, 

267 stacklevel=2, 

268 ) 

269 self._timeout = attr.evolve(self._timeout, total=read_timeout) 

270 if conn_timeout is not None: 

271 self._timeout = attr.evolve(self._timeout, connect=conn_timeout) 

272 warnings.warn( 

273 "conn_timeout is deprecated, " "use timeout argument instead", 

274 DeprecationWarning, 

275 stacklevel=2, 

276 ) 

277 else: 

278 self._timeout = timeout # type: ignore[assignment] 

279 if read_timeout is not sentinel: 

280 raise ValueError( 

281 "read_timeout and timeout parameters " 

282 "conflict, please setup " 

283 "timeout.read" 

284 ) 

285 if conn_timeout is not None: 

286 raise ValueError( 

287 "conn_timeout and timeout parameters " 

288 "conflict, please setup " 

289 "timeout.connect" 

290 ) 

291 self._raise_for_status = raise_for_status 

292 self._auto_decompress = auto_decompress 

293 self._trust_env = trust_env 

294 self._requote_redirect_url = requote_redirect_url 

295 self._read_bufsize = read_bufsize 

296 

297 # Convert to list of tuples 

298 if headers: 

299 real_headers: CIMultiDict[str] = CIMultiDict(headers) 

300 else: 

301 real_headers = CIMultiDict() 

302 self._default_headers: CIMultiDict[str] = real_headers 

303 if skip_auto_headers is not None: 

304 self._skip_auto_headers = frozenset(istr(i) for i in skip_auto_headers) 

305 else: 

306 self._skip_auto_headers = frozenset() 

307 

308 self._request_class = request_class 

309 self._response_class = response_class 

310 self._ws_response_class = ws_response_class 

311 

312 self._trace_configs = trace_configs or [] 

313 for trace_config in self._trace_configs: 

314 trace_config.freeze() 

315 

316 def __init_subclass__(cls: Type["ClientSession"]) -> None: 

317 warnings.warn( 

318 "Inheritance class {} from ClientSession " 

319 "is discouraged".format(cls.__name__), 

320 DeprecationWarning, 

321 stacklevel=2, 

322 ) 

323 

324 if DEBUG: 

325 

326 def __setattr__(self, name: str, val: Any) -> None: 

327 if name not in self.ATTRS: 

328 warnings.warn( 

329 "Setting custom ClientSession.{} attribute " 

330 "is discouraged".format(name), 

331 DeprecationWarning, 

332 stacklevel=2, 

333 ) 

334 super().__setattr__(name, val) 

335 

336 def __del__(self, _warnings: Any = warnings) -> None: 

337 if not self.closed: 

338 if PY_36: 

339 kwargs = {"source": self} 

340 else: 

341 kwargs = {} 

342 _warnings.warn( 

343 f"Unclosed client session {self!r}", ResourceWarning, **kwargs 

344 ) 

345 context = {"client_session": self, "message": "Unclosed client session"} 

346 if self._source_traceback is not None: 

347 context["source_traceback"] = self._source_traceback 

348 self._loop.call_exception_handler(context) 

349 

350 def request( 

351 self, method: str, url: StrOrURL, **kwargs: Any 

352 ) -> "_RequestContextManager": 

353 """Perform HTTP request.""" 

354 return _RequestContextManager(self._request(method, url, **kwargs)) 

355 

356 def _build_url(self, str_or_url: StrOrURL) -> URL: 

357 url = URL(str_or_url) 

358 if self._base_url is None: 

359 return url 

360 else: 

361 assert not url.is_absolute() and url.path.startswith("/") 

362 return self._base_url.join(url) 

363 

364 async def _request( 

365 self, 

366 method: str, 

367 str_or_url: StrOrURL, 

368 *, 

369 params: Optional[Mapping[str, str]] = None, 

370 data: Any = None, 

371 json: Any = None, 

372 cookies: Optional[LooseCookies] = None, 

373 headers: Optional[LooseHeaders] = None, 

374 skip_auto_headers: Optional[Iterable[str]] = None, 

375 auth: Optional[BasicAuth] = None, 

376 allow_redirects: bool = True, 

377 max_redirects: int = 10, 

378 compress: Optional[str] = None, 

379 chunked: Optional[bool] = None, 

380 expect100: bool = False, 

381 raise_for_status: Optional[bool] = None, 

382 read_until_eof: bool = True, 

383 proxy: Optional[StrOrURL] = None, 

384 proxy_auth: Optional[BasicAuth] = None, 

385 timeout: Union[ClientTimeout, object] = sentinel, 

386 verify_ssl: Optional[bool] = None, 

387 fingerprint: Optional[bytes] = None, 

388 ssl_context: Optional[SSLContext] = None, 

389 ssl: Optional[Union[SSLContext, bool, Fingerprint]] = None, 

390 proxy_headers: Optional[LooseHeaders] = None, 

391 trace_request_ctx: Optional[SimpleNamespace] = None, 

392 read_bufsize: Optional[int] = None, 

393 ) -> ClientResponse: 

394 

395 # NOTE: timeout clamps existing connect and read timeouts. We cannot 

396 # set the default to None because we need to detect if the user wants 

397 # to use the existing timeouts by setting timeout to None. 

398 

399 if self.closed: 

400 raise RuntimeError("Session is closed") 

401 

402 ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) 

403 

404 if data is not None and json is not None: 

405 raise ValueError( 

406 "data and json parameters can not be used at the same time" 

407 ) 

408 elif json is not None: 

409 data = payload.JsonPayload(json, dumps=self._json_serialize) 

410 

411 if not isinstance(chunked, bool) and chunked is not None: 

412 warnings.warn("Chunk size is deprecated #1615", DeprecationWarning) 

413 

414 redirects = 0 

415 history = [] 

416 version = self._version 

417 

418 # Merge with default headers and transform to CIMultiDict 

419 headers = self._prepare_headers(headers) 

420 proxy_headers = self._prepare_headers(proxy_headers) 

421 

422 try: 

423 url = self._build_url(str_or_url) 

424 except ValueError as e: 

425 raise InvalidURL(str_or_url) from e 

426 

427 skip_headers = set(self._skip_auto_headers) 

428 if skip_auto_headers is not None: 

429 for i in skip_auto_headers: 

430 skip_headers.add(istr(i)) 

431 

432 if proxy is not None: 

433 try: 

434 proxy = URL(proxy) 

435 except ValueError as e: 

436 raise InvalidURL(proxy) from e 

437 

438 if timeout is sentinel: 

439 real_timeout: ClientTimeout = self._timeout 

440 else: 

441 if not isinstance(timeout, ClientTimeout): 

442 real_timeout = ClientTimeout(total=timeout) # type: ignore[arg-type] 

443 else: 

444 real_timeout = timeout 

445 # timeout is cumulative for all request operations 

446 # (request, redirects, responses, data consuming) 

447 tm = TimeoutHandle(self._loop, real_timeout.total) 

448 handle = tm.start() 

449 

450 if read_bufsize is None: 

451 read_bufsize = self._read_bufsize 

452 

453 traces = [ 

454 Trace( 

455 self, 

456 trace_config, 

457 trace_config.trace_config_ctx(trace_request_ctx=trace_request_ctx), 

458 ) 

459 for trace_config in self._trace_configs 

460 ] 

461 

462 for trace in traces: 

463 await trace.send_request_start(method, url.update_query(params), headers) 

464 

465 timer = tm.timer() 

466 try: 

467 with timer: 

468 while True: 

469 url, auth_from_url = strip_auth_from_url(url) 

470 if auth and auth_from_url: 

471 raise ValueError( 

472 "Cannot combine AUTH argument with " 

473 "credentials encoded in URL" 

474 ) 

475 

476 if auth is None: 

477 auth = auth_from_url 

478 if auth is None: 

479 auth = self._default_auth 

480 # It would be confusing if we support explicit 

481 # Authorization header with auth argument 

482 if ( 

483 headers is not None 

484 and auth is not None 

485 and hdrs.AUTHORIZATION in headers 

486 ): 

487 raise ValueError( 

488 "Cannot combine AUTHORIZATION header " 

489 "with AUTH argument or credentials " 

490 "encoded in URL" 

491 ) 

492 

493 all_cookies = self._cookie_jar.filter_cookies(url) 

494 

495 if cookies is not None: 

496 tmp_cookie_jar = CookieJar() 

497 tmp_cookie_jar.update_cookies(cookies) 

498 req_cookies = tmp_cookie_jar.filter_cookies(url) 

499 if req_cookies: 

500 all_cookies.load(req_cookies) 

501 

502 if proxy is not None: 

503 proxy = URL(proxy) 

504 elif self._trust_env: 

505 with suppress(LookupError): 

506 proxy, proxy_auth = get_env_proxy_for_url(url) 

507 

508 req = self._request_class( 

509 method, 

510 url, 

511 params=params, 

512 headers=headers, 

513 skip_auto_headers=skip_headers, 

514 data=data, 

515 cookies=all_cookies, 

516 auth=auth, 

517 version=version, 

518 compress=compress, 

519 chunked=chunked, 

520 expect100=expect100, 

521 loop=self._loop, 

522 response_class=self._response_class, 

523 proxy=proxy, 

524 proxy_auth=proxy_auth, 

525 timer=timer, 

526 session=self, 

527 ssl=ssl, 

528 proxy_headers=proxy_headers, 

529 traces=traces, 

530 ) 

531 

532 # connection timeout 

533 try: 

534 async with ceil_timeout(real_timeout.connect): 

535 assert self._connector is not None 

536 conn = await self._connector.connect( 

537 req, traces=traces, timeout=real_timeout 

538 ) 

539 except asyncio.TimeoutError as exc: 

540 raise ServerTimeoutError( 

541 "Connection timeout " "to host {}".format(url) 

542 ) from exc 

543 

544 assert conn.transport is not None 

545 

546 assert conn.protocol is not None 

547 conn.protocol.set_response_params( 

548 timer=timer, 

549 skip_payload=method.upper() == "HEAD", 

550 read_until_eof=read_until_eof, 

551 auto_decompress=self._auto_decompress, 

552 read_timeout=real_timeout.sock_read, 

553 read_bufsize=read_bufsize, 

554 ) 

555 

556 try: 

557 try: 

558 resp = await req.send(conn) 

559 try: 

560 await resp.start(conn) 

561 except BaseException: 

562 resp.close() 

563 raise 

564 except BaseException: 

565 conn.close() 

566 raise 

567 except ClientError: 

568 raise 

569 except OSError as exc: 

570 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

571 raise 

572 raise ClientOSError(*exc.args) from exc 

573 

574 self._cookie_jar.update_cookies(resp.cookies, resp.url) 

575 

576 # redirects 

577 if resp.status in (301, 302, 303, 307, 308) and allow_redirects: 

578 

579 for trace in traces: 

580 await trace.send_request_redirect( 

581 method, url.update_query(params), headers, resp 

582 ) 

583 

584 redirects += 1 

585 history.append(resp) 

586 if max_redirects and redirects >= max_redirects: 

587 resp.close() 

588 raise TooManyRedirects( 

589 history[0].request_info, tuple(history) 

590 ) 

591 

592 # For 301 and 302, mimic IE, now changed in RFC 

593 # https://github.com/kennethreitz/requests/pull/269 

594 if (resp.status == 303 and resp.method != hdrs.METH_HEAD) or ( 

595 resp.status in (301, 302) and resp.method == hdrs.METH_POST 

596 ): 

597 method = hdrs.METH_GET 

598 data = None 

599 if headers.get(hdrs.CONTENT_LENGTH): 

600 headers.pop(hdrs.CONTENT_LENGTH) 

601 

602 r_url = resp.headers.get(hdrs.LOCATION) or resp.headers.get( 

603 hdrs.URI 

604 ) 

605 if r_url is None: 

606 # see github.com/aio-libs/aiohttp/issues/2022 

607 break 

608 else: 

609 # reading from correct redirection 

610 # response is forbidden 

611 resp.release() 

612 

613 try: 

614 parsed_url = URL( 

615 r_url, encoded=not self._requote_redirect_url 

616 ) 

617 

618 except ValueError as e: 

619 raise InvalidURL(r_url) from e 

620 

621 scheme = parsed_url.scheme 

622 if scheme not in ("http", "https", ""): 

623 resp.close() 

624 raise ValueError("Can redirect only to http or https") 

625 elif not scheme: 

626 parsed_url = url.join(parsed_url) 

627 

628 if url.origin() != parsed_url.origin(): 

629 auth = None 

630 headers.pop(hdrs.AUTHORIZATION, None) 

631 

632 url = parsed_url 

633 params = None 

634 resp.release() 

635 continue 

636 

637 break 

638 

639 # check response status 

640 if raise_for_status is None: 

641 raise_for_status = self._raise_for_status 

642 if raise_for_status: 

643 resp.raise_for_status() 

644 

645 # register connection 

646 if handle is not None: 

647 if resp.connection is not None: 

648 resp.connection.add_callback(handle.cancel) 

649 else: 

650 handle.cancel() 

651 

652 resp._history = tuple(history) 

653 

654 for trace in traces: 

655 await trace.send_request_end( 

656 method, url.update_query(params), headers, resp 

657 ) 

658 return resp 

659 

660 except BaseException as e: 

661 # cleanup timer 

662 tm.close() 

663 if handle: 

664 handle.cancel() 

665 handle = None 

666 

667 for trace in traces: 

668 await trace.send_request_exception( 

669 method, url.update_query(params), headers, e 

670 ) 

671 raise 

672 

673 def ws_connect( 

674 self, 

675 url: StrOrURL, 

676 *, 

677 method: str = hdrs.METH_GET, 

678 protocols: Iterable[str] = (), 

679 timeout: float = 10.0, 

680 receive_timeout: Optional[float] = None, 

681 autoclose: bool = True, 

682 autoping: bool = True, 

683 heartbeat: Optional[float] = None, 

684 auth: Optional[BasicAuth] = None, 

685 origin: Optional[str] = None, 

686 params: Optional[Mapping[str, str]] = None, 

687 headers: Optional[LooseHeaders] = None, 

688 proxy: Optional[StrOrURL] = None, 

689 proxy_auth: Optional[BasicAuth] = None, 

690 ssl: Union[SSLContext, bool, None, Fingerprint] = None, 

691 verify_ssl: Optional[bool] = None, 

692 fingerprint: Optional[bytes] = None, 

693 ssl_context: Optional[SSLContext] = None, 

694 proxy_headers: Optional[LooseHeaders] = None, 

695 compress: int = 0, 

696 max_msg_size: int = 4 * 1024 * 1024, 

697 ) -> "_WSRequestContextManager": 

698 """Initiate websocket connection.""" 

699 return _WSRequestContextManager( 

700 self._ws_connect( 

701 url, 

702 method=method, 

703 protocols=protocols, 

704 timeout=timeout, 

705 receive_timeout=receive_timeout, 

706 autoclose=autoclose, 

707 autoping=autoping, 

708 heartbeat=heartbeat, 

709 auth=auth, 

710 origin=origin, 

711 params=params, 

712 headers=headers, 

713 proxy=proxy, 

714 proxy_auth=proxy_auth, 

715 ssl=ssl, 

716 verify_ssl=verify_ssl, 

717 fingerprint=fingerprint, 

718 ssl_context=ssl_context, 

719 proxy_headers=proxy_headers, 

720 compress=compress, 

721 max_msg_size=max_msg_size, 

722 ) 

723 ) 

724 

725 async def _ws_connect( 

726 self, 

727 url: StrOrURL, 

728 *, 

729 method: str = hdrs.METH_GET, 

730 protocols: Iterable[str] = (), 

731 timeout: float = 10.0, 

732 receive_timeout: Optional[float] = None, 

733 autoclose: bool = True, 

734 autoping: bool = True, 

735 heartbeat: Optional[float] = None, 

736 auth: Optional[BasicAuth] = None, 

737 origin: Optional[str] = None, 

738 params: Optional[Mapping[str, str]] = None, 

739 headers: Optional[LooseHeaders] = None, 

740 proxy: Optional[StrOrURL] = None, 

741 proxy_auth: Optional[BasicAuth] = None, 

742 ssl: Union[SSLContext, bool, None, Fingerprint] = None, 

743 verify_ssl: Optional[bool] = None, 

744 fingerprint: Optional[bytes] = None, 

745 ssl_context: Optional[SSLContext] = None, 

746 proxy_headers: Optional[LooseHeaders] = None, 

747 compress: int = 0, 

748 max_msg_size: int = 4 * 1024 * 1024, 

749 ) -> ClientWebSocketResponse: 

750 

751 if headers is None: 

752 real_headers: CIMultiDict[str] = CIMultiDict() 

753 else: 

754 real_headers = CIMultiDict(headers) 

755 

756 default_headers = { 

757 hdrs.UPGRADE: "websocket", 

758 hdrs.CONNECTION: "upgrade", 

759 hdrs.SEC_WEBSOCKET_VERSION: "13", 

760 } 

761 

762 for key, value in default_headers.items(): 

763 real_headers.setdefault(key, value) 

764 

765 sec_key = base64.b64encode(os.urandom(16)) 

766 real_headers[hdrs.SEC_WEBSOCKET_KEY] = sec_key.decode() 

767 

768 if protocols: 

769 real_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = ",".join(protocols) 

770 if origin is not None: 

771 real_headers[hdrs.ORIGIN] = origin 

772 if compress: 

773 extstr = ws_ext_gen(compress=compress) 

774 real_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = extstr 

775 

776 ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) 

777 

778 # send request 

779 resp = await self.request( 

780 method, 

781 url, 

782 params=params, 

783 headers=real_headers, 

784 read_until_eof=False, 

785 auth=auth, 

786 proxy=proxy, 

787 proxy_auth=proxy_auth, 

788 ssl=ssl, 

789 proxy_headers=proxy_headers, 

790 ) 

791 

792 try: 

793 # check handshake 

794 if resp.status != 101: 

795 raise WSServerHandshakeError( 

796 resp.request_info, 

797 resp.history, 

798 message="Invalid response status", 

799 status=resp.status, 

800 headers=resp.headers, 

801 ) 

802 

803 if resp.headers.get(hdrs.UPGRADE, "").lower() != "websocket": 

804 raise WSServerHandshakeError( 

805 resp.request_info, 

806 resp.history, 

807 message="Invalid upgrade header", 

808 status=resp.status, 

809 headers=resp.headers, 

810 ) 

811 

812 if resp.headers.get(hdrs.CONNECTION, "").lower() != "upgrade": 

813 raise WSServerHandshakeError( 

814 resp.request_info, 

815 resp.history, 

816 message="Invalid connection header", 

817 status=resp.status, 

818 headers=resp.headers, 

819 ) 

820 

821 # key calculation 

822 r_key = resp.headers.get(hdrs.SEC_WEBSOCKET_ACCEPT, "") 

823 match = base64.b64encode(hashlib.sha1(sec_key + WS_KEY).digest()).decode() 

824 if r_key != match: 

825 raise WSServerHandshakeError( 

826 resp.request_info, 

827 resp.history, 

828 message="Invalid challenge response", 

829 status=resp.status, 

830 headers=resp.headers, 

831 ) 

832 

833 # websocket protocol 

834 protocol = None 

835 if protocols and hdrs.SEC_WEBSOCKET_PROTOCOL in resp.headers: 

836 resp_protocols = [ 

837 proto.strip() 

838 for proto in resp.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",") 

839 ] 

840 

841 for proto in resp_protocols: 

842 if proto in protocols: 

843 protocol = proto 

844 break 

845 

846 # websocket compress 

847 notakeover = False 

848 if compress: 

849 compress_hdrs = resp.headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS) 

850 if compress_hdrs: 

851 try: 

852 compress, notakeover = ws_ext_parse(compress_hdrs) 

853 except WSHandshakeError as exc: 

854 raise WSServerHandshakeError( 

855 resp.request_info, 

856 resp.history, 

857 message=exc.args[0], 

858 status=resp.status, 

859 headers=resp.headers, 

860 ) from exc 

861 else: 

862 compress = 0 

863 notakeover = False 

864 

865 conn = resp.connection 

866 assert conn is not None 

867 conn_proto = conn.protocol 

868 assert conn_proto is not None 

869 transport = conn.transport 

870 assert transport is not None 

871 reader: FlowControlDataQueue[WSMessage] = FlowControlDataQueue( 

872 conn_proto, 2**16, loop=self._loop 

873 ) 

874 conn_proto.set_parser(WebSocketReader(reader, max_msg_size), reader) 

875 writer = WebSocketWriter( 

876 conn_proto, 

877 transport, 

878 use_mask=True, 

879 compress=compress, 

880 notakeover=notakeover, 

881 ) 

882 except BaseException: 

883 resp.close() 

884 raise 

885 else: 

886 return self._ws_response_class( 

887 reader, 

888 writer, 

889 protocol, 

890 resp, 

891 timeout, 

892 autoclose, 

893 autoping, 

894 self._loop, 

895 receive_timeout=receive_timeout, 

896 heartbeat=heartbeat, 

897 compress=compress, 

898 client_notakeover=notakeover, 

899 ) 

900 

901 def _prepare_headers(self, headers: Optional[LooseHeaders]) -> "CIMultiDict[str]": 

902 """Add default headers and transform it to CIMultiDict""" 

903 # Convert headers to MultiDict 

904 result = CIMultiDict(self._default_headers) 

905 if headers: 

906 if not isinstance(headers, (MultiDictProxy, MultiDict)): 

907 headers = CIMultiDict(headers) 

908 added_names: Set[str] = set() 

909 for key, value in headers.items(): 

910 if key in added_names: 

911 result.add(key, value) 

912 else: 

913 result[key] = value 

914 added_names.add(key) 

915 return result 

916 

917 def get( 

918 self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any 

919 ) -> "_RequestContextManager": 

920 """Perform HTTP GET request.""" 

921 return _RequestContextManager( 

922 self._request(hdrs.METH_GET, url, allow_redirects=allow_redirects, **kwargs) 

923 ) 

924 

925 def options( 

926 self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any 

927 ) -> "_RequestContextManager": 

928 """Perform HTTP OPTIONS request.""" 

929 return _RequestContextManager( 

930 self._request( 

931 hdrs.METH_OPTIONS, url, allow_redirects=allow_redirects, **kwargs 

932 ) 

933 ) 

934 

935 def head( 

936 self, url: StrOrURL, *, allow_redirects: bool = False, **kwargs: Any 

937 ) -> "_RequestContextManager": 

938 """Perform HTTP HEAD request.""" 

939 return _RequestContextManager( 

940 self._request( 

941 hdrs.METH_HEAD, url, allow_redirects=allow_redirects, **kwargs 

942 ) 

943 ) 

944 

945 def post( 

946 self, url: StrOrURL, *, data: Any = None, **kwargs: Any 

947 ) -> "_RequestContextManager": 

948 """Perform HTTP POST request.""" 

949 return _RequestContextManager( 

950 self._request(hdrs.METH_POST, url, data=data, **kwargs) 

951 ) 

952 

953 def put( 

954 self, url: StrOrURL, *, data: Any = None, **kwargs: Any 

955 ) -> "_RequestContextManager": 

956 """Perform HTTP PUT request.""" 

957 return _RequestContextManager( 

958 self._request(hdrs.METH_PUT, url, data=data, **kwargs) 

959 ) 

960 

961 def patch( 

962 self, url: StrOrURL, *, data: Any = None, **kwargs: Any 

963 ) -> "_RequestContextManager": 

964 """Perform HTTP PATCH request.""" 

965 return _RequestContextManager( 

966 self._request(hdrs.METH_PATCH, url, data=data, **kwargs) 

967 ) 

968 

969 def delete(self, url: StrOrURL, **kwargs: Any) -> "_RequestContextManager": 

970 """Perform HTTP DELETE request.""" 

971 return _RequestContextManager(self._request(hdrs.METH_DELETE, url, **kwargs)) 

972 

973 async def close(self) -> None: 

974 """Close underlying connector. 

975 

976 Release all acquired resources. 

977 """ 

978 if not self.closed: 

979 if self._connector is not None and self._connector_owner: 

980 await self._connector.close() 

981 self._connector = None 

982 

983 @property 

984 def closed(self) -> bool: 

985 """Is client session closed. 

986 

987 A readonly property. 

988 """ 

989 return self._connector is None or self._connector.closed 

990 

991 @property 

992 def connector(self) -> Optional[BaseConnector]: 

993 """Connector instance used for the session.""" 

994 return self._connector 

995 

996 @property 

997 def cookie_jar(self) -> AbstractCookieJar: 

998 """The session cookies.""" 

999 return self._cookie_jar 

1000 

1001 @property 

1002 def version(self) -> Tuple[int, int]: 

1003 """The session HTTP protocol version.""" 

1004 return self._version 

1005 

1006 @property 

1007 def requote_redirect_url(self) -> bool: 

1008 """Do URL requoting on redirection handling.""" 

1009 return self._requote_redirect_url 

1010 

1011 @requote_redirect_url.setter 

1012 def requote_redirect_url(self, val: bool) -> None: 

1013 """Do URL requoting on redirection handling.""" 

1014 warnings.warn( 

1015 "session.requote_redirect_url modification " "is deprecated #2778", 

1016 DeprecationWarning, 

1017 stacklevel=2, 

1018 ) 

1019 self._requote_redirect_url = val 

1020 

1021 @property 

1022 def loop(self) -> asyncio.AbstractEventLoop: 

1023 """Session's loop.""" 

1024 warnings.warn( 

1025 "client.loop property is deprecated", DeprecationWarning, stacklevel=2 

1026 ) 

1027 return self._loop 

1028 

1029 @property 

1030 def timeout(self) -> ClientTimeout: 

1031 """Timeout for the session.""" 

1032 return self._timeout 

1033 

1034 @property 

1035 def headers(self) -> "CIMultiDict[str]": 

1036 """The default headers of the client session.""" 

1037 return self._default_headers 

1038 

1039 @property 

1040 def skip_auto_headers(self) -> FrozenSet[istr]: 

1041 """Headers for which autogeneration should be skipped""" 

1042 return self._skip_auto_headers 

1043 

1044 @property 

1045 def auth(self) -> Optional[BasicAuth]: 

1046 """An object that represents HTTP Basic Authorization""" 

1047 return self._default_auth 

1048 

1049 @property 

1050 def json_serialize(self) -> JSONEncoder: 

1051 """Json serializer callable""" 

1052 return self._json_serialize 

1053 

1054 @property 

1055 def connector_owner(self) -> bool: 

1056 """Should connector be closed on session closing""" 

1057 return self._connector_owner 

1058 

1059 @property 

1060 def raise_for_status( 

1061 self, 

1062 ) -> Union[bool, Callable[[ClientResponse], Awaitable[None]]]: 

1063 """Should `ClientResponse.raise_for_status()` be called for each response.""" 

1064 return self._raise_for_status 

1065 

1066 @property 

1067 def auto_decompress(self) -> bool: 

1068 """Should the body response be automatically decompressed.""" 

1069 return self._auto_decompress 

1070 

1071 @property 

1072 def trust_env(self) -> bool: 

1073 """ 

1074 Should proxies information from environment or netrc be trusted. 

1075 

1076 Information is from HTTP_PROXY / HTTPS_PROXY environment variables 

1077 or ~/.netrc file if present. 

1078 """ 

1079 return self._trust_env 

1080 

1081 @property 

1082 def trace_configs(self) -> List[TraceConfig]: 

1083 """A list of TraceConfig instances used for client tracing""" 

1084 return self._trace_configs 

1085 

1086 def detach(self) -> None: 

1087 """Detach connector from session without closing the former. 

1088 

1089 Session is switched to closed state anyway. 

1090 """ 

1091 self._connector = None 

1092 

1093 def __enter__(self) -> None: 

1094 raise TypeError("Use async with instead") 

1095 

1096 def __exit__( 

1097 self, 

1098 exc_type: Optional[Type[BaseException]], 

1099 exc_val: Optional[BaseException], 

1100 exc_tb: Optional[TracebackType], 

1101 ) -> None: 

1102 # __exit__ should exist in pair with __enter__ but never executed 

1103 pass # pragma: no cover 

1104 

1105 async def __aenter__(self) -> "ClientSession": 

1106 return self 

1107 

1108 async def __aexit__( 

1109 self, 

1110 exc_type: Optional[Type[BaseException]], 

1111 exc_val: Optional[BaseException], 

1112 exc_tb: Optional[TracebackType], 

1113 ) -> None: 

1114 await self.close() 

1115 

1116 

1117class _BaseRequestContextManager(Coroutine[Any, Any, _RetType], Generic[_RetType]): 

1118 

1119 __slots__ = ("_coro", "_resp") 

1120 

1121 def __init__(self, coro: Coroutine["asyncio.Future[Any]", None, _RetType]) -> None: 

1122 self._coro = coro 

1123 

1124 def send(self, arg: None) -> "asyncio.Future[Any]": 

1125 return self._coro.send(arg) 

1126 

1127 def throw(self, arg: BaseException) -> None: # type: ignore[arg-type,override] 

1128 self._coro.throw(arg) 

1129 

1130 def close(self) -> None: 

1131 return self._coro.close() 

1132 

1133 def __await__(self) -> Generator[Any, None, _RetType]: 

1134 ret = self._coro.__await__() 

1135 return ret 

1136 

1137 def __iter__(self) -> Generator[Any, None, _RetType]: 

1138 return self.__await__() 

1139 

1140 async def __aenter__(self) -> _RetType: 

1141 self._resp = await self._coro 

1142 return self._resp 

1143 

1144 

1145class _RequestContextManager(_BaseRequestContextManager[ClientResponse]): 

1146 __slots__ = () 

1147 

1148 async def __aexit__( 

1149 self, 

1150 exc_type: Optional[Type[BaseException]], 

1151 exc: Optional[BaseException], 

1152 tb: Optional[TracebackType], 

1153 ) -> None: 

1154 # We're basing behavior on the exception as it can be caused by 

1155 # user code unrelated to the status of the connection. If you 

1156 # would like to close a connection you must do that 

1157 # explicitly. Otherwise connection error handling should kick in 

1158 # and close/recycle the connection as required. 

1159 self._resp.release() 

1160 

1161 

1162class _WSRequestContextManager(_BaseRequestContextManager[ClientWebSocketResponse]): 

1163 __slots__ = () 

1164 

1165 async def __aexit__( 

1166 self, 

1167 exc_type: Optional[Type[BaseException]], 

1168 exc: Optional[BaseException], 

1169 tb: Optional[TracebackType], 

1170 ) -> None: 

1171 await self._resp.close() 

1172 

1173 

1174class _SessionRequestContextManager: 

1175 

1176 __slots__ = ("_coro", "_resp", "_session") 

1177 

1178 def __init__( 

1179 self, 

1180 coro: Coroutine["asyncio.Future[Any]", None, ClientResponse], 

1181 session: ClientSession, 

1182 ) -> None: 

1183 self._coro = coro 

1184 self._resp: Optional[ClientResponse] = None 

1185 self._session = session 

1186 

1187 async def __aenter__(self) -> ClientResponse: 

1188 try: 

1189 self._resp = await self._coro 

1190 except BaseException: 

1191 await self._session.close() 

1192 raise 

1193 else: 

1194 return self._resp 

1195 

1196 async def __aexit__( 

1197 self, 

1198 exc_type: Optional[Type[BaseException]], 

1199 exc: Optional[BaseException], 

1200 tb: Optional[TracebackType], 

1201 ) -> None: 

1202 assert self._resp is not None 

1203 self._resp.close() 

1204 await self._session.close() 

1205 

1206 

1207def request( 

1208 method: str, 

1209 url: StrOrURL, 

1210 *, 

1211 params: Optional[Mapping[str, str]] = None, 

1212 data: Any = None, 

1213 json: Any = None, 

1214 headers: Optional[LooseHeaders] = None, 

1215 skip_auto_headers: Optional[Iterable[str]] = None, 

1216 auth: Optional[BasicAuth] = None, 

1217 allow_redirects: bool = True, 

1218 max_redirects: int = 10, 

1219 compress: Optional[str] = None, 

1220 chunked: Optional[bool] = None, 

1221 expect100: bool = False, 

1222 raise_for_status: Optional[bool] = None, 

1223 read_until_eof: bool = True, 

1224 proxy: Optional[StrOrURL] = None, 

1225 proxy_auth: Optional[BasicAuth] = None, 

1226 timeout: Union[ClientTimeout, object] = sentinel, 

1227 cookies: Optional[LooseCookies] = None, 

1228 version: HttpVersion = http.HttpVersion11, 

1229 connector: Optional[BaseConnector] = None, 

1230 read_bufsize: Optional[int] = None, 

1231 loop: Optional[asyncio.AbstractEventLoop] = None, 

1232) -> _SessionRequestContextManager: 

1233 """Constructs and sends a request. 

1234 

1235 Returns response object. 

1236 method - HTTP method 

1237 url - request url 

1238 params - (optional) Dictionary or bytes to be sent in the query 

1239 string of the new request 

1240 data - (optional) Dictionary, bytes, or file-like object to 

1241 send in the body of the request 

1242 json - (optional) Any json compatible python object 

1243 headers - (optional) Dictionary of HTTP Headers to send with 

1244 the request 

1245 cookies - (optional) Dict object to send with the request 

1246 auth - (optional) BasicAuth named tuple represent HTTP Basic Auth 

1247 auth - aiohttp.helpers.BasicAuth 

1248 allow_redirects - (optional) If set to False, do not follow 

1249 redirects 

1250 version - Request HTTP version. 

1251 compress - Set to True if request has to be compressed 

1252 with deflate encoding. 

1253 chunked - Set to chunk size for chunked transfer encoding. 

1254 expect100 - Expect 100-continue response from server. 

1255 connector - BaseConnector sub-class instance to support 

1256 connection pooling. 

1257 read_until_eof - Read response until eof if response 

1258 does not have Content-Length header. 

1259 loop - Optional event loop. 

1260 timeout - Optional ClientTimeout settings structure, 5min 

1261 total timeout by default. 

1262 Usage:: 

1263 >>> import aiohttp 

1264 >>> resp = await aiohttp.request('GET', 'http://python.org/') 

1265 >>> resp 

1266 <ClientResponse(python.org/) [200]> 

1267 >>> data = await resp.read() 

1268 """ 

1269 connector_owner = False 

1270 if connector is None: 

1271 connector_owner = True 

1272 connector = TCPConnector(loop=loop, force_close=True) 

1273 

1274 session = ClientSession( 

1275 loop=loop, 

1276 cookies=cookies, 

1277 version=version, 

1278 timeout=timeout, 

1279 connector=connector, 

1280 connector_owner=connector_owner, 

1281 ) 

1282 

1283 return _SessionRequestContextManager( 

1284 session._request( 

1285 method, 

1286 url, 

1287 params=params, 

1288 data=data, 

1289 json=json, 

1290 headers=headers, 

1291 skip_auto_headers=skip_auto_headers, 

1292 auth=auth, 

1293 allow_redirects=allow_redirects, 

1294 max_redirects=max_redirects, 

1295 compress=compress, 

1296 chunked=chunked, 

1297 expect100=expect100, 

1298 raise_for_status=raise_for_status, 

1299 read_until_eof=read_until_eof, 

1300 proxy=proxy, 

1301 proxy_auth=proxy_auth, 

1302 read_bufsize=read_bufsize, 

1303 ), 

1304 session, 

1305 )