Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/azure/core/rest/_helpers.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

170 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26from __future__ import annotations 

27import copy 

28import codecs 

29import email.message 

30from json import dumps 

31from typing import ( 

32 Optional, 

33 Union, 

34 Mapping, 

35 Sequence, 

36 Tuple, 

37 IO, 

38 Any, 

39 Iterable, 

40 MutableMapping, 

41 AsyncIterable, 

42 cast, 

43 Dict, 

44 TYPE_CHECKING, 

45) 

46import xml.etree.ElementTree as ET 

47from urllib.parse import urlparse 

48from azure.core.serialization import AzureJSONEncoder 

49from ..utils._pipeline_transport_rest_shared import ( 

50 _format_parameters_helper, 

51 _pad_attr_name, 

52 _prepare_multipart_body_helper, 

53 _serialize_request, 

54 _format_data_helper, 

55 get_file_items, 

56) 

57 

58if TYPE_CHECKING: 

59 # This avoid a circular import 

60 from ._rest_py3 import HttpRequest 

61 

62################################### TYPES SECTION ######################### 

63 

64binary_type = str 

65PrimitiveData = Optional[Union[str, int, float, bool]] 

66 

67ParamsType = Mapping[str, Union[PrimitiveData, Sequence[PrimitiveData]]] 

68 

69FileContent = Union[str, bytes, IO[str], IO[bytes]] 

70 

71FileType = Union[ 

72 # file (or bytes) 

73 FileContent, 

74 # (filename, file (or bytes)) 

75 Tuple[Optional[str], FileContent], 

76 # (filename, file (or bytes), content_type) 

77 Tuple[Optional[str], FileContent, Optional[str]], 

78] 

79 

80FilesType = Union[Mapping[str, FileType], Sequence[Tuple[str, FileType]]] 

81 

82ContentTypeBase = Union[str, bytes, Iterable[bytes]] 

83ContentType = Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]] 

84 

85DataType = Optional[Union[bytes, Dict[str, Union[str, int]]]] 

86 

87########################### HELPER SECTION ################################# 

88 

89 

90def _verify_data_object(name, value): 

91 if not isinstance(name, str): 

92 raise TypeError("Invalid type for data name. Expected str, got {}: {}".format(type(name), name)) 

93 if value is not None and not isinstance(value, (str, bytes, int, float)): 

94 raise TypeError("Invalid type for data value. Expected primitive type, got {}: {}".format(type(name), name)) 

95 

96 

97def set_urlencoded_body(data, has_files): 

98 body = {} 

99 default_headers = {} 

100 for f, d in data.items(): 

101 if not d: 

102 continue 

103 if isinstance(d, list): 

104 for item in d: 

105 _verify_data_object(f, item) 

106 else: 

107 _verify_data_object(f, d) 

108 body[f] = d 

109 if not has_files: 

110 # little hacky, but for files we don't send a content type with 

111 # boundary so requests / aiohttp etc deal with it 

112 default_headers["Content-Type"] = "application/x-www-form-urlencoded" 

113 return default_headers, body 

114 

115 

116def set_multipart_body(files: FilesType): 

117 formatted_files = [(f, _format_data_helper(d)) for f, d in get_file_items(files) if d is not None] 

118 return {}, dict(formatted_files) if isinstance(files, Mapping) else formatted_files 

119 

120 

121def set_xml_body(content): 

122 headers = {} 

123 bytes_content = ET.tostring(content, encoding="utf8") 

124 body = bytes_content.replace(b"encoding='utf8'", b"encoding='utf-8'") 

125 if body: 

126 headers["Content-Length"] = str(len(body)) 

127 return headers, body 

128 

129 

130def set_content_body( 

131 content: Any, 

132) -> Tuple[MutableMapping[str, str], Optional[ContentTypeBase]]: 

133 headers: MutableMapping[str, str] = {} 

134 

135 if isinstance(content, ET.Element): 

136 # XML body 

137 return set_xml_body(content) 

138 if isinstance(content, (str, bytes)): 

139 headers = {} 

140 body = content 

141 if isinstance(content, str): 

142 headers["Content-Type"] = "text/plain" 

143 if body: 

144 headers["Content-Length"] = str(len(body)) 

145 return headers, body 

146 if any(hasattr(content, attr) for attr in ["read", "__iter__", "__aiter__"]): 

147 return headers, content 

148 raise TypeError( 

149 "Unexpected type for 'content': '{}'. ".format(type(content)) 

150 + "We expect 'content' to either be str, bytes, a open file-like object or an iterable/asynciterable." 

151 ) 

152 

153 

154def set_json_body(json: Any) -> Tuple[Dict[str, str], Any]: 

155 headers = {"Content-Type": "application/json"} 

156 if hasattr(json, "read"): 

