Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

496 statements  

1"""HTTP Client for asyncio.""" 

2 

3import asyncio 

4import base64 

5import hashlib 

6import json 

7import os 

8import sys 

9import traceback 

10import warnings 

11from contextlib import suppress 

12from types import SimpleNamespace, TracebackType 

13from typing import ( 

14 TYPE_CHECKING, 

15 Any, 

16 Awaitable, 

17 Callable, 

18 Coroutine, 

19 Final, 

20 FrozenSet, 

21 Generator, 

22 Generic, 

23 Iterable, 

24 List, 

25 Mapping, 

26 Optional, 

27 Set, 

28 Tuple, 

29 Type, 

30 TypeVar, 

31 Union, 

32) 

33 

34import attr 

35from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr 

36from yarl import URL 

37 

38from . import hdrs, http, payload 

39from .abc import AbstractCookieJar 

40from .client_exceptions import ( 

41 ClientConnectionError as ClientConnectionError, 

42 ClientConnectorCertificateError as ClientConnectorCertificateError, 

43 ClientConnectorError as ClientConnectorError, 

44 ClientConnectorSSLError as ClientConnectorSSLError, 

45 ClientError as ClientError, 

46 ClientHttpProxyError as ClientHttpProxyError, 

47 ClientOSError as ClientOSError, 

48 ClientPayloadError as ClientPayloadError, 

49 ClientProxyConnectionError as ClientProxyConnectionError, 

50 ClientResponseError as ClientResponseError, 

51 ClientSSLError as ClientSSLError, 

52 ContentTypeError as ContentTypeError, 

53 InvalidURL as InvalidURL, 

54 ServerConnectionError as ServerConnectionError, 

55 ServerDisconnectedError as ServerDisconnectedError, 

56 ServerFingerprintMismatch as ServerFingerprintMismatch, 

57 ServerTimeoutError as ServerTimeoutError, 

58 TooManyRedirects as TooManyRedirects, 

59 WSServerHandshakeError as WSServerHandshakeError, 

60) 

61from .client_reqrep import ( 

62 ClientRequest as ClientRequest, 

63 ClientResponse as ClientResponse, 

64 Fingerprint as Fingerprint, 

65 RequestInfo as RequestInfo, 

66 _merge_ssl_params, 

67) 

68from .client_ws import ClientWebSocketResponse as ClientWebSocketResponse 

69from .connector import ( 

70 BaseConnector as BaseConnector, 

71 NamedPipeConnector as NamedPipeConnector, 

72 TCPConnector as TCPConnector, 

73 UnixConnector as UnixConnector, 

74) 

75from .cookiejar import CookieJar 

76from .helpers import ( 

77 _SENTINEL, 

78 DEBUG, 

79 BasicAuth, 

80 TimeoutHandle, 

81 ceil_timeout, 

82 get_env_proxy_for_url, 

83 get_running_loop, 

84 method_must_be_empty_body, 

85 sentinel, 

86 strip_auth_from_url, 

87) 

88from .http import WS_KEY, HttpVersion, WebSocketReader, WebSocketWriter 

89from .http_websocket import WSHandshakeError, WSMessage, ws_ext_gen, ws_ext_parse 

90from .streams import FlowControlDataQueue 

91from .tracing import Trace, TraceConfig 

92from .typedefs import JSONEncoder, LooseCookies, LooseHeaders, StrOrURL 

93 

94__all__ = ( 

95 # client_exceptions 

96 "ClientConnectionError", 

97 "ClientConnectorCertificateError", 

98 "ClientConnectorError", 

99 "ClientConnectorSSLError", 

100 "ClientError", 

101 "ClientHttpProxyError", 

102 "ClientOSError", 

103 "ClientPayloadError", 

104 "ClientProxyConnectionError", 

105 "ClientResponseError", 

106 "ClientSSLError", 

107 "ContentTypeError", 

108 "InvalidURL", 

109 "ServerConnectionError", 

110 "ServerDisconnectedError", 

111 "ServerFingerprintMismatch", 

112 "ServerTimeoutError", 

113 "TooManyRedirects", 

114 "WSServerHandshakeError", 

115 # client_reqrep 

116 "ClientRequest", 

117 "ClientResponse", 

118 "Fingerprint", 

119 "RequestInfo", 

120 # connector 

121 "BaseConnector", 

122 "TCPConnector", 

123 "UnixConnector", 

124 "NamedPipeConnector", 

125 # client_ws 

126 "ClientWebSocketResponse", 

127 # client 

128 "ClientSession", 

129 "ClientTimeout", 

130 "request", 

131) 

132 

133 

134if TYPE_CHECKING: 

135 from ssl import SSLContext 

136else: 

137 SSLContext = None 

138 

139 

140@attr.s(auto_attribs=True, frozen=True, slots=True) 

141class ClientTimeout: 

142 total: Optional[float] = None 

143 connect: Optional[float] = None 

144 sock_read: Optional[float] = None 

145 sock_connect: Optional[float] = None 

146 ceil_threshold: float = 5 

147 

148 # pool_queue_timeout: Optional[float] = None 

149 # dns_resolution_timeout: Optional[float] = None 

150 # socket_connect_timeout: Optional[float] = None 

151 # connection_acquiring_timeout: Optional[float] = None 

152 # new_connection_timeout: Optional[float] = None 

153 # http_header_timeout: Optional[float] = None 

154 # response_body_timeout: Optional[float] = None 

155 

156 # to create a timeout specific for a single request, either 

157 # - create a completely new one to overwrite the default 

158 # - or use http://www.attrs.org/en/stable/api.html#attr.evolve 

159 # to overwrite the defaults 

160 

161 

162# 5 Minute default read timeout 

163DEFAULT_TIMEOUT: Final[ClientTimeout] = ClientTimeout(total=5 * 60) 

164 

165_RetType = TypeVar("_RetType") 

166_CharsetResolver = Callable[[ClientResponse, bytes], str] 

167 

168 

169class ClientSession: 

170 """First-class interface for making HTTP requests.""" 

171 

