Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client.py: 32%
467 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1"""HTTP Client for asyncio."""
3import asyncio
4import base64
5import dataclasses
6import hashlib
7import json
8import os
9import sys
10import traceback
11import warnings
12from contextlib import suppress
13from types import SimpleNamespace, TracebackType
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Awaitable,
18 Callable,
19 Coroutine,
20 FrozenSet,
21 Generator,
22 Generic,
23 Iterable,
24 List,
25 Mapping,
26 Optional,
27 Set,
28 Tuple,
29 Type,
30 TypeVar,
31 Union,
32)
34from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr
35from typing_extensions import Final, final
36from yarl import URL
38from . import hdrs, http, payload
39from .abc import AbstractCookieJar
40from .client_exceptions import (
41 ClientConnectionError,
42 ClientConnectorCertificateError,
43 ClientConnectorError,
44 ClientConnectorSSLError,
45 ClientError,
46 ClientHttpProxyError,
47 ClientOSError,
48 ClientPayloadError,
49 ClientProxyConnectionError,
50 ClientResponseError,
51 ClientSSLError,
52 ContentTypeError,
53 InvalidURL,
54 ServerConnectionError,
55 ServerDisconnectedError,
56 ServerFingerprintMismatch,
57 ServerTimeoutError,
58 TooManyRedirects,
59 WSServerHandshakeError,
60)
61from .client_reqrep import (
62 SSL_ALLOWED_TYPES,
63 ClientRequest,
64 ClientResponse,
65 Fingerprint,
66 RequestInfo,
67)
68from .client_ws import (
69 DEFAULT_WS_CLIENT_TIMEOUT,
70 ClientWebSocketResponse,
71 ClientWSTimeout,
72)
73from .connector import BaseConnector, NamedPipeConnector, TCPConnector, UnixConnector
74from .cookiejar import CookieJar
75from .helpers import (
76 _SENTINEL,
77 BasicAuth,
78 TimeoutHandle,
79 ceil_timeout,
80 get_env_proxy_for_url,
81 sentinel,
82 strip_auth_from_url,
83)
84from .http import WS_KEY, HttpVersion, WebSocketReader, WebSocketWriter
85from .http_websocket import WSHandshakeError, WSMessage, ws_ext_gen, ws_ext_parse
86from .streams import FlowControlDataQueue
87from .tracing import Trace, TraceConfig
88from .typedefs import JSONEncoder, LooseCookies, LooseHeaders, StrOrURL
90__all__ = (
91 # client_exceptions
92 "ClientConnectionError",
93 "ClientConnectorCertificateError",
94 "ClientConnectorError",
95 "ClientConnectorSSLError",
96 "ClientError",
97 "ClientHttpProxyError",
98 "ClientOSError",
99 "ClientPayloadError",
100 "ClientProxyConnectionError",
101 "ClientResponseError",
102 "ClientSSLError",
103 "ContentTypeError",
104 "InvalidURL",
105 "ServerConnectionError",
106 "ServerDisconnectedError",
107 "ServerFingerprintMismatch",
108 "ServerTimeoutError",
109 "TooManyRedirects",
110 "WSServerHandshakeError",
111 # client_reqrep
112 "ClientRequest",
113 "ClientResponse",
114 "Fingerprint",
115 "RequestInfo",
116 # connector
117 "BaseConnector",
118 "TCPConnector",
119 "UnixConnector",
120 "NamedPipeConnector",
121 # client_ws
122 "ClientWebSocketResponse",
123 # client
124 "ClientSession",
125 "ClientTimeout",
126 "request",
127)
130if TYPE_CHECKING:
131 from ssl import SSLContext
132else:
133 SSLContext = None
136@dataclasses.dataclass(frozen=True)
137class ClientTimeout:
138 total: Optional[float] = None
139 connect: Optional[float] = None
140 sock_read: Optional[float] = None
141 sock_connect: Optional[float] = None
142 ceil_threshold: float = 5
144 # pool_queue_timeout: Optional[float] = None
145 # dns_resolution_timeout: Optional[float] = None
146 # socket_connect_timeout: Optional[float] = None
147 # connection_acquiring_timeout: Optional[float] = None
148 # new_connection_timeout: Optional[float] = None
149 # http_header_timeout: Optional[float] = None
150 # response_body_timeout: Optional[float] = None
152 # to create a timeout specific for a single request, either
153 # - create a completely new one to overwrite the default
154 # - or use https://docs.python.org/3/library/dataclasses.html#dataclasses.replace
155 # to overwrite the defaults
158# 5 Minute default read timeout
159DEFAULT_TIMEOUT: Final[ClientTimeout] = ClientTimeout(total=5 * 60)
161_RetType = TypeVar("_RetType")
164@final
165class ClientSession:
166 """First-class interface for making HTTP requests."""
168 __slots__ = (
169 "_base_url",
170 "_source_traceback",
171 "_connector",
172 "_loop",
173 "_cookie_jar",
174 "_connector_owner",
175 "_default_auth",
176 "_version",
177 "_json_serialize",
178 "_requote_redirect_url",
179 "_timeout",
180 "_raise_for_status",
181 "_auto_decompress",
182 "_trust_env",
183 "_default_headers",
184 "_skip_auto_headers",
185 "_request_class",
186 "_response_class",
187 "_ws_response_class",
188 "_trace_configs",
189 "_read_bufsize",
190 "_max_line_size",
191 "_max_field_size",
192 )
194 def __init__(
195 self,
196 base_url: Optional[StrOrURL] = None,
197 *,
198 connector: Optional[BaseConnector] = None,
199 cookies: Optional[LooseCookies] = None,
200 headers: Optional[LooseHeaders] = None,
201 skip_auto_headers: Optional[Iterable[str]] = None,
202 auth: Optional[BasicAuth] = None,
203 json_serialize: JSONEncoder = json.dumps,
204 request_class: Type[ClientRequest] = ClientRequest,
205 response_class: Type[ClientResponse] = ClientResponse,
206 ws_response_class: Type[ClientWebSocketResponse] = ClientWebSocketResponse,
207 version: HttpVersion = http.HttpVersion11,
208 cookie_jar: Optional[AbstractCookieJar] = None,
209 connector_owner: bool = True,
210 raise_for_status: Union[
211 bool, Callable[[ClientResponse], Awaitable[None]]
212 ] = False,
213 timeout: Union[_SENTINEL, ClientTimeout, None] = sentinel,
214 auto_decompress: bool = True,
215 trust_env: bool = False,
216 requote_redirect_url: bool = True,
217 trace_configs: Optional[List[TraceConfig]] = None,
218 read_bufsize: int = 2**16,
219 max_line_size: int = 8190,
220 max_field_size: int = 8190,
221 ) -> None:
222 if base_url is None or isinstance(base_url, URL):
223 self._base_url: Optional[URL] = base_url
224 else:
225 self._base_url = URL(base_url)
226 assert (
227 self._base_url.origin() == self._base_url
228 ), "Only absolute URLs without path part are supported"
230 loop = asyncio.get_running_loop()
232 if connector is None:
233 connector = TCPConnector()
235 # Initialize these three attrs before raising any exception,
236 # they are used in __del__
237 self._connector: Optional[BaseConnector] = connector
238 self._loop = loop
239 if loop.get_debug():
240 self._source_traceback: Optional[
241 traceback.StackSummary
242 ] = traceback.extract_stack(sys._getframe(1))
243 else:
244 self._source_traceback = None
246 if connector._loop is not loop:
247 raise RuntimeError("Session and connector have to use same event loop")
249 if cookie_jar is None:
250 cookie_jar = CookieJar()
251 self._cookie_jar = cookie_jar
253 if cookies is not None:
254 self._cookie_jar.update_cookies(cookies)
256 self._connector_owner = connector_owner
257 self._default_auth = auth
258 self._version = version
259 self._json_serialize = json_serialize
260 if timeout is sentinel or timeout is None:
261 self._timeout = DEFAULT_TIMEOUT
262 else:
263 self._timeout = timeout
264 self._raise_for_status = raise_for_status
265 self._auto_decompress = auto_decompress
266 self._trust_env = trust_env
267 self._requote_redirect_url = requote_redirect_url
268 self._read_bufsize = read_bufsize
269 self._max_line_size = max_line_size
270 self._max_field_size = max_field_size
272 # Convert to list of tuples
273 if headers:
274 real_headers: CIMultiDict[str] = CIMultiDict(headers)
275 else:
276 real_headers = CIMultiDict()
277 self._default_headers: CIMultiDict[str] = real_headers
278 if skip_auto_headers is not None:
279 self._skip_auto_headers = frozenset(istr(i) for i in skip_auto_headers)
280 else:
281 self._skip_auto_headers = frozenset()
283 self._request_class = request_class
284 self._response_class = response_class
285 self._ws_response_class = ws_response_class
287 self._trace_configs = trace_configs or []
288 for trace_config in self._trace_configs:
289 trace_config.freeze()
291 def __init_subclass__(cls: Type["ClientSession"]) -> None:
292 raise TypeError(
293 "Inheritance class {} from ClientSession "
294 "is forbidden".format(cls.__name__)
295 )
297 def __del__(self, _warnings: Any = warnings) -> None:
298 if not self.closed:
299 _warnings.warn(
300 f"Unclosed client session {self!r}",
301 ResourceWarning,
302 source=self,
303 )
304 context = {"client_session": self, "message": "Unclosed client session"}
305 if self._source_traceback is not None:
306 context["source_traceback"] = self._source_traceback
307 self._loop.call_exception_handler(context)
309 def request(
310 self, method: str, url: StrOrURL, **kwargs: Any
311 ) -> "_RequestContextManager":
312 """Perform HTTP request."""
313 return _RequestContextManager(self._request(method, url, **kwargs))
315 def _build_url(self, str_or_url: StrOrURL) -> URL:
316 url = URL(str_or_url)
317 if self._base_url is None:
318 return url
319 else:
320 assert not url.is_absolute() and url.path.startswith("/")
321 return self._base_url.join(url)
323 async def _request(
324 self,
325 method: str,
326 str_or_url: StrOrURL,
327 *,
328 params: Optional[Mapping[str, str]] = None,
329 data: Any = None,
330 json: Any = None,
331 cookies: Optional[LooseCookies] = None,
332 headers: Optional[LooseHeaders] = None,
333 skip_auto_headers: Optional[Iterable[str]] = None,
334 auth: Optional[BasicAuth] = None,
335 allow_redirects: bool = True,
336 max_redirects: int = 10,
337 compress: Optional[str] = None,
338 chunked: Optional[bool] = None,
339 expect100: bool = False,
340 raise_for_status: Union[
341 None, bool, Callable[[ClientResponse], Awaitable[None]]
342 ] = None,
343 read_until_eof: bool = True,
344 proxy: Optional[StrOrURL] = None,
345 proxy_auth: Optional[BasicAuth] = None,
346 timeout: Union[ClientTimeout, _SENTINEL, None] = sentinel,
347 ssl: Optional[Union[SSLContext, bool, Fingerprint]] = None,
348 proxy_headers: Optional[LooseHeaders] = None,
349 trace_request_ctx: Optional[SimpleNamespace] = None,
350 read_bufsize: Optional[int] = None,
351 auto_decompress: Optional[bool] = None,
352 max_line_size: Optional[int] = None,
353 max_field_size: Optional[int] = None,
354 ) -> ClientResponse:
355 # NOTE: timeout clamps existing connect and read timeouts. We cannot
356 # set the default to None because we need to detect if the user wants
357 # to use the existing timeouts by setting timeout to None.
359 if self.closed:
360 raise RuntimeError("Session is closed")
362 if not isinstance(ssl, SSL_ALLOWED_TYPES):
363 raise TypeError(
364 "ssl should be SSLContext, bool, Fingerprint, "
365 "or None, got {!r} instead.".format(ssl)
366 )
368 if data is not None and json is not None:
369 raise ValueError(
370 "data and json parameters can not be used at the same time"
371 )
372 elif json is not None:
373 data = payload.JsonPayload(json, dumps=self._json_serialize)
375 redirects = 0
376 history = []
377 version = self._version
378 params = params or {}
380 # Merge with default headers and transform to CIMultiDict
381 headers = self._prepare_headers(headers)
382 proxy_headers = self._prepare_headers(proxy_headers)
384 try:
385 url = self._build_url(str_or_url)
386 except ValueError as e:
387 raise InvalidURL(str_or_url) from e
389 skip_headers = set(self._skip_auto_headers)
390 if skip_auto_headers is not None:
391 for i in skip_auto_headers:
392 skip_headers.add(istr(i))
394 if proxy is not None:
395 try:
396 proxy = URL(proxy)
397 except ValueError as e:
398 raise InvalidURL(proxy) from e
400 if timeout is sentinel or timeout is None:
401 real_timeout: ClientTimeout = self._timeout
402 else:
403 real_timeout = timeout
404 # timeout is cumulative for all request operations
405 # (request, redirects, responses, data consuming)
406 tm = TimeoutHandle(
407 self._loop, real_timeout.total, ceil_threshold=real_timeout.ceil_threshold
408 )
409 handle = tm.start()
411 if read_bufsize is None:
412 read_bufsize = self._read_bufsize
414 if auto_decompress is None:
415 auto_decompress = self._auto_decompress
417 if max_line_size is None:
418 max_line_size = self._max_line_size
420 if max_field_size is None:
421 max_field_size = self._max_field_size
423 traces = [
424 Trace(
425 self,
426 trace_config,
427 trace_config.trace_config_ctx(trace_request_ctx=trace_request_ctx),
428 )
429 for trace_config in self._trace_configs
430 ]
432 for trace in traces:
433 await trace.send_request_start(method, url.update_query(params), headers)
435 timer = tm.timer()
436 try:
437 with timer:
438 while True:
439 url, auth_from_url = strip_auth_from_url(url)
440 if auth and auth_from_url:
441 raise ValueError(
442 "Cannot combine AUTH argument with "
443 "credentials encoded in URL"
444 )
446 if auth is None:
447 auth = auth_from_url
448 if auth is None:
449 auth = self._default_auth
450 # It would be confusing if we support explicit
451 # Authorization header with auth argument
452 if (
453 headers is not None
454 and auth is not None
455 and hdrs.AUTHORIZATION in headers
456 ):
457 raise ValueError(
458 "Cannot combine AUTHORIZATION header "
459 "with AUTH argument or credentials "
460 "encoded in URL"
461 )
463 all_cookies = self._cookie_jar.filter_cookies(url)
465 if cookies is not None:
466 tmp_cookie_jar = CookieJar()
467 tmp_cookie_jar.update_cookies(cookies)
468 req_cookies = tmp_cookie_jar.filter_cookies(url)
469 if req_cookies:
470 all_cookies.load(req_cookies)
472 if proxy is not None:
473 proxy = URL(proxy)
474 elif self._trust_env:
475 with suppress(LookupError):
476 proxy, proxy_auth = get_env_proxy_for_url(url)
478 req = self._request_class(
479 method,
480 url,
481 params=params,
482 headers=headers,
483 skip_auto_headers=skip_headers,
484 data=data,
485 cookies=all_cookies,
486 auth=auth,
487 version=version,
488 compress=compress,
489 chunked=chunked,
490 expect100=expect100,
491 loop=self._loop,
492 response_class=self._response_class,
493 proxy=proxy,
494 proxy_auth=proxy_auth,
495 timer=timer,
496 session=self,
497 ssl=ssl,
498 proxy_headers=proxy_headers,
499 traces=traces,
500 trust_env=self.trust_env,
501 )
503 # connection timeout
504 try:
505 async with ceil_timeout(
506 real_timeout.connect,
507 ceil_threshold=real_timeout.ceil_threshold,
508 ):
509 assert self._connector is not None
510 conn = await self._connector.connect(
511 req, traces=traces, timeout=real_timeout
512 )
513 except asyncio.TimeoutError as exc:
514 raise ServerTimeoutError(
515 f"Connection timeout to host {url}"
516 ) from exc
518 assert conn.transport is not None
520 assert conn.protocol is not None
521 conn.protocol.set_response_params(
522 timer=timer,
523 skip_payload=method.upper() == "HEAD",
524 read_until_eof=read_until_eof,
525 auto_decompress=auto_decompress,
526 read_timeout=real_timeout.sock_read,
527 read_bufsize=read_bufsize,
528 timeout_ceil_threshold=self._connector._timeout_ceil_threshold,
529 max_line_size=max_line_size,
530 max_field_size=max_field_size,
531 )
533 try:
534 try:
535 resp = await req.send(conn)
536 try:
537 await resp.start(conn)
538 except BaseException:
539 resp.close()
540 raise
541 except BaseException:
542 conn.close()
543 raise
544 except ClientError:
545 raise
546 except OSError as exc:
547 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
548 raise
549 raise ClientOSError(*exc.args) from exc
551 self._cookie_jar.update_cookies(resp.cookies, resp.url)
553 # redirects
554 if resp.status in (301, 302, 303, 307, 308) and allow_redirects:
555 for trace in traces:
556 await trace.send_request_redirect(
557 method, url.update_query(params), headers, resp
558 )
560 redirects += 1
561 history.append(resp)
562 if max_redirects and redirects >= max_redirects:
563 resp.close()
564 raise TooManyRedirects(
565 history[0].request_info, tuple(history)
566 )
568 # For 301 and 302, mimic IE, now changed in RFC
569 # https://github.com/kennethreitz/requests/pull/269
570 if (resp.status == 303 and resp.method != hdrs.METH_HEAD) or (
571 resp.status in (301, 302) and resp.method == hdrs.METH_POST
572 ):
573 method = hdrs.METH_GET
574 data = None
575 if headers.get(hdrs.CONTENT_LENGTH):
576 headers.pop(hdrs.CONTENT_LENGTH)
578 r_url = resp.headers.get(hdrs.LOCATION) or resp.headers.get(
579 hdrs.URI
580 )
581 if r_url is None:
582 # see github.com/aio-libs/aiohttp/issues/2022
583 break
584 else:
585 # reading from correct redirection
586 # response is forbidden
587 resp.release()
589 try:
590 parsed_url = URL(
591 r_url, encoded=not self._requote_redirect_url
592 )
594 except ValueError as e:
595 raise InvalidURL(r_url) from e
597 scheme = parsed_url.scheme
598 if scheme not in ("http", "https", ""):
599 resp.close()
600 raise ValueError("Can redirect only to http or https")
601 elif not scheme:
602 parsed_url = url.join(parsed_url)
604 is_same_host_https_redirect = (
605 url.host == parsed_url.host
606 and parsed_url.scheme == "https"
607 and url.scheme == "http"
608 )
610 if (
611 url.origin() != parsed_url.origin()
612 and not is_same_host_https_redirect
613 ):
614 auth = None
615 headers.pop(hdrs.AUTHORIZATION, None)
617 url = parsed_url
618 params = {}
619 resp.release()
620 continue
622 break
624 # check response status
625 if raise_for_status is None:
626 raise_for_status = self._raise_for_status
628 if raise_for_status is None:
629 pass
630 elif callable(raise_for_status):
631 await raise_for_status(resp)
632 elif raise_for_status:
633 resp.raise_for_status()
635 # register connection
636 if handle is not None:
637 if resp.connection is not None:
638 resp.connection.add_callback(handle.cancel)
639 else:
640 handle.cancel()
642 resp._history = tuple(history)
644 for trace in traces:
645 await trace.send_request_end(
646 method, url.update_query(params), headers, resp
647 )
648 return resp
650 except BaseException as e:
651 # cleanup timer
652 tm.close()
653 if handle:
654 handle.cancel()
655 handle = None
657 for trace in traces:
658 await trace.send_request_exception(
659 method, url.update_query(params), headers, e
660 )
661 raise
663 def ws_connect(
664 self,
665 url: StrOrURL,
666 *,
667 method: str = hdrs.METH_GET,
668 protocols: Iterable[str] = (),
669 timeout: Union[ClientWSTimeout, float, _SENTINEL, None] = sentinel,
670 receive_timeout: Optional[float] = None,
671 autoclose: bool = True,
672 autoping: bool = True,
673 heartbeat: Optional[float] = None,
674 auth: Optional[BasicAuth] = None,
675 origin: Optional[str] = None,
676 params: Optional[Mapping[str, str]] = None,
677 headers: Optional[LooseHeaders] = None,
678 proxy: Optional[StrOrURL] = None,
679 proxy_auth: Optional[BasicAuth] = None,
680 ssl: Union[SSLContext, bool, None, Fingerprint] = None,
681 proxy_headers: Optional[LooseHeaders] = None,
682 compress: int = 0,
683 max_msg_size: int = 4 * 1024 * 1024,
684 ) -> "_WSRequestContextManager":
685 """Initiate websocket connection."""
686 return _WSRequestContextManager(
687 self._ws_connect(
688 url,
689 method=method,
690 protocols=protocols,
691 timeout=timeout,
692 receive_timeout=receive_timeout,
693 autoclose=autoclose,
694 autoping=autoping,
695 heartbeat=heartbeat,
696 auth=auth,
697 origin=origin,
698 params=params,
699 headers=headers,
700 proxy=proxy,
701 proxy_auth=proxy_auth,
702 ssl=ssl,
703 proxy_headers=proxy_headers,
704 compress=compress,
705 max_msg_size=max_msg_size,
706 )
707 )
709 async def _ws_connect(
710 self,
711 url: StrOrURL,
712 *,
713 method: str = hdrs.METH_GET,
714 protocols: Iterable[str] = (),
715 timeout: Union[ClientWSTimeout, float, _SENTINEL, None] = sentinel,
716 receive_timeout: Optional[float] = None,
717 autoclose: bool = True,
718 autoping: bool = True,
719 heartbeat: Optional[float] = None,
720 auth: Optional[BasicAuth] = None,
721 origin: Optional[str] = None,
722 params: Optional[Mapping[str, str]] = None,
723 headers: Optional[LooseHeaders] = None,
724 proxy: Optional[StrOrURL] = None,
725 proxy_auth: Optional[BasicAuth] = None,
726 ssl: Union[SSLContext, bool, None, Fingerprint] = None,
727 proxy_headers: Optional[LooseHeaders] = None,
728 compress: int = 0,
729 max_msg_size: int = 4 * 1024 * 1024,
730 ) -> ClientWebSocketResponse:
731 if timeout is sentinel or timeout is None:
732 ws_timeout = DEFAULT_WS_CLIENT_TIMEOUT
733 else:
734 if isinstance(timeout, ClientWSTimeout):
735 ws_timeout = timeout
736 else:
737 warnings.warn(
738 "parameter 'timeout' of type 'float' "
739 "is deprecated, please use "
740 "'timeout=ClientWSTimeout(ws_close=...)'",
741 DeprecationWarning,
742 stacklevel=2,
743 )
744 ws_timeout = ClientWSTimeout(ws_close=timeout)
746 if receive_timeout is not None:
747 warnings.warn(
748 "float parameter 'receive_timeout' "
749 "is deprecated, please use parameter "
750 "'timeout=ClientWSTimeout(ws_receive=...)'",
751 DeprecationWarning,
752 stacklevel=2,
753 )
754 ws_timeout = dataclasses.replace(ws_timeout, ws_receive=receive_timeout)
756 if headers is None:
757 real_headers: CIMultiDict[str] = CIMultiDict()
758 else:
759 real_headers = CIMultiDict(headers)
761 default_headers = {
762 hdrs.UPGRADE: "websocket",
763 hdrs.CONNECTION: "Upgrade",
764 hdrs.SEC_WEBSOCKET_VERSION: "13",
765 }
767 for key, value in default_headers.items():
768 real_headers.setdefault(key, value)
770 sec_key = base64.b64encode(os.urandom(16))
771 real_headers[hdrs.SEC_WEBSOCKET_KEY] = sec_key.decode()
773 if protocols:
774 real_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = ",".join(protocols)
775 if origin is not None:
776 real_headers[hdrs.ORIGIN] = origin
777 if compress:
778 extstr = ws_ext_gen(compress=compress)
779 real_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = extstr
781 if not isinstance(ssl, SSL_ALLOWED_TYPES):
782 raise TypeError(
783 "ssl should be SSLContext, bool, Fingerprint, "
784 "or None, got {!r} instead.".format(ssl)
785 )
787 # send request
788 resp = await self.request(
789 method,
790 url,
791 params=params,
792 headers=real_headers,
793 read_until_eof=False,
794 auth=auth,
795 proxy=proxy,
796 proxy_auth=proxy_auth,
797 ssl=ssl,
798 proxy_headers=proxy_headers,
799 )
801 try:
802 # check handshake
803 if resp.status != 101:
804 raise WSServerHandshakeError(
805 resp.request_info,
806 resp.history,
807 message="Invalid response status",
808 status=resp.status,
809 headers=resp.headers,
810 )
812 if resp.headers.get(hdrs.UPGRADE, "").lower() != "websocket":
813 raise WSServerHandshakeError(
814 resp.request_info,
815 resp.history,
816 message="Invalid upgrade header",
817 status=resp.status,
818 headers=resp.headers,
819 )
821 if resp.headers.get(hdrs.CONNECTION, "").lower() != "upgrade":
822 raise WSServerHandshakeError(
823 resp.request_info,
824 resp.history,
825 message="Invalid connection header",
826 status=resp.status,
827 headers=resp.headers,
828 )
830 # key calculation
831 r_key = resp.headers.get(hdrs.SEC_WEBSOCKET_ACCEPT, "")
832 match = base64.b64encode(hashlib.sha1(sec_key + WS_KEY).digest()).decode()
833 if r_key != match:
834 raise WSServerHandshakeError(
835 resp.request_info,
836 resp.history,
837 message="Invalid challenge response",
838 status=resp.status,
839 headers=resp.headers,
840 )
842 # websocket protocol
843 protocol = None
844 if protocols and hdrs.SEC_WEBSOCKET_PROTOCOL in resp.headers:
845 resp_protocols = [
846 proto.strip()
847 for proto in resp.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",")
848 ]
850 for proto in resp_protocols:
851 if proto in protocols:
852 protocol = proto
853 break
855 # websocket compress
856 notakeover = False
857 if compress:
858 compress_hdrs = resp.headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS)
859 if compress_hdrs:
860 try:
861 compress, notakeover = ws_ext_parse(compress_hdrs)
862 except WSHandshakeError as exc:
863 raise WSServerHandshakeError(
864 resp.request_info,
865 resp.history,
866 message=exc.args[0],
867 status=resp.status,
868 headers=resp.headers,
869 ) from exc
870 else:
871 compress = 0
872 notakeover = False
874 conn = resp.connection
875 assert conn is not None
876 conn_proto = conn.protocol
877 assert conn_proto is not None
878 transport = conn.transport
879 assert transport is not None
880 reader: FlowControlDataQueue[WSMessage] = FlowControlDataQueue(
881 conn_proto, 2**16, loop=self._loop
882 )
883 conn_proto.set_parser(WebSocketReader(reader, max_msg_size), reader)
884 writer = WebSocketWriter(
885 conn_proto,
886 transport,
887 use_mask=True,
888 compress=compress,
889 notakeover=notakeover,
890 )
891 except BaseException:
892 resp.close()
893 raise
894 else:
895 return self._ws_response_class(
896 reader,
897 writer,
898 protocol,
899 resp,
900 ws_timeout,
901 autoclose,
902 autoping,
903 self._loop,
904 heartbeat=heartbeat,
905 compress=compress,
906 client_notakeover=notakeover,
907 )
909 def _prepare_headers(self, headers: Optional[LooseHeaders]) -> "CIMultiDict[str]":
910 """Add default headers and transform it to CIMultiDict"""
911 # Convert headers to MultiDict
912 result = CIMultiDict(self._default_headers)
913 if headers:
914 if not isinstance(headers, (MultiDictProxy, MultiDict)):
915 headers = CIMultiDict(headers)
916 added_names: Set[str] = set()
917 for key, value in headers.items():
918 if key in added_names:
919 result.add(key, value)
920 else:
921 result[key] = value
922 added_names.add(key)
923 return result
925 def get(
926 self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any
927 ) -> "_RequestContextManager":
928 """Perform HTTP GET request."""
929 return _RequestContextManager(
930 self._request(hdrs.METH_GET, url, allow_redirects=allow_redirects, **kwargs)
931 )
933 def options(
934 self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any
935 ) -> "_RequestContextManager":
936 """Perform HTTP OPTIONS request."""
937 return _RequestContextManager(
938 self._request(
939 hdrs.METH_OPTIONS, url, allow_redirects=allow_redirects, **kwargs
940 )
941 )
943 def head(
944 self, url: StrOrURL, *, allow_redirects: bool = False, **kwargs: Any
945 ) -> "_RequestContextManager":
946 """Perform HTTP HEAD request."""
947 return _RequestContextManager(
948 self._request(
949 hdrs.METH_HEAD, url, allow_redirects=allow_redirects, **kwargs
950 )
951 )
953 def post(
954 self, url: StrOrURL, *, data: Any = None, **kwargs: Any
955 ) -> "_RequestContextManager":
956 """Perform HTTP POST request."""
957 return _RequestContextManager(
958 self._request(hdrs.METH_POST, url, data=data, **kwargs)
959 )
961 def put(
962 self, url: StrOrURL, *, data: Any = None, **kwargs: Any
963 ) -> "_RequestContextManager":
964 """Perform HTTP PUT request."""
965 return _RequestContextManager(
966 self._request(hdrs.METH_PUT, url, data=data, **kwargs)
967 )
969 def patch(
970 self, url: StrOrURL, *, data: Any = None, **kwargs: Any
971 ) -> "_RequestContextManager":
972 """Perform HTTP PATCH request."""
973 return _RequestContextManager(
974 self._request(hdrs.METH_PATCH, url, data=data, **kwargs)
975 )
977 def delete(self, url: StrOrURL, **kwargs: Any) -> "_RequestContextManager":
978 """Perform HTTP DELETE request."""
979 return _RequestContextManager(self._request(hdrs.METH_DELETE, url, **kwargs))
981 async def close(self) -> None:
982 """Close underlying connector.
984 Release all acquired resources.
985 """
986 if not self.closed:
987 if self._connector is not None and self._connector_owner:
988 await self._connector.close()
989 self._connector = None
991 @property
992 def closed(self) -> bool:
993 """Is client session closed.
995 A readonly property.
996 """
997 return self._connector is None or self._connector.closed
999 @property
1000 def connector(self) -> Optional[BaseConnector]:
1001 """Connector instance used for the session."""
1002 return self._connector
1004 @property
1005 def cookie_jar(self) -> AbstractCookieJar:
1006 """The session cookies."""
1007 return self._cookie_jar
1009 @property
1010 def version(self) -> Tuple[int, int]:
1011 """The session HTTP protocol version."""
1012 return self._version
1014 @property
1015 def requote_redirect_url(self) -> bool:
1016 """Do URL requoting on redirection handling."""
1017 return self._requote_redirect_url
1019 @property
1020 def timeout(self) -> ClientTimeout:
1021 """Timeout for the session."""
1022 return self._timeout
1024 @property
1025 def headers(self) -> "CIMultiDict[str]":
1026 """The default headers of the client session."""
1027 return self._default_headers
1029 @property
1030 def skip_auto_headers(self) -> FrozenSet[istr]:
1031 """Headers for which autogeneration should be skipped"""
1032 return self._skip_auto_headers
1034 @property
1035 def auth(self) -> Optional[BasicAuth]:
1036 """An object that represents HTTP Basic Authorization"""
1037 return self._default_auth
1039 @property
1040 def json_serialize(self) -> JSONEncoder:
1041 """Json serializer callable"""
1042 return self._json_serialize
1044 @property
1045 def connector_owner(self) -> bool:
1046 """Should connector be closed on session closing"""
1047 return self._connector_owner
1049 @property
1050 def raise_for_status(
1051 self,
1052 ) -> Union[bool, Callable[[ClientResponse], Awaitable[None]]]:
1053 """Should `ClientResponse.raise_for_status()` be called for each response."""
1054 return self._raise_for_status
1056 @property
1057 def auto_decompress(self) -> bool:
1058 """Should the body response be automatically decompressed."""
1059 return self._auto_decompress
1061 @property
1062 def trust_env(self) -> bool:
1063 """
1064 Should proxies information from environment or netrc be trusted.
1066 Information is from HTTP_PROXY / HTTPS_PROXY environment variables
1067 or ~/.netrc file if present.
1068 """
1069 return self._trust_env
1071 @property
1072 def trace_configs(self) -> List[TraceConfig]:
1073 """A list of TraceConfig instances used for client tracing"""
1074 return self._trace_configs
1076 def detach(self) -> None:
1077 """Detach connector from session without closing the former.
1079 Session is switched to closed state anyway.
1080 """
1081 self._connector = None
1083 async def __aenter__(self) -> "ClientSession":
1084 return self
1086 async def __aexit__(
1087 self,
1088 exc_type: Optional[Type[BaseException]],
1089 exc_val: Optional[BaseException],
1090 exc_tb: Optional[TracebackType],
1091 ) -> None:
1092 await self.close()
1095class _BaseRequestContextManager(Coroutine[Any, Any, _RetType], Generic[_RetType]):
1096 __slots__ = ("_coro", "_resp")
1098 def __init__(self, coro: Coroutine["asyncio.Future[Any]", None, _RetType]) -> None:
1099 self._coro = coro
1101 def send(self, arg: None) -> "asyncio.Future[Any]":
1102 return self._coro.send(arg)
1104 def throw(self, arg: BaseException) -> None: # type: ignore[override]
1105 self._coro.throw(arg)
1107 def close(self) -> None:
1108 return self._coro.close()
1110 def __await__(self) -> Generator[Any, None, _RetType]:
1111 ret = self._coro.__await__()
1112 return ret
1114 def __iter__(self) -> Generator[Any, None, _RetType]:
1115 return self.__await__()
1117 async def __aenter__(self) -> _RetType:
1118 self._resp = await self._coro
1119 return self._resp
1122class _RequestContextManager(_BaseRequestContextManager[ClientResponse]):
1123 __slots__ = ()
1125 async def __aexit__(
1126 self,
1127 exc_type: Optional[Type[BaseException]],
1128 exc: Optional[BaseException],
1129 tb: Optional[TracebackType],
1130 ) -> None:
1131 # We're basing behavior on the exception as it can be caused by
1132 # user code unrelated to the status of the connection. If you
1133 # would like to close a connection you must do that
1134 # explicitly. Otherwise connection error handling should kick in
1135 # and close/recycle the connection as required.
1136 self._resp.release()
1139class _WSRequestContextManager(_BaseRequestContextManager[ClientWebSocketResponse]):
1140 __slots__ = ()
1142 async def __aexit__(
1143 self,
1144 exc_type: Optional[Type[BaseException]],
1145 exc: Optional[BaseException],
1146 tb: Optional[TracebackType],
1147 ) -> None:
1148 await self._resp.close()
1151class _SessionRequestContextManager:
1152 __slots__ = ("_coro", "_resp", "_session")
1154 def __init__(
1155 self,
1156 coro: Coroutine["asyncio.Future[Any]", None, ClientResponse],
1157 session: ClientSession,
1158 ) -> None:
1159 self._coro = coro
1160 self._resp: Optional[ClientResponse] = None
1161 self._session = session
1163 async def __aenter__(self) -> ClientResponse:
1164 try:
1165 self._resp = await self._coro
1166 except BaseException:
1167 await self._session.close()
1168 raise
1169 else:
1170 return self._resp
1172 async def __aexit__(
1173 self,
1174 exc_type: Optional[Type[BaseException]],
1175 exc: Optional[BaseException],
1176 tb: Optional[TracebackType],
1177 ) -> None:
1178 assert self._resp is not None
1179 self._resp.close()
1180 await self._session.close()
1183def request(
1184 method: str,
1185 url: StrOrURL,
1186 *,
1187 params: Optional[Mapping[str, str]] = None,
1188 data: Any = None,
1189 json: Any = None,
1190 headers: Optional[LooseHeaders] = None,
1191 skip_auto_headers: Optional[Iterable[str]] = None,
1192 auth: Optional[BasicAuth] = None,
1193 allow_redirects: bool = True,
1194 max_redirects: int = 10,
1195 compress: Optional[str] = None,
1196 chunked: Optional[bool] = None,
1197 expect100: bool = False,
1198 raise_for_status: Optional[bool] = None,
1199 read_until_eof: bool = True,
1200 proxy: Optional[StrOrURL] = None,
1201 proxy_auth: Optional[BasicAuth] = None,
1202 timeout: Union[ClientTimeout, _SENTINEL] = sentinel,
1203 cookies: Optional[LooseCookies] = None,
1204 version: HttpVersion = http.HttpVersion11,
1205 connector: Optional[BaseConnector] = None,
1206 read_bufsize: Optional[int] = None,
1207 max_line_size: int = 8190,
1208 max_field_size: int = 8190,
1209) -> _SessionRequestContextManager:
1210 """Constructs and sends a request.
1212 Returns response object.
1213 method - HTTP method
1214 url - request url
1215 params - (optional) Dictionary or bytes to be sent in the query
1216 string of the new request
1217 data - (optional) Dictionary, bytes, or file-like object to
1218 send in the body of the request
1219 json - (optional) Any json compatible python object
1220 headers - (optional) Dictionary of HTTP Headers to send with
1221 the request
1222 cookies - (optional) Dict object to send with the request
1223 auth - (optional) BasicAuth named tuple represent HTTP Basic Auth
1224 auth - aiohttp.helpers.BasicAuth
1225 allow_redirects - (optional) If set to False, do not follow
1226 redirects
1227 version - Request HTTP version.
1228 compress - Set to True if request has to be compressed
1229 with deflate encoding.
1230 chunked - Set to chunk size for chunked transfer encoding.
1231 expect100 - Expect 100-continue response from server.
1232 connector - BaseConnector sub-class instance to support
1233 connection pooling.
1234 read_until_eof - Read response until eof if response
1235 does not have Content-Length header.
1236 loop - Optional event loop.
1237 timeout - Optional ClientTimeout settings structure, 5min
1238 total timeout by default.
1239 Usage::
1240 >>> import aiohttp
1241 >>> async with aiohttp.request('GET', 'http://python.org/') as resp:
1242 ... print(resp)
1243 ... data = await resp.read()
1244 <ClientResponse(https://www.python.org/) [200 OK]>
1245 """
1246 connector_owner = False
1247 if connector is None:
1248 connector_owner = True
1249 connector = TCPConnector(force_close=True)
1251 session = ClientSession(
1252 cookies=cookies,
1253 version=version,
1254 timeout=timeout,
1255 connector=connector,
1256 connector_owner=connector_owner,
1257 )
1259 return _SessionRequestContextManager(
1260 session._request(
1261 method,
1262 url,
1263 params=params,
1264 data=data,
1265 json=json,
1266 headers=headers,
1267 skip_auto_headers=skip_auto_headers,
1268 auth=auth,
1269 allow_redirects=allow_redirects,
1270 max_redirects=max_redirects,
1271 compress=compress,
1272 chunked=chunked,
1273 expect100=expect100,
1274 raise_for_status=raise_for_status,
1275 read_until_eof=read_until_eof,
1276 proxy=proxy,
1277 proxy_auth=proxy_auth,
1278 read_bufsize=read_bufsize,
1279 max_line_size=max_line_size,
1280 max_field_size=max_field_size,
1281 ),
1282 session,
1283 )