Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/_pipeline_client.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

69 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26from __future__ import annotations 

27import logging 

28from collections.abc import Iterable 

29from typing import TypeVar, Generic, Optional, Any 

30from .configuration import Configuration 

31from .pipeline import Pipeline 

32from .pipeline.transport._base import PipelineClientBase 

33from .pipeline.transport import HttpTransport 

34from .pipeline.policies import ( 

35 ContentDecodePolicy, 

36 DistributedTracingPolicy, 

37 HttpLoggingPolicy, 

38 RequestIdPolicy, 

39 RetryPolicy, 

40 SensitiveHeaderCleanupPolicy, 

41) 

42 

43HTTPResponseType = TypeVar("HTTPResponseType") 

44HTTPRequestType = TypeVar("HTTPRequestType") 

45 

46_LOGGER = logging.getLogger(__name__) 

47 

48 

49class PipelineClient(PipelineClientBase, Generic[HTTPRequestType, HTTPResponseType]): 

50 """Service client core methods. 

51 

52 Builds a Pipeline client. 

53 

54 :param str base_url: URL for the request. 

55 :keyword ~azure.core.configuration.Configuration config: If omitted, the standard configuration is used. 

56 :keyword Pipeline pipeline: If omitted, a Pipeline object is created and returned. 

57 :keyword list[HTTPPolicy] policies: If omitted, the standard policies of the configuration object is used. 

58 :keyword per_call_policies: If specified, the policies will be added into the policy list before RetryPolicy 

59 :paramtype per_call_policies: Union[HTTPPolicy, SansIOHTTPPolicy, list[HTTPPolicy], list[SansIOHTTPPolicy]] 

60 :keyword per_retry_policies: If specified, the policies will be added into the policy list after RetryPolicy 

61 :paramtype per_retry_policies: Union[HTTPPolicy, SansIOHTTPPolicy, list[HTTPPolicy], list[SansIOHTTPPolicy]] 

62 :keyword HttpTransport transport: If omitted, RequestsTransport is used for synchronous transport. 

63 :return: A pipeline object. 

64 :rtype: ~azure.core.pipeline.Pipeline 

65 

66 .. admonition:: Example: 

67 

68 .. literalinclude:: ../samples/test_example_sync.py 

69 :start-after: [START build_pipeline_client] 

70 :end-before: [END build_pipeline_client] 

71 :language: python 

72 :dedent: 4 

73 :caption: Builds the pipeline client. 

74 """ 

75 

76 def __init__( 

77 self, 

78 base_url: str, 

79 *, 

80 pipeline: Optional[Pipeline[HTTPRequestType, HTTPResponseType]] = None, 

81 config: Optional[Configuration[HTTPRequestType, HTTPResponseType]] = None, 

82 **kwargs: Any, 

83 ): 

84 super(PipelineClient, self).__init__(base_url) 

85 self._config: Configuration[HTTPRequestType, HTTPResponseType] = config or Configuration(**kwargs) 

86 self._base_url = base_url 

87 

88 self._pipeline = pipeline or self._build_pipeline(self._config, **kwargs) 

89 

90 def __enter__(self) -> PipelineClient[HTTPRequestType, HTTPResponseType]: 

91 self._pipeline.__enter__() 

92 return self 

93 

94 def __exit__(self, *exc_details: Any) -> None: 

95 self._pipeline.__exit__(*exc_details) 

96 

97 def close(self) -> None: 

98 self.__exit__() 

99 

100 def _build_pipeline( 

101 self, 

102 config: Configuration[HTTPRequestType, HTTPResponseType], 

103 *, 

104 transport: Optional[HttpTransport[HTTPRequestType, HTTPResponseType]] = None, 

105 policies=None, 

106 per_call_policies=None, 

107 per_retry_policies=None, 

108 **kwargs, 

109 ) -> Pipeline[HTTPRequestType, HTTPResponseType]: 

110 per_call_policies = per_call_policies or [] 

111 per_retry_policies = per_retry_policies or [] 

112 

113 if policies is None: # [] is a valid policy list 

114 policies = [ 

115 config.request_id_policy or RequestIdPolicy(**kwargs), 

116 config.headers_policy, 

117 config.user_agent_policy, 

118 config.proxy_policy, 

119 ContentDecodePolicy(**kwargs), 

120 ] 

121 if isinstance(per_call_policies, Iterable): 

122 policies.extend(per_call_policies) 

123 else: 

124 policies.append(per_call_policies) 

125 

126 policies.extend( 

127 [ 

128 config.redirect_policy, 

129 config.retry_policy, 

130 config.authentication_policy, 

131 config.custom_hook_policy, 

132 ] 

133 ) 

134 if isinstance(per_retry_policies, Iterable): 

135 policies.extend(per_retry_policies) 

136 else: 

137 policies.append(per_retry_policies) 

138 

139 policies.extend( 

140 [ 

141 config.logging_policy, 

142 DistributedTracingPolicy(**kwargs), 

143 (SensitiveHeaderCleanupPolicy(**kwargs) if config.redirect_policy else None), 

144 config.http_logging_policy or HttpLoggingPolicy(**kwargs), 

145 ] 

146 ) 

147 else: 

148 if isinstance(per_call_policies, Iterable): 

149 per_call_policies_list = list(per_call_policies) 

150 else: 

151 per_call_policies_list = [per_call_policies] 

152 per_call_policies_list.extend(policies) 

153 policies = per_call_policies_list 

154 

155 if isinstance(per_retry_policies, Iterable): 

156 per_retry_policies_list = list(per_retry_policies) 

157 else: 

158 per_retry_policies_list = [per_retry_policies] 

159 if len(per_retry_policies_list) > 0: 

160 index_of_retry = -1 

161 for index, policy in enumerate(policies): 

162 if isinstance(policy, RetryPolicy): 

163 index_of_retry = index 

164 if index_of_retry == -1: 

165 raise ValueError( 

166 "Failed to add per_retry_policies; no RetryPolicy found in the supplied list of policies. " 

167 ) 

168 policies_1 = policies[: index_of_retry + 1] 

169 policies_2 = policies[index_of_retry + 1 :] 

170 policies_1.extend(per_retry_policies_list) 

171 policies_1.extend(policies_2) 

172 policies = policies_1 

173 

174 if transport is None: 

175 # Use private import for better typing, mypy and pyright don't like PEP562 

176 from .pipeline.transport._requests_basic import RequestsTransport 

177 

178 transport = RequestsTransport(**kwargs) 

179 

180 return Pipeline(transport, policies) 

181 

182 def send_request(self, request: HTTPRequestType, *, stream: bool = False, **kwargs: Any) -> HTTPResponseType: 

183 """Method that runs the network request through the client's chained policies. 

184 

185 >>> from azure.core.rest import HttpRequest 

186 >>> request = HttpRequest('GET', 'http://www.example.com') 

187 <HttpRequest [GET], url: 'http://www.example.com'> 

188 >>> response = client.send_request(request) 

189 <HttpResponse: 200 OK> 

190 

191 :param request: The network request you want to make. Required. 

192 :type request: ~azure.core.rest.HttpRequest 

193 :keyword bool stream: Whether the response payload will be streamed. Defaults to False. 

194 :return: The response of your network call. Does not do error handling on your response. 

195 :rtype: ~azure.core.rest.HttpResponse 

196 """ 

197 return_pipeline_response = kwargs.pop("_return_pipeline_response", False) 

198 pipeline_response = self._pipeline.run(request, stream=stream, **kwargs) 

199 if return_pipeline_response: 

200 return pipeline_response # type: ignore # This is a private API we don't want to type in signature 

201 return pipeline_response.http_response