157 content_headers, body = set_content_body(json) 

158 headers.update(content_headers) 

159 else: 

160 body = dumps(json, cls=AzureJSONEncoder) 

161 headers.update({"Content-Length": str(len(body))}) 

162 return headers, body 

163 

164 

165def lookup_encoding(encoding: str) -> bool: 

166 # including check for whether encoding is known taken from httpx 

167 try: 

168 codecs.lookup(encoding) 

169 return True 

170 except LookupError: 

171 return False 

172 

173 

174def get_charset_encoding(response) -> Optional[str]: 

175 content_type = response.headers.get("Content-Type") 

176 

177 if not content_type: 

178 return None 

179 # https://peps.python.org/pep-0594/#cgi 

180 m = email.message.Message() 

181 m["content-type"] = content_type 

182 encoding = cast(str, m.get_param("charset")) # -> utf-8 

183 if encoding is None or not lookup_encoding(encoding): 

184 return None 

185 return encoding 

186 

187 

188def decode_to_text(encoding: Optional[str], content: bytes) -> str: 

189 if not content: 

190 return "" 

191 if encoding == "utf-8": 

192 encoding = "utf-8-sig" 

193 if encoding: 

194 return content.decode(encoding) 

195 return codecs.getincrementaldecoder("utf-8-sig")(errors="replace").decode(content) 

196 

197 

198class HttpRequestBackcompatMixin: 

199 def __getattr__(self, attr: str) -> Any: 

200 backcompat_attrs = [ 

201 "files", 

202 "data", 

203 "multipart_mixed_info", 

204 "query", 

205 "body", 

206 "format_parameters", 

207 "set_streamed_data_body", 

208 "set_text_body", 

209 "set_xml_body", 

210 "set_json_body", 

211 "set_formdata_body", 

212 "set_bytes_body", 

213 "set_multipart_mixed", 

214 "prepare_multipart_body", 

215 "serialize", 

216 ] 

217 attr = _pad_attr_name(attr, backcompat_attrs) 

218 return self.__getattribute__(attr) 

219 

220 def __setattr__(self, attr: str, value: Any) -> None: 

221 backcompat_attrs = [ 

222 "multipart_mixed_info", 

223 "files", 

224 "data", 

225 "body", 

226 ] 

227 attr = _pad_attr_name(attr, backcompat_attrs) 

228 super(HttpRequestBackcompatMixin, self).__setattr__(attr, value) 

229 

230 @property 

231 def _multipart_mixed_info(self) -> Optional[Tuple[Sequence[Any], Sequence[Any], str, Dict[str, Any]]]: 

232 """DEPRECATED: Information used to make multipart mixed requests. 

233 This is deprecated and will be removed in a later release. 

234 

235 :rtype: tuple 

236 :return: (requests, policies, boundary, kwargs) 

237 """ 

238 try: 

239 return self._multipart_mixed_info_val 

240 except AttributeError: 

241 return None 

242 

243 @_multipart_mixed_info.setter 

244 def _multipart_mixed_info(self, val: Optional[Tuple[Sequence[Any], Sequence[Any], str, Dict[str, Any]]]): 

245 """DEPRECATED: Set information to make multipart mixed requests. 

246 This is deprecated and will be removed in a later release. 

247 

248 :param tuple val: (requests, policies, boundary, kwargs) 

249 """ 

250 self._multipart_mixed_info_val = val 

251 

252 @property 

253 def _query(self) -> Dict[str, Any]: 

254 """DEPRECATED: Query parameters passed in by user 

255 This is deprecated and will be removed in a later release. 

256 

257 :rtype: dict 

258 :return: Query parameters 

259 """ 

260 query = urlparse(self.url).query 

261 if query: 

262 return {p[0]: p[-1] for p in [p.partition("=") for p in query.split("&")]} 

263 return {} 

264 

265 @property 

266 def _body(self) -> DataType: 

267 """DEPRECATED: Body of the request. You should use the `content` property instead 

268 This is deprecated and will be removed in a later release. 

269 

270 :rtype: bytes 

271 :return: Body of the request 

272 """ 

273 return self._data 

274 

275 @_body.setter 

276 def _body(self, val: DataType) -> None: 

277 """DEPRECATED: Set the body of the request 

278 This is deprecated and will be removed in a later release. 

279 

280 :param bytes val: Body of the request 

281 """ 

282 self._data = val 

283 

284 def _format_parameters(self, params: MutableMapping[str, str]) -> None: 

285 """DEPRECATED: Format the query parameters 

286 This is deprecated and will be removed in a later release. 

287 You should pass the query parameters through the kwarg `params` 

288 instead. 

289 

290 :param dict params: Query parameters 

291 """ 

292 _format_parameters_helper(self, params) 

293 

294 def _set_streamed_data_body(self, data): 