172 ATTRS = frozenset( 

173 [ 

174 "_base_url", 

175 "_source_traceback", 

176 "_connector", 

177 "requote_redirect_url", 

178 "_loop", 

179 "_cookie_jar", 

180 "_connector_owner", 

181 "_default_auth", 

182 "_version", 

183 "_json_serialize", 

184 "_requote_redirect_url", 

185 "_timeout", 

186 "_raise_for_status", 

187 "_auto_decompress", 

188 "_trust_env", 

189 "_default_headers", 

190 "_skip_auto_headers", 

191 "_request_class", 

192 "_response_class", 

193 "_ws_response_class", 

194 "_trace_configs", 

195 "_read_bufsize", 

196 "_max_line_size", 

197 "_max_field_size", 

198 "_resolve_charset", 

199 ] 

200 ) 

201 

202 _source_traceback: Optional[traceback.StackSummary] = None 

203 _connector: Optional[BaseConnector] = None 

204 

205 def __init__( 

206 self, 

207 base_url: Optional[StrOrURL] = None, 

208 *, 

209 connector: Optional[BaseConnector] = None, 

210 loop: Optional[asyncio.AbstractEventLoop] = None, 

211 cookies: Optional[LooseCookies] = None, 

212 headers: Optional[LooseHeaders] = None, 

213 skip_auto_headers: Optional[Iterable[str]] = None, 

214 auth: Optional[BasicAuth] = None, 

215 json_serialize: JSONEncoder = json.dumps, 

216 request_class: Type[ClientRequest] = ClientRequest, 

217 response_class: Type[ClientResponse] = ClientResponse, 

218 ws_response_class: Type[ClientWebSocketResponse] = ClientWebSocketResponse, 

219 version: HttpVersion = http.HttpVersion11, 

220 cookie_jar: Optional[AbstractCookieJar] = None, 

221 connector_owner: bool = True, 

222 raise_for_status: Union[ 

223 bool, Callable[[ClientResponse], Awaitable[None]] 

224 ] = False, 

225 read_timeout: Union[float, _SENTINEL] = sentinel, 

226 conn_timeout: Optional[float] = None, 

227 timeout: Union[object, ClientTimeout] = sentinel, 

228 auto_decompress: bool = True, 

229 trust_env: bool = False, 

230 requote_redirect_url: bool = True, 

231 trace_configs: Optional[List[TraceConfig]] = None, 

232 read_bufsize: int = 2**16, 

233 max_line_size: int = 8190, 

234 max_field_size: int = 8190, 

235 fallback_charset_resolver: _CharsetResolver = lambda r, b: "utf-8", 

236 ) -> None: 

237 # We initialise _connector to None immediately, as it's referenced in __del__() 

238 # and could cause issues if an exception occurs during initialisation. 

239 self._connector: Optional[BaseConnector] = None 

240 if timeout is sentinel or timeout is None: 

241 self._timeout = DEFAULT_TIMEOUT 

242 if read_timeout is not sentinel: 

243 warnings.warn( 

244 "read_timeout is deprecated, " "use timeout argument instead", 

245 DeprecationWarning, 

246 stacklevel=2, 

247 ) 

248 self._timeout = attr.evolve(self._timeout, total=read_timeout) 

249 if conn_timeout is not None: 

250 self._timeout = attr.evolve(self._timeout, connect=conn_timeout) 

251 warnings.warn( 

252 "conn_timeout is deprecated, " "use timeout argument instead", 

253 DeprecationWarning, 

254 stacklevel=2, 

255 ) 

256 else: 

257 if not isinstance(timeout, ClientTimeout): 

258 raise ValueError( 

259 f"timeout parameter cannot be of {type(timeout)} type, " 

260 "please use 'timeout=ClientTimeout(...)'", 

261 ) 

262 self._timeout = timeout 

263 if read_timeout is not sentinel: 

264 raise ValueError( 

265 "read_timeout and timeout parameters " 

266 "conflict, please setup " 

267 "timeout.read" 

268 ) 

269 if conn_timeout is not None: 

270 raise ValueError( 

271 "conn_timeout and timeout parameters " 

272 "conflict, please setup " 

273 "timeout.connect" 

274 ) 

275 if loop is None: 

276 if connector is not None: 

277 loop = connector._loop 

278 

279 loop = get_running_loop(loop) 

280 

281 if base_url is None or isinstance(base_url, URL): 

282 self._base_url: Optional[URL] = base_url 

283 else: 

284 self._base_url = URL(base_url) 

285 assert ( 

286 self._base_url.origin() == self._base_url 

287 ), "Only absolute URLs without path part are supported" 

288 

289 if connector is None: 

290 connector = TCPConnector(loop=loop) 

291 

292 if connector._loop is not loop: 

293 raise RuntimeError("Session and connector has to use same event loop") 

294 

295 self._loop = loop 

296 

297 if loop.get_debug(): 

298 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

299 

300 if cookie_jar is None: 

301 cookie_jar = CookieJar(loop=loop) 

302 self._cookie_jar = cookie_jar 

303 

304 if cookies is not None: 

305 self._cookie_jar.update_cookies(cookies) 

306 

307 self._connector = connector 

308 self._connector_owner = connector_owner 

309 self._default_auth = auth 

310 self._version = version 

311 self._json_serialize = json_serialize 

312 self._raise_for_status = raise_for_status 

313 self._auto_decompress = auto_decompress 

314 self._trust_env = trust_env 

315 self._requote_redirect_url = requote_redirect_url 

316 self._read_bufsize = read_bufsize 

317 self._max_line_size = max_line_size 

318 self._max_field_size = max_field_size 

319 

320 # Convert to list of tuples 

321 if headers: 

322 real_headers: CIMultiDict[str] = CIMultiDict(headers) 

323 else: 

324 real_headers = CIMultiDict() 

325 self._default_headers: CIMultiDict[str] = real_headers 

326 if skip_auto_headers is not None: 

327 self._skip_auto_headers = frozenset(istr(i) for i in skip_auto_headers) 

328 else: 

329 self._skip_auto_headers = frozenset() 

330 

