1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Transport adapter for Asynchronous HTTP Requests based on aiohttp.
16"""
17
18import asyncio
19import logging
20from typing import AsyncGenerator, Mapping, Optional
21
22try:
23 import aiohttp # type: ignore
24except ImportError as caught_exc: # pragma: NO COVER
25 raise ImportError(
26 "The aiohttp library is not installed from please install the aiohttp package to use the aiohttp transport."
27 ) from caught_exc
28
29from google.auth import _helpers
30from google.auth import exceptions
31from google.auth.aio import _helpers as _helpers_async
32from google.auth.aio import transport
33
34_LOGGER = logging.getLogger(__name__)
35
36
37class Response(transport.Response):
38 """
39 Represents an HTTP response and its data. It is returned by ``google.auth.aio.transport.sessions.AsyncAuthorizedSession``.
40
41 Args:
42 response (aiohttp.ClientResponse): An instance of aiohttp.ClientResponse.
43
44 Attributes:
45 status_code (int): The HTTP status code of the response.
46 headers (Mapping[str, str]): The HTTP headers of the response.
47 """
48
49 def __init__(self, response: aiohttp.ClientResponse):
50 self._response = response
51
52 @property
53 @_helpers.copy_docstring(transport.Response)
54 def status_code(self) -> int:
55 return self._response.status
56
57 @property
58 @_helpers.copy_docstring(transport.Response)
59 def headers(self) -> Mapping[str, str]:
60 return {key: value for key, value in self._response.headers.items()}
61
62 @_helpers.copy_docstring(transport.Response)
63 async def content(self, chunk_size: int = 1024) -> AsyncGenerator[bytes, None]:
64 try:
65 async for chunk in self._response.content.iter_chunked(
66 chunk_size
67 ): # pragma: no branch
68 yield chunk
69 except aiohttp.ClientPayloadError as exc:
70 raise exceptions.ResponseError(
71 "Failed to read from the payload stream."
72 ) from exc
73
74 @_helpers.copy_docstring(transport.Response)
75 async def read(self) -> bytes:
76 try:
77 return await self._response.read()
78 except aiohttp.ClientResponseError as exc:
79 raise exceptions.ResponseError("Failed to read the response body.") from exc
80
81 @_helpers.copy_docstring(transport.Response)
82 async def close(self):
83 self._response.close()
84
85
86class Request(transport.Request):
87 """Asynchronous Requests request adapter.
88
89 This class is used internally for making requests using aiohttp
90 in a consistent way. If you use :class:`google.auth.aio.transport.sessions.AsyncAuthorizedSession`
91 you do not need to construct or use this class directly.
92
93 This class can be useful if you want to configure a Request callable
94 with a custom ``aiohttp.ClientSession`` in :class:`AuthorizedSession` or if
95 you want to manually refresh a :class:`~google.auth.aio.credentials.Credentials` instance::
96
97 import aiohttp
98 import google.auth.aio.transport.aiohttp
99
100 # Default example:
101 request = google.auth.aio.transport.aiohttp.Request()
102 await credentials.refresh(request)
103
104 # Custom aiohttp Session Example:
105 session = session=aiohttp.ClientSession(auto_decompress=False)
106 request = google.auth.aio.transport.aiohttp.Request(session=session)
107 auth_sesion = google.auth.aio.transport.sessions.AsyncAuthorizedSession(auth_request=request)
108
109 Args:
110 session (aiohttp.ClientSession): An instance :class:`aiohttp.ClientSession` used
111 to make HTTP requests. If not specified, a session will be created.
112
113 .. automethod:: __call__
114 """
115
116 def __init__(self, session: aiohttp.ClientSession = None):
117 self._session = session
118 self._closed = False
119
120 async def __call__(
121 self,
122 url: str,
123 method: str = "GET",
124 body: Optional[bytes] = None,
125 headers: Optional[Mapping[str, str]] = None,
126 timeout: float = transport._DEFAULT_TIMEOUT_SECONDS,
127 **kwargs,
128 ) -> transport.Response:
129 """
130 Make an HTTP request using aiohttp.
131
132 Args:
133 url (str): The URL to be requested.
134 method (Optional[str]):
135 The HTTP method to use for the request. Defaults to 'GET'.
136 body (Optional[bytes]):
137 The payload or body in HTTP request.
138 headers (Optional[Mapping[str, str]]):
139 Request headers.
140 timeout (float): The number of seconds to wait for a
141 response from the server. If not specified or if None, the
142 requests default timeout will be used.
143 kwargs: Additional arguments passed through to the underlying
144 aiohttp :meth:`aiohttp.Session.request` method.
145
146 Returns:
147 google.auth.aio.transport.Response: The HTTP response.
148
149 Raises:
150 - google.auth.exceptions.TransportError: If the request fails or if the session is closed.
151 - google.auth.exceptions.TimeoutError: If the request times out.
152 """
153
154 try:
155 if self._closed:
156 raise exceptions.TransportError("session is closed.")
157
158 if not self._session:
159 self._session = aiohttp.ClientSession()
160
161 client_timeout = aiohttp.ClientTimeout(total=timeout)
162 _helpers.request_log(_LOGGER, method, url, body, headers)
163 response = await self._session.request(
164 method,
165 url,
166 data=body,
167 headers=headers,
168 timeout=client_timeout,
169 **kwargs,
170 )
171 await _helpers_async.response_log_async(_LOGGER, response)
172 return Response(response)
173
174 except aiohttp.ClientError as caught_exc:
175 client_exc = exceptions.TransportError(f"Failed to send request to {url}.")
176 raise client_exc from caught_exc
177
178 except asyncio.TimeoutError as caught_exc:
179 timeout_exc = exceptions.TimeoutError(
180 f"Request timed out after {timeout} seconds."
181 )
182 raise timeout_exc from caught_exc
183
184 async def close(self) -> None:
185 """
186 Close the underlying aiohttp session to release the acquired resources.
187 """
188 if not self._closed and self._session:
189 await self._session.close()
190 self._closed = True