Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/azure/core/rest/_helpers.py: 31%

170 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-07 06:33 +0000

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26from __future__ import annotations 

27import copy 

28import codecs 

29import email.message 

30from json import dumps 

31from typing import ( 

32 Optional, 

33 Union, 

34 Mapping, 

35 Sequence, 

36 Tuple, 

37 IO, 

38 Any, 

39 Iterable, 

40 MutableMapping, 

41 AsyncIterable, 

42 cast, 

43 Dict, 

44 TYPE_CHECKING, 

45) 

46import xml.etree.ElementTree as ET 

47from urllib.parse import urlparse 

48from azure.core.serialization import AzureJSONEncoder 

49from ..utils._pipeline_transport_rest_shared import ( 

50 _format_parameters_helper, 

51 _pad_attr_name, 

52 _prepare_multipart_body_helper, 

53 _serialize_request, 

54 _format_data_helper, 

55) 

56 

57if TYPE_CHECKING: 

58 # This avoid a circular import 

59 from ._rest_py3 import HttpRequest 

60 

61################################### TYPES SECTION ######################### 

62 

63binary_type = str 

64PrimitiveData = Optional[Union[str, int, float, bool]] 

65 

66ParamsType = Mapping[str, Union[PrimitiveData, Sequence[PrimitiveData]]] 

67 

68FileContent = Union[str, bytes, IO[str], IO[bytes]] 

69FileType = Tuple[Optional[str], FileContent] 

70 

71FilesType = Union[Mapping[str, FileType], Sequence[Tuple[str, FileType]]] 

72 

73ContentTypeBase = Union[str, bytes, Iterable[bytes]] 

74ContentType = Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]] 

75 

76DataType = Optional[Union[bytes, Dict[str, Union[str, int]]]] 

77 

78########################### HELPER SECTION ################################# 

79 

80 

81def _verify_data_object(name, value): 

82 if not isinstance(name, str): 

83 raise TypeError("Invalid type for data name. Expected str, got {}: {}".format(type(name), name)) 

84 if value is not None and not isinstance(value, (str, bytes, int, float)): 

85 raise TypeError("Invalid type for data value. Expected primitive type, got {}: {}".format(type(name), name)) 

86 

87 

88def set_urlencoded_body(data, has_files): 

89 body = {} 

90 default_headers = {} 

91 for f, d in data.items(): 

92 if not d: 

93 continue 

94 if isinstance(d, list): 

95 for item in d: 

96 _verify_data_object(f, item) 

97 else: 

98 _verify_data_object(f, d) 

99 body[f] = d 

100 if not has_files: 

101 # little hacky, but for files we don't send a content type with 

102 # boundary so requests / aiohttp etc deal with it 

103 default_headers["Content-Type"] = "application/x-www-form-urlencoded" 

104 return default_headers, body 

105 

106 

107def set_multipart_body(files): 

108 formatted_files = {f: _format_data_helper(d) for f, d in files.items() if d is not None} 

109 return {}, formatted_files 

110 

111 

112def set_xml_body(content): 

113 headers = {} 

114 bytes_content = ET.tostring(content, encoding="utf8") 

115 body = bytes_content.replace(b"encoding='utf8'", b"encoding='utf-8'") 

116 if body: 

117 headers["Content-Length"] = str(len(body)) 

118 return headers, body 

119 

120 

121def set_content_body( 

122 content: Any, 

123) -> Tuple[MutableMapping[str, str], Optional[ContentTypeBase]]: 

124 headers: MutableMapping[str, str] = {} 

125 

126 if isinstance(content, ET.Element): 

127 # XML body 

128 return set_xml_body(content) 

129 if isinstance(content, (str, bytes)): 

130 headers = {} 

131 body = content 

132 if isinstance(content, str): 

133 headers["Content-Type"] = "text/plain" 

134 if body: 

135 headers["Content-Length"] = str(len(body)) 

136 return headers, body 

137 if any(hasattr(content, attr) for attr in ["read", "__iter__", "__aiter__"]): 

138 return headers, content 

139 raise TypeError( 

140 "Unexpected type for 'content': '{}'. ".format(type(content)) 

141 + "We expect 'content' to either be str, bytes, a open file-like object or an iterable/asynciterable." 

142 ) 

143 

144 

145def set_json_body(json: Any) -> Tuple[Dict[str, str], Any]: 

146 headers = {"Content-Type": "application/json"} 

147 if hasattr(json, "read"): 

148 content_headers, body = set_content_body(json) 

149 headers.update(content_headers) 

150 else: 

151 body = dumps(json, cls=AzureJSONEncoder) 

152 headers.update({"Content-Length": str(len(body))}) 