331 self._request_class = request_class 

332 self._response_class = response_class 

333 self._ws_response_class = ws_response_class 

334 

335 self._trace_configs = trace_configs or [] 

336 for trace_config in self._trace_configs: 

337 trace_config.freeze() 

338 

339 self._resolve_charset = fallback_charset_resolver 

340 

341 def __init_subclass__(cls: Type["ClientSession"]) -> None: 

342 warnings.warn( 

343 "Inheritance class {} from ClientSession " 

344 "is discouraged".format(cls.__name__), 

345 DeprecationWarning, 

346 stacklevel=2, 

347 ) 

348 

349 if DEBUG: 

350 

351 def __setattr__(self, name: str, val: Any) -> None: 

352 if name not in self.ATTRS: 

353 warnings.warn( 

354 "Setting custom ClientSession.{} attribute " 

355 "is discouraged".format(name), 

356 DeprecationWarning, 

357 stacklevel=2, 

358 ) 

359 super().__setattr__(name, val) 

360 

361 def __del__(self, _warnings: Any = warnings) -> None: 

362 if not self.closed: 

363 kwargs = {"source": self} 

364 _warnings.warn( 

365 f"Unclosed client session {self!r}", ResourceWarning, **kwargs 

366 ) 

367 context = {"client_session": self, "message": "Unclosed client session"} 

368 if self._source_traceback is not None: 

369 context["source_traceback"] = self._source_traceback 

370 self._loop.call_exception_handler(context) 

371 

372 def request( 

373 self, method: str, url: StrOrURL, **kwargs: Any 

374 ) -> "_RequestContextManager": 

375 """Perform HTTP request.""" 

376 return _RequestContextManager(self._request(method, url, **kwargs)) 

377 

378 def _build_url(self, str_or_url: StrOrURL) -> URL: 

379 url = URL(str_or_url) 

380 if self._base_url is None: 

381 return url 

382 else: 

383 assert not url.is_absolute() and url.path.startswith("/") 

384 return self._base_url.join(url) 

385 

386 async def _request( 

387 self, 

388 method: str, 

389 str_or_url: StrOrURL, 

390 *, 

391 params: Optional[Mapping[str, str]] = None, 

392 data: Any = None, 

393 json: Any = None, 

394 cookies: Optional[LooseCookies] = None, 

395 headers: Optional[LooseHeaders] = None, 

396 skip_auto_headers: Optional[Iterable[str]] = None, 

397 auth: Optional[BasicAuth] = None, 

398 allow_redirects: bool = True, 

399 max_redirects: int = 10, 

400 compress: Optional[str] = None, 

401 chunked: Optional[bool] = None, 

402 expect100: bool = False, 

403 raise_for_status: Union[ 

404 None, bool, Callable[[ClientResponse], Awaitable[None]] 

405 ] = None, 

406 read_until_eof: bool = True, 

407 proxy: Optional[StrOrURL] = None, 

408 proxy_auth: Optional[BasicAuth] = None, 

409 timeout: Union[ClientTimeout, _SENTINEL] = sentinel, 

410 verify_ssl: Optional[bool] = None, 

411 fingerprint: Optional[bytes] = None, 

412 ssl_context: Optional[SSLContext] = None, 

413 ssl: Union[SSLContext, bool, Fingerprint] = True, 

414 server_hostname: Optional[str] = None, 

415 proxy_headers: Optional[LooseHeaders] = None, 

416 trace_request_ctx: Optional[SimpleNamespace] = None, 

417 read_bufsize: Optional[int] = None, 

418 auto_decompress: Optional[bool] = None, 

419 max_line_size: Optional[int] = None, 

420 max_field_size: Optional[int] = None, 

421 ) -> ClientResponse: 

422 

423 # NOTE: timeout clamps existing connect and read timeouts. We cannot 

424 # set the default to None because we need to detect if the user wants 

425 # to use the existing timeouts by setting timeout to None. 

426 

427 if self.closed: 

428 raise RuntimeError("Session is closed") 

429 

430 ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) 

431 

432 if data is not None and json is not None: 

433 raise ValueError( 

434 "data and json parameters can not be used at the same time" 

435 ) 

436 elif json is not None: 

437 data = payload.JsonPayload(json, dumps=self._json_serialize) 

438 

439 if not isinstance(chunked, bool) and chunked is not None: 

440 warnings.warn("Chunk size is deprecated #1615", DeprecationWarning) 

441 

442 redirects = 0 

443 history = [] 

444 version = self._version 

445 params = params or {} 

446 

447 # Merge with default headers and transform to CIMultiDict 

448 headers = self._prepare_headers(headers) 

449 proxy_headers = self._prepare_headers(proxy_headers) 

450 

451 try: 

452 url = self._build_url(str_or_url) 

453 except ValueError as e: 

454 raise InvalidURL(str_or_url) from e 

455 

456 skip_headers = set(self._skip_auto_headers) 

457 if skip_auto_headers is not None: 

458 for i in skip_auto_headers: 

459 skip_headers.add(istr(i)) 

460 

461 if proxy is not None: 

462 try: 

463 proxy = URL(proxy) 

464 except ValueError as e: 

465 raise InvalidURL(proxy) from e 

466 

467 if timeout is sentinel: 

468 real_timeout: ClientTimeout = self._timeout 

469 else: 

470 if not isinstance(timeout, ClientTimeout): 

471 real_timeout = ClientTimeout(total=timeout) 

472 else: 

473 real_timeout = timeout 

474 # timeout is cumulative for all request operations 

475 # (request, redirects, responses, data consuming) 

476 tm = TimeoutHandle( 

477 self._loop, real_timeout.total, ceil_threshold=real_timeout.ceil_threshold 

478 ) 

479 handle = tm.start() 

480 

481 if read_bufsize is None: 

482 read_bufsize = self._read_bufsize 

483 

484 if auto_decompress is None: 

485 auto_decompress = self._auto_decompress 

486 

487 if max_line_size is None: 

488 max_line_size = self._max_line_size 

489 

490 if max_field_size is None: 

491 max_field_size = self._max_field_size 

