Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client.py: 31%
478 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
1"""HTTP Client for asyncio."""
3import asyncio
4import base64
5import dataclasses
6import hashlib
7import json
8import os
9import sys
10import traceback
11import warnings
12from contextlib import suppress
13from types import SimpleNamespace, TracebackType
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Awaitable,
18 Callable,
19 Collection,
20 Coroutine,
21 Final,
22 FrozenSet,
23 Generator,
24 Generic,
25 Iterable,
26 List,
27 Mapping,
28 Optional,
29 Set,
30 Tuple,
31 Type,
32 TypeVar,
33 Union,
34 final,
35)
37from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr
38from yarl import URL
40from . import hdrs, http, payload
41from .abc import AbstractCookieJar
42from .client_exceptions import (
43 ClientConnectionError,
44 ClientConnectorCertificateError,
45 ClientConnectorError,
46 ClientConnectorSSLError,
47 ClientError,
48 ClientHttpProxyError,
49 ClientOSError,
50 ClientPayloadError,
51 ClientProxyConnectionError,
52 ClientResponseError,
53 ClientSSLError,
54 ConnectionTimeoutError,
55 ContentTypeError,
56 InvalidURL,
57 ServerConnectionError,
58 ServerDisconnectedError,
59 ServerFingerprintMismatch,
60 ServerTimeoutError,
61 SocketTimeoutError,
62 TooManyRedirects,
63 WSServerHandshakeError,
64)
65from .client_reqrep import (
66 SSL_ALLOWED_TYPES,
67 ClientRequest,
68 ClientResponse,
69 Fingerprint,
70 RequestInfo,
71)
72from .client_ws import (
73 DEFAULT_WS_CLIENT_TIMEOUT,
74 ClientWebSocketResponse,
75 ClientWSTimeout,
76)
77from .connector import BaseConnector, NamedPipeConnector, TCPConnector, UnixConnector
78from .cookiejar import CookieJar
79from .helpers import (
80 _SENTINEL,
81 BasicAuth,
82 TimeoutHandle,
83 ceil_timeout,
84 get_env_proxy_for_url,
85 method_must_be_empty_body,
86 sentinel,
87 strip_auth_from_url,
88)
89from .http import WS_KEY, HttpVersion, WebSocketReader, WebSocketWriter
90from .http_websocket import WSHandshakeError, WSMessage, ws_ext_gen, ws_ext_parse
91from .streams import FlowControlDataQueue
92from .tracing import Trace, TraceConfig
93from .typedefs import JSONEncoder, LooseCookies, LooseHeaders, StrOrURL
95__all__ = (
96 # client_exceptions
97 "ClientConnectionError",
98 "ClientConnectorCertificateError",
99 "ClientConnectorError",
100 "ClientConnectorSSLError",
101 "ClientError",
102 "ClientHttpProxyError",
103 "ClientOSError",
104 "ClientPayloadError",
105 "ClientProxyConnectionError",
106 "ClientResponseError",
107 "ClientSSLError",
108 "ConnectionTimeoutError",
109 "ContentTypeError",
110 "InvalidURL",
111 "ServerConnectionError",
112 "ServerDisconnectedError",
113 "ServerFingerprintMismatch",
114 "ServerTimeoutError",
115 "SocketTimeoutError",
116 "TooManyRedirects",
117 "WSServerHandshakeError",
118 # client_reqrep
119 "ClientRequest",
120 "ClientResponse",
121 "Fingerprint",
122 "RequestInfo",
123 # connector
124 "BaseConnector",
125 "TCPConnector",
126 "UnixConnector",
127 "NamedPipeConnector",
128 # client_ws
129 "ClientWebSocketResponse",
130 # client
131 "ClientSession",
132 "ClientTimeout",
133 "request",
134)
137if TYPE_CHECKING:
138 from ssl import SSLContext
139else:
140 SSLContext = None
143@dataclasses.dataclass(frozen=True)
144class ClientTimeout:
145 total: Optional[float] = None
146 connect: Optional[float] = None
147 sock_read: Optional[float] = None
148 sock_connect: Optional[float] = None
149 ceil_threshold: float = 5
151 # pool_queue_timeout: Optional[float] = None
152 # dns_resolution_timeout: Optional[float] = None
153 # socket_connect_timeout: Optional[float] = None
154 # connection_acquiring_timeout: Optional[float] = None
155 # new_connection_timeout: Optional[float] = None
156 # http_header_timeout: Optional[float] = None
157 # response_body_timeout: Optional[float] = None
159 # to create a timeout specific for a single request, either
160 # - create a completely new one to overwrite the default
161 # - or use https://docs.python.org/3/library/dataclasses.html#dataclasses.replace
162 # to overwrite the defaults
165# 5 Minute default read timeout
166DEFAULT_TIMEOUT: Final[ClientTimeout] = ClientTimeout(total=5 * 60)
168# https://www.rfc-editor.org/rfc/rfc9110#section-9.2.2
169IDEMPOTENT_METHODS = frozenset({"GET", "HEAD", "OPTIONS", "TRACE", "PUT", "DELETE"})
171_RetType = TypeVar("_RetType")
172_CharsetResolver = Callable[[ClientResponse, bytes], str]
175@final
176class ClientSession:
177 """First-class interface for making HTTP requests."""
179 __slots__ = (
180 "_base_url",
181 "_source_traceback",
182 "_connector",
183 "_loop",
184 "_cookie_jar",
185 "_connector_owner",
186 "_default_auth",
187 "_version",
188 "_json_serialize",
189 "_requote_redirect_url",
190 "_timeout",
191 "_raise_for_status",
192 "_auto_decompress",
193 "_trust_env",
194 "_default_headers",
195 "_skip_auto_headers",
196 "_request_class",
197 "_response_class",
198 "_ws_response_class",
199 "_trace_configs",
200 "_read_bufsize",
201 "_max_line_size",
202 "_max_field_size",
203 "_resolve_charset",
204 )
206 def __init__(
207 self,
208 base_url: Optional[StrOrURL] = None,
209 *,
210 connector: Optional[BaseConnector] = None,
211 cookies: Optional[LooseCookies] = None,
212 headers: Optional[LooseHeaders] = None,
213 skip_auto_headers: Optional[Iterable[str]] = None,
214 auth: Optional[BasicAuth] = None,
215 json_serialize: JSONEncoder = json.dumps,
216 request_class: Type[ClientRequest] = ClientRequest,
217 response_class: Type[ClientResponse] = ClientResponse,
218 ws_response_class: Type[ClientWebSocketResponse] = ClientWebSocketResponse,
219 version: HttpVersion = http.HttpVersion11,
220 cookie_jar: Optional[AbstractCookieJar] = None,
221 connector_owner: bool = True,
222 raise_for_status: Union[
223 bool, Callable[[ClientResponse], Awaitable[None]]
224 ] = False,
225 timeout: Union[_SENTINEL, ClientTimeout, None] = sentinel,
226 auto_decompress: bool = True,
227 trust_env: bool = False,
228 requote_redirect_url: bool = True,
229 trace_configs: Optional[List[TraceConfig]] = None,
230 read_bufsize: int = 2**16,
231 max_line_size: int = 8190,
232 max_field_size: int = 8190,
233 fallback_charset_resolver: _CharsetResolver = lambda r, b: "utf-8",
234 ) -> None:
235 if base_url is None or isinstance(base_url, URL):
236 self._base_url: Optional[URL] = base_url
237 else:
238 self._base_url = URL(base_url)
239 assert (
240 self._base_url.origin() == self._base_url
241 ), "Only absolute URLs without path part are supported"
243 loop = asyncio.get_running_loop()
245 if connector is None:
246 connector = TCPConnector()
248 # Initialize these three attrs before raising any exception,
249 # they are used in __del__
250 self._connector: Optional[BaseConnector] = connector
251 self._loop = loop
252 if loop.get_debug():
253 self._source_traceback: Optional[
254 traceback.StackSummary
255 ] = traceback.extract_stack(sys._getframe(1))
256 else:
257 self._source_traceback = None
259 if connector._loop is not loop:
260 raise RuntimeError("Session and connector have to use same event loop")
262 if cookie_jar is None:
263 cookie_jar = CookieJar()
264 self._cookie_jar = cookie_jar
266 if cookies is not None:
267 self._cookie_jar.update_cookies(cookies)
269 self._connector_owner = connector_owner
270 self._default_auth = auth
271 self._version = version
272 self._json_serialize = json_serialize
273 if timeout is sentinel or timeout is None:
274 timeout = DEFAULT_TIMEOUT
275 if not isinstance(timeout, ClientTimeout):
276 raise ValueError(
277 f"timeout parameter cannot be of {type(timeout)} type, "
278 "please use 'timeout=ClientTimeout(...)'",
279 )
280 self._timeout = timeout
281 self._raise_for_status = raise_for_status
282 self._auto_decompress = auto_decompress
283 self._trust_env = trust_env
284 self._requote_redirect_url = requote_redirect_url
285 self._read_bufsize = read_bufsize
286 self._max_line_size = max_line_size
287 self._max_field_size = max_field_size
289 # Convert to list of tuples
290 if headers:
291 real_headers: CIMultiDict[str] = CIMultiDict(headers)
292 else:
293 real_headers = CIMultiDict()
294 self._default_headers: CIMultiDict[str] = real_headers
295 if skip_auto_headers is not None:
296 self._skip_auto_headers = frozenset(istr(i) for i in skip_auto_headers)
297 else:
298 self._skip_auto_headers = frozenset()
300 self._request_class = request_class
301 self._response_class = response_class
302 self._ws_response_class = ws_response_class
304 self._trace_configs = trace_configs or []
305 for trace_config in self._trace_configs:
306 trace_config.freeze()
308 self._resolve_charset = fallback_charset_resolver
310 def __init_subclass__(cls: Type["ClientSession"]) -> None:
311 raise TypeError(
312 "Inheritance class {} from ClientSession "
313 "is forbidden".format(cls.__name__)
314 )
316 def __del__(self, _warnings: Any = warnings) -> None:
317 if not self.closed:
318 _warnings.warn(
319 f"Unclosed client session {self!r}",
320 ResourceWarning,
321 source=self,
322 )
323 context = {"client_session": self, "message": "Unclosed client session"}
324 if self._source_traceback is not None:
325 context["source_traceback"] = self._source_traceback
326 self._loop.call_exception_handler(context)
328 def request(
329 self, method: str, url: StrOrURL, **kwargs: Any
330 ) -> "_RequestContextManager":
331 """Perform HTTP request."""
332 return _RequestContextManager(self._request(method, url, **kwargs))
334 def _build_url(self, str_or_url: StrOrURL) -> URL:
335 url = URL(str_or_url)
336 if self._base_url is None:
337 return url
338 else:
339 assert not url.is_absolute() and url.path.startswith("/")
340 return self._base_url.join(url)
342 async def _request(
343 self,
344 method: str,
345 str_or_url: StrOrURL,
346 *,
347 params: Optional[Mapping[str, str]] = None,
348 data: Any = None,
349 json: Any = None,
350 cookies: Optional[LooseCookies] = None,
351 headers: Optional[LooseHeaders] = None,
352 skip_auto_headers: Optional[Iterable[str]] = None,
353 auth: Optional[BasicAuth] = None,
354 allow_redirects: bool = True,
355 max_redirects: int = 10,
356 compress: Optional[str] = None,
357 chunked: Optional[bool] = None,
358 expect100: bool = False,
359 raise_for_status: Union[
360 None, bool, Callable[[ClientResponse], Awaitable[None]]
361 ] = None,
362 read_until_eof: bool = True,
363 proxy: Optional[StrOrURL] = None,
364 proxy_auth: Optional[BasicAuth] = None,
365 timeout: Union[ClientTimeout, _SENTINEL, None] = sentinel,
366 ssl: Union[SSLContext, bool, Fingerprint] = True,
367 server_hostname: Optional[str] = None,
368 proxy_headers: Optional[LooseHeaders] = None,
369 trace_request_ctx: Optional[SimpleNamespace] = None,
370 read_bufsize: Optional[int] = None,
371 auto_decompress: Optional[bool] = None,
372 max_line_size: Optional[int] = None,
373 max_field_size: Optional[int] = None,
374 ) -> ClientResponse:
375 # NOTE: timeout clamps existing connect and read timeouts. We cannot
376 # set the default to None because we need to detect if the user wants
377 # to use the existing timeouts by setting timeout to None.
379 if self.closed:
380 raise RuntimeError("Session is closed")
382 if not isinstance(ssl, SSL_ALLOWED_TYPES):
383 raise TypeError(
384 "ssl should be SSLContext, Fingerprint, or bool, "
385 "got {!r} instead.".format(ssl)
386 )
388 if data is not None and json is not None:
389 raise ValueError(
390 "data and json parameters can not be used at the same time"
391 )
392 elif json is not None:
393 data = payload.JsonPayload(json, dumps=self._json_serialize)
395 redirects = 0
396 history = []
397 version = self._version
398 params = params or {}
400 # Merge with default headers and transform to CIMultiDict
401 headers = self._prepare_headers(headers)
402 proxy_headers = self._prepare_headers(proxy_headers)
404 try:
405 url = self._build_url(str_or_url)
406 except ValueError as e:
407 raise InvalidURL(str_or_url) from e
409 skip_headers = set(self._skip_auto_headers)
410 if skip_auto_headers is not None:
411 for i in skip_auto_headers:
412 skip_headers.add(istr(i))
414 if proxy is not None:
415 try:
416 proxy = URL(proxy)
417 except ValueError as e:
418 raise InvalidURL(proxy) from e
420 if timeout is sentinel or timeout is None:
421 real_timeout: ClientTimeout = self._timeout
422 else:
423 real_timeout = timeout
424 # timeout is cumulative for all request operations
425 # (request, redirects, responses, data consuming)
426 tm = TimeoutHandle(
427 self._loop, real_timeout.total, ceil_threshold=real_timeout.ceil_threshold
428 )
429 handle = tm.start()
431 if read_bufsize is None:
432 read_bufsize = self._read_bufsize
434 if auto_decompress is None:
435 auto_decompress = self._auto_decompress
437 if max_line_size is None:
438 max_line_size = self._max_line_size
440 if max_field_size is None:
441 max_field_size = self._max_field_size
443 traces = [
444 Trace(
445 self,
446 trace_config,
447 trace_config.trace_config_ctx(trace_request_ctx=trace_request_ctx),
448 )
449 for trace_config in self._trace_configs
450 ]
452 for trace in traces:
453 await trace.send_request_start(method, url.update_query(params), headers)
455 timer = tm.timer()
456 try:
457 with timer:
458 # https://www.rfc-editor.org/rfc/rfc9112.html#name-retrying-requests
459 retry_persistent_connection = method in IDEMPOTENT_METHODS
460 while True:
461 url, auth_from_url = strip_auth_from_url(url)
462 if auth and auth_from_url:
463 raise ValueError(
464 "Cannot combine AUTH argument with "
465 "credentials encoded in URL"
466 )
468 if auth is None:
469 auth = auth_from_url
470 if auth is None:
471 auth = self._default_auth
472 # It would be confusing if we support explicit
473 # Authorization header with auth argument
474 if auth is not None and hdrs.AUTHORIZATION in headers:
475 raise ValueError(
476 "Cannot combine AUTHORIZATION header "
477 "with AUTH argument or credentials "
478 "encoded in URL"
479 )
481 all_cookies = self._cookie_jar.filter_cookies(url)
483 if cookies is not None:
484 tmp_cookie_jar = CookieJar()
485 tmp_cookie_jar.update_cookies(cookies)
486 req_cookies = tmp_cookie_jar.filter_cookies(url)
487 if req_cookies:
488 all_cookies.load(req_cookies)
490 if proxy is not None:
491 proxy = URL(proxy)
492 elif self._trust_env:
493 with suppress(LookupError):
494 proxy, proxy_auth = get_env_proxy_for_url(url)
496 req = self._request_class(
497 method,
498 url,
499 params=params,
500 headers=headers,
501 skip_auto_headers=skip_headers,
502 data=data,
503 cookies=all_cookies,
504 auth=auth,
505 version=version,
506 compress=compress,
507 chunked=chunked,
508 expect100=expect100,
509 loop=self._loop,
510 response_class=self._response_class,
511 proxy=proxy,
512 proxy_auth=proxy_auth,
513 timer=timer,
514 session=self,
515 ssl=ssl,
516 server_hostname=server_hostname,
517 proxy_headers=proxy_headers,
518 traces=traces,
519 trust_env=self.trust_env,
520 )
522 # connection timeout
523 try:
524 async with ceil_timeout(
525 real_timeout.connect,
526 ceil_threshold=real_timeout.ceil_threshold,
527 ):
528 assert self._connector is not None
529 conn = await self._connector.connect(
530 req, traces=traces, timeout=real_timeout
531 )
532 except asyncio.TimeoutError as exc:
533 raise ConnectionTimeoutError(
534 f"Connection timeout to host {url}"
535 ) from exc
537 assert conn.transport is not None
539 assert conn.protocol is not None
540 conn.protocol.set_response_params(
541 timer=timer,
542 skip_payload=method_must_be_empty_body(method),
543 read_until_eof=read_until_eof,
544 auto_decompress=auto_decompress,
545 read_timeout=real_timeout.sock_read,
546 read_bufsize=read_bufsize,
547 timeout_ceil_threshold=self._connector._timeout_ceil_threshold,
548 max_line_size=max_line_size,
549 max_field_size=max_field_size,
550 )
552 try:
553 try:
554 resp = await req.send(conn)
555 try:
556 await resp.start(conn)
557 except BaseException:
558 resp.close()
559 raise
560 except BaseException:
561 conn.close()
562 raise
563 except (ClientOSError, ServerDisconnectedError):
564 if retry_persistent_connection:
565 retry_persistent_connection = False
566 continue
567 raise
568 except ClientError:
569 raise
570 except OSError as exc:
571 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
572 raise
573 raise ClientOSError(*exc.args) from exc
575 self._cookie_jar.update_cookies(resp.cookies, resp.url)
577 # redirects
578 if resp.status in (301, 302, 303, 307, 308) and allow_redirects:
579 for trace in traces:
580 await trace.send_request_redirect(
581 method, url.update_query(params), headers, resp
582 )
584 redirects += 1
585 history.append(resp)
586 if max_redirects and redirects >= max_redirects:
587 resp.close()
588 raise TooManyRedirects(
589 history[0].request_info, tuple(history)
590 )
592 # For 301 and 302, mimic IE, now changed in RFC
593 # https://github.com/kennethreitz/requests/pull/269
594 if (resp.status == 303 and resp.method != hdrs.METH_HEAD) or (
595 resp.status in (301, 302) and resp.method == hdrs.METH_POST
596 ):
597 method = hdrs.METH_GET
598 data = None
599 if headers.get(hdrs.CONTENT_LENGTH):
600 headers.pop(hdrs.CONTENT_LENGTH)
602 r_url = resp.headers.get(hdrs.LOCATION) or resp.headers.get(
603 hdrs.URI
604 )
605 if r_url is None:
606 # see github.com/aio-libs/aiohttp/issues/2022
607 break
608 else:
609 # reading from correct redirection
610 # response is forbidden
611 resp.release()
613 try:
614 parsed_url = URL(
615 r_url, encoded=not self._requote_redirect_url
616 )
618 except ValueError as e:
619 raise InvalidURL(r_url) from e
621 scheme = parsed_url.scheme
622 if scheme not in ("http", "https", ""):
623 resp.close()
624 raise ValueError("Can redirect only to http or https")
625 elif not scheme:
626 parsed_url = url.join(parsed_url)
628 is_same_host_https_redirect = (
629 url.host == parsed_url.host
630 and parsed_url.scheme == "https"
631 and url.scheme == "http"
632 )
634 if (
635 url.origin() != parsed_url.origin()
636 and not is_same_host_https_redirect
637 ):
638 auth = None
639 headers.pop(hdrs.AUTHORIZATION, None)
641 url = parsed_url
642 params = {}
643 resp.release()
644 continue
646 break
648 # check response status
649 if raise_for_status is None:
650 raise_for_status = self._raise_for_status
652 if raise_for_status is None:
653 pass
654 elif callable(raise_for_status):
655 await raise_for_status(resp)
656 elif raise_for_status:
657 resp.raise_for_status()
659 # register connection
660 if handle is not None:
661 if resp.connection is not None:
662 resp.connection.add_callback(handle.cancel)
663 else:
664 handle.cancel()
666 resp._history = tuple(history)
668 for trace in traces:
669 await trace.send_request_end(
670 method, url.update_query(params), headers, resp
671 )
672 return resp
674 except BaseException as e:
675 # cleanup timer
676 tm.close()
677 if handle:
678 handle.cancel()
679 handle = None
681 for trace in traces:
682 await trace.send_request_exception(
683 method, url.update_query(params), headers, e
684 )
685 raise
687 def ws_connect(
688 self,
689 url: StrOrURL,
690 *,
691 method: str = hdrs.METH_GET,
692 protocols: Collection[str] = (),
693 timeout: Union[ClientWSTimeout, float, _SENTINEL, None] = sentinel,
694 receive_timeout: Optional[float] = None,
695 autoclose: bool = True,
696 autoping: bool = True,
697 heartbeat: Optional[float] = None,
698 auth: Optional[BasicAuth] = None,
699 origin: Optional[str] = None,
700 params: Optional[Mapping[str, str]] = None,
701 headers: Optional[LooseHeaders] = None,
702 proxy: Optional[StrOrURL] = None,
703 proxy_auth: Optional[BasicAuth] = None,
704 ssl: Union[SSLContext, bool, Fingerprint] = True,
705 server_hostname: Optional[str] = None,
706 proxy_headers: Optional[LooseHeaders] = None,
707 compress: int = 0,
708 max_msg_size: int = 4 * 1024 * 1024,
709 ) -> "_WSRequestContextManager":
710 """Initiate websocket connection."""
711 return _WSRequestContextManager(
712 self._ws_connect(
713 url,
714 method=method,
715 protocols=protocols,
716 timeout=timeout,
717 receive_timeout=receive_timeout,
718 autoclose=autoclose,
719 autoping=autoping,
720 heartbeat=heartbeat,
721 auth=auth,
722 origin=origin,
723 params=params,
724 headers=headers,
725 proxy=proxy,
726 proxy_auth=proxy_auth,
727 ssl=ssl,
728 server_hostname=server_hostname,
729 proxy_headers=proxy_headers,
730 compress=compress,
731 max_msg_size=max_msg_size,
732 )
733 )
735 async def _ws_connect(
736 self,
737 url: StrOrURL,
738 *,
739 method: str = hdrs.METH_GET,
740 protocols: Collection[str] = (),
741 timeout: Union[ClientWSTimeout, float, _SENTINEL, None] = sentinel,
742 receive_timeout: Optional[float] = None,
743 autoclose: bool = True,
744 autoping: bool = True,
745 heartbeat: Optional[float] = None,
746 auth: Optional[BasicAuth] = None,
747 origin: Optional[str] = None,
748 params: Optional[Mapping[str, str]] = None,
749 headers: Optional[LooseHeaders] = None,
750 proxy: Optional[StrOrURL] = None,
751 proxy_auth: Optional[BasicAuth] = None,
752 ssl: Union[SSLContext, bool, Fingerprint] = True,
753 server_hostname: Optional[str] = None,
754 proxy_headers: Optional[LooseHeaders] = None,
755 compress: int = 0,
756 max_msg_size: int = 4 * 1024 * 1024,
757 ) -> ClientWebSocketResponse:
758 if timeout is sentinel or timeout is None:
759 ws_timeout = DEFAULT_WS_CLIENT_TIMEOUT
760 else:
761 if isinstance(timeout, ClientWSTimeout):
762 ws_timeout = timeout
763 else:
764 warnings.warn(
765 "parameter 'timeout' of type 'float' "
766 "is deprecated, please use "
767 "'timeout=ClientWSTimeout(ws_close=...)'",
768 DeprecationWarning,
769 stacklevel=2,
770 )
771 ws_timeout = ClientWSTimeout(ws_close=timeout)
773 if receive_timeout is not None:
774 warnings.warn(
775 "float parameter 'receive_timeout' "
776 "is deprecated, please use parameter "
777 "'timeout=ClientWSTimeout(ws_receive=...)'",
778 DeprecationWarning,
779 stacklevel=2,
780 )
781 ws_timeout = dataclasses.replace(ws_timeout, ws_receive=receive_timeout)
783 if headers is None:
784 real_headers: CIMultiDict[str] = CIMultiDict()
785 else:
786 real_headers = CIMultiDict(headers)
788 default_headers = {
789 hdrs.UPGRADE: "websocket",
790 hdrs.CONNECTION: "Upgrade",
791 hdrs.SEC_WEBSOCKET_VERSION: "13",
792 }
794 for key, value in default_headers.items():
795 real_headers.setdefault(key, value)
797 sec_key = base64.b64encode(os.urandom(16))
798 real_headers[hdrs.SEC_WEBSOCKET_KEY] = sec_key.decode()
800 if protocols:
801 real_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = ",".join(protocols)
802 if origin is not None:
803 real_headers[hdrs.ORIGIN] = origin
804 if compress:
805 extstr = ws_ext_gen(compress=compress)
806 real_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = extstr
808 if not isinstance(ssl, SSL_ALLOWED_TYPES):
809 raise TypeError(
810 "ssl should be SSLContext, Fingerprint, or bool, "
811 "got {!r} instead.".format(ssl)
812 )
814 # send request
815 resp = await self.request(
816 method,
817 url,
818 params=params,
819 headers=real_headers,
820 read_until_eof=False,
821 auth=auth,
822 proxy=proxy,
823 proxy_auth=proxy_auth,
824 ssl=ssl,
825 server_hostname=server_hostname,
826 proxy_headers=proxy_headers,
827 )
829 try:
830 # check handshake
831 if resp.status != 101:
832 raise WSServerHandshakeError(
833 resp.request_info,
834 resp.history,
835 message="Invalid response status",
836 status=resp.status,
837 headers=resp.headers,
838 )
840 if resp.headers.get(hdrs.UPGRADE, "").lower() != "websocket":
841 raise WSServerHandshakeError(
842 resp.request_info,
843 resp.history,
844 message="Invalid upgrade header",
845 status=resp.status,
846 headers=resp.headers,
847 )
849 if resp.headers.get(hdrs.CONNECTION, "").lower() != "upgrade":
850 raise WSServerHandshakeError(
851 resp.request_info,
852 resp.history,
853 message="Invalid connection header",
854 status=resp.status,
855 headers=resp.headers,
856 )
858 # key calculation
859 r_key = resp.headers.get(hdrs.SEC_WEBSOCKET_ACCEPT, "")
860 match = base64.b64encode(hashlib.sha1(sec_key + WS_KEY).digest()).decode()
861 if r_key != match:
862 raise WSServerHandshakeError(
863 resp.request_info,
864 resp.history,
865 message="Invalid challenge response",
866 status=resp.status,
867 headers=resp.headers,
868 )
870 # websocket protocol
871 protocol = None
872 if protocols and hdrs.SEC_WEBSOCKET_PROTOCOL in resp.headers:
873 resp_protocols = [
874 proto.strip()
875 for proto in resp.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",")
876 ]
878 for proto in resp_protocols:
879 if proto in protocols:
880 protocol = proto
881 break
883 # websocket compress
884 notakeover = False
885 if compress:
886 compress_hdrs = resp.headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS)
887 if compress_hdrs:
888 try:
889 compress, notakeover = ws_ext_parse(compress_hdrs)
890 except WSHandshakeError as exc:
891 raise WSServerHandshakeError(
892 resp.request_info,
893 resp.history,
894 message=exc.args[0],
895 status=resp.status,
896 headers=resp.headers,
897 ) from exc
898 else:
899 compress = 0
900 notakeover = False
902 conn = resp.connection
903 assert conn is not None
904 conn_proto = conn.protocol
905 assert conn_proto is not None
906 transport = conn.transport
907 assert transport is not None
908 reader: FlowControlDataQueue[WSMessage] = FlowControlDataQueue(
909 conn_proto, 2**16, loop=self._loop
910 )
911 conn_proto.set_parser(WebSocketReader(reader, max_msg_size), reader)
912 writer = WebSocketWriter(
913 conn_proto,
914 transport,
915 use_mask=True,
916 compress=compress,
917 notakeover=notakeover,
918 )
919 except BaseException:
920 resp.close()
921 raise
922 else:
923 return self._ws_response_class(
924 reader,
925 writer,
926 protocol,
927 resp,
928 ws_timeout,
929 autoclose,
930 autoping,
931 self._loop,
932 heartbeat=heartbeat,
933 compress=compress,
934 client_notakeover=notakeover,
935 )
937 def _prepare_headers(self, headers: Optional[LooseHeaders]) -> "CIMultiDict[str]":
938 """Add default headers and transform it to CIMultiDict"""
939 # Convert headers to MultiDict
940 result = CIMultiDict(self._default_headers)
941 if headers:
942 if not isinstance(headers, (MultiDictProxy, MultiDict)):
943 headers = CIMultiDict(headers)
944 added_names: Set[str] = set()
945 for key, value in headers.items():
946 if key in added_names:
947 result.add(key, value)
948 else:
949 result[key] = value
950 added_names.add(key)
951 return result
953 def get(
954 self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any
955 ) -> "_RequestContextManager":
956 """Perform HTTP GET request."""
957 return _RequestContextManager(
958 self._request(hdrs.METH_GET, url, allow_redirects=allow_redirects, **kwargs)
959 )
961 def options(
962 self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any
963 ) -> "_RequestContextManager":
964 """Perform HTTP OPTIONS request."""
965 return _RequestContextManager(
966 self._request(
967 hdrs.METH_OPTIONS, url, allow_redirects=allow_redirects, **kwargs
968 )
969 )
971 def head(
972 self, url: StrOrURL, *, allow_redirects: bool = False, **kwargs: Any
973 ) -> "_RequestContextManager":
974 """Perform HTTP HEAD request."""
975 return _RequestContextManager(
976 self._request(
977 hdrs.METH_HEAD, url, allow_redirects=allow_redirects, **kwargs
978 )
979 )
981 def post(
982 self, url: StrOrURL, *, data: Any = None, **kwargs: Any
983 ) -> "_RequestContextManager":
984 """Perform HTTP POST request."""
985 return _RequestContextManager(
986 self._request(hdrs.METH_POST, url, data=data, **kwargs)
987 )
989 def put(
990 self, url: StrOrURL, *, data: Any = None, **kwargs: Any
991 ) -> "_RequestContextManager":
992 """Perform HTTP PUT request."""
993 return _RequestContextManager(
994 self._request(hdrs.METH_PUT, url, data=data, **kwargs)
995 )
997 def patch(
998 self, url: StrOrURL, *, data: Any = None, **kwargs: Any
999 ) -> "_RequestContextManager":
1000 """Perform HTTP PATCH request."""
1001 return _RequestContextManager(
1002 self._request(hdrs.METH_PATCH, url, data=data, **kwargs)
1003 )
1005 def delete(self, url: StrOrURL, **kwargs: Any) -> "_RequestContextManager":
1006 """Perform HTTP DELETE request."""
1007 return _RequestContextManager(self._request(hdrs.METH_DELETE, url, **kwargs))
1009 async def close(self) -> None:
1010 """Close underlying connector.
1012 Release all acquired resources.
1013 """
1014 if not self.closed:
1015 if self._connector is not None and self._connector_owner:
1016 await self._connector.close()
1017 self._connector = None
1019 @property
1020 def closed(self) -> bool:
1021 """Is client session closed.
1023 A readonly property.
1024 """
1025 return self._connector is None or self._connector.closed
1027 @property
1028 def connector(self) -> Optional[BaseConnector]:
1029 """Connector instance used for the session."""
1030 return self._connector
1032 @property
1033 def cookie_jar(self) -> AbstractCookieJar:
1034 """The session cookies."""
1035 return self._cookie_jar
1037 @property
1038 def version(self) -> Tuple[int, int]:
1039 """The session HTTP protocol version."""
1040 return self._version
1042 @property
1043 def requote_redirect_url(self) -> bool:
1044 """Do URL requoting on redirection handling."""
1045 return self._requote_redirect_url
1047 @property
1048 def timeout(self) -> ClientTimeout:
1049 """Timeout for the session."""
1050 return self._timeout
1052 @property
1053 def headers(self) -> "CIMultiDict[str]":
1054 """The default headers of the client session."""
1055 return self._default_headers
1057 @property
1058 def skip_auto_headers(self) -> FrozenSet[istr]:
1059 """Headers for which autogeneration should be skipped"""
1060 return self._skip_auto_headers
1062 @property
1063 def auth(self) -> Optional[BasicAuth]:
1064 """An object that represents HTTP Basic Authorization"""
1065 return self._default_auth
1067 @property
1068 def json_serialize(self) -> JSONEncoder:
1069 """Json serializer callable"""
1070 return self._json_serialize
1072 @property
1073 def connector_owner(self) -> bool:
1074 """Should connector be closed on session closing"""
1075 return self._connector_owner
1077 @property
1078 def raise_for_status(
1079 self,
1080 ) -> Union[bool, Callable[[ClientResponse], Awaitable[None]]]:
1081 """Should `ClientResponse.raise_for_status()` be called for each response."""
1082 return self._raise_for_status
1084 @property
1085 def auto_decompress(self) -> bool:
1086 """Should the body response be automatically decompressed."""
1087 return self._auto_decompress
1089 @property
1090 def trust_env(self) -> bool:
1091 """
1092 Should proxies information from environment or netrc be trusted.
1094 Information is from HTTP_PROXY / HTTPS_PROXY environment variables
1095 or ~/.netrc file if present.
1096 """
1097 return self._trust_env
1099 @property
1100 def trace_configs(self) -> List[TraceConfig]:
1101 """A list of TraceConfig instances used for client tracing"""
1102 return self._trace_configs
1104 def detach(self) -> None:
1105 """Detach connector from session without closing the former.
1107 Session is switched to closed state anyway.
1108 """
1109 self._connector = None
1111 async def __aenter__(self) -> "ClientSession":
1112 return self
1114 async def __aexit__(
1115 self,
1116 exc_type: Optional[Type[BaseException]],
1117 exc_val: Optional[BaseException],
1118 exc_tb: Optional[TracebackType],
1119 ) -> None:
1120 await self.close()
1123class _BaseRequestContextManager(Coroutine[Any, Any, _RetType], Generic[_RetType]):
1124 __slots__ = ("_coro", "_resp")
1126 def __init__(self, coro: Coroutine["asyncio.Future[Any]", None, _RetType]) -> None:
1127 self._coro = coro
1129 def send(self, arg: None) -> "asyncio.Future[Any]":
1130 return self._coro.send(arg)
1132 def throw(self, *args: Any, **kwargs: Any) -> "asyncio.Future[Any]":
1133 return self._coro.throw(*args, **kwargs)
1135 def close(self) -> None:
1136 return self._coro.close()
1138 def __await__(self) -> Generator[Any, None, _RetType]:
1139 ret = self._coro.__await__()
1140 return ret
1142 def __iter__(self) -> Generator[Any, None, _RetType]:
1143 return self.__await__()
1145 async def __aenter__(self) -> _RetType:
1146 self._resp = await self._coro
1147 return self._resp
1150class _RequestContextManager(_BaseRequestContextManager[ClientResponse]):
1151 __slots__ = ()
1153 async def __aexit__(
1154 self,
1155 exc_type: Optional[Type[BaseException]],
1156 exc: Optional[BaseException],
1157 tb: Optional[TracebackType],
1158 ) -> None:
1159 # We're basing behavior on the exception as it can be caused by
1160 # user code unrelated to the status of the connection. If you
1161 # would like to close a connection you must do that
1162 # explicitly. Otherwise connection error handling should kick in
1163 # and close/recycle the connection as required.
1164 self._resp.release()
1165 await self._resp.wait_for_close()
1168class _WSRequestContextManager(_BaseRequestContextManager[ClientWebSocketResponse]):
1169 __slots__ = ()
1171 async def __aexit__(
1172 self,
1173 exc_type: Optional[Type[BaseException]],
1174 exc: Optional[BaseException],
1175 tb: Optional[TracebackType],
1176 ) -> None:
1177 await self._resp.close()
1180class _SessionRequestContextManager:
1181 __slots__ = ("_coro", "_resp", "_session")
1183 def __init__(
1184 self,
1185 coro: Coroutine["asyncio.Future[Any]", None, ClientResponse],
1186 session: ClientSession,
1187 ) -> None:
1188 self._coro = coro
1189 self._resp: Optional[ClientResponse] = None
1190 self._session = session
1192 async def __aenter__(self) -> ClientResponse:
1193 try:
1194 self._resp = await self._coro
1195 except BaseException:
1196 await self._session.close()
1197 raise
1198 else:
1199 return self._resp
1201 async def __aexit__(
1202 self,
1203 exc_type: Optional[Type[BaseException]],
1204 exc: Optional[BaseException],
1205 tb: Optional[TracebackType],
1206 ) -> None:
1207 assert self._resp is not None
1208 self._resp.close()
1209 await self._session.close()
1212def request(
1213 method: str,
1214 url: StrOrURL,
1215 *,
1216 params: Optional[Mapping[str, str]] = None,
1217 data: Any = None,
1218 json: Any = None,
1219 headers: Optional[LooseHeaders] = None,
1220 skip_auto_headers: Optional[Iterable[str]] = None,
1221 auth: Optional[BasicAuth] = None,
1222 allow_redirects: bool = True,
1223 max_redirects: int = 10,
1224 compress: Optional[str] = None,
1225 chunked: Optional[bool] = None,
1226 expect100: bool = False,
1227 raise_for_status: Optional[bool] = None,
1228 read_until_eof: bool = True,
1229 proxy: Optional[StrOrURL] = None,
1230 proxy_auth: Optional[BasicAuth] = None,
1231 timeout: Union[ClientTimeout, _SENTINEL] = sentinel,
1232 cookies: Optional[LooseCookies] = None,
1233 version: HttpVersion = http.HttpVersion11,
1234 connector: Optional[BaseConnector] = None,
1235 read_bufsize: Optional[int] = None,
1236 max_line_size: int = 8190,
1237 max_field_size: int = 8190,
1238) -> _SessionRequestContextManager:
1239 """Constructs and sends a request.
1241 Returns response object.
1242 method - HTTP method
1243 url - request url
1244 params - (optional) Dictionary or bytes to be sent in the query
1245 string of the new request
1246 data - (optional) Dictionary, bytes, or file-like object to
1247 send in the body of the request
1248 json - (optional) Any json compatible python object
1249 headers - (optional) Dictionary of HTTP Headers to send with
1250 the request
1251 cookies - (optional) Dict object to send with the request
1252 auth - (optional) BasicAuth named tuple represent HTTP Basic Auth
1253 auth - aiohttp.helpers.BasicAuth
1254 allow_redirects - (optional) If set to False, do not follow
1255 redirects
1256 version - Request HTTP version.
1257 compress - Set to True if request has to be compressed
1258 with deflate encoding.
1259 chunked - Set to chunk size for chunked transfer encoding.
1260 expect100 - Expect 100-continue response from server.
1261 connector - BaseConnector sub-class instance to support
1262 connection pooling.
1263 read_until_eof - Read response until eof if response
1264 does not have Content-Length header.
1265 loop - Optional event loop.
1266 timeout - Optional ClientTimeout settings structure, 5min
1267 total timeout by default.
1268 Usage::
1269 >>> import aiohttp
1270 >>> async with aiohttp.request('GET', 'http://python.org/') as resp:
1271 ... print(resp)
1272 ... data = await resp.read()
1273 <ClientResponse(https://www.python.org/) [200 OK]>
1274 """
1275 connector_owner = False
1276 if connector is None:
1277 connector_owner = True
1278 connector = TCPConnector(force_close=True)
1280 session = ClientSession(
1281 cookies=cookies,
1282 version=version,
1283 timeout=timeout,
1284 connector=connector,
1285 connector_owner=connector_owner,
1286 )
1288 return _SessionRequestContextManager(
1289 session._request(
1290 method,
1291 url,
1292 params=params,
1293 data=data,
1294 json=json,
1295 headers=headers,
1296 skip_auto_headers=skip_auto_headers,
1297 auth=auth,
1298 allow_redirects=allow_redirects,
1299 max_redirects=max_redirects,
1300 compress=compress,
1301 chunked=chunked,
1302 expect100=expect100,
1303 raise_for_status=raise_for_status,
1304 read_until_eof=read_until_eof,
1305 proxy=proxy,
1306 proxy_auth=proxy_auth,
1307 read_bufsize=read_bufsize,
1308 max_line_size=max_line_size,
1309 max_field_size=max_field_size,
1310 ),
1311 session,
1312 )