Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/rest/_helpers.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

171 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26from __future__ import annotations 

27import copy 

28import codecs 

29import email.message 

30from json import dumps 

31from typing import ( 

32 Optional, 

33 Union, 

34 Mapping, 

35 Sequence, 

36 Tuple, 

37 IO, 

38 Any, 

39 Iterable, 

40 MutableMapping, 

41 AsyncIterable, 

42 cast, 

43 Dict, 

44 TYPE_CHECKING, 

45) 

46import xml.etree.ElementTree as ET 

47from urllib.parse import urlparse 

48from azure.core.serialization import AzureJSONEncoder 

49from ..utils._pipeline_transport_rest_shared import ( 

50 _format_parameters_helper, 

51 _pad_attr_name, 

52 _prepare_multipart_body_helper, 

53 _serialize_request, 

54 _format_data_helper, 

55 get_file_items, 

56) 

57 

58if TYPE_CHECKING: 

59 # This avoid a circular import 

60 from ._rest_py3 import HttpRequest 

61 

62################################### TYPES SECTION ######################### 

63 

64binary_type = str 

65PrimitiveData = Optional[Union[str, int, float, bool]] 

66 

67ParamsType = Mapping[str, Union[PrimitiveData, Sequence[PrimitiveData]]] 

68 

69FileContent = Union[str, bytes, IO[str], IO[bytes]] 

70 

71FileType = Union[ 

72 # file (or bytes) 

73 FileContent, 

74 # (filename, file (or bytes)) 

75 Tuple[Optional[str], FileContent], 

76 # (filename, file (or bytes), content_type) 

77 Tuple[Optional[str], FileContent, Optional[str]], 

78] 

79 

80FilesType = Union[Mapping[str, FileType], Sequence[Tuple[str, FileType]]] 

81 

82ContentTypeBase = Union[str, bytes, Iterable[bytes]] 

83ContentType = Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]] 

84 

85DataType = Optional[Union[bytes, Dict[str, Union[str, int]]]] 

86 

87########################### HELPER SECTION ################################# 

88 

89 

90def _verify_data_object(name, value): 

91 if not isinstance(name, str): 

92 raise TypeError("Invalid type for data name. Expected str, got {}: {}".format(type(name), name)) 

93 if value is not None and not isinstance(value, (str, bytes, int, float)): 

94 raise TypeError("Invalid type for data value. Expected primitive type, got {}: {}".format(type(name), name)) 

95 

96 

97def set_urlencoded_body(data, has_files): 

98 body = {} 

99 default_headers = {} 

100 for f, d in data.items(): 

101 if not d: 

102 continue 

103 if isinstance(d, list): 

104 for item in d: 

105 _verify_data_object(f, item) 

106 else: 

107 _verify_data_object(f, d) 

108 body[f] = d 

109 if not has_files: 

110 # little hacky, but for files we don't send a content type with 

111 # boundary so requests / aiohttp etc deal with it 

112 default_headers["Content-Type"] = "application/x-www-form-urlencoded" 

113 return default_headers, body 

114 

115 

116def set_multipart_body(files: FilesType): 

117 formatted_files = [(f, _format_data_helper(d)) for f, d in get_file_items(files) if d is not None] 

118 return {}, dict(formatted_files) if isinstance(files, Mapping) else formatted_files 

119 

120 

121def set_xml_body(content): 

122 headers = {} 

123 bytes_content = ET.tostring(content, encoding="utf8") 

124 body = bytes_content.replace(b"encoding='utf8'", b"encoding='utf-8'") 

125 if body: 

126 headers["Content-Length"] = str(len(body)) 

127 return headers, body 

128 

129 

130def set_content_body( 

131 content: Any, 

132) -> Tuple[MutableMapping[str, str], Optional[ContentTypeBase]]: 

133 headers: MutableMapping[str, str] = {} 

134 

135 if isinstance(content, ET.Element): 

136 # XML body 

137 return set_xml_body(content) 

138 if isinstance(content, (str, bytes)): 

139 headers = {} 

140 body = content 

141 if isinstance(content, str): 

142 headers["Content-Type"] = "text/plain" 

143 if body: 

144 headers["Content-Length"] = str(len(body)) 

145 return headers, body 

146 if any(hasattr(content, attr) for attr in ["read", "__iter__", "__aiter__"]): 

147 return headers, content 

148 raise TypeError( 

149 "Unexpected type for 'content': '{}'. ".format(type(content)) 

150 + "We expect 'content' to either be str, bytes, a open file-like object or an iterable/asynciterable." 

151 ) 

152 

153 

154def set_json_body(json: Any) -> Tuple[Dict[str, str], Any]: 

155 headers = {"Content-Type": "application/json"} 

156 if hasattr(json, "read"): 

157 content_headers, body = set_content_body(json) 