492 

493 traces = [ 

494 Trace( 

495 self, 

496 trace_config, 

497 trace_config.trace_config_ctx(trace_request_ctx=trace_request_ctx), 

498 ) 

499 for trace_config in self._trace_configs 

500 ] 

501 

502 for trace in traces: 

503 await trace.send_request_start(method, url.update_query(params), headers) 

504 

505 timer = tm.timer() 

506 try: 

507 with timer: 

508 while True: 

509 url, auth_from_url = strip_auth_from_url(url) 

510 if auth and auth_from_url: 

511 raise ValueError( 

512 "Cannot combine AUTH argument with " 

513 "credentials encoded in URL" 

514 ) 

515 

516 if auth is None: 

517 auth = auth_from_url 

518 if auth is None: 

519 auth = self._default_auth 

520 # It would be confusing if we support explicit 

521 # Authorization header with auth argument 

522 if ( 

523 headers is not None 

524 and auth is not None 

525 and hdrs.AUTHORIZATION in headers 

526 ): 

527 raise ValueError( 

528 "Cannot combine AUTHORIZATION header " 

529 "with AUTH argument or credentials " 

530 "encoded in URL" 

531 ) 

532 

533 all_cookies = self._cookie_jar.filter_cookies(url) 

534 

535 if cookies is not None: 

536 tmp_cookie_jar = CookieJar() 

537 tmp_cookie_jar.update_cookies(cookies) 

538 req_cookies = tmp_cookie_jar.filter_cookies(url) 

539 if req_cookies: 

540 all_cookies.load(req_cookies) 

541 

542 if proxy is not None: 

543 proxy = URL(proxy) 

544 elif self._trust_env: 

545 with suppress(LookupError): 

546 proxy, proxy_auth = get_env_proxy_for_url(url) 

547 

548 req = self._request_class( 

549 method, 

550 url, 

551 params=params, 

552 headers=headers, 

553 skip_auto_headers=skip_headers, 

554 data=data, 

555 cookies=all_cookies, 

556 auth=auth, 

557 version=version, 

558 compress=compress, 

559 chunked=chunked, 

560 expect100=expect100, 

561 loop=self._loop, 

562 response_class=self._response_class, 

563 proxy=proxy, 

564 proxy_auth=proxy_auth, 

565 timer=timer, 

566 session=self, 

567 ssl=ssl if ssl is not None else True, 

568 server_hostname=server_hostname, 

569 proxy_headers=proxy_headers, 

570 traces=traces, 

571 trust_env=self.trust_env, 

572 ) 

573 

574 # connection timeout 

575 try: 

576 async with ceil_timeout( 

577 real_timeout.connect, 

578 ceil_threshold=real_timeout.ceil_threshold, 

579 ): 

580 assert self._connector is not None 

581 conn = await self._connector.connect( 

582 req, traces=traces, timeout=real_timeout 

583 ) 

584 except asyncio.TimeoutError as exc: 

585 raise ServerTimeoutError( 

586 "Connection timeout " "to host {}".format(url) 

587 ) from exc 

588 

589 assert conn.transport is not None 

590 

591 assert conn.protocol is not None 

592 conn.protocol.set_response_params( 

593 timer=timer, 

594 skip_payload=method_must_be_empty_body(method), 

595 read_until_eof=read_until_eof, 

596 auto_decompress=auto_decompress, 

597 read_timeout=real_timeout.sock_read, 

598 read_bufsize=read_bufsize, 

599 timeout_ceil_threshold=self._connector._timeout_ceil_threshold, 

600 max_line_size=max_line_size, 

601 max_field_size=max_field_size, 

602 ) 

603 

604 try: 

605 try: 

606 resp = await req.send(conn) 

607 try: 

608 await resp.start(conn) 

609 except BaseException: 

610 resp.close() 

611 raise 

612 except BaseException: 

613 conn.close() 

614 raise 

615 except ClientError: 

616 raise 

617 except OSError as exc: 

618 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

619 raise 

620 raise ClientOSError(*exc.args) from exc 

621 

622 self._cookie_jar.update_cookies(resp.cookies, resp.url) 

623 

624 # redirects 

625 if resp.status in (301, 302, 303, 307, 308) and allow_redirects: 

626 

627 for trace in traces: 

628 await trace.send_request_redirect( 

629 method, url.update_query(params), headers, resp 

630 ) 

631 

632 redirects += 1 

633 history.append(resp) 

634 if max_redirects and redirects >= max_redirects: 

635 resp.close() 

636 raise TooManyRedirects( 

637 history[0].request_info, tuple(history) 

638 ) 

639 

640 # For 301 and 302, mimic IE, now changed in RFC 

641 # https://github.com/kennethreitz/requests/pull/269 

642 if (resp.status == 303 and resp.method != hdrs.METH_HEAD) or ( 

643 resp.status in (301, 302) and resp.method == hdrs.METH_POST 

644 ): 

645 method = hdrs.METH_GET 

646 data = None 

647 if headers.get(hdrs.CONTENT_LENGTH): 

648 headers.pop(hdrs.CONTENT_LENGTH) 

649 

650 r_url = resp.headers.get(hdrs.LOCATION) or resp.headers.get( 

651 hdrs.URI 

652 ) 

653 if r_url is None: 

654 # see github.com/aio-libs/aiohttp/issues/2022 

655 break 

656 else: 

657 # reading from correct redirection 

658 # response is forbidden 

659 resp.release() 

660 

661 try: 

662 parsed_url = URL( 

663 r_url, encoded=not self._requote_redirect_url 

664 ) 

665 

666 except ValueError as e: 

667 raise InvalidURL(r_url) from e 

668 

669 scheme = parsed_url.scheme 

670 if scheme not in ("http", "https", ""): 

671 resp.close() 

672 raise ValueError("Can redirect only to http or https") 

673 elif not scheme: 

674 parsed_url = url.join(parsed_url) 

675 

676 if url.origin() != parsed_url.origin(): 

677 auth = None 

678 headers.pop(hdrs.AUTHORIZATION, None) 

679 