153 return headers, body 

154 

155 

156def lookup_encoding(encoding: str) -> bool: 

157 # including check for whether encoding is known taken from httpx 

158 try: 

159 codecs.lookup(encoding) 

160 return True 

161 except LookupError: 

162 return False 

163 

164 

165def get_charset_encoding(response) -> Optional[str]: 

166 content_type = response.headers.get("Content-Type") 

167 

168 if not content_type: 

169 return None 

170 # https://peps.python.org/pep-0594/#cgi 

171 m = email.message.Message() 

172 m["content-type"] = content_type 

173 encoding = cast(str, m.get_param("charset")) # -> utf-8 

174 if encoding is None or not lookup_encoding(encoding): 

175 return None 

176 return encoding 

177 

178 

179def decode_to_text(encoding: Optional[str], content: bytes) -> str: 

180 if not content: 

181 return "" 

182 if encoding == "utf-8": 

183 encoding = "utf-8-sig" 

184 if encoding: 

185 return content.decode(encoding) 

186 return codecs.getincrementaldecoder("utf-8-sig")(errors="replace").decode(content) 

187 

188 

189class HttpRequestBackcompatMixin: 

190 def __getattr__(self, attr: str) -> Any: 

191 backcompat_attrs = [ 

192 "files", 

193 "data", 

194 "multipart_mixed_info", 

195 "query", 

196 "body", 

197 "format_parameters", 

198 "set_streamed_data_body", 

199 "set_text_body", 

200 "set_xml_body", 

201 "set_json_body", 

202 "set_formdata_body", 

203 "set_bytes_body", 

204 "set_multipart_mixed", 

205 "prepare_multipart_body", 

206 "serialize", 

207 ] 

208 attr = _pad_attr_name(attr, backcompat_attrs) 

209 return self.__getattribute__(attr) 

210 

211 def __setattr__(self, attr: str, value: Any) -> None: 

212 backcompat_attrs = [ 

213 "multipart_mixed_info", 

214 "files", 

215 "data", 

216 "body", 

217 ] 

218 attr = _pad_attr_name(attr, backcompat_attrs) 

219 super(HttpRequestBackcompatMixin, self).__setattr__(attr, value) 

220 

221 @property 

222 def _multipart_mixed_info(self) -> Optional[Tuple[Sequence[Any], Sequence[Any], str, Dict[str, Any]]]: 

223 """DEPRECATED: Information used to make multipart mixed requests. 

224 This is deprecated and will be removed in a later release. 

225 

226 :rtype: tuple 

227 :return: (requests, policies, boundary, kwargs) 

228 """ 

229 try: 

230 return self._multipart_mixed_info_val 

231 except AttributeError: 

232 return None 

233 

234 @_multipart_mixed_info.setter 

235 def _multipart_mixed_info(self, val: Optional[Tuple[Sequence[Any], Sequence[Any], str, Dict[str, Any]]]): 

236 """DEPRECATED: Set information to make multipart mixed requests. 

237 This is deprecated and will be removed in a later release. 

238 

239 :param tuple val: (requests, policies, boundary, kwargs) 

240 """ 

241 self._multipart_mixed_info_val = val 

242 

243 @property 

244 def _query(self) -> Dict[str, Any]: 

245 """DEPRECATED: Query parameters passed in by user 

246 This is deprecated and will be removed in a later release. 

247 

248 :rtype: dict 

249 :return: Query parameters 

250 """ 

251 query = urlparse(self.url).query 

252 if query: 

253 return {p[0]: p[-1] for p in [p.partition("=") for p in query.split("&")]} 

254 return {} 

255 

256 @property 

257 def _body(self) -> DataType: 

258 """DEPRECATED: Body of the request. You should use the `content` property instead 

259 This is deprecated and will be removed in a later release. 

260 

261 :rtype: bytes 

262 :return: Body of the request 

263 """ 

264 return self._data 

265 

266 @_body.setter 

267 def _body(self, val: DataType) -> None: 

268 """DEPRECATED: Set the body of the request 

269 This is deprecated and will be removed in a later release. 

270 

271 :param bytes val: Body of the request 

272 """ 

273 self._data = val 

274 

275 def _format_parameters(self, params: MutableMapping[str, str]) -> None: 

276 """DEPRECATED: Format the query parameters 

277 This is deprecated and will be removed in a later release. 

278 You should pass the query parameters through the kwarg `params` 

279 instead. 

280 

281 :param dict params: Query parameters 

282 """ 

283 _format_parameters_helper(self, params) 

284 

285 def _set_streamed_data_body(self, data): 

