Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/pipeline/transport/_base_async.py: 68%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

40 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26from __future__ import annotations 

27import abc 

28from collections.abc import AsyncIterator 

29from typing import ( 

30 AsyncIterator as AsyncIteratorType, 

31 TypeVar, 

32 Generic, 

33 Any, 

34 AsyncContextManager, 

35 Optional, 

36 Type, 

37 TYPE_CHECKING, 

38) 

39from types import TracebackType 

40 

41from ._base import _HttpResponseBase, _HttpClientTransportResponse, HttpRequest 

42from ...utils._pipeline_transport_rest_shared_async import _PartGenerator 

43 

44 

45AsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType") 

46HTTPResponseType = TypeVar("HTTPResponseType") 

47HTTPRequestType = TypeVar("HTTPRequestType") 

48 

49if TYPE_CHECKING: 

50 # We need a transport to define a pipeline, this "if" avoid a circular import 

51 from .._base_async import AsyncPipeline 

52 

53 

54class _ResponseStopIteration(Exception): 

55 pass 

56 

57 

58def _iterate_response_content(iterator): 

59 """To avoid the following error from Python: 

60 > TypeError: StopIteration interacts badly with generators and cannot be raised into a Future 

61 

62 :param iterator: An iterator 

63 :type iterator: iterator 

64 :return: The next item in the iterator 

65 :rtype: any 

66 """ 

67 try: 

68 return next(iterator) 

69 except StopIteration: 

70 raise _ResponseStopIteration() # pylint: disable=raise-missing-from 

71 

72 

73class AsyncHttpResponse(_HttpResponseBase, AsyncContextManager["AsyncHttpResponse"]): 

74 """An AsyncHttpResponse ABC. 

75 

76 Allows for the asynchronous streaming of data from the response. 

77 """ 

78 

79 def stream_download( 

80 self, 

81 pipeline: AsyncPipeline[HttpRequest, "AsyncHttpResponse"], 

82 *, 

83 decompress: bool = True, 

84 **kwargs: Any, 

85 ) -> AsyncIteratorType[bytes]: 

86 """Generator for streaming response body data. 

87 

88 Should be implemented by sub-classes if streaming download 

89 is supported. Will return an asynchronous generator. 

90 

91 :param pipeline: The pipeline object 

92 :type pipeline: azure.core.pipeline.Pipeline 

93 :keyword bool decompress: If True which is default, will attempt to decode the body based 

94 on the *content-encoding* header. 

95 :return: An async iterator of bytes 

96 :rtype: AsyncIterator[bytes] 

97 """ 

98 raise NotImplementedError("stream_download is not implemented.") 

99 

100 def parts(self) -> AsyncIterator["AsyncHttpResponse"]: 

101 """Assuming the content-type is multipart/mixed, will return the parts as an async iterator. 

102 

103 :return: An async iterator of the parts 

104 :rtype: AsyncIterator 

105 :raises ValueError: If the content is not multipart/mixed 

106 """ 

107 if not self.content_type or not self.content_type.startswith("multipart/mixed"): 

108 raise ValueError("You can't get parts if the response is not multipart/mixed") 

109 

110 return _PartGenerator(self, default_http_response_type=AsyncHttpClientTransportResponse) 

111 

112 async def __aexit__( 

113 self, 

114 exc_type: Optional[Type[BaseException]] = None, 

115 exc_value: Optional[BaseException] = None, 

116 traceback: Optional[TracebackType] = None, 

117 ) -> None: 

118 return None 

119 

120 

121class AsyncHttpClientTransportResponse( 

122 _HttpClientTransportResponse, AsyncHttpResponse 

123): # pylint: disable=abstract-method 

124 """Create a HTTPResponse from an http.client response. 

125 

126 Body will NOT be read by the constructor. Call "body()" to load the body in memory if necessary. 

127 

128 :param HttpRequest request: The request. 

129 :param httpclient_response: The object returned from an HTTP(S)Connection from http.client 

130 """ 

131 

132 

133class AsyncHttpTransport( 

134 AsyncContextManager["AsyncHttpTransport"], 

135 abc.ABC, 

136 Generic[HTTPRequestType, AsyncHTTPResponseType], 

137): 

138 """An http sender ABC.""" 

139 

140 @abc.abstractmethod 

141 async def send(self, request: HTTPRequestType, **kwargs: Any) -> AsyncHTTPResponseType: 

142 """Send the request using this HTTP sender. 

143 

144 :param request: The request object. Exact type can be inferred from the pipeline. 

145 :type request: any 

146 :return: The response object. Exact type can be inferred from the pipeline. 

147 :rtype: any 

148 """ 

149 

150 @abc.abstractmethod 

151 async def open(self) -> None: 

152 """Assign new session if one does not already exist.""" 

153 

154 @abc.abstractmethod 

155 async def close(self) -> None: 

156 """Close the session if it is not externally owned.""" 

157 

158 async def sleep(self, duration: float) -> None: 

159 """Sleep for the specified duration. 

160 

161 You should always ask the transport to sleep, and not call directly 

162 the stdlib. This is mostly important in async, as the transport 

163 may not use asyncio but other implementation like trio and they their own 

164 way to sleep, but to keep design 

165 consistent, it's cleaner to always ask the transport to sleep and let the transport 

166 implementor decide how to do it. 

167 By default, this method will use "asyncio", and don't need to be overridden 

168 if your transport does too. 

169 

170 :param float duration: The number of seconds to sleep. 

171 """ 

172 import asyncio # pylint: disable=do-not-import-asyncio 

173 

174 await asyncio.sleep(duration)