Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/aio/transport/aiohttp.py: 9%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

64 statements  

1# Copyright 2024 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Transport adapter for Asynchronous HTTP Requests based on aiohttp. 

16""" 

17 

18import asyncio 

19import logging 

20from typing import AsyncGenerator, Mapping, Optional 

21 

22try: 

23 import aiohttp # type: ignore 

24except ImportError as caught_exc: # pragma: NO COVER 

25 raise ImportError( 

26 "The aiohttp library is not installed from please install the aiohttp package to use the aiohttp transport." 

27 ) from caught_exc 

28 

29from google.auth import _helpers 

30from google.auth import exceptions 

31from google.auth.aio import _helpers as _helpers_async 

32from google.auth.aio import transport 

33 

34_LOGGER = logging.getLogger(__name__) 

35 

36 

37class Response(transport.Response): 

38 """ 

39 Represents an HTTP response and its data. It is returned by ``google.auth.aio.transport.sessions.AsyncAuthorizedSession``. 

40 

41 Args: 

42 response (aiohttp.ClientResponse): An instance of aiohttp.ClientResponse. 

43 

44 Attributes: 

45 status_code (int): The HTTP status code of the response. 

46 headers (Mapping[str, str]): The HTTP headers of the response. 

47 """ 

48 

49 def __init__(self, response: aiohttp.ClientResponse): 

50 self._response = response 

51 

52 @property 

53 @_helpers.copy_docstring(transport.Response) 

54 def status_code(self) -> int: 

55 return self._response.status 

56 

57 @property 

58 @_helpers.copy_docstring(transport.Response) 

59 def headers(self) -> Mapping[str, str]: 

60 return {key: value for key, value in self._response.headers.items()} 

61 

62 @_helpers.copy_docstring(transport.Response) 

63 async def content(self, chunk_size: int = 1024) -> AsyncGenerator[bytes, None]: 

64 try: 

65 async for chunk in self._response.content.iter_chunked( 

66 chunk_size 

67 ): # pragma: no branch 

68 yield chunk 

69 except aiohttp.ClientPayloadError as exc: 

70 raise exceptions.ResponseError( 

71 "Failed to read from the payload stream." 

72 ) from exc 

73 

74 @_helpers.copy_docstring(transport.Response) 

75 async def read(self) -> bytes: 

76 try: 

77 return await self._response.read() 

78 except aiohttp.ClientResponseError as exc: 

79 raise exceptions.ResponseError("Failed to read the response body.") from exc 

80 

81 @_helpers.copy_docstring(transport.Response) 

82 async def close(self): 

83 self._response.close() 

84 

85 

86class Request(transport.Request): 

87 """Asynchronous Requests request adapter. 

88 

89 This class is used internally for making requests using aiohttp 

90 in a consistent way. If you use :class:`google.auth.aio.transport.sessions.AsyncAuthorizedSession` 

91 you do not need to construct or use this class directly. 

92 

93 This class can be useful if you want to configure a Request callable 

94 with a custom ``aiohttp.ClientSession`` in :class:`AuthorizedSession` or if 

95 you want to manually refresh a :class:`~google.auth.aio.credentials.Credentials` instance:: 

96 

97 import aiohttp 

98 import google.auth.aio.transport.aiohttp 

99 

100 # Default example: 

101 request = google.auth.aio.transport.aiohttp.Request() 

102 await credentials.refresh(request) 

103 

104 # Custom aiohttp Session Example: 

105 session = session=aiohttp.ClientSession(auto_decompress=False) 

106 request = google.auth.aio.transport.aiohttp.Request(session=session) 

107 auth_sesion = google.auth.aio.transport.sessions.AsyncAuthorizedSession(auth_request=request) 

108 

109 Args: 

110 session (aiohttp.ClientSession): An instance :class:`aiohttp.ClientSession` used 

111 to make HTTP requests. If not specified, a session will be created. 

112 

113 .. automethod:: __call__ 

114 """ 

115 

116 def __init__(self, session: aiohttp.ClientSession = None): 

117 self._session = session 

118 self._closed = False 

119 

120 async def __call__( 

121 self, 

122 url: str, 

123 method: str = "GET", 

124 body: Optional[bytes] = None, 

125 headers: Optional[Mapping[str, str]] = None, 

126 timeout: float = transport._DEFAULT_TIMEOUT_SECONDS, 

127 **kwargs, 

128 ) -> transport.Response: 

129 """ 

130 Make an HTTP request using aiohttp. 

131 

132 Args: 

133 url (str): The URL to be requested. 

134 method (Optional[str]): 

135 The HTTP method to use for the request. Defaults to 'GET'. 

136 body (Optional[bytes]): 

137 The payload or body in HTTP request. 

138 headers (Optional[Mapping[str, str]]): 

139 Request headers. 

140 timeout (float): The number of seconds to wait for a 

141 response from the server. If not specified or if None, the 

142 requests default timeout will be used. 

143 kwargs: Additional arguments passed through to the underlying 

144 aiohttp :meth:`aiohttp.Session.request` method. 

145 

146 Returns: 

147 google.auth.aio.transport.Response: The HTTP response. 

148 

149 Raises: 

150 - google.auth.exceptions.TransportError: If the request fails or if the session is closed. 

151 - google.auth.exceptions.TimeoutError: If the request times out. 

152 """ 

153 

154 try: 

155 if self._closed: 

156 raise exceptions.TransportError("session is closed.") 

157 

158 if not self._session: 

159 self._session = aiohttp.ClientSession() 

160 

161 client_timeout = aiohttp.ClientTimeout(total=timeout) 

162 _helpers.request_log(_LOGGER, method, url, body, headers) 

163 response = await self._session.request( 

164 method, 

165 url, 

166 data=body, 

167 headers=headers, 

168 timeout=client_timeout, 

169 **kwargs, 

170 ) 

171 await _helpers_async.response_log_async(_LOGGER, response) 

172 return Response(response) 

173 

174 except aiohttp.ClientError as caught_exc: 

175 client_exc = exceptions.TransportError(f"Failed to send request to {url}.") 

176 raise client_exc from caught_exc 

177 

178 except asyncio.TimeoutError as caught_exc: 

179 timeout_exc = exceptions.TimeoutError( 

180 f"Request timed out after {timeout} seconds." 

181 ) 

182 raise timeout_exc from caught_exc 

183 

184 async def close(self) -> None: 

185 """ 

186 Close the underlying aiohttp session to release the acquired resources. 

187 """ 

188 if not self._closed and self._session: 

189 await self._session.close() 

190 self._closed = True