286 """DEPRECATED: Set the streamed request body. 

287 This is deprecated and will be removed in a later release. 

288 You should pass your stream content through the `content` kwarg instead 

289 

290 :param data: Streamed data 

291 :type data: bytes or iterable 

292 """ 

293 if not isinstance(data, binary_type) and not any( 

294 hasattr(data, attr) for attr in ["read", "__iter__", "__aiter__"] 

295 ): 

296 raise TypeError("A streamable data source must be an open file-like object or iterable.") 

297 headers = self._set_body(content=data) 

298 self._files = None 

299 self.headers.update(headers) 

300 

301 def _set_text_body(self, data): 

302 """DEPRECATED: Set the text body 

303 This is deprecated and will be removed in a later release. 

304 You should pass your text content through the `content` kwarg instead 

305 

306 :param str data: Text data 

307 """ 

308 headers = self._set_body(content=data) 

309 self.headers.update(headers) 

310 self._files = None 

311 

312 def _set_xml_body(self, data): 

313 """DEPRECATED: Set the xml body. 

314 This is deprecated and will be removed in a later release. 

315 You should pass your xml content through the `content` kwarg instead 

316 

317 :param data: XML data 

318 :type data: xml.etree.ElementTree.Element 

319 """ 

320 headers = self._set_body(content=data) 

321 self.headers.update(headers) 

322 self._files = None 

323 

324 def _set_json_body(self, data): 

325 """DEPRECATED: Set the json request body. 

326 This is deprecated and will be removed in a later release. 

327 You should pass your json content through the `json` kwarg instead 

328 

329 :param data: JSON data 

330 :type data: dict 

331 """ 

332 headers = self._set_body(json=data) 

333 self.headers.update(headers) 

334 self._files = None 

335 

336 def _set_formdata_body(self, data=None): 

337 """DEPRECATED: Set the formrequest body. 

338 This is deprecated and will be removed in a later release. 

339 You should pass your stream content through the `files` kwarg instead 

340 

341 :param data: Form data 

342 :type data: dict 

343 """ 

344 if data is None: 

345 data = {} 

346 content_type = self.headers.pop("Content-Type", None) if self.headers else None 

347 

348 if content_type and content_type.lower() == "application/x-www-form-urlencoded": 

349 headers = self._set_body(data=data) 

350 self._files = None 

351 else: # Assume "multipart/form-data" 

352 headers = self._set_body(files=data) 

353 self._data = None 

354 self.headers.update(headers) 

355 

356 def _set_bytes_body(self, data): 

357 """DEPRECATED: Set the bytes request body. 

358 This is deprecated and will be removed in a later release. 

359 You should pass your bytes content through the `content` kwarg instead 

360 

361 :param bytes data: Bytes data 

362 """ 

363 headers = self._set_body(content=data) 

364 # we don't want default Content-Type 

365 # in 2.7, byte strings are still strings, so they get set with text/plain content type 

366 

367 headers.pop("Content-Type", None) 

368 self.headers.update(headers) 

369 self._files = None 

370 

371 def _set_multipart_mixed(self, *requests: HttpRequest, **kwargs: Any) -> None: 

372 """DEPRECATED: Set the multipart mixed info. 

373 This is deprecated and will be removed in a later release. 

374 

375 :param requests: Requests to be sent in the multipart request 

376 :type requests: list[HttpRequest] 

377 """ 

378 self.multipart_mixed_info: Tuple[Sequence[HttpRequest], Sequence[Any], str, Dict[str, Any]] = ( 

379 requests, 

380 kwargs.pop("policies", []), 

381 kwargs.pop("boundary", None), 

382 kwargs, 

383 ) 

384 

385 def _prepare_multipart_body(self, content_index=0): 

386 """DEPRECATED: Prepare your request body for multipart requests. 

387 This is deprecated and will be removed in a later release. 

388 

389 :param int content_index: The index of the request to be sent in the multipart request 

390 :returns: The updated index after all parts in this request have been added. 

391 :rtype: int 

392 """ 

393 return _prepare_multipart_body_helper(self, content_index) 

394 

395 def _serialize(self): 

396 """DEPRECATED: Serialize this request using application/http spec. 

397 This is deprecated and will be removed in a later release. 

398 

399 :rtype: bytes 

400 :return: The serialized request 

401 """ 

402 return _serialize_request(self) 

403 

404 def _add_backcompat_properties(self, request, memo): 

405 """While deepcopying, we also need to add the private backcompat attrs. 

406 

407 :param HttpRequest request: The request to copy from 

408 :param dict memo: The memo dict used by deepcopy 

409 """ 

410 request._multipart_mixed_info = copy.deepcopy( # pylint: disable=protected-access 

411 self._multipart_mixed_info, memo 

412 )