158 headers.update(content_headers) 

159 else: 

160 body = dumps(json, cls=AzureJSONEncoder) 

161 headers.update({"Content-Length": str(len(body))}) 

162 return headers, body 

163 

164 

165def lookup_encoding(encoding: str) -> bool: 

166 # including check for whether encoding is known taken from httpx 

167 try: 

168 codecs.lookup(encoding) 

169 return True 

170 except LookupError: 

171 return False 

172 

173 

174def get_charset_encoding(response) -> Optional[str]: 

175 content_type = response.headers.get("Content-Type") 

176 

177 if not content_type: 

178 return None 

179 # https://peps.python.org/pep-0594/#cgi 

180 m = email.message.Message() 

181 m["content-type"] = content_type 

182 encoding = cast(str, m.get_param("charset")) # -> utf-8 

183 if encoding is None or not lookup_encoding(encoding): 

184 return None 

185 return encoding 

186 

187 

188def decode_to_text(encoding: Optional[str], content: bytes) -> str: 

189 if not content: 

190 return "" 

191 if encoding == "utf-8": 

192 encoding = "utf-8-sig" 

193 if encoding: 

194 return content.decode(encoding) 

195 return codecs.getincrementaldecoder("utf-8-sig")(errors="replace").decode(content) 

196 

197 

198class HttpRequestBackcompatMixin: 

199 def __getattr__(self, attr: str) -> Any: 

200 backcompat_attrs = [ 

201 "files", 

202 "data", 

203 "multipart_mixed_info", 

204 "query", 

205 "body", 

206 "format_parameters", 

207 "set_streamed_data_body", 

208 "set_text_body", 

209 "set_xml_body", 

210 "set_json_body", 

211 "set_formdata_body", 

212 "set_bytes_body", 

213 "set_multipart_mixed", 

214 "prepare_multipart_body", 

215 "serialize", 

216 ] 

217 attr = _pad_attr_name(attr, backcompat_attrs) 

218 return self.__getattribute__(attr) 

219 

220 def __setattr__(self, attr: str, value: Any) -> None: 

221 backcompat_attrs = [ 

222 "multipart_mixed_info", 

223 "files", 

224 "data", 

225 "body", 

226 ] 

227 attr = _pad_attr_name(attr, backcompat_attrs) 

228 super(HttpRequestBackcompatMixin, self).__setattr__(attr, value) 

229 

230 @property 

231 def _multipart_mixed_info( 

232 self, 

233 ) -> Optional[Tuple[Sequence[Any], Sequence[Any], str, Dict[str, Any]]]: 

234 """DEPRECATED: Information used to make multipart mixed requests. 

235 This is deprecated and will be removed in a later release. 

236 

237 :rtype: tuple 

238 :return: (requests, policies, boundary, kwargs) 

239 """ 

240 try: 

241 return self._multipart_mixed_info_val 

242 except AttributeError: 

243 return None 

244 

245 @_multipart_mixed_info.setter 

246 def _multipart_mixed_info(self, val: Optional[Tuple[Sequence[Any], Sequence[Any], str, Dict[str, Any]]]): 

247 """DEPRECATED: Set information to make multipart mixed requests. 

248 This is deprecated and will be removed in a later release. 

249 

250 :param tuple val: (requests, policies, boundary, kwargs) 

251 """ 

252 self._multipart_mixed_info_val = val 

253 

254 @property 

255 def _query(self) -> Dict[str, Any]: 

256 """DEPRECATED: Query parameters passed in by user 

257 This is deprecated and will be removed in a later release. 

258 

259 :rtype: dict 

260 :return: Query parameters 

261 """ 

262 query = urlparse(self.url).query 

263 if query: 

264 return {p[0]: p[-1] for p in [p.partition("=") for p in query.split("&")]} 

265 return {} 

266 

267 @property 

268 def _body(self) -> DataType: 

269 """DEPRECATED: Body of the request. You should use the `content` property instead 

270 This is deprecated and will be removed in a later release. 

271 

272 :rtype: bytes 

273 :return: Body of the request 

274 """ 

275 return self._data 

276 

277 @_body.setter 

278 def _body(self, val: DataType) -> None: 

279 """DEPRECATED: Set the body of the request 

280 This is deprecated and will be removed in a later release. 

281 

282 :param bytes val: Body of the request 

283 """ 

284 self._data = val 

285 

286 def _format_parameters(self, params: MutableMapping[str, str]) -> None: 

287 """DEPRECATED: Format the query parameters 

288 This is deprecated and will be removed in a later release. 

289 You should pass the query parameters through the kwarg `params` 

290 instead. 

291 

292 :param dict params: Query parameters 

293 """ 

294 _format_parameters_helper(self, params) 

295 

296 def _set_streamed_data_body(self, data): 