680 url = parsed_url 

681 params = {} 

682 resp.release() 

683 continue 

684 

685 break 

686 

687 # check response status 

688 if raise_for_status is None: 

689 raise_for_status = self._raise_for_status 

690 

691 if raise_for_status is None: 

692 pass 

693 elif callable(raise_for_status): 

694 await raise_for_status(resp) 

695 elif raise_for_status: 

696 resp.raise_for_status() 

697 

698 # register connection 

699 if handle is not None: 

700 if resp.connection is not None: 

701 resp.connection.add_callback(handle.cancel) 

702 else: 

703 handle.cancel() 

704 

705 resp._history = tuple(history) 

706 

707 for trace in traces: 

708 await trace.send_request_end( 

709 method, url.update_query(params), headers, resp 

710 ) 

711 return resp 

712 

713 except BaseException as e: 

714 # cleanup timer 

715 tm.close() 

716 if handle: 

717 handle.cancel() 

718 handle = None 

719 

720 for trace in traces: 

721 await trace.send_request_exception( 

722 method, url.update_query(params), headers, e 

723 ) 

724 raise 

725 

726 def ws_connect( 

727 self, 

728 url: StrOrURL, 

729 *, 

730 method: str = hdrs.METH_GET, 

731 protocols: Iterable[str] = (), 

732 timeout: float = 10.0, 

733 receive_timeout: Optional[float] = None, 

734 autoclose: bool = True, 

735 autoping: bool = True, 

736 heartbeat: Optional[float] = None, 

737 auth: Optional[BasicAuth] = None, 

738 origin: Optional[str] = None, 

739 params: Optional[Mapping[str, str]] = None, 

740 headers: Optional[LooseHeaders] = None, 

741 proxy: Optional[StrOrURL] = None, 

742 proxy_auth: Optional[BasicAuth] = None, 

743 ssl: Union[SSLContext, bool, None, Fingerprint] = True, 

744 verify_ssl: Optional[bool] = None, 

745 fingerprint: Optional[bytes] = None, 

746 ssl_context: Optional[SSLContext] = None, 

747 proxy_headers: Optional[LooseHeaders] = None, 

748 compress: int = 0, 

749 max_msg_size: int = 4 * 1024 * 1024, 

750 ) -> "_WSRequestContextManager": 

751 """Initiate websocket connection.""" 

752 return _WSRequestContextManager( 

753 self._ws_connect( 

754 url, 

755 method=method, 

756 protocols=protocols, 

757 timeout=timeout, 

758 receive_timeout=receive_timeout, 

759 autoclose=autoclose, 

760 autoping=autoping, 

761 heartbeat=heartbeat, 

762 auth=auth, 

763 origin=origin, 

764 params=params, 

765 headers=headers, 

766 proxy=proxy, 

767 proxy_auth=proxy_auth, 

768 ssl=ssl, 

769 verify_ssl=verify_ssl, 

770 fingerprint=fingerprint, 

771 ssl_context=ssl_context, 

772 proxy_headers=proxy_headers, 

773 compress=compress, 

774 max_msg_size=max_msg_size, 

775 ) 

776 ) 

777 

778 async def _ws_connect( 

779 self, 

780 url: StrOrURL, 

781 *, 

782 method: str = hdrs.METH_GET, 

783 protocols: Iterable[str] = (), 

784 timeout: float = 10.0, 

785 receive_timeout: Optional[float] = None, 

786 autoclose: bool = True, 

787 autoping: bool = True, 

788 heartbeat: Optional[float] = None, 

789 auth: Optional[BasicAuth] = None, 

790 origin: Optional[str] = None, 

791 params: Optional[Mapping[str, str]] = None, 

792 headers: Optional[LooseHeaders] = None, 

793 proxy: Optional[StrOrURL] = None, 

794 proxy_auth: Optional[BasicAuth] = None, 

795 ssl: Optional[Union[SSLContext, bool, Fingerprint]] = True, 

796 verify_ssl: Optional[bool] = None, 

797 fingerprint: Optional[bytes] = None, 

798 ssl_context: Optional[SSLContext] = None, 

799 proxy_headers: Optional[LooseHeaders] = None, 

800 compress: int = 0, 

801 max_msg_size: int = 4 * 1024 * 1024, 

802 ) -> ClientWebSocketResponse: 

803 

804 if headers is None: 

805 real_headers: CIMultiDict[str] = CIMultiDict() 

806 else: 

807 real_headers = CIMultiDict(headers) 

808 

809 default_headers = { 

810 hdrs.UPGRADE: "websocket", 

811 hdrs.CONNECTION: "Upgrade", 

812 hdrs.SEC_WEBSOCKET_VERSION: "13", 

813 } 

814 

815 for key, value in default_headers.items(): 

816 real_headers.setdefault(key, value) 

817 

818 sec_key = base64.b64encode(os.urandom(16)) 

819 real_headers[hdrs.SEC_WEBSOCKET_KEY] = sec_key.decode() 

820 

821 if protocols: 

822 real_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = ",".join(protocols) 

823 if origin is not None: 

824 real_headers[hdrs.ORIGIN] = origin 

825 if compress: 

826 extstr = ws_ext_gen(compress=compress) 

827 real_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = extstr 

828 

829 # For the sake of backward compatibility, if user passes in None, convert it to True 

830 if ssl is None: 

831 ssl = True 

832 ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) 

833 

834 # send request 

835 resp = await self.request( 

836 method, 

837 url, 

838 params=params, 

839 headers=real_headers, 

840 read_until_eof=False, 

841 auth=auth, 

842 proxy=proxy, 

843 proxy_auth=proxy_auth, 

844 ssl=ssl, 

845 proxy_headers=proxy_headers, 

846 ) 

847 

848 try: 

849 # check handshake 

850 if resp.status != 101: 

851 raise WSServerHandshakeError( 

852 resp.request_info, 

853 resp.history, 

854 message="Invalid response status", 

855 status=resp.status, 

856 headers=resp.headers, 

857 ) 

858 

859 if resp.headers.get(hdrs.UPGRADE, "").lower() != "websocket": 

