1import asyncio 
    2import codecs 
    3import contextlib 
    4import functools 
    5import io 
    6import re 
    7import sys 
    8import traceback 
    9import warnings 
    10from collections.abc import Mapping 
    11from hashlib import md5, sha1, sha256 
    12from http.cookies import Morsel, SimpleCookie 
    13from types import MappingProxyType, TracebackType 
    14from typing import ( 
    15    TYPE_CHECKING, 
    16    Any, 
    17    Callable, 
    18    Dict, 
    19    Iterable, 
    20    List, 
    21    Literal, 
    22    NamedTuple, 
    23    Optional, 
    24    Tuple, 
    25    Type, 
    26    Union, 
    27) 
    28 
    29import attr 
    30from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy 
    31from yarl import URL 
    32 
    33from . import hdrs, helpers, http, multipart, payload 
    34from ._cookie_helpers import ( 
    35    parse_cookie_header, 
    36    parse_set_cookie_headers, 
    37    preserve_morsel_with_coded_value, 
    38) 
    39from .abc import AbstractStreamWriter 
    40from .client_exceptions import ( 
    41    ClientConnectionError, 
    42    ClientOSError, 
    43    ClientResponseError, 
    44    ContentTypeError, 
    45    InvalidURL, 
    46    ServerFingerprintMismatch, 
    47) 
    48from .compression_utils import HAS_BROTLI, HAS_ZSTD 
    49from .formdata import FormData 
    50from .helpers import ( 
    51    _SENTINEL, 
    52    BaseTimerContext, 
    53    BasicAuth, 
    54    HeadersMixin, 
    55    TimerNoop, 
    56    noop, 
    57    reify, 
    58    sentinel, 
    59    set_exception, 
    60    set_result, 
    61) 
    62from .http import ( 
    63    SERVER_SOFTWARE, 
    64    HttpVersion, 
    65    HttpVersion10, 
    66    HttpVersion11, 
    67    StreamWriter, 
    68) 
    69from .streams import StreamReader 
    70from .typedefs import ( 
    71    DEFAULT_JSON_DECODER, 
    72    JSONDecoder, 
    73    LooseCookies, 
    74    LooseHeaders, 
    75    Query, 
    76    RawHeaders, 
    77) 
    78 
    79if TYPE_CHECKING: 
    80    import ssl 
    81    from ssl import SSLContext 
    82else: 
    83    try: 
    84        import ssl 
    85        from ssl import SSLContext 
    86    except ImportError:  # pragma: no cover 
    87        ssl = None  # type: ignore[assignment] 
    88        SSLContext = object  # type: ignore[misc,assignment] 
    89 
    90 
    91__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint") 
    92 
    93 
    94if TYPE_CHECKING: 
    95    from .client import ClientSession 
    96    from .connector import Connection 
    97    from .tracing import Trace 
    98 
    99 
    100_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed") 
    101_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 
    102json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json") 
    103 
    104 
    105def _gen_default_accept_encoding() -> str: 
    106    encodings = [ 
    107        "gzip", 
    108        "deflate", 
    109    ] 
    110    if HAS_BROTLI: 
    111        encodings.append("br") 
    112    if HAS_ZSTD: 
    113        encodings.append("zstd") 
    114    return ", ".join(encodings) 
    115 
    116 
    117@attr.s(auto_attribs=True, frozen=True, slots=True) 
    118class ContentDisposition: 
    119    type: Optional[str] 
    120    parameters: "MappingProxyType[str, str]" 
    121    filename: Optional[str] 
    122 
    123 
    124class _RequestInfo(NamedTuple): 
    125    url: URL 
    126    method: str 
    127    headers: "CIMultiDictProxy[str]" 
    128    real_url: URL 
    129 
    130 
    131class RequestInfo(_RequestInfo): 
    132 
    133    def __new__( 
    134        cls, 
    135        url: URL, 
    136        method: str, 
    137        headers: "CIMultiDictProxy[str]", 
    138        real_url: Union[URL, _SENTINEL] = sentinel, 
    139    ) -> "RequestInfo": 
    140        """Create a new RequestInfo instance. 
    141 
    142        For backwards compatibility, the real_url parameter is optional. 
    143        """ 
    144        return tuple.__new__( 
    145            cls, (url, method, headers, url if real_url is sentinel else real_url) 
    146        ) 
    147 
    148 
    149class Fingerprint: 
    150    HASHFUNC_BY_DIGESTLEN = { 
    151        16: md5, 
    152        20: sha1, 
    153        32: sha256, 
    154    } 
    155 
    156    def __init__(self, fingerprint: bytes) -> None: 
    157        digestlen = len(fingerprint) 
    158        hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen) 
    159        if not hashfunc: 
    160            raise ValueError("fingerprint has invalid length") 
    161        elif hashfunc is md5 or hashfunc is sha1: 
    162            raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.") 
    163        self._hashfunc = hashfunc 
    164        self._fingerprint = fingerprint 
    165 
    166    @property 
    167    def fingerprint(self) -> bytes: 
    168        return self._fingerprint 
    169 
    170    def check(self, transport: asyncio.Transport) -> None: 
    171        if not transport.get_extra_info("sslcontext"): 
    172            return 
    173        sslobj = transport.get_extra_info("ssl_object") 
    174        cert = sslobj.getpeercert(binary_form=True) 
    175        got = self._hashfunc(cert).digest() 
    176        if got != self._fingerprint: 
    177            host, port, *_ = transport.get_extra_info("peername") 
    178            raise ServerFingerprintMismatch(self._fingerprint, got, host, port) 
    179 
    180 
    181if ssl is not None: 
    182    SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None)) 
    183else:  # pragma: no cover 
    184    SSL_ALLOWED_TYPES = (bool, type(None)) 
    185 
    186 
    187def _merge_ssl_params( 
    188    ssl: Union["SSLContext", bool, Fingerprint], 
    189    verify_ssl: Optional[bool], 
    190    ssl_context: Optional["SSLContext"], 
    191    fingerprint: Optional[bytes], 
    192) -> Union["SSLContext", bool, Fingerprint]: 
    193    if ssl is None: 
    194        ssl = True  # Double check for backwards compatibility 
    195    if verify_ssl is not None and not verify_ssl: 
    196        warnings.warn( 
    197            "verify_ssl is deprecated, use ssl=False instead", 
    198            DeprecationWarning, 
    199            stacklevel=3, 
    200        ) 
    201        if ssl is not True: 
    202            raise ValueError( 
    203                "verify_ssl, ssl_context, fingerprint and ssl " 
    204                "parameters are mutually exclusive" 
    205            ) 
    206        else: 
    207            ssl = False 
    208    if ssl_context is not None: 
    209        warnings.warn( 
    210            "ssl_context is deprecated, use ssl=context instead", 
    211            DeprecationWarning, 
    212            stacklevel=3, 
    213        ) 
    214        if ssl is not True: 
    215            raise ValueError( 
    216                "verify_ssl, ssl_context, fingerprint and ssl " 
    217                "parameters are mutually exclusive" 
    218            ) 
    219        else: 
    220            ssl = ssl_context 
    221    if fingerprint is not None: 
    222        warnings.warn( 
    223            "fingerprint is deprecated, use ssl=Fingerprint(fingerprint) instead", 
    224            DeprecationWarning, 
    225            stacklevel=3, 
    226        ) 
    227        if ssl is not True: 
    228            raise ValueError( 
    229                "verify_ssl, ssl_context, fingerprint and ssl " 
    230                "parameters are mutually exclusive" 
    231            ) 
    232        else: 
    233            ssl = Fingerprint(fingerprint) 
    234    if not isinstance(ssl, SSL_ALLOWED_TYPES): 
    235        raise TypeError( 
    236            "ssl should be SSLContext, bool, Fingerprint or None, " 
    237            "got {!r} instead.".format(ssl) 
    238        ) 
    239    return ssl 
    240 
    241 
    242_SSL_SCHEMES = frozenset(("https", "wss")) 
    243 
    244 
    245# ConnectionKey is a NamedTuple because it is used as a key in a dict 
    246# and a set in the connector. Since a NamedTuple is a tuple it uses 
    247# the fast native tuple __hash__ and __eq__ implementation in CPython. 
    248class ConnectionKey(NamedTuple): 
    249    # the key should contain an information about used proxy / TLS 
    250    # to prevent reusing wrong connections from a pool 
    251    host: str 
    252    port: Optional[int] 
    253    is_ssl: bool 
    254    ssl: Union[SSLContext, bool, Fingerprint] 
    255    proxy: Optional[URL] 
    256    proxy_auth: Optional[BasicAuth] 
    257    proxy_headers_hash: Optional[int]  # hash(CIMultiDict) 
    258 
    259 
    260def _is_expected_content_type( 
    261    response_content_type: str, expected_content_type: str 
    262) -> bool: 
    263    if expected_content_type == "application/json": 
    264        return json_re.match(response_content_type) is not None 
    265    return expected_content_type in response_content_type 
    266 
    267 
    268def _warn_if_unclosed_payload(payload: payload.Payload, stacklevel: int = 2) -> None: 
    269    """Warn if the payload is not closed. 
    270 
    271    Callers must check that the body is a Payload before calling this method. 
    272 
    273    Args: 
    274        payload: The payload to check 
    275        stacklevel: Stack level for the warning (default 2 for direct callers) 
    276    """ 
    277    if not payload.autoclose and not payload.consumed: 
    278        warnings.warn( 
    279            "The previous request body contains unclosed resources. " 
    280            "Use await request.update_body() instead of setting request.body " 
    281            "directly to properly close resources and avoid leaks.", 
    282            ResourceWarning, 
    283            stacklevel=stacklevel, 
    284        ) 
    285 
    286 
    287class ClientResponse(HeadersMixin): 
    288 
    289    # Some of these attributes are None when created, 
    290    # but will be set by the start() method. 
    291    # As the end user will likely never see the None values, we cheat the types below. 
    292    # from the Status-Line of the response 
    293    version: Optional[HttpVersion] = None  # HTTP-Version 
    294    status: int = None  # type: ignore[assignment] # Status-Code 
    295    reason: Optional[str] = None  # Reason-Phrase 
    296 
    297    content: StreamReader = None  # type: ignore[assignment] # Payload stream 
    298    _body: Optional[bytes] = None 
    299    _headers: CIMultiDictProxy[str] = None  # type: ignore[assignment] 
    300    _history: Tuple["ClientResponse", ...] = () 
    301    _raw_headers: RawHeaders = None  # type: ignore[assignment] 
    302 
    303    _connection: Optional["Connection"] = None  # current connection 
    304    _cookies: Optional[SimpleCookie] = None 
    305    _raw_cookie_headers: Optional[Tuple[str, ...]] = None 
    306    _continue: Optional["asyncio.Future[bool]"] = None 
    307    _source_traceback: Optional[traceback.StackSummary] = None 
    308    _session: Optional["ClientSession"] = None 
    309    # set up by ClientRequest after ClientResponse object creation 
    310    # post-init stage allows to not change ctor signature 
    311    _closed = True  # to allow __del__ for non-initialized properly response 
    312    _released = False 
    313    _in_context = False 
    314 
    315    _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8" 
    316 
    317    __writer: Optional["asyncio.Task[None]"] = None 
    318 
    319    def __init__( 
    320        self, 
    321        method: str, 
    322        url: URL, 
    323        *, 
    324        writer: "Optional[asyncio.Task[None]]", 
    325        continue100: Optional["asyncio.Future[bool]"], 
    326        timer: BaseTimerContext, 
    327        request_info: RequestInfo, 
    328        traces: List["Trace"], 
    329        loop: asyncio.AbstractEventLoop, 
    330        session: "ClientSession", 
    331    ) -> None: 
    332        # URL forbids subclasses, so a simple type check is enough. 
    333        assert type(url) is URL 
    334 
    335        self.method = method 
    336 
    337        self._real_url = url 
    338        self._url = url.with_fragment(None) if url.raw_fragment else url 
    339        if writer is not None: 
    340            self._writer = writer 
    341        if continue100 is not None: 
    342            self._continue = continue100 
    343        self._request_info = request_info 
    344        self._timer = timer if timer is not None else TimerNoop() 
    345        self._cache: Dict[str, Any] = {} 
    346        self._traces = traces 
    347        self._loop = loop 
    348        # Save reference to _resolve_charset, so that get_encoding() will still 
    349        # work after the response has finished reading the body. 
    350        # TODO: Fix session=None in tests (see ClientRequest.__init__). 
    351        if session is not None: 
    352            # store a reference to session #1985 
    353            self._session = session 
    354            self._resolve_charset = session._resolve_charset 
    355        if loop.get_debug(): 
    356            self._source_traceback = traceback.extract_stack(sys._getframe(1)) 
    357 
    358    def __reset_writer(self, _: object = None) -> None: 
    359        self.__writer = None 
    360 
    361    @property 
    362    def _writer(self) -> Optional["asyncio.Task[None]"]: 
    363        """The writer task for streaming data. 
    364 
    365        _writer is only provided for backwards compatibility 
    366        for subclasses that may need to access it. 
    367        """ 
    368        return self.__writer 
    369 
    370    @_writer.setter 
    371    def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None: 
    372        """Set the writer task for streaming data.""" 
    373        if self.__writer is not None: 
    374            self.__writer.remove_done_callback(self.__reset_writer) 
    375        self.__writer = writer 
    376        if writer is None: 
    377            return 
    378        if writer.done(): 
    379            # The writer is already done, so we can clear it immediately. 
    380            self.__writer = None 
    381        else: 
    382            writer.add_done_callback(self.__reset_writer) 
    383 
    384    @property 
    385    def cookies(self) -> SimpleCookie: 
    386        if self._cookies is None: 
    387            if self._raw_cookie_headers is not None: 
    388                # Parse cookies for response.cookies (SimpleCookie for backward compatibility) 
    389                cookies = SimpleCookie() 
    390                # Use parse_set_cookie_headers for more lenient parsing that handles 
    391                # malformed cookies better than SimpleCookie.load 
    392                cookies.update(parse_set_cookie_headers(self._raw_cookie_headers)) 
    393                self._cookies = cookies 
    394            else: 
    395                self._cookies = SimpleCookie() 
    396        return self._cookies 
    397 
    398    @cookies.setter 
    399    def cookies(self, cookies: SimpleCookie) -> None: 
    400        self._cookies = cookies 
    401        # Generate raw cookie headers from the SimpleCookie 
    402        if cookies: 
    403            self._raw_cookie_headers = tuple( 
    404                morsel.OutputString() for morsel in cookies.values() 
    405            ) 
    406        else: 
    407            self._raw_cookie_headers = None 
    408 
    409    @reify 
    410    def url(self) -> URL: 
    411        return self._url 
    412 
    413    @reify 
    414    def url_obj(self) -> URL: 
    415        warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2) 
    416        return self._url 
    417 
    418    @reify 
    419    def real_url(self) -> URL: 
    420        return self._real_url 
    421 
    422    @reify 
    423    def host(self) -> str: 
    424        assert self._url.host is not None 
    425        return self._url.host 
    426 
    427    @reify 
    428    def headers(self) -> "CIMultiDictProxy[str]": 
    429        return self._headers 
    430 
    431    @reify 
    432    def raw_headers(self) -> RawHeaders: 
    433        return self._raw_headers 
    434 
    435    @reify 
    436    def request_info(self) -> RequestInfo: 
    437        return self._request_info 
    438 
    439    @reify 
    440    def content_disposition(self) -> Optional[ContentDisposition]: 
    441        raw = self._headers.get(hdrs.CONTENT_DISPOSITION) 
    442        if raw is None: 
    443            return None 
    444        disposition_type, params_dct = multipart.parse_content_disposition(raw) 
    445        params = MappingProxyType(params_dct) 
    446        filename = multipart.content_disposition_filename(params) 
    447        return ContentDisposition(disposition_type, params, filename) 
    448 
    449    def __del__(self, _warnings: Any = warnings) -> None: 
    450        if self._closed: 
    451            return 
    452 
    453        if self._connection is not None: 
    454            self._connection.release() 
    455            self._cleanup_writer() 
    456 
    457            if self._loop.get_debug(): 
    458                kwargs = {"source": self} 
    459                _warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs) 
    460                context = {"client_response": self, "message": "Unclosed response"} 
    461                if self._source_traceback: 
    462                    context["source_traceback"] = self._source_traceback 
    463                self._loop.call_exception_handler(context) 
    464 
    465    def __repr__(self) -> str: 
    466        out = io.StringIO() 
    467        ascii_encodable_url = str(self.url) 
    468        if self.reason: 
    469            ascii_encodable_reason = self.reason.encode( 
    470                "ascii", "backslashreplace" 
    471            ).decode("ascii") 
    472        else: 
    473            ascii_encodable_reason = "None" 
    474        print( 
    475            "<ClientResponse({}) [{} {}]>".format( 
    476                ascii_encodable_url, self.status, ascii_encodable_reason 
    477            ), 
    478            file=out, 
    479        ) 
    480        print(self.headers, file=out) 
    481        return out.getvalue() 
    482 
    483    @property 
    484    def connection(self) -> Optional["Connection"]: 
    485        return self._connection 
    486 
    487    @reify 
    488    def history(self) -> Tuple["ClientResponse", ...]: 
    489        """A sequence of of responses, if redirects occurred.""" 
    490        return self._history 
    491 
    492    @reify 
    493    def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]": 
    494        links_str = ", ".join(self.headers.getall("link", [])) 
    495 
    496        if not links_str: 
    497            return MultiDictProxy(MultiDict()) 
    498 
    499        links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict() 
    500 
    501        for val in re.split(r",(?=\s*<)", links_str): 
    502            match = re.match(r"\s*<(.*)>(.*)", val) 
    503            if match is None:  # pragma: no cover 
    504                # the check exists to suppress mypy error 
    505                continue 
    506            url, params_str = match.groups() 
    507            params = params_str.split(";")[1:] 
    508 
    509            link: MultiDict[Union[str, URL]] = MultiDict() 
    510 
    511            for param in params: 
    512                match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M) 
    513                if match is None:  # pragma: no cover 
    514                    # the check exists to suppress mypy error 
    515                    continue 
    516                key, _, value, _ = match.groups() 
    517 
    518                link.add(key, value) 
    519 
    520            key = link.get("rel", url) 
    521 
    522            link.add("url", self.url.join(URL(url))) 
    523 
    524            links.add(str(key), MultiDictProxy(link)) 
    525 
    526        return MultiDictProxy(links) 
    527 
    528    async def start(self, connection: "Connection") -> "ClientResponse": 
    529        """Start response processing.""" 
    530        self._closed = False 
    531        self._protocol = connection.protocol 
    532        self._connection = connection 
    533 
    534        with self._timer: 
    535            while True: 
    536                # read response 
    537                try: 
    538                    protocol = self._protocol 
    539                    message, payload = await protocol.read()  # type: ignore[union-attr] 
    540                except http.HttpProcessingError as exc: 
    541                    raise ClientResponseError( 
    542                        self.request_info, 
    543                        self.history, 
    544                        status=exc.code, 
    545                        message=exc.message, 
    546                        headers=exc.headers, 
    547                    ) from exc 
    548 
    549                if message.code < 100 or message.code > 199 or message.code == 101: 
    550                    break 
    551 
    552                if self._continue is not None: 
    553                    set_result(self._continue, True) 
    554                    self._continue = None 
    555 
    556        # payload eof handler 
    557        payload.on_eof(self._response_eof) 
    558 
    559        # response status 
    560        self.version = message.version 
    561        self.status = message.code 
    562        self.reason = message.reason 
    563 
    564        # headers 
    565        self._headers = message.headers  # type is CIMultiDictProxy 
    566        self._raw_headers = message.raw_headers  # type is Tuple[bytes, bytes] 
    567 
    568        # payload 
    569        self.content = payload 
    570 
    571        # cookies 
    572        if cookie_hdrs := self.headers.getall(hdrs.SET_COOKIE, ()): 
    573            # Store raw cookie headers for CookieJar 
    574            self._raw_cookie_headers = tuple(cookie_hdrs) 
    575        return self 
    576 
    577    def _response_eof(self) -> None: 
    578        if self._closed: 
    579            return 
    580 
    581        # protocol could be None because connection could be detached 
    582        protocol = self._connection and self._connection.protocol 
    583        if protocol is not None and protocol.upgraded: 
    584            return 
    585 
    586        self._closed = True 
    587        self._cleanup_writer() 
    588        self._release_connection() 
    589 
    590    @property 
    591    def closed(self) -> bool: 
    592        return self._closed 
    593 
    594    def close(self) -> None: 
    595        if not self._released: 
    596            self._notify_content() 
    597 
    598        self._closed = True 
    599        if self._loop is None or self._loop.is_closed(): 
    600            return 
    601 
    602        self._cleanup_writer() 
    603        if self._connection is not None: 
    604            self._connection.close() 
    605            self._connection = None 
    606 
    607    def release(self) -> Any: 
    608        if not self._released: 
    609            self._notify_content() 
    610 
    611        self._closed = True 
    612 
    613        self._cleanup_writer() 
    614        self._release_connection() 
    615        return noop() 
    616 
    617    @property 
    618    def ok(self) -> bool: 
    619        """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not. 
    620 
    621        This is **not** a check for ``200 OK`` but a check that the response 
    622        status is under 400. 
    623        """ 
    624        return 400 > self.status 
    625 
    626    def raise_for_status(self) -> None: 
    627        if not self.ok: 
    628            # reason should always be not None for a started response 
    629            assert self.reason is not None 
    630 
    631            # If we're in a context we can rely on __aexit__() to release as the 
    632            # exception propagates. 
    633            if not self._in_context: 
    634                self.release() 
    635 
    636            raise ClientResponseError( 
    637                self.request_info, 
    638                self.history, 
    639                status=self.status, 
    640                message=self.reason, 
    641                headers=self.headers, 
    642            ) 
    643 
    644    def _release_connection(self) -> None: 
    645        if self._connection is not None: 
    646            if self.__writer is None: 
    647                self._connection.release() 
    648                self._connection = None 
    649            else: 
    650                self.__writer.add_done_callback(lambda f: self._release_connection()) 
    651 
    652    async def _wait_released(self) -> None: 
    653        if self.__writer is not None: 
    654            try: 
    655                await self.__writer 
    656            except asyncio.CancelledError: 
    657                if ( 
    658                    sys.version_info >= (3, 11) 
    659                    and (task := asyncio.current_task()) 
    660                    and task.cancelling() 
    661                ): 
    662                    raise 
    663        self._release_connection() 
    664 
    665    def _cleanup_writer(self) -> None: 
    666        if self.__writer is not None: 
    667            self.__writer.cancel() 
    668        self._session = None 
    669 
    670    def _notify_content(self) -> None: 
    671        content = self.content 
    672        if content and content.exception() is None: 
    673            set_exception(content, _CONNECTION_CLOSED_EXCEPTION) 
    674        self._released = True 
    675 
    676    async def wait_for_close(self) -> None: 
    677        if self.__writer is not None: 
    678            try: 
    679                await self.__writer 
    680            except asyncio.CancelledError: 
    681                if ( 
    682                    sys.version_info >= (3, 11) 
    683                    and (task := asyncio.current_task()) 
    684                    and task.cancelling() 
    685                ): 
    686                    raise 
    687        self.release() 
    688 
    689    async def read(self) -> bytes: 
    690        """Read response payload.""" 
    691        if self._body is None: 
    692            try: 
    693                self._body = await self.content.read() 
    694                for trace in self._traces: 
    695                    await trace.send_response_chunk_received( 
    696                        self.method, self.url, self._body 
    697                    ) 
    698            except BaseException: 
    699                self.close() 
    700                raise 
    701        elif self._released:  # Response explicitly released 
    702            raise ClientConnectionError("Connection closed") 
    703 
    704        protocol = self._connection and self._connection.protocol 
    705        if protocol is None or not protocol.upgraded: 
    706            await self._wait_released()  # Underlying connection released 
    707        return self._body 
    708 
    709    def get_encoding(self) -> str: 
    710        ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 
    711        mimetype = helpers.parse_mimetype(ctype) 
    712 
    713        encoding = mimetype.parameters.get("charset") 
    714        if encoding: 
    715            with contextlib.suppress(LookupError, ValueError): 
    716                return codecs.lookup(encoding).name 
    717 
    718        if mimetype.type == "application" and ( 
    719            mimetype.subtype == "json" or mimetype.subtype == "rdap" 
    720        ): 
    721            # RFC 7159 states that the default encoding is UTF-8. 
    722            # RFC 7483 defines application/rdap+json 
    723            return "utf-8" 
    724 
    725        if self._body is None: 
    726            raise RuntimeError( 
    727                "Cannot compute fallback encoding of a not yet read body" 
    728            ) 
    729 
    730        return self._resolve_charset(self, self._body) 
    731 
    732    async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str: 
    733        """Read response payload and decode.""" 
    734        if self._body is None: 
    735            await self.read() 
    736 
    737        if encoding is None: 
    738            encoding = self.get_encoding() 
    739 
    740        return self._body.decode(encoding, errors=errors)  # type: ignore[union-attr] 
    741 
    742    async def json( 
    743        self, 
    744        *, 
    745        encoding: Optional[str] = None, 
    746        loads: JSONDecoder = DEFAULT_JSON_DECODER, 
    747        content_type: Optional[str] = "application/json", 
    748    ) -> Any: 
    749        """Read and decodes JSON response.""" 
    750        if self._body is None: 
    751            await self.read() 
    752 
    753        if content_type: 
    754            ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 
    755            if not _is_expected_content_type(ctype, content_type): 
    756                raise ContentTypeError( 
    757                    self.request_info, 
    758                    self.history, 
    759                    status=self.status, 
    760                    message=( 
    761                        "Attempt to decode JSON with unexpected mimetype: %s" % ctype 
    762                    ), 
    763                    headers=self.headers, 
    764                ) 
    765 
    766        stripped = self._body.strip()  # type: ignore[union-attr] 
    767        if not stripped: 
    768            return None 
    769 
    770        if encoding is None: 
    771            encoding = self.get_encoding() 
    772 
    773        return loads(stripped.decode(encoding)) 
    774 
    775    async def __aenter__(self) -> "ClientResponse": 
    776        self._in_context = True 
    777        return self 
    778 
    779    async def __aexit__( 
    780        self, 
    781        exc_type: Optional[Type[BaseException]], 
    782        exc_val: Optional[BaseException], 
    783        exc_tb: Optional[TracebackType], 
    784    ) -> None: 
    785        self._in_context = False 
    786        # similar to _RequestContextManager, we do not need to check 
    787        # for exceptions, response object can close connection 
    788        # if state is broken 
    789        self.release() 
    790        await self.wait_for_close() 
    791 
    792 
    793class ClientRequest: 
    794    GET_METHODS = { 
    795        hdrs.METH_GET, 
    796        hdrs.METH_HEAD, 
    797        hdrs.METH_OPTIONS, 
    798        hdrs.METH_TRACE, 
    799    } 
    800    POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT} 
    801    ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE}) 
    802 
    803    DEFAULT_HEADERS = { 
    804        hdrs.ACCEPT: "*/*", 
    805        hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(), 
    806    } 
    807 
    808    # Type of body depends on PAYLOAD_REGISTRY, which is dynamic. 
    809    _body: Union[None, payload.Payload] = None 
    810    auth = None 
    811    response = None 
    812 
    813    __writer: Optional["asyncio.Task[None]"] = None  # async task for streaming data 
    814 
    815    # These class defaults help create_autospec() work correctly. 
    816    # If autospec is improved in future, maybe these can be removed. 
    817    url = URL() 
    818    method = "GET" 
    819 
    820    _continue = None  # waiter future for '100 Continue' response 
    821 
    822    _skip_auto_headers: Optional["CIMultiDict[None]"] = None 
    823 
    824    # N.B. 
    825    # Adding __del__ method with self._writer closing doesn't make sense 
    826    # because _writer is instance method, thus it keeps a reference to self. 
    827    # Until writer has finished finalizer will not be called. 
    828 
    829    def __init__( 
    830        self, 
    831        method: str, 
    832        url: URL, 
    833        *, 
    834        params: Query = None, 
    835        headers: Optional[LooseHeaders] = None, 
    836        skip_auto_headers: Optional[Iterable[str]] = None, 
    837        data: Any = None, 
    838        cookies: Optional[LooseCookies] = None, 
    839        auth: Optional[BasicAuth] = None, 
    840        version: http.HttpVersion = http.HttpVersion11, 
    841        compress: Union[str, bool, None] = None, 
    842        chunked: Optional[bool] = None, 
    843        expect100: bool = False, 
    844        loop: Optional[asyncio.AbstractEventLoop] = None, 
    845        response_class: Optional[Type["ClientResponse"]] = None, 
    846        proxy: Optional[URL] = None, 
    847        proxy_auth: Optional[BasicAuth] = None, 
    848        timer: Optional[BaseTimerContext] = None, 
    849        session: Optional["ClientSession"] = None, 
    850        ssl: Union[SSLContext, bool, Fingerprint] = True, 
    851        proxy_headers: Optional[LooseHeaders] = None, 
    852        traces: Optional[List["Trace"]] = None, 
    853        trust_env: bool = False, 
    854        server_hostname: Optional[str] = None, 
    855    ): 
    856        if loop is None: 
    857            loop = asyncio.get_event_loop() 
    858        if match := _CONTAINS_CONTROL_CHAR_RE.search(method): 
    859            raise ValueError( 
    860                f"Method cannot contain non-token characters {method!r} " 
    861                f"(found at least {match.group()!r})" 
    862            ) 
    863        # URL forbids subclasses, so a simple type check is enough. 
    864        assert type(url) is URL, url 
    865        if proxy is not None: 
    866            assert type(proxy) is URL, proxy 
    867        # FIXME: session is None in tests only, need to fix tests 
    868        # assert session is not None 
    869        if TYPE_CHECKING: 
    870            assert session is not None 
    871        self._session = session 
    872        if params: 
    873            url = url.extend_query(params) 
    874        self.original_url = url 
    875        self.url = url.with_fragment(None) if url.raw_fragment else url 
    876        self.method = method.upper() 
    877        self.chunked = chunked 
    878        self.compress = compress 
    879        self.loop = loop 
    880        self.length = None 
    881        if response_class is None: 
    882            real_response_class = ClientResponse 
    883        else: 
    884            real_response_class = response_class 
    885        self.response_class: Type[ClientResponse] = real_response_class 
    886        self._timer = timer if timer is not None else TimerNoop() 
    887        self._ssl = ssl if ssl is not None else True 
    888        self.server_hostname = server_hostname 
    889 
    890        if loop.get_debug(): 
    891            self._source_traceback = traceback.extract_stack(sys._getframe(1)) 
    892 
    893        self.update_version(version) 
    894        self.update_host(url) 
    895        self.update_headers(headers) 
    896        self.update_auto_headers(skip_auto_headers) 
    897        self.update_cookies(cookies) 
    898        self.update_content_encoding(data) 
    899        self.update_auth(auth, trust_env) 
    900        self.update_proxy(proxy, proxy_auth, proxy_headers) 
    901 
    902        self.update_body_from_data(data) 
    903        if data is not None or self.method not in self.GET_METHODS: 
    904            self.update_transfer_encoding() 
    905        self.update_expect_continue(expect100) 
    906        self._traces = [] if traces is None else traces 
    907 
    908    def __reset_writer(self, _: object = None) -> None: 
    909        self.__writer = None 
    910 
    911    def _get_content_length(self) -> Optional[int]: 
    912        """Extract and validate Content-Length header value. 
    913 
    914        Returns parsed Content-Length value or None if not set. 
    915        Raises ValueError if header exists but cannot be parsed as an integer. 
    916        """ 
    917        if hdrs.CONTENT_LENGTH not in self.headers: 
    918            return None 
    919 
    920        content_length_hdr = self.headers[hdrs.CONTENT_LENGTH] 
    921        try: 
    922            return int(content_length_hdr) 
    923        except ValueError: 
    924            raise ValueError( 
    925                f"Invalid Content-Length header: {content_length_hdr}" 
    926            ) from None 
    927 
    928    @property 
    929    def skip_auto_headers(self) -> CIMultiDict[None]: 
    930        return self._skip_auto_headers or CIMultiDict() 
    931 
    932    @property 
    933    def _writer(self) -> Optional["asyncio.Task[None]"]: 
    934        return self.__writer 
    935 
    936    @_writer.setter 
    937    def _writer(self, writer: "asyncio.Task[None]") -> None: 
    938        if self.__writer is not None: 
    939            self.__writer.remove_done_callback(self.__reset_writer) 
    940        self.__writer = writer 
    941        writer.add_done_callback(self.__reset_writer) 
    942 
    943    def is_ssl(self) -> bool: 
    944        return self.url.scheme in _SSL_SCHEMES 
    945 
    946    @property 
    947    def ssl(self) -> Union["SSLContext", bool, Fingerprint]: 
    948        return self._ssl 
    949 
    950    @property 
    951    def connection_key(self) -> ConnectionKey: 
    952        if proxy_headers := self.proxy_headers: 
    953            h: Optional[int] = hash(tuple(proxy_headers.items())) 
    954        else: 
    955            h = None 
    956        url = self.url 
    957        return tuple.__new__( 
    958            ConnectionKey, 
    959            ( 
    960                url.raw_host or "", 
    961                url.port, 
    962                url.scheme in _SSL_SCHEMES, 
    963                self._ssl, 
    964                self.proxy, 
    965                self.proxy_auth, 
    966                h, 
    967            ), 
    968        ) 
    969 
    970    @property 
    971    def host(self) -> str: 
    972        ret = self.url.raw_host 
    973        assert ret is not None 
    974        return ret 
    975 
    976    @property 
    977    def port(self) -> Optional[int]: 
    978        return self.url.port 
    979 
    980    @property 
    981    def body(self) -> Union[payload.Payload, Literal[b""]]: 
    982        """Request body.""" 
    983        # empty body is represented as bytes for backwards compatibility 
    984        return self._body or b"" 
    985 
    986    @body.setter 
    987    def body(self, value: Any) -> None: 
    988        """Set request body with warning for non-autoclose payloads. 
    989 
    990        WARNING: This setter must be called from within an event loop and is not 
    991        thread-safe. Setting body outside of an event loop may raise RuntimeError 
    992        when closing file-based payloads. 
    993 
    994        DEPRECATED: Direct assignment to body is deprecated and will be removed 
    995        in a future version. Use await update_body() instead for proper resource 
    996        management. 
    997        """ 
    998        # Close existing payload if present 
    999        if self._body is not None: 
    1000            # Warn if the payload needs manual closing 
    1001            # stacklevel=3: user code -> body setter -> _warn_if_unclosed_payload 
    1002            _warn_if_unclosed_payload(self._body, stacklevel=3) 
    1003            # NOTE: In the future, when we remove sync close support, 
    1004            # this setter will need to be removed and only the async 
    1005            # update_body() method will be available. For now, we call 
    1006            # _close() for backwards compatibility. 
    1007            self._body._close() 
    1008        self._update_body(value) 
    1009 
    1010    @property 
    1011    def request_info(self) -> RequestInfo: 
    1012        headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers) 
    1013        # These are created on every request, so we use a NamedTuple 
    1014        # for performance reasons. We don't use the RequestInfo.__new__ 
    1015        # method because it has a different signature which is provided 
    1016        # for backwards compatibility only. 
    1017        return tuple.__new__( 
    1018            RequestInfo, (self.url, self.method, headers, self.original_url) 
    1019        ) 
    1020 
    1021    @property 
    1022    def session(self) -> "ClientSession": 
    1023        """Return the ClientSession instance. 
    1024 
    1025        This property provides access to the ClientSession that initiated 
    1026        this request, allowing middleware to make additional requests 
    1027        using the same session. 
    1028        """ 
    1029        return self._session 
    1030 
    1031    def update_host(self, url: URL) -> None: 
    1032        """Update destination host, port and connection type (ssl).""" 
    1033        # get host/port 
    1034        if not url.raw_host: 
    1035            raise InvalidURL(url) 
    1036 
    1037        # basic auth info 
    1038        if url.raw_user or url.raw_password: 
    1039            self.auth = helpers.BasicAuth(url.user or "", url.password or "") 
    1040 
    1041    def update_version(self, version: Union[http.HttpVersion, str]) -> None: 
    1042        """Convert request version to two elements tuple. 
    1043 
    1044        parser HTTP version '1.1' => (1, 1) 
    1045        """ 
    1046        if isinstance(version, str): 
    1047            v = [part.strip() for part in version.split(".", 1)] 
    1048            try: 
    1049                version = http.HttpVersion(int(v[0]), int(v[1])) 
    1050            except ValueError: 
    1051                raise ValueError( 
    1052                    f"Can not parse http version number: {version}" 
    1053                ) from None 
    1054        self.version = version 
    1055 
    1056    def update_headers(self, headers: Optional[LooseHeaders]) -> None: 
    1057        """Update request headers.""" 
    1058        self.headers: CIMultiDict[str] = CIMultiDict() 
    1059 
    1060        # Build the host header 
    1061        host = self.url.host_port_subcomponent 
    1062 
    1063        # host_port_subcomponent is None when the URL is a relative URL. 
    1064        # but we know we do not have a relative URL here. 
    1065        assert host is not None 
    1066        self.headers[hdrs.HOST] = host 
    1067 
    1068        if not headers: 
    1069            return 
    1070 
    1071        if isinstance(headers, (dict, MultiDictProxy, MultiDict)): 
    1072            headers = headers.items() 
    1073 
    1074        for key, value in headers:  # type: ignore[misc] 
    1075            # A special case for Host header 
    1076            if key in hdrs.HOST_ALL: 
    1077                self.headers[key] = value 
    1078            else: 
    1079                self.headers.add(key, value) 
    1080 
    1081    def update_auto_headers(self, skip_auto_headers: Optional[Iterable[str]]) -> None: 
    1082        if skip_auto_headers is not None: 
    1083            self._skip_auto_headers = CIMultiDict( 
    1084                (hdr, None) for hdr in sorted(skip_auto_headers) 
    1085            ) 
    1086            used_headers = self.headers.copy() 
    1087            used_headers.extend(self._skip_auto_headers)  # type: ignore[arg-type] 
    1088        else: 
    1089            # Fast path when there are no headers to skip 
    1090            # which is the most common case. 
    1091            used_headers = self.headers 
    1092 
    1093        for hdr, val in self.DEFAULT_HEADERS.items(): 
    1094            if hdr not in used_headers: 
    1095                self.headers[hdr] = val 
    1096 
    1097        if hdrs.USER_AGENT not in used_headers: 
    1098            self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE 
    1099 
    1100    def update_cookies(self, cookies: Optional[LooseCookies]) -> None: 
    1101        """Update request cookies header.""" 
    1102        if not cookies: 
    1103            return 
    1104 
    1105        c = SimpleCookie() 
    1106        if hdrs.COOKIE in self.headers: 
    1107            # parse_cookie_header for RFC 6265 compliant Cookie header parsing 
    1108            c.update(parse_cookie_header(self.headers.get(hdrs.COOKIE, ""))) 
    1109            del self.headers[hdrs.COOKIE] 
    1110 
    1111        if isinstance(cookies, Mapping): 
    1112            iter_cookies = cookies.items() 
    1113        else: 
    1114            iter_cookies = cookies  # type: ignore[assignment] 
    1115        for name, value in iter_cookies: 
    1116            if isinstance(value, Morsel): 
    1117                # Use helper to preserve coded_value exactly as sent by server 
    1118                c[name] = preserve_morsel_with_coded_value(value) 
    1119            else: 
    1120                c[name] = value  # type: ignore[assignment] 
    1121 
    1122        self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip() 
    1123 
    1124    def update_content_encoding(self, data: Any) -> None: 
    1125        """Set request content encoding.""" 
    1126        if not data: 
    1127            # Don't compress an empty body. 
    1128            self.compress = None 
    1129            return 
    1130 
    1131        if self.headers.get(hdrs.CONTENT_ENCODING): 
    1132            if self.compress: 
    1133                raise ValueError( 
    1134                    "compress can not be set if Content-Encoding header is set" 
    1135                ) 
    1136        elif self.compress: 
    1137            if not isinstance(self.compress, str): 
    1138                self.compress = "deflate" 
    1139            self.headers[hdrs.CONTENT_ENCODING] = self.compress 
    1140            self.chunked = True  # enable chunked, no need to deal with length 
    1141 
    1142    def update_transfer_encoding(self) -> None: 
    1143        """Analyze transfer-encoding header.""" 
    1144        te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower() 
    1145 
    1146        if "chunked" in te: 
    1147            if self.chunked: 
    1148                raise ValueError( 
    1149                    "chunked can not be set " 
    1150                    'if "Transfer-Encoding: chunked" header is set' 
    1151                ) 
    1152 
    1153        elif self.chunked: 
    1154            if hdrs.CONTENT_LENGTH in self.headers: 
    1155                raise ValueError( 
    1156                    "chunked can not be set if Content-Length header is set" 
    1157                ) 
    1158 
    1159            self.headers[hdrs.TRANSFER_ENCODING] = "chunked" 
    1160 
    1161    def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None: 
    1162        """Set basic auth.""" 
    1163        if auth is None: 
    1164            auth = self.auth 
    1165        if auth is None: 
    1166            return 
    1167 
    1168        if not isinstance(auth, helpers.BasicAuth): 
    1169            raise TypeError("BasicAuth() tuple is required instead") 
    1170 
    1171        self.headers[hdrs.AUTHORIZATION] = auth.encode() 
    1172 
    1173    def update_body_from_data(self, body: Any, _stacklevel: int = 3) -> None: 
    1174        """Update request body from data.""" 
    1175        if self._body is not None: 
    1176            _warn_if_unclosed_payload(self._body, stacklevel=_stacklevel) 
    1177 
    1178        if body is None: 
    1179            self._body = None 
    1180            # Set Content-Length to 0 when body is None for methods that expect a body 
    1181            if ( 
    1182                self.method not in self.GET_METHODS 
    1183                and not self.chunked 
    1184                and hdrs.CONTENT_LENGTH not in self.headers 
    1185            ): 
    1186                self.headers[hdrs.CONTENT_LENGTH] = "0" 
    1187            return 
    1188 
    1189        # FormData 
    1190        maybe_payload = body() if isinstance(body, FormData) else body 
    1191 
    1192        try: 
    1193            body_payload = payload.PAYLOAD_REGISTRY.get(maybe_payload, disposition=None) 
    1194        except payload.LookupError: 
    1195            body_payload = FormData(maybe_payload)()  # type: ignore[arg-type] 
    1196 
    1197        self._body = body_payload 
    1198        # enable chunked encoding if needed 
    1199        if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers: 
    1200            if (size := body_payload.size) is not None: 
    1201                self.headers[hdrs.CONTENT_LENGTH] = str(size) 
    1202            else: 
    1203                self.chunked = True 
    1204 
    1205        # copy payload headers 
    1206        assert body_payload.headers 
    1207        headers = self.headers 
    1208        skip_headers = self._skip_auto_headers 
    1209        for key, value in body_payload.headers.items(): 
    1210            if key in headers or (skip_headers is not None and key in skip_headers): 
    1211                continue 
    1212            headers[key] = value 
    1213 
    1214    def _update_body(self, body: Any) -> None: 
    1215        """Update request body after its already been set.""" 
    1216        # Remove existing Content-Length header since body is changing 
    1217        if hdrs.CONTENT_LENGTH in self.headers: 
    1218            del self.headers[hdrs.CONTENT_LENGTH] 
    1219 
    1220        # Remove existing Transfer-Encoding header to avoid conflicts 
    1221        if self.chunked and hdrs.TRANSFER_ENCODING in self.headers: 
    1222            del self.headers[hdrs.TRANSFER_ENCODING] 
    1223 
    1224        # Now update the body using the existing method 
    1225        # Called from _update_body, add 1 to stacklevel from caller 
    1226        self.update_body_from_data(body, _stacklevel=4) 
    1227 
    1228        # Update transfer encoding headers if needed (same logic as __init__) 
    1229        if body is not None or self.method not in self.GET_METHODS: 
    1230            self.update_transfer_encoding() 
    1231 
    1232    async def update_body(self, body: Any) -> None: 
    1233        """ 
    1234        Update request body and close previous payload if needed. 
    1235 
    1236        This method safely updates the request body by first closing any existing 
    1237        payload to prevent resource leaks, then setting the new body. 
    1238 
    1239        IMPORTANT: Always use this method instead of setting request.body directly. 
    1240        Direct assignment to request.body will leak resources if the previous body 
    1241        contains file handles, streams, or other resources that need cleanup. 
    1242 
    1243        Args: 
    1244            body: The new body content. Can be: 
    1245                - bytes/bytearray: Raw binary data 
    1246                - str: Text data (will be encoded using charset from Content-Type) 
    1247                - FormData: Form data that will be encoded as multipart/form-data 
    1248                - Payload: A pre-configured payload object 
    1249                - AsyncIterable: An async iterable of bytes chunks 
    1250                - File-like object: Will be read and sent as binary data 
    1251                - None: Clears the body 
    1252 
    1253        Usage: 
    1254            # CORRECT: Use update_body 
    1255            await request.update_body(b"new request data") 
    1256 
    1257            # WRONG: Don't set body directly 
    1258            # request.body = b"new request data"  # This will leak resources! 
    1259 
    1260            # Update with form data 
    1261            form_data = FormData() 
    1262            form_data.add_field('field', 'value') 
    1263            await request.update_body(form_data) 
    1264 
    1265            # Clear body 
    1266            await request.update_body(None) 
    1267 
    1268        Note: 
    1269            This method is async because it may need to close file handles or 
    1270            other resources associated with the previous payload. Always await 
    1271            this method to ensure proper cleanup. 
    1272 
    1273        Warning: 
    1274            Setting request.body directly is highly discouraged and can lead to: 
    1275            - Resource leaks (unclosed file handles, streams) 
    1276            - Memory leaks (unreleased buffers) 
    1277            - Unexpected behavior with streaming payloads 
    1278 
    1279            It is not recommended to change the payload type in middleware. If the 
    1280            body was already set (e.g., as bytes), it's best to keep the same type 
    1281            rather than converting it (e.g., to str) as this may result in unexpected 
    1282            behavior. 
    1283 
    1284        See Also: 
    1285            - update_body_from_data: Synchronous body update without cleanup 
    1286            - body property: Direct body access (STRONGLY DISCOURAGED) 
    1287 
    1288        """ 
    1289        # Close existing payload if it exists and needs closing 
    1290        if self._body is not None: 
    1291            await self._body.close() 
    1292        self._update_body(body) 
    1293 
    1294    def update_expect_continue(self, expect: bool = False) -> None: 
    1295        if expect: 
    1296            self.headers[hdrs.EXPECT] = "100-continue" 
    1297        elif ( 
    1298            hdrs.EXPECT in self.headers 
    1299            and self.headers[hdrs.EXPECT].lower() == "100-continue" 
    1300        ): 
    1301            expect = True 
    1302 
    1303        if expect: 
    1304            self._continue = self.loop.create_future() 
    1305 
    1306    def update_proxy( 
    1307        self, 
    1308        proxy: Optional[URL], 
    1309        proxy_auth: Optional[BasicAuth], 
    1310        proxy_headers: Optional[LooseHeaders], 
    1311    ) -> None: 
    1312        self.proxy = proxy 
    1313        if proxy is None: 
    1314            self.proxy_auth = None 
    1315            self.proxy_headers = None 
    1316            return 
    1317 
    1318        if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth): 
    1319            raise ValueError("proxy_auth must be None or BasicAuth() tuple") 
    1320        self.proxy_auth = proxy_auth 
    1321 
    1322        if proxy_headers is not None and not isinstance( 
    1323            proxy_headers, (MultiDict, MultiDictProxy) 
    1324        ): 
    1325            proxy_headers = CIMultiDict(proxy_headers) 
    1326        self.proxy_headers = proxy_headers 
    1327 
    1328    async def write_bytes( 
    1329        self, 
    1330        writer: AbstractStreamWriter, 
    1331        conn: "Connection", 
    1332        content_length: Optional[int] = None, 
    1333    ) -> None: 
    1334        """ 
    1335        Write the request body to the connection stream. 
    1336 
    1337        This method handles writing different types of request bodies: 
    1338        1. Payload objects (using their specialized write_with_length method) 
    1339        2. Bytes/bytearray objects 
    1340        3. Iterable body content 
    1341 
    1342        Args: 
    1343            writer: The stream writer to write the body to 
    1344            conn: The connection being used for this request 
    1345            content_length: Optional maximum number of bytes to write from the body 
    1346                            (None means write the entire body) 
    1347 
    1348        The method properly handles: 
    1349        - Waiting for 100-Continue responses if required 
    1350        - Content length constraints for chunked encoding 
    1351        - Error handling for network issues, cancellation, and other exceptions 
    1352        - Signaling EOF and timeout management 
    1353 
    1354        Raises: 
    1355            ClientOSError: When there's an OS-level error writing the body 
    1356            ClientConnectionError: When there's a general connection error 
    1357            asyncio.CancelledError: When the operation is cancelled 
    1358 
    1359        """ 
    1360        # 100 response 
    1361        if self._continue is not None: 
    1362            # Force headers to be sent before waiting for 100-continue 
    1363            writer.send_headers() 
    1364            await writer.drain() 
    1365            await self._continue 
    1366 
    1367        protocol = conn.protocol 
    1368        assert protocol is not None 
    1369        try: 
    1370            # This should be a rare case but the 
    1371            # self._body can be set to None while 
    1372            # the task is being started or we wait above 
    1373            # for the 100-continue response. 
    1374            # The more likely case is we have an empty 
    1375            # payload, but 100-continue is still expected. 
    1376            if self._body is not None: 
    1377                await self._body.write_with_length(writer, content_length) 
    1378        except OSError as underlying_exc: 
    1379            reraised_exc = underlying_exc 
    1380 
    1381            # Distinguish between timeout and other OS errors for better error reporting 
    1382            exc_is_not_timeout = underlying_exc.errno is not None or not isinstance( 
    1383                underlying_exc, asyncio.TimeoutError 
    1384            ) 
    1385            if exc_is_not_timeout: 
    1386                reraised_exc = ClientOSError( 
    1387                    underlying_exc.errno, 
    1388                    f"Can not write request body for {self.url !s}", 
    1389                ) 
    1390 
    1391            set_exception(protocol, reraised_exc, underlying_exc) 
    1392        except asyncio.CancelledError: 
    1393            # Body hasn't been fully sent, so connection can't be reused 
    1394            conn.close() 
    1395            raise 
    1396        except Exception as underlying_exc: 
    1397            set_exception( 
    1398                protocol, 
    1399                ClientConnectionError( 
    1400                    "Failed to send bytes into the underlying connection " 
    1401                    f"{conn !s}: {underlying_exc!r}", 
    1402                ), 
    1403                underlying_exc, 
    1404            ) 
    1405        else: 
    1406            # Successfully wrote the body, signal EOF and start response timeout 
    1407            await writer.write_eof() 
    1408            protocol.start_timeout() 
    1409 
    1410    async def send(self, conn: "Connection") -> "ClientResponse": 
    1411        # Specify request target: 
    1412        # - CONNECT request must send authority form URI 
    1413        # - not CONNECT proxy must send absolute form URI 
    1414        # - most common is origin form URI 
    1415        if self.method == hdrs.METH_CONNECT: 
    1416            connect_host = self.url.host_subcomponent 
    1417            assert connect_host is not None 
    1418            path = f"{connect_host}:{self.url.port}" 
    1419        elif self.proxy and not self.is_ssl(): 
    1420            path = str(self.url) 
    1421        else: 
    1422            path = self.url.raw_path_qs 
    1423 
    1424        protocol = conn.protocol 
    1425        assert protocol is not None 
    1426        writer = StreamWriter( 
    1427            protocol, 
    1428            self.loop, 
    1429            on_chunk_sent=( 
    1430                functools.partial(self._on_chunk_request_sent, self.method, self.url) 
    1431                if self._traces 
    1432                else None 
    1433            ), 
    1434            on_headers_sent=( 
    1435                functools.partial(self._on_headers_request_sent, self.method, self.url) 
    1436                if self._traces 
    1437                else None 
    1438            ), 
    1439        ) 
    1440 
    1441        if self.compress: 
    1442            writer.enable_compression(self.compress)  # type: ignore[arg-type] 
    1443 
    1444        if self.chunked is not None: 
    1445            writer.enable_chunking() 
    1446 
    1447        # set default content-type 
    1448        if ( 
    1449            self.method in self.POST_METHODS 
    1450            and ( 
    1451                self._skip_auto_headers is None 
    1452                or hdrs.CONTENT_TYPE not in self._skip_auto_headers 
    1453            ) 
    1454            and hdrs.CONTENT_TYPE not in self.headers 
    1455        ): 
    1456            self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream" 
    1457 
    1458        v = self.version 
    1459        if hdrs.CONNECTION not in self.headers: 
    1460            if conn._connector.force_close: 
    1461                if v == HttpVersion11: 
    1462                    self.headers[hdrs.CONNECTION] = "close" 
    1463            elif v == HttpVersion10: 
    1464                self.headers[hdrs.CONNECTION] = "keep-alive" 
    1465 
    1466        # status + headers 
    1467        status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}" 
    1468 
    1469        # Buffer headers for potential coalescing with body 
    1470        await writer.write_headers(status_line, self.headers) 
    1471 
    1472        task: Optional["asyncio.Task[None]"] 
    1473        if self._body or self._continue is not None or protocol.writing_paused: 
    1474            coro = self.write_bytes(writer, conn, self._get_content_length()) 
    1475            if sys.version_info >= (3, 12): 
    1476                # Optimization for Python 3.12, try to write 
    1477                # bytes immediately to avoid having to schedule 
    1478                # the task on the event loop. 
    1479                task = asyncio.Task(coro, loop=self.loop, eager_start=True) 
    1480            else: 
    1481                task = self.loop.create_task(coro) 
    1482            if task.done(): 
    1483                task = None 
    1484            else: 
    1485                self._writer = task 
    1486        else: 
    1487            # We have nothing to write because 
    1488            # - there is no body 
    1489            # - the protocol does not have writing paused 
    1490            # - we are not waiting for a 100-continue response 
    1491            protocol.start_timeout() 
    1492            writer.set_eof() 
    1493            task = None 
    1494        response_class = self.response_class 
    1495        assert response_class is not None 
    1496        self.response = response_class( 
    1497            self.method, 
    1498            self.original_url, 
    1499            writer=task, 
    1500            continue100=self._continue, 
    1501            timer=self._timer, 
    1502            request_info=self.request_info, 
    1503            traces=self._traces, 
    1504            loop=self.loop, 
    1505            session=self._session, 
    1506        ) 
    1507        return self.response 
    1508 
    1509    async def close(self) -> None: 
    1510        if self.__writer is not None: 
    1511            try: 
    1512                await self.__writer 
    1513            except asyncio.CancelledError: 
    1514                if ( 
    1515                    sys.version_info >= (3, 11) 
    1516                    and (task := asyncio.current_task()) 
    1517                    and task.cancelling() 
    1518                ): 
    1519                    raise 
    1520 
    1521    def terminate(self) -> None: 
    1522        if self.__writer is not None: 
    1523            if not self.loop.is_closed(): 
    1524                self.__writer.cancel() 
    1525            self.__writer.remove_done_callback(self.__reset_writer) 
    1526            self.__writer = None 
    1527 
    1528    async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None: 
    1529        for trace in self._traces: 
    1530            await trace.send_request_chunk_sent(method, url, chunk) 
    1531 
    1532    async def _on_headers_request_sent( 
    1533        self, method: str, url: URL, headers: "CIMultiDict[str]" 
    1534    ) -> None: 
    1535        for trace in self._traces: 
    1536            await trace.send_request_headers(method, url, headers)