295 """DEPRECATED: Set the streamed request body. 

296 This is deprecated and will be removed in a later release. 

297 You should pass your stream content through the `content` kwarg instead 

298 

299 :param data: Streamed data 

300 :type data: bytes or iterable 

301 """ 

302 if not isinstance(data, binary_type) and not any( 

303 hasattr(data, attr) for attr in ["read", "__iter__", "__aiter__"] 

304 ): 

305 raise TypeError("A streamable data source must be an open file-like object or iterable.") 

306 headers = self._set_body(content=data) 

307 self._files = None 

308 self.headers.update(headers) 

309 

310 def _set_text_body(self, data): 

311 """DEPRECATED: Set the text body 

312 This is deprecated and will be removed in a later release. 

313 You should pass your text content through the `content` kwarg instead 

314 

315 :param str data: Text data 

316 """ 

317 headers = self._set_body(content=data) 

318 self.headers.update(headers) 

319 self._files = None 

320 

321 def _set_xml_body(self, data): 

322 """DEPRECATED: Set the xml body. 

323 This is deprecated and will be removed in a later release. 

324 You should pass your xml content through the `content` kwarg instead 

325 

326 :param data: XML data 

327 :type data: xml.etree.ElementTree.Element 

328 """ 

329 headers = self._set_body(content=data) 

330 self.headers.update(headers) 

331 self._files = None 

332 

333 def _set_json_body(self, data): 

334 """DEPRECATED: Set the json request body. 

335 This is deprecated and will be removed in a later release. 

336 You should pass your json content through the `json` kwarg instead 

337 

338 :param data: JSON data 

339 :type data: dict 

340 """ 

341 headers = self._set_body(json=data) 

342 self.headers.update(headers) 

343 self._files = None 

344 

345 def _set_formdata_body(self, data=None): 

346 """DEPRECATED: Set the formrequest body. 

347 This is deprecated and will be removed in a later release. 

348 You should pass your stream content through the `files` kwarg instead 

349 

350 :param data: Form data 

351 :type data: dict 

352 """ 

353 if data is None: 

354 data = {} 

355 content_type = self.headers.pop("Content-Type", None) if self.headers else None 

356 

357 if content_type and content_type.lower() == "application/x-www-form-urlencoded": 

358 headers = self._set_body(data=data) 

359 self._files = None 

360 else: # Assume "multipart/form-data" 

361 headers = self._set_body(files=data) 

362 self._data = None 

363 self.headers.update(headers) 

364 

365 def _set_bytes_body(self, data): 

366 """DEPRECATED: Set the bytes request body. 

367 This is deprecated and will be removed in a later release. 

368 You should pass your bytes content through the `content` kwarg instead 

369 

370 :param bytes data: Bytes data 

371 """ 

372 headers = self._set_body(content=data) 

373 # we don't want default Content-Type 

374 # in 2.7, byte strings are still strings, so they get set with text/plain content type 

375 

376 headers.pop("Content-Type", None) 

377 self.headers.update(headers) 

378 self._files = None 

379 

380 def _set_multipart_mixed(self, *requests: HttpRequest, **kwargs: Any) -> None: 

381 """DEPRECATED: Set the multipart mixed info. 

382 This is deprecated and will be removed in a later release. 

383 

384 :param requests: Requests to be sent in the multipart request 

385 :type requests: list[HttpRequest] 

386 """ 

387 self.multipart_mixed_info: Tuple[Sequence[HttpRequest], Sequence[Any], str, Dict[str, Any]] = ( 

388 requests, 

389 kwargs.pop("policies", []), 

390 kwargs.pop("boundary", None), 

391 kwargs, 

392 ) 

393 

394 def _prepare_multipart_body(self, content_index=0): 

395 """DEPRECATED: Prepare your request body for multipart requests. 

396 This is deprecated and will be removed in a later release. 

397 

398 :param int content_index: The index of the request to be sent in the multipart request 

399 :returns: The updated index after all parts in this request have been added. 

400 :rtype: int 

401 """ 

402 return _prepare_multipart_body_helper(self, content_index) 

403 

404 def _serialize(self): 

405 """DEPRECATED: Serialize this request using application/http spec. 

406 This is deprecated and will be removed in a later release. 

407 

408 :rtype: bytes 

409 :return: The serialized request 

410 """ 

411 return _serialize_request(self) 

412 

413 def _add_backcompat_properties(self, request, memo): 

414 """While deepcopying, we also need to add the private backcompat attrs. 

415 

416 :param HttpRequest request: The request to copy from 

417 :param dict memo: The memo dict used by deepcopy 

418 """ 

419 request._multipart_mixed_info = copy.deepcopy( # pylint: disable=protected-access 

420 self._multipart_mixed_info, memo 

421 )