860 raise WSServerHandshakeError( 

861 resp.request_info, 

862 resp.history, 

863 message="Invalid upgrade header", 

864 status=resp.status, 

865 headers=resp.headers, 

866 ) 

867 

868 if resp.headers.get(hdrs.CONNECTION, "").lower() != "upgrade": 

869 raise WSServerHandshakeError( 

870 resp.request_info, 

871 resp.history, 

872 message="Invalid connection header", 

873 status=resp.status, 

874 headers=resp.headers, 

875 ) 

876 

877 # key calculation 

878 r_key = resp.headers.get(hdrs.SEC_WEBSOCKET_ACCEPT, "") 

879 match = base64.b64encode(hashlib.sha1(sec_key + WS_KEY).digest()).decode() 

880 if r_key != match: 

881 raise WSServerHandshakeError( 

882 resp.request_info, 

883 resp.history, 

884 message="Invalid challenge response", 

885 status=resp.status, 

886 headers=resp.headers, 

887 ) 

888 

889 # websocket protocol 

890 protocol = None 

891 if protocols and hdrs.SEC_WEBSOCKET_PROTOCOL in resp.headers: 

892 resp_protocols = [ 

893 proto.strip() 

894 for proto in resp.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",") 

895 ] 

896 

897 for proto in resp_protocols: 

898 if proto in protocols: 

899 protocol = proto 

900 break 

901 

902 # websocket compress 

903 notakeover = False 

904 if compress: 

905 compress_hdrs = resp.headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS) 

906 if compress_hdrs: 

907 try: 

908 compress, notakeover = ws_ext_parse(compress_hdrs) 

909 except WSHandshakeError as exc: 

910 raise WSServerHandshakeError( 

911 resp.request_info, 

912 resp.history, 

913 message=exc.args[0], 

914 status=resp.status, 

915 headers=resp.headers, 

916 ) from exc 

917 else: 

918 compress = 0 

919 notakeover = False 

920 

921 conn = resp.connection 

922 assert conn is not None 

923 conn_proto = conn.protocol 

924 assert conn_proto is not None 

925 transport = conn.transport 

926 assert transport is not None 

927 reader: FlowControlDataQueue[WSMessage] = FlowControlDataQueue( 

928 conn_proto, 2**16, loop=self._loop 

929 ) 

930 conn_proto.set_parser(WebSocketReader(reader, max_msg_size), reader) 

931 writer = WebSocketWriter( 

932 conn_proto, 

933 transport, 

934 use_mask=True, 

935 compress=compress, 

936 notakeover=notakeover, 

937 ) 

938 except BaseException: 

939 resp.close() 

940 raise 

941 else: 

942 return self._ws_response_class( 

943 reader, 

944 writer, 

945 protocol, 

946 resp, 

947 timeout, 

948 autoclose, 

949 autoping, 

950 self._loop, 

951 receive_timeout=receive_timeout, 

952 heartbeat=heartbeat, 

953 compress=compress, 

954 client_notakeover=notakeover, 

955 ) 

956 

957 def _prepare_headers(self, headers: Optional[LooseHeaders]) -> "CIMultiDict[str]": 

958 """Add default headers and transform it to CIMultiDict""" 

959 # Convert headers to MultiDict 

960 result = CIMultiDict(self._default_headers) 

961 if headers: 

962 if not isinstance(headers, (MultiDictProxy, MultiDict)): 

963 headers = CIMultiDict(headers) 

964 added_names: Set[str] = set() 

965 for key, value in headers.items(): 

966 if key in added_names: 

967 result.add(key, value) 

968 else: 

969 result[key] = value 

970 added_names.add(key) 

971 return result 

972 

973 def get( 

974 self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any 

975 ) -> "_RequestContextManager": 

976 """Perform HTTP GET request.""" 

977 return _RequestContextManager( 

978 self._request(hdrs.METH_GET, url, allow_redirects=allow_redirects, **kwargs) 

979 ) 

980 

981 def options( 

982 self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any 

983 ) -> "_RequestContextManager": 

984 """Perform HTTP OPTIONS request.""" 

985 return _RequestContextManager( 

986 self._request( 

987 hdrs.METH_OPTIONS, url, allow_redirects=allow_redirects, **kwargs 

988 ) 

989 ) 

990 

991 def head( 

992 self, url: StrOrURL, *, allow_redirects: bool = False, **kwargs: Any 

993 ) -> "_RequestContextManager": 

994 """Perform HTTP HEAD request.""" 

995 return _RequestContextManager( 

996 self._request( 

997 hdrs.METH_HEAD, url, allow_redirects=allow_redirects, **kwargs 

998 ) 

999 ) 

1000 

1001 def post( 

1002 self, url: StrOrURL, *, data: Any = None, **kwargs: Any 

1003 ) -> "_RequestContextManager": 

1004 """Perform HTTP POST request.""" 

1005 return _RequestContextManager( 

1006 self._request(hdrs.METH_POST, url, data=data, **kwargs) 

1007 ) 

1008 

1009 def put( 

1010 self, url: StrOrURL, *, data: Any = None, **kwargs: Any 

1011 ) -> "_RequestContextManager": 

1012 """Perform HTTP PUT request.""" 

1013 return _RequestContextManager( 

1014 self._request(hdrs.METH_PUT, url, data=data, **kwargs) 

1015 ) 

1016 

1017 def patch( 

1018 self, url: StrOrURL, *, data: Any = None, **kwargs: Any 

1019 ) -> "_RequestContextManager": 

1020 """Perform HTTP PATCH request.""" 

1021 return _RequestContextManager( 

1022 self._request(hdrs.METH_PATCH, url, data=data, **kwargs) 

1023 ) 

1024 

1025 def delete(self, url: StrOrURL, **kwargs: Any) -> "_RequestContextManager": 

1026 """Perform HTTP DELETE request.""" 

1027 return _RequestContextManager(self._request(hdrs.METH_DELETE, url, **kwargs)) 

1028 

1029 async def close(self) -> None: 

