Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/azure/core/pipeline/_base.py: 31%

81 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-07 06:33 +0000

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26from __future__ import annotations 

27import logging 

28from typing import Generic, TypeVar, Union, Any, List, Dict, Optional, Iterable, ContextManager 

29from azure.core.pipeline import ( 

30 PipelineRequest, 

31 PipelineResponse, 

32 PipelineContext, 

33) 

34from azure.core.pipeline.policies import HTTPPolicy, SansIOHTTPPolicy 

35from ._tools import await_result as _await_result 

36from .transport import HttpTransport 

37 

38HTTPResponseType = TypeVar("HTTPResponseType") 

39HTTPRequestType = TypeVar("HTTPRequestType") 

40 

41_LOGGER = logging.getLogger(__name__) 

42 

43 

44def cleanup_kwargs_for_transport(kwargs: Dict[str, str]) -> None: 

45 """Remove kwargs that are not meant for the transport layer. 

46 :param kwargs: The keyword arguments. 

47 :type kwargs: dict 

48 

49 "insecure_domain_change" is used to indicate that a redirect 

50 has occurred to a different domain. This tells the SensitiveHeaderCleanupPolicy 

51 to clean up sensitive headers. We need to remove it before sending the request 

52 to the transport layer. This code is needed to handle the case that the 

53 SensitiveHeaderCleanupPolicy is not added into the pipeline and "insecure_domain_change" is not popped. 

54 "enable_cae" is added to the `get_token` method of the `TokenCredential` protocol. 

55 """ 

56 kwargs_to_remove = ["insecure_domain_change", "enable_cae"] 

57 if not kwargs: 

58 return 

59 for key in kwargs_to_remove: 

60 kwargs.pop(key, None) 

61 

62 

63class _SansIOHTTPPolicyRunner(HTTPPolicy[HTTPRequestType, HTTPResponseType]): 

64 """Sync implementation of the SansIO policy. 

65 

66 Modifies the request and sends to the next policy in the chain. 

67 

68 :param policy: A SansIO policy. 

69 :type policy: ~azure.core.pipeline.policies.SansIOHTTPPolicy 

70 """ 

71 

72 def __init__(self, policy: SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]) -> None: 

73 super(_SansIOHTTPPolicyRunner, self).__init__() 

74 self._policy = policy 

75 

76 def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]: 

77 """Modifies the request and sends to the next policy in the chain. 

78 

79 :param request: The PipelineRequest object. 

80 :type request: ~azure.core.pipeline.PipelineRequest 

81 :return: The PipelineResponse object. 

82 :rtype: ~azure.core.pipeline.PipelineResponse 

83 """ 

84 _await_result(self._policy.on_request, request) 

85 try: 

86 response = self.next.send(request) 

87 except Exception: # pylint: disable=broad-except 

88 _await_result(self._policy.on_exception, request) 

89 raise 

90 else: 

91 _await_result(self._policy.on_response, request, response) 

92 return response 

93 

94 

95class _TransportRunner(HTTPPolicy[HTTPRequestType, HTTPResponseType]): 

96 """Transport runner. 

97 

98 Uses specified HTTP transport type to send request and returns response. 

99 

100 :param sender: The Http Transport instance. 

101 :type sender: ~azure.core.pipeline.transport.HttpTransport 

102 """ 

103 

104 def __init__(self, sender: HttpTransport[HTTPRequestType, HTTPResponseType]) -> None: 

105 super(_TransportRunner, self).__init__() 

106 self._sender = sender 

107 

108 def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]: 

109 """HTTP transport send method. 

110 

111 :param request: The PipelineRequest object. 

112 :type request: ~azure.core.pipeline.PipelineRequest 

113 :return: The PipelineResponse object. 

114 :rtype: ~azure.core.pipeline.PipelineResponse 

115 """ 

116 cleanup_kwargs_for_transport(request.context.options) 

117 return PipelineResponse( 

118 request.http_request, 

119 self._sender.send(request.http_request, **request.context.options), 

120 context=request.context, 

121 ) 

122 

123 

124class Pipeline(ContextManager["Pipeline"], Generic[HTTPRequestType, HTTPResponseType]): 

