1# Copyright 2024 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#      http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Transport adapter for Asynchronous HTTP Requests based on aiohttp. 
    16""" 
    17 
    18import asyncio 
    19import logging 
    20from typing import AsyncGenerator, Mapping, Optional 
    21 
    22try: 
    23    import aiohttp  # type: ignore 
    24except ImportError as caught_exc:  # pragma: NO COVER 
    25    raise ImportError( 
    26        "The aiohttp library is not installed from please install the aiohttp package to use the aiohttp transport." 
    27    ) from caught_exc 
    28 
    29from google.auth import _helpers 
    30from google.auth import exceptions 
    31from google.auth.aio import _helpers as _helpers_async 
    32from google.auth.aio import transport 
    33 
    34_LOGGER = logging.getLogger(__name__) 
    35 
    36 
    37class Response(transport.Response): 
    38    """ 
    39    Represents an HTTP response and its data. It is returned by ``google.auth.aio.transport.sessions.AsyncAuthorizedSession``. 
    40 
    41    Args: 
    42        response (aiohttp.ClientResponse): An instance of aiohttp.ClientResponse. 
    43 
    44    Attributes: 
    45        status_code (int): The HTTP status code of the response. 
    46        headers (Mapping[str, str]): The HTTP headers of the response. 
    47    """ 
    48 
    49    def __init__(self, response: aiohttp.ClientResponse): 
    50        self._response = response 
    51 
    52    @property 
    53    @_helpers.copy_docstring(transport.Response) 
    54    def status_code(self) -> int: 
    55        return self._response.status 
    56 
    57    @property 
    58    @_helpers.copy_docstring(transport.Response) 
    59    def headers(self) -> Mapping[str, str]: 
    60        return {key: value for key, value in self._response.headers.items()} 
    61 
    62    @_helpers.copy_docstring(transport.Response) 
    63    async def content(self, chunk_size: int = 1024) -> AsyncGenerator[bytes, None]: 
    64        try: 
    65            async for chunk in self._response.content.iter_chunked( 
    66                chunk_size 
    67            ):  # pragma: no branch 
    68                yield chunk 
    69        except aiohttp.ClientPayloadError as exc: 
    70            raise exceptions.ResponseError( 
    71                "Failed to read from the payload stream." 
    72            ) from exc 
    73 
    74    @_helpers.copy_docstring(transport.Response) 
    75    async def read(self) -> bytes: 
    76        try: 
    77            return await self._response.read() 
    78        except aiohttp.ClientResponseError as exc: 
    79            raise exceptions.ResponseError("Failed to read the response body.") from exc 
    80 
    81    @_helpers.copy_docstring(transport.Response) 
    82    async def close(self): 
    83        self._response.close() 
    84 
    85 
    86class Request(transport.Request): 
    87    """Asynchronous Requests request adapter. 
    88 
    89    This class is used internally for making requests using aiohttp 
    90    in a consistent way. If you use :class:`google.auth.aio.transport.sessions.AsyncAuthorizedSession` 
    91    you do not need to construct or use this class directly. 
    92 
    93    This class can be useful if you want to configure a Request callable 
    94    with a custom ``aiohttp.ClientSession`` in :class:`AuthorizedSession` or if 
    95    you want to manually refresh a :class:`~google.auth.aio.credentials.Credentials` instance:: 
    96 
    97        import aiohttp 
    98        import google.auth.aio.transport.aiohttp 
    99 
    100        # Default example: 
    101        request = google.auth.aio.transport.aiohttp.Request() 
    102        await credentials.refresh(request) 
    103 
    104        # Custom aiohttp Session Example: 
    105        session = session=aiohttp.ClientSession(auto_decompress=False) 
    106        request = google.auth.aio.transport.aiohttp.Request(session=session) 
    107        auth_sesion = google.auth.aio.transport.sessions.AsyncAuthorizedSession(auth_request=request) 
    108 
    109    Args: 
    110        session (aiohttp.ClientSession): An instance :class:`aiohttp.ClientSession` used 
    111            to make HTTP requests. If not specified, a session will be created. 
    112 
    113    .. automethod:: __call__ 
    114    """ 
    115 
    116    def __init__(self, session: aiohttp.ClientSession = None): 
    117        self._session = session 
    118        self._closed = False 
    119 
    120    async def __call__( 
    121        self, 
    122        url: str, 
    123        method: str = "GET", 
    124        body: Optional[bytes] = None, 
    125        headers: Optional[Mapping[str, str]] = None, 
    126        timeout: float = transport._DEFAULT_TIMEOUT_SECONDS, 
    127        **kwargs, 
    128    ) -> transport.Response: 
    129        """ 
    130        Make an HTTP request using aiohttp. 
    131 
    132        Args: 
    133            url (str): The URL to be requested. 
    134            method (Optional[str]): 
    135                The HTTP method to use for the request. Defaults to 'GET'. 
    136            body (Optional[bytes]): 
    137                The payload or body in HTTP request. 
    138            headers (Optional[Mapping[str, str]]): 
    139                Request headers. 
    140            timeout (float): The number of seconds to wait for a 
    141                response from the server. If not specified or if None, the 
    142                requests default timeout will be used. 
    143            kwargs: Additional arguments passed through to the underlying 
    144                aiohttp :meth:`aiohttp.Session.request` method. 
    145 
    146        Returns: 
    147            google.auth.aio.transport.Response: The HTTP response. 
    148 
    149        Raises: 
    150            - google.auth.exceptions.TransportError: If the request fails or if the session is closed. 
    151            - google.auth.exceptions.TimeoutError: If the request times out. 
    152        """ 
    153 
    154        try: 
    155            if self._closed: 
    156                raise exceptions.TransportError("session is closed.") 
    157 
    158            if not self._session: 
    159                self._session = aiohttp.ClientSession() 
    160 
    161            client_timeout = aiohttp.ClientTimeout(total=timeout) 
    162            _helpers.request_log(_LOGGER, method, url, body, headers) 
    163            response = await self._session.request( 
    164                method, 
    165                url, 
    166                data=body, 
    167                headers=headers, 
    168                timeout=client_timeout, 
    169                **kwargs, 
    170            ) 
    171            await _helpers_async.response_log_async(_LOGGER, response) 
    172            return Response(response) 
    173 
    174        except aiohttp.ClientError as caught_exc: 
    175            client_exc = exceptions.TransportError(f"Failed to send request to {url}.") 
    176            raise client_exc from caught_exc 
    177 
    178        except asyncio.TimeoutError as caught_exc: 
    179            timeout_exc = exceptions.TimeoutError( 
    180                f"Request timed out after {timeout} seconds." 
    181            ) 
    182            raise timeout_exc from caught_exc 
    183 
    184    async def close(self) -> None: 
    185        """ 
    186        Close the underlying aiohttp session to release the acquired resources. 
    187        """ 
    188        if not self._closed and self._session: 
    189            await self._session.close() 
    190        self._closed = True