1030 """Close underlying connector. 

1031 

1032 Release all acquired resources. 

1033 """ 

1034 if not self.closed: 

1035 if self._connector is not None and self._connector_owner: 

1036 await self._connector.close() 

1037 self._connector = None 

1038 

1039 @property 

1040 def closed(self) -> bool: 

1041 """Is client session closed. 

1042 

1043 A readonly property. 

1044 """ 

1045 return self._connector is None or self._connector.closed 

1046 

1047 @property 

1048 def connector(self) -> Optional[BaseConnector]: 

1049 """Connector instance used for the session.""" 

1050 return self._connector 

1051 

1052 @property 

1053 def cookie_jar(self) -> AbstractCookieJar: 

1054 """The session cookies.""" 

1055 return self._cookie_jar 

1056 

1057 @property 

1058 def version(self) -> Tuple[int, int]: 

1059 """The session HTTP protocol version.""" 

1060 return self._version 

1061 

1062 @property 

1063 def requote_redirect_url(self) -> bool: 

1064 """Do URL requoting on redirection handling.""" 

1065 return self._requote_redirect_url 

1066 

1067 @requote_redirect_url.setter 

1068 def requote_redirect_url(self, val: bool) -> None: 

1069 """Do URL requoting on redirection handling.""" 

1070 warnings.warn( 

1071 "session.requote_redirect_url modification " "is deprecated #2778", 

1072 DeprecationWarning, 

1073 stacklevel=2, 

1074 ) 

1075 self._requote_redirect_url = val 

1076 

1077 @property 

1078 def loop(self) -> asyncio.AbstractEventLoop: 

1079 """Session's loop.""" 

1080 warnings.warn( 

1081 "client.loop property is deprecated", DeprecationWarning, stacklevel=2 

1082 ) 

1083 return self._loop 

1084 

1085 @property 

1086 def timeout(self) -> ClientTimeout: 

1087 """Timeout for the session.""" 

1088 return self._timeout 

1089 

1090 @property 

1091 def headers(self) -> "CIMultiDict[str]": 

1092 """The default headers of the client session.""" 

1093 return self._default_headers 

1094 

1095 @property 

1096 def skip_auto_headers(self) -> FrozenSet[istr]: 

1097 """Headers for which autogeneration should be skipped""" 

1098 return self._skip_auto_headers 

1099 

1100 @property 

1101 def auth(self) -> Optional[BasicAuth]: 

1102 """An object that represents HTTP Basic Authorization""" 

1103 return self._default_auth 

1104 

1105 @property 

1106 def json_serialize(self) -> JSONEncoder: 

1107 """Json serializer callable""" 

1108 return self._json_serialize 

1109 

1110 @property 

1111 def connector_owner(self) -> bool: 

1112 """Should connector be closed on session closing""" 

1113 return self._connector_owner 

1114 

1115 @property 

1116 def raise_for_status( 

1117 self, 

1118 ) -> Union[bool, Callable[[ClientResponse], Awaitable[None]]]: 

1119 """Should `ClientResponse.raise_for_status()` be called for each response.""" 

1120 return self._raise_for_status 

1121 

1122 @property 

1123 def auto_decompress(self) -> bool: 

1124 """Should the body response be automatically decompressed.""" 

1125 return self._auto_decompress 

1126 

1127 @property 

1128 def trust_env(self) -> bool: 

1129 """ 

1130 Should proxies information from environment or netrc be trusted. 

1131 

1132 Information is from HTTP_PROXY / HTTPS_PROXY environment variables 

1133 or ~/.netrc file if present. 

1134 """ 

1135 return self._trust_env 

1136 

1137 @property 

1138 def trace_configs(self) -> List[TraceConfig]: 

1139 """A list of TraceConfig instances used for client tracing""" 

1140 return self._trace_configs 

1141 

1142 def detach(self) -> None: 

1143 """Detach connector from session without closing the former. 

1144 

1145 Session is switched to closed state anyway. 

1146 """ 

1147 self._connector = None 

1148 

1149 def __enter__(self) -> None: 

1150 raise TypeError("Use async with instead") 

1151 

1152 def __exit__( 

1153 self, 

1154 exc_type: Optional[Type[BaseException]], 

1155 exc_val: Optional[BaseException], 

1156 exc_tb: Optional[TracebackType], 

1157 ) -> None: 

1158 # __exit__ should exist in pair with __enter__ but never executed 

1159 pass # pragma: no cover 

1160 

1161 async def __aenter__(self) -> "ClientSession": 

1162 return self 

1163 

1164 async def __aexit__( 

1165 self, 

1166 exc_type: Optional[Type[BaseException]], 

1167 exc_val: Optional[BaseException], 

1168 exc_tb: Optional[TracebackType], 

1169 ) -> None: 

1170 await self.close() 

1171 

1172 

1173class _BaseRequestContextManager(Coroutine[Any, Any, _RetType], Generic[_RetType]): 

1174 

1175 __slots__ = ("_coro", "_resp") 

1176 

1177 def __init__(self, coro: Coroutine["asyncio.Future[Any]", None, _RetType]) -> None: 

1178 self._coro = coro 

1179 

1180 def send(self, arg: None) -> "asyncio.Future[Any]": 

1181 return self._coro.send(arg) 

1182 

1183 def throw(self, *args: Any, **kwargs: Any) -> "asyncio.Future[Any]": 

1184 return self._coro.throw(*args, **kwargs) 

1185 

1186 def close(self) -> None: 

1187 return self._coro.close() 

1188 

1189 def __await__(self) -> Generator[Any, None, _RetType]: 

1190 ret = self._coro.__await__() 

1191 return ret 

1192 

1193 def __iter__(self) -> Generator[Any, None, _RetType]: 

1194 return self.__await__() 

1195 

1196 async def __aenter__(self) -> _RetType: 

1197 self._resp = await self._coro 

1198 return self._resp 

1199 

1200 

1201class _RequestContextManager(_BaseRequestContextManager[ClientResponse]): 

1202 __slots__ = () 

1203 

