Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/azure/core/pipeline/_base.py: 31%
81 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-07 06:33 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-07 06:33 +0000
1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26from __future__ import annotations
27import logging
28from typing import Generic, TypeVar, Union, Any, List, Dict, Optional, Iterable, ContextManager
29from azure.core.pipeline import (
30 PipelineRequest,
31 PipelineResponse,
32 PipelineContext,
33)
34from azure.core.pipeline.policies import HTTPPolicy, SansIOHTTPPolicy
35from ._tools import await_result as _await_result
36from .transport import HttpTransport
38HTTPResponseType = TypeVar("HTTPResponseType")
39HTTPRequestType = TypeVar("HTTPRequestType")
41_LOGGER = logging.getLogger(__name__)
44def cleanup_kwargs_for_transport(kwargs: Dict[str, str]) -> None:
45 """Remove kwargs that are not meant for the transport layer.
46 :param kwargs: The keyword arguments.
47 :type kwargs: dict
49 "insecure_domain_change" is used to indicate that a redirect
50 has occurred to a different domain. This tells the SensitiveHeaderCleanupPolicy
51 to clean up sensitive headers. We need to remove it before sending the request
52 to the transport layer. This code is needed to handle the case that the
53 SensitiveHeaderCleanupPolicy is not added into the pipeline and "insecure_domain_change" is not popped.
54 "enable_cae" is added to the `get_token` method of the `TokenCredential` protocol.
55 """
56 kwargs_to_remove = ["insecure_domain_change", "enable_cae"]
57 if not kwargs:
58 return
59 for key in kwargs_to_remove:
60 kwargs.pop(key, None)
63class _SansIOHTTPPolicyRunner(HTTPPolicy[HTTPRequestType, HTTPResponseType]):
64 """Sync implementation of the SansIO policy.
66 Modifies the request and sends to the next policy in the chain.
68 :param policy: A SansIO policy.
69 :type policy: ~azure.core.pipeline.policies.SansIOHTTPPolicy
70 """
72 def __init__(self, policy: SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]) -> None:
73 super(_SansIOHTTPPolicyRunner, self).__init__()
74 self._policy = policy
76 def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]:
77 """Modifies the request and sends to the next policy in the chain.
79 :param request: The PipelineRequest object.
80 :type request: ~azure.core.pipeline.PipelineRequest
81 :return: The PipelineResponse object.
82 :rtype: ~azure.core.pipeline.PipelineResponse
83 """
84 _await_result(self._policy.on_request, request)
85 try:
86 response = self.next.send(request)
87 except Exception: # pylint: disable=broad-except
88 _await_result(self._policy.on_exception, request)
89 raise
90 else:
91 _await_result(self._policy.on_response, request, response)
92 return response
95class _TransportRunner(HTTPPolicy[HTTPRequestType, HTTPResponseType]):
96 """Transport runner.
98 Uses specified HTTP transport type to send request and returns response.
100 :param sender: The Http Transport instance.
101 :type sender: ~azure.core.pipeline.transport.HttpTransport
102 """
104 def __init__(self, sender: HttpTransport[HTTPRequestType, HTTPResponseType]) -> None:
105 super(_TransportRunner, self).__init__()
106 self._sender = sender
108 def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]:
109 """HTTP transport send method.
111 :param request: The PipelineRequest object.
112 :type request: ~azure.core.pipeline.PipelineRequest
113 :return: The PipelineResponse object.
114 :rtype: ~azure.core.pipeline.PipelineResponse
115 """
116 cleanup_kwargs_for_transport(request.context.options)
117 return PipelineResponse(
118 request.http_request,
119 self._sender.send(request.http_request, **request.context.options),
120 context=request.context,
121 )
124class Pipeline(ContextManager["Pipeline"], Generic[HTTPRequestType, HTTPResponseType]):
125 """A pipeline implementation.
127 This is implemented as a context manager, that will activate the context
128 of the HTTP sender. The transport is the last node in the pipeline.
130 :param transport: The Http Transport instance
131 :type transport: ~azure.core.pipeline.transport.HttpTransport
132 :param list policies: List of configured policies.
134 .. admonition:: Example:
136 .. literalinclude:: ../samples/test_example_sync.py
137 :start-after: [START build_pipeline]
138 :end-before: [END build_pipeline]
139 :language: python
140 :dedent: 4
141 :caption: Builds the pipeline for synchronous transport.
142 """
144 def __init__(
145 self,
146 transport: HttpTransport[HTTPRequestType, HTTPResponseType],
147 policies: Optional[
148 Iterable[
149 Union[
150 HTTPPolicy[HTTPRequestType, HTTPResponseType], SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]
151 ]
152 ]
153 ] = None,
154 ) -> None:
155 self._impl_policies: List[HTTPPolicy[HTTPRequestType, HTTPResponseType]] = []
156 self._transport = transport
158 for policy in policies or []:
159 if isinstance(policy, SansIOHTTPPolicy):
160 self._impl_policies.append(_SansIOHTTPPolicyRunner(policy))
161 elif policy:
162 self._impl_policies.append(policy)
163 for index in range(len(self._impl_policies) - 1):
164 self._impl_policies[index].next = self._impl_policies[index + 1]
165 if self._impl_policies:
166 self._impl_policies[-1].next = _TransportRunner(self._transport)
168 def __enter__(self) -> Pipeline[HTTPRequestType, HTTPResponseType]:
169 self._transport.__enter__()
170 return self
172 def __exit__(self, *exc_details: Any) -> None: # pylint: disable=arguments-differ
173 self._transport.__exit__(*exc_details)
175 @staticmethod
176 def _prepare_multipart_mixed_request(request: HTTPRequestType) -> None:
177 """Will execute the multipart policies.
179 Does nothing if "set_multipart_mixed" was never called.
181 :param request: The request object.
182 :type request: ~azure.core.rest.HttpRequest
183 """
184 multipart_mixed_info = request.multipart_mixed_info # type: ignore
185 if not multipart_mixed_info:
186 return
188 requests: List[HTTPRequestType] = multipart_mixed_info[0]
189 policies: List[SansIOHTTPPolicy] = multipart_mixed_info[1]
190 pipeline_options: Dict[str, Any] = multipart_mixed_info[3]
192 # Apply on_requests concurrently to all requests
193 import concurrent.futures
195 def prepare_requests(req):
196 if req.multipart_mixed_info:
197 # Recursively update changeset "sub requests"
198 Pipeline._prepare_multipart_mixed_request(req)
199 context = PipelineContext(None, **pipeline_options)
200 pipeline_request = PipelineRequest(req, context)
201 for policy in policies:
202 _await_result(policy.on_request, pipeline_request)
204 with concurrent.futures.ThreadPoolExecutor() as executor:
205 # List comprehension to raise exceptions if happened
206 [ # pylint: disable=expression-not-assigned, unnecessary-comprehension
207 _ for _ in executor.map(prepare_requests, requests)
208 ]
210 def _prepare_multipart(self, request: HTTPRequestType) -> None:
211 # This code is fine as long as HTTPRequestType is actually
212 # azure.core.pipeline.transport.HTTPRequest, bu we don't check it in here
213 # since we didn't see (yet) pipeline usage where it's not this actual instance
214 # class used
215 self._prepare_multipart_mixed_request(request)
216 request.prepare_multipart_body() # type: ignore
218 def run(self, request: HTTPRequestType, **kwargs: Any) -> PipelineResponse[HTTPRequestType, HTTPResponseType]:
219 """Runs the HTTP Request through the chained policies.
221 :param request: The HTTP request object.
222 :type request: ~azure.core.pipeline.transport.HttpRequest
223 :return: The PipelineResponse object
224 :rtype: ~azure.core.pipeline.PipelineResponse
225 """
226 self._prepare_multipart(request)
227 context = PipelineContext(self._transport, **kwargs)
228 pipeline_request: PipelineRequest[HTTPRequestType] = PipelineRequest(request, context)
229 first_node = self._impl_policies[0] if self._impl_policies else _TransportRunner(self._transport)
230 return first_node.send(pipeline_request)