125 """A pipeline implementation. 

126 

127 This is implemented as a context manager, that will activate the context 

128 of the HTTP sender. The transport is the last node in the pipeline. 

129 

130 :param transport: The Http Transport instance 

131 :type transport: ~azure.core.pipeline.transport.HttpTransport 

132 :param list policies: List of configured policies. 

133 

134 .. admonition:: Example: 

135 

136 .. literalinclude:: ../samples/test_example_sync.py 

137 :start-after: [START build_pipeline] 

138 :end-before: [END build_pipeline] 

139 :language: python 

140 :dedent: 4 

141 :caption: Builds the pipeline for synchronous transport. 

142 """ 

143 

144 def __init__( 

145 self, 

146 transport: HttpTransport[HTTPRequestType, HTTPResponseType], 

147 policies: Optional[ 

148 Iterable[ 

149 Union[ 

150 HTTPPolicy[HTTPRequestType, HTTPResponseType], SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType] 

151 ] 

152 ] 

153 ] = None, 

154 ) -> None: 

155 self._impl_policies: List[HTTPPolicy[HTTPRequestType, HTTPResponseType]] = [] 

156 self._transport = transport 

157 

158 for policy in policies or []: 

159 if isinstance(policy, SansIOHTTPPolicy): 

160 self._impl_policies.append(_SansIOHTTPPolicyRunner(policy)) 

161 elif policy: 

162 self._impl_policies.append(policy) 

163 for index in range(len(self._impl_policies) - 1): 

164 self._impl_policies[index].next = self._impl_policies[index + 1] 

165 if self._impl_policies: 

166 self._impl_policies[-1].next = _TransportRunner(self._transport) 

167 

168 def __enter__(self) -> Pipeline[HTTPRequestType, HTTPResponseType]: 

169 self._transport.__enter__() 

170 return self 

171 

172 def __exit__(self, *exc_details: Any) -> None: # pylint: disable=arguments-differ 

173 self._transport.__exit__(*exc_details) 

174 

175 @staticmethod 

176 def _prepare_multipart_mixed_request(request: HTTPRequestType) -> None: 

177 """Will execute the multipart policies. 

178 

179 Does nothing if "set_multipart_mixed" was never called. 

180 

181 :param request: The request object. 

182 :type request: ~azure.core.rest.HttpRequest 

183 """ 

184 multipart_mixed_info = request.multipart_mixed_info # type: ignore 

185 if not multipart_mixed_info: 

186 return 

187 

188 requests: List[HTTPRequestType] = multipart_mixed_info[0] 

189 policies: List[SansIOHTTPPolicy] = multipart_mixed_info[1] 

190 pipeline_options: Dict[str, Any] = multipart_mixed_info[3] 

191 

192 # Apply on_requests concurrently to all requests 

193 import concurrent.futures 

194 

195 def prepare_requests(req): 

196 if req.multipart_mixed_info: 

197 # Recursively update changeset "sub requests" 

198 Pipeline._prepare_multipart_mixed_request(req) 

199 context = PipelineContext(None, **pipeline_options) 

200 pipeline_request = PipelineRequest(req, context) 

201 for policy in policies: 

202 _await_result(policy.on_request, pipeline_request) 

203 

204 with concurrent.futures.ThreadPoolExecutor() as executor: 

205 # List comprehension to raise exceptions if happened 

206 [ # pylint: disable=expression-not-assigned, unnecessary-comprehension 

207 _ for _ in executor.map(prepare_requests, requests) 

208 ] 

209 

210 def _prepare_multipart(self, request: HTTPRequestType) -> None: 

211 # This code is fine as long as HTTPRequestType is actually 

212 # azure.core.pipeline.transport.HTTPRequest, bu we don't check it in here 

213 # since we didn't see (yet) pipeline usage where it's not this actual instance 

214 # class used 

215 self._prepare_multipart_mixed_request(request) 

216 request.prepare_multipart_body() # type: ignore 

217 

218 def run(self, request: HTTPRequestType, **kwargs: Any) -> PipelineResponse[HTTPRequestType, HTTPResponseType]: 

219 """Runs the HTTP Request through the chained policies. 

220 

221 :param request: The HTTP request object. 

222 :type request: ~azure.core.pipeline.transport.HttpRequest 

223 :return: The PipelineResponse object 

224 :rtype: ~azure.core.pipeline.PipelineResponse 

225 """ 

226 self._prepare_multipart(request) 

227 context = PipelineContext(self._transport, **kwargs) 

228 pipeline_request: PipelineRequest[HTTPRequestType] = PipelineRequest(request, context) 

229 first_node = self._impl_policies[0] if self._impl_policies else _TransportRunner(self._transport) 

230 return first_node.send(pipeline_request)