1204 async def __aexit__( 

1205 self, 

1206 exc_type: Optional[Type[BaseException]], 

1207 exc: Optional[BaseException], 

1208 tb: Optional[TracebackType], 

1209 ) -> None: 

1210 # We're basing behavior on the exception as it can be caused by 

1211 # user code unrelated to the status of the connection. If you 

1212 # would like to close a connection you must do that 

1213 # explicitly. Otherwise connection error handling should kick in 

1214 # and close/recycle the connection as required. 

1215 self._resp.release() 

1216 await self._resp.wait_for_close() 

1217 

1218 

1219class _WSRequestContextManager(_BaseRequestContextManager[ClientWebSocketResponse]): 

1220 __slots__ = () 

1221 

1222 async def __aexit__( 

1223 self, 

1224 exc_type: Optional[Type[BaseException]], 

1225 exc: Optional[BaseException], 

1226 tb: Optional[TracebackType], 

1227 ) -> None: 

1228 await self._resp.close() 

1229 

1230 

1231class _SessionRequestContextManager: 

1232 

1233 __slots__ = ("_coro", "_resp", "_session") 

1234 

1235 def __init__( 

1236 self, 

1237 coro: Coroutine["asyncio.Future[Any]", None, ClientResponse], 

1238 session: ClientSession, 

1239 ) -> None: 

1240 self._coro = coro 

1241 self._resp: Optional[ClientResponse] = None 

1242 self._session = session 

1243 

1244 async def __aenter__(self) -> ClientResponse: 

1245 try: 

1246 self._resp = await self._coro 

1247 except BaseException: 

1248 await self._session.close() 

1249 raise 

1250 else: 

1251 return self._resp 

1252 

1253 async def __aexit__( 

1254 self, 

1255 exc_type: Optional[Type[BaseException]], 

1256 exc: Optional[BaseException], 

1257 tb: Optional[TracebackType], 

1258 ) -> None: 

1259 assert self._resp is not None 

1260 self._resp.close() 

1261 await self._session.close() 

1262 

1263 

1264def request( 

1265 method: str, 

1266 url: StrOrURL, 

1267 *, 

1268 params: Optional[Mapping[str, str]] = None, 

1269 data: Any = None, 

1270 json: Any = None, 

1271 headers: Optional[LooseHeaders] = None, 

1272 skip_auto_headers: Optional[Iterable[str]] = None, 

1273 auth: Optional[BasicAuth] = None, 

1274 allow_redirects: bool = True, 

1275 max_redirects: int = 10, 

1276 compress: Optional[str] = None, 

1277 chunked: Optional[bool] = None, 

1278 expect100: bool = False, 

1279 raise_for_status: Optional[bool] = None, 

1280 read_until_eof: bool = True, 

1281 proxy: Optional[StrOrURL] = None, 

1282 proxy_auth: Optional[BasicAuth] = None, 

1283 timeout: Union[ClientTimeout, object] = sentinel, 

1284 cookies: Optional[LooseCookies] = None, 

1285 version: HttpVersion = http.HttpVersion11, 

1286 connector: Optional[BaseConnector] = None, 

1287 read_bufsize: Optional[int] = None, 

1288 loop: Optional[asyncio.AbstractEventLoop] = None, 

1289 max_line_size: int = 8190, 

1290 max_field_size: int = 8190, 

1291) -> _SessionRequestContextManager: 

1292 """Constructs and sends a request. 

1293 

1294 Returns response object. 

1295 method - HTTP method 

1296 url - request url 

1297 params - (optional) Dictionary or bytes to be sent in the query 

1298 string of the new request 

1299 data - (optional) Dictionary, bytes, or file-like object to 

1300 send in the body of the request 

1301 json - (optional) Any json compatible python object 

1302 headers - (optional) Dictionary of HTTP Headers to send with 

1303 the request 

1304 cookies - (optional) Dict object to send with the request 

1305 auth - (optional) BasicAuth named tuple represent HTTP Basic Auth 

1306 auth - aiohttp.helpers.BasicAuth 

1307 allow_redirects - (optional) If set to False, do not follow 

1308 redirects 

1309 version - Request HTTP version. 

1310 compress - Set to True if request has to be compressed 

1311 with deflate encoding. 

1312 chunked - Set to chunk size for chunked transfer encoding. 

1313 expect100 - Expect 100-continue response from server. 

1314 connector - BaseConnector sub-class instance to support 

1315 connection pooling. 

1316 read_until_eof - Read response until eof if response 

1317 does not have Content-Length header. 

1318 loop - Optional event loop. 

1319 timeout - Optional ClientTimeout settings structure, 5min 

1320 total timeout by default. 

1321 Usage:: 

1322 >>> import aiohttp 

1323 >>> resp = await aiohttp.request('GET', 'http://python.org/') 

1324 >>> resp 

1325 <ClientResponse(python.org/) [200]> 

1326 >>> data = await resp.read() 

1327 """ 

1328 connector_owner = False 

1329 if connector is None: 

1330 connector_owner = True 

1331 connector = TCPConnector(loop=loop, force_close=True) 

1332 

1333 session = ClientSession( 

1334 loop=loop, 

1335 cookies=cookies, 

1336 version=version, 

1337 timeout=timeout, 

1338 connector=connector, 

1339 connector_owner=connector_owner, 

1340 ) 

1341 

1342 return _SessionRequestContextManager( 

1343 session._request( 

1344 method, 

1345 url, 

1346 params=params, 

1347 data=data, 

1348 json=json, 

1349 headers=headers, 

1350 skip_auto_headers=skip_auto_headers, 

1351 auth=auth, 

1352 allow_redirects=allow_redirects, 

1353 max_redirects=max_redirects, 

1354 compress=compress, 

1355 chunked=chunked, 

1356 expect100=expect100, 

1357 raise_for_status=raise_for_status, 

1358 read_until_eof=read_until_eof, 

1359 proxy=proxy, 

1360 proxy_auth=proxy_auth, 

1361 read_bufsize=read_bufsize, 

1362 max_line_size=max_line_size, 

1363 max_field_size=max_field_size, 

1364 ), 

1365 session, 

1366 )