297 """DEPRECATED: Set the streamed request body. 

298 This is deprecated and will be removed in a later release. 

299 You should pass your stream content through the `content` kwarg instead 

300 

301 :param data: Streamed data 

302 :type data: bytes or iterable 

303 """ 

304 if not isinstance(data, binary_type) and not any( 

305 hasattr(data, attr) for attr in ["read", "__iter__", "__aiter__"] 

306 ): 

307 raise TypeError("A streamable data source must be an open file-like object or iterable.") 

308 headers = self._set_body(content=data) 

309 self._files = None 

310 self.headers.update(headers) 

311 

312 def _set_text_body(self, data): 

313 """DEPRECATED: Set the text body 

314 This is deprecated and will be removed in a later release. 

315 You should pass your text content through the `content` kwarg instead 

316 

317 :param str data: Text data 

318 """ 

319 headers = self._set_body(content=data) 

320 self.headers.update(headers) 

321 self._files = None 

322 

323 def _set_xml_body(self, data): 

324 """DEPRECATED: Set the xml body. 

325 This is deprecated and will be removed in a later release. 

326 You should pass your xml content through the `content` kwarg instead 

327 

328 :param data: XML data 

329 :type data: xml.etree.ElementTree.Element 

330 """ 

331 headers = self._set_body(content=data) 

332 self.headers.update(headers) 

333 self._files = None 

334 

335 def _set_json_body(self, data): 

336 """DEPRECATED: Set the json request body. 

337 This is deprecated and will be removed in a later release. 

338 You should pass your json content through the `json` kwarg instead 

339 

340 :param data: JSON data 

341 :type data: dict 

342 """ 

343 headers = self._set_body(json=data) 

344 self.headers.update(headers) 

345 self._files = None 

346 

347 def _set_formdata_body(self, data=None): 

348 """DEPRECATED: Set the formrequest body. 

349 This is deprecated and will be removed in a later release. 

350 You should pass your stream content through the `files` kwarg instead 

351 

352 :param data: Form data 

353 :type data: dict 

354 """ 

355 if data is None: 

356 data = {} 

357 content_type = self.headers.pop("Content-Type", None) if self.headers else None 

358 

359 if content_type and content_type.lower() == "application/x-www-form-urlencoded": 

360 headers = self._set_body(data=data) 

361 self._files = None 

362 else: # Assume "multipart/form-data" 

363 headers = self._set_body(files=data) 

364 self._data = None 

365 self.headers.update(headers) 

366 

367 def _set_bytes_body(self, data): 

368 """DEPRECATED: Set the bytes request body. 

369 This is deprecated and will be removed in a later release. 

370 You should pass your bytes content through the `content` kwarg instead 

371 

372 :param bytes data: Bytes data 

373 """ 

374 headers = self._set_body(content=data) 

375 # we don't want default Content-Type 

376 # in 2.7, byte strings are still strings, so they get set with text/plain content type 

377 

378 headers.pop("Content-Type", None) 

379 self.headers.update(headers) 

380 self._files = None 

381 

382 def _set_multipart_mixed(self, *requests: HttpRequest, **kwargs: Any) -> None: 

383 """DEPRECATED: Set the multipart mixed info. 

384 This is deprecated and will be removed in a later release. 

385 

386 :param requests: Requests to be sent in the multipart request 

387 :type requests: list[HttpRequest] 

388 """ 

389 self.multipart_mixed_info: Tuple[Sequence[HttpRequest], Sequence[Any], str, Dict[str, Any]] = ( 

390 requests, 

391 kwargs.pop("policies", []), 

392 kwargs.pop("boundary", None), 

393 kwargs, 

394 ) 

395 

396 def _prepare_multipart_body(self, content_index=0): 

397 """DEPRECATED: Prepare your request body for multipart requests. 

398 This is deprecated and will be removed in a later release. 

399 

400 :param int content_index: The index of the request to be sent in the multipart request 

401 :returns: The updated index after all parts in this request have been added. 

402 :rtype: int 

403 """ 

404 return _prepare_multipart_body_helper(self, content_index) 

405 

406 def _serialize(self): 

407 """DEPRECATED: Serialize this request using application/http spec. 

408 This is deprecated and will be removed in a later release. 

409 

410 :rtype: bytes 

411 :return: The serialized request 

412 """ 

413 return _serialize_request(self) 

414 

415 def _add_backcompat_properties(self, request, memo): 

416 """While deepcopying, we also need to add the private backcompat attrs. 

417 

418 :param HttpRequest request: The request to copy from 

419 :param dict memo: The memo dict used by deepcopy 

420 """ 

421 request._multipart_mixed_info = copy.deepcopy( # pylint: disable=protected-access 

422 self._multipart_mixed_info, memo 

423 )