Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/xmltodict.py: 59%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

328 statements  

1#!/usr/bin/env python 

2"Makes working with XML feel like you are working with JSON" 

3 

4from xml.parsers import expat 

5from xml.sax.saxutils import XMLGenerator, escape 

6from xml.sax.xmlreader import AttributesImpl 

7from io import StringIO 

8from inspect import isgenerator 

9 

10class ParsingInterrupted(Exception): 

11 pass 

12 

13 

14class _DictSAXHandler: 

15 def __init__( 

16 self, 

17 item_depth=0, 

18 item_callback=lambda *args: True, 

19 xml_attribs=True, 

20 attr_prefix="@", 

21 cdata_key="#text", 

22 force_cdata=False, 

23 cdata_separator="", 

24 postprocessor=None, 

25 dict_constructor=dict, 

26 strip_whitespace=True, 

27 namespace_separator=":", 

28 namespaces=None, 

29 force_list=None, 

30 comment_key="#comment", 

31 ): 

32 self.path = [] 

33 self.stack = [] 

34 self.data = [] 

35 self.item = None 

36 self.item_depth = item_depth 

37 self.xml_attribs = xml_attribs 

38 self.item_callback = item_callback 

39 self.attr_prefix = attr_prefix 

40 self.cdata_key = cdata_key 

41 self.force_cdata = force_cdata 

42 self.cdata_separator = cdata_separator 

43 self.postprocessor = postprocessor 

44 self.dict_constructor = dict_constructor 

45 self.strip_whitespace = strip_whitespace 

46 self.namespace_separator = namespace_separator 

47 self.namespaces = namespaces 

48 self.namespace_declarations = dict_constructor() 

49 self.force_list = force_list 

50 self.comment_key = comment_key 

51 

52 def _build_name(self, full_name): 

53 if self.namespaces is None: 

54 return full_name 

55 i = full_name.rfind(self.namespace_separator) 

56 if i == -1: 

57 return full_name 

58 namespace, name = full_name[:i], full_name[i+1:] 

59 try: 

60 short_namespace = self.namespaces[namespace] 

61 except KeyError: 

62 short_namespace = namespace 

63 if not short_namespace: 

64 return name 

65 else: 

66 return self.namespace_separator.join((short_namespace, name)) 

67 

68 def _attrs_to_dict(self, attrs): 

69 if isinstance(attrs, dict): 

70 return attrs 

71 return self.dict_constructor(zip(attrs[0::2], attrs[1::2])) 

72 

73 def startNamespaceDecl(self, prefix, uri): 

74 self.namespace_declarations[prefix or ''] = uri 

75 

76 def startElement(self, full_name, attrs): 

77 name = self._build_name(full_name) 

78 attrs = self._attrs_to_dict(attrs) 

79 if self.namespace_declarations: 

80 if not attrs: 

81 attrs = self.dict_constructor() 

82 attrs['xmlns'] = self.namespace_declarations 

83 self.namespace_declarations = self.dict_constructor() 

84 self.path.append((name, attrs or None)) 

85 if len(self.path) >= self.item_depth: 

86 self.stack.append((self.item, self.data)) 

87 if self.xml_attribs: 

88 attr_entries = [] 

89 for key, value in attrs.items(): 

90 key = self.attr_prefix+self._build_name(key) 

91 if self.postprocessor: 

92 entry = self.postprocessor(self.path, key, value) 

93 else: 

94 entry = (key, value) 

95 if entry: 

96 attr_entries.append(entry) 

97 attrs = self.dict_constructor(attr_entries) 

98 else: 

99 attrs = None 

100 self.item = attrs or None 

101 self.data = [] 

102 

103 def endElement(self, full_name): 

104 name = self._build_name(full_name) 

105 # If we just closed an item at the streaming depth, emit it and drop it 

106 # without attaching it back to its parent. This avoids accumulating all 

107 # streamed items in memory when using item_depth > 0. 

108 if len(self.path) == self.item_depth: 

109 item = self.item 

110 if item is None: 

111 item = (None if not self.data 

112 else self.cdata_separator.join(self.data)) 

113 

114 should_continue = self.item_callback(self.path, item) 

115 if not should_continue: 

116 raise ParsingInterrupted 

117 # Reset state for the parent context without keeping a reference to 

118 # the emitted item. 

119 if self.stack: 

120 self.item, self.data = self.stack.pop() 

121 else: 

122 self.item = None 

123 self.data = [] 

124 self.path.pop() 

125 return 

126 if self.stack: 

127 data = (None if not self.data 

128 else self.cdata_separator.join(self.data)) 

129 item = self.item 

130 self.item, self.data = self.stack.pop() 

131 if self.strip_whitespace and data: 

132 data = data.strip() or None 

133 if data and self._should_force_cdata(name, data) and item is None: 

134 item = self.dict_constructor() 

135 if item is not None: 

136 if data: 

137 self.push_data(item, self.cdata_key, data) 

138 self.item = self.push_data(self.item, name, item) 

139 else: 

140 self.item = self.push_data(self.item, name, data) 

141 else: 

142 self.item = None 

143 self.data = [] 

144 self.path.pop() 

145 

146 def characters(self, data): 

147 if not self.data: 

148 self.data = [data] 

149 else: 

150 self.data.append(data) 

151 

152 def comments(self, data): 

153 if self.strip_whitespace: 

154 data = data.strip() 

155 self.item = self.push_data(self.item, self.comment_key, data) 

156 

157 def push_data(self, item, key, data): 

158 if self.postprocessor is not None: 

159 result = self.postprocessor(self.path, key, data) 

160 if result is None: 

161 return item 

162 key, data = result 

163 if item is None: 

164 item = self.dict_constructor() 

165 try: 

166 value = item[key] 

167 if isinstance(value, list): 

168 value.append(data) 

169 else: 

170 item[key] = [value, data] 

171 except KeyError: 

172 if self._should_force_list(key, data): 

173 item[key] = [data] 

174 else: 

175 item[key] = data 

176 return item 

177 

178 def _should_force_list(self, key, value): 

179 if not self.force_list: 

180 return False 

181 if isinstance(self.force_list, bool): 

182 return self.force_list 

183 try: 

184 return key in self.force_list 

185 except TypeError: 

186 return self.force_list(self.path[:-1], key, value) 

187 

188 def _should_force_cdata(self, key, value): 

189 if not self.force_cdata: 

190 return False 

191 if isinstance(self.force_cdata, bool): 

192 return self.force_cdata 

193 try: 

194 return key in self.force_cdata 

195 except TypeError: 

196 return self.force_cdata(self.path[:-1], key, value) 

197 

198 

199def parse(xml_input, encoding=None, expat=expat, process_namespaces=False, 

200 namespace_separator=':', disable_entities=True, process_comments=False, **kwargs): 

201 """Parse the given XML input and convert it into a dictionary. 

202 

203 `xml_input` can either be a `string`, a file-like object, or a generator of strings. 

204 

205 If `xml_attribs` is `True`, element attributes are put in the dictionary 

206 among regular child elements, using `@` as a prefix to avoid collisions. If 

207 set to `False`, they are just ignored. 

208 

209 Simple example:: 

210 

211 >>> import xmltodict 

212 >>> doc = xmltodict.parse(\"\"\" 

213 ... <a prop="x"> 

214 ... <b>1</b> 

215 ... <b>2</b> 

216 ... </a> 

217 ... \"\"\") 

218 >>> doc['a']['@prop'] 

219 'x' 

220 >>> doc['a']['b'] 

221 ['1', '2'] 

222 

223 If `item_depth` is `0`, the function returns a dictionary for the root 

224 element (default behavior). Otherwise, it calls `item_callback` every time 

225 an item at the specified depth is found and returns `None` in the end 

226 (streaming mode). 

227 

228 The callback function receives two parameters: the `path` from the document 

229 root to the item (name-attribs pairs), and the `item` (dict). If the 

230 callback's return value is false-ish, parsing will be stopped with the 

231 :class:`ParsingInterrupted` exception. 

232 

233 Streaming example:: 

234 

235 >>> def handle(path, item): 

236 ... print('path:%s item:%s' % (path, item)) 

237 ... return True 

238 ... 

239 >>> xmltodict.parse(\"\"\" 

240 ... <a prop="x"> 

241 ... <b>1</b> 

242 ... <b>2</b> 

243 ... </a>\"\"\", item_depth=2, item_callback=handle) 

244 path:[('a', {'prop': 'x'}), ('b', None)] item:1 

245 path:[('a', {'prop': 'x'}), ('b', None)] item:2 

246 

247 The optional argument `postprocessor` is a function that takes `path`, 

248 `key` and `value` as positional arguments and returns a new `(key, value)` 

249 pair where both `key` and `value` may have changed. Usage example:: 

250 

251 >>> def postprocessor(path, key, value): 

252 ... try: 

253 ... return key + ':int', int(value) 

254 ... except (ValueError, TypeError): 

255 ... return key, value 

256 >>> xmltodict.parse('<a><b>1</b><b>2</b><b>x</b></a>', 

257 ... postprocessor=postprocessor) 

258 {'a': {'b:int': [1, 2], 'b': 'x'}} 

259 

260 You can pass an alternate version of `expat` (such as `defusedexpat`) by 

261 using the `expat` parameter. E.g: 

262 

263 >>> import defusedexpat 

264 >>> xmltodict.parse('<a>hello</a>', expat=defusedexpat.pyexpat) 

265 {'a': 'hello'} 

266 

267 You can use the force_list argument to force lists to be created even 

268 when there is only a single child of a given level of hierarchy. The 

269 force_list argument is a tuple of keys. If the key for a given level 

270 of hierarchy is in the force_list argument, that level of hierarchy 

271 will have a list as a child (even if there is only one sub-element). 

272 The index_keys operation takes precedence over this. This is applied 

273 after any user-supplied postprocessor has already run. 

274 

275 For example, given this input: 

276 <servers> 

277 <server> 

278 <name>host1</name> 

279 <os>Linux</os> 

280 <interfaces> 

281 <interface> 

282 <name>em0</name> 

283 <ip_address>10.0.0.1</ip_address> 

284 </interface> 

285 </interfaces> 

286 </server> 

287 </servers> 

288 

289 If called with force_list=('interface',), it will produce 

290 this dictionary: 

291 {'servers': 

292 {'server': 

293 {'name': 'host1', 

294 'os': 'Linux'}, 

295 'interfaces': 

296 {'interface': 

297 [ {'name': 'em0', 'ip_address': '10.0.0.1' } ] } } } 

298 

299 `force_list` can also be a callable that receives `path`, `key` and 

300 `value`. This is helpful in cases where the logic that decides whether 

301 a list should be forced is more complex. 

302 

303 

304 If `process_comments` is `True`, comments will be added using `comment_key` 

305 (default=`'#comment'`) to the tag that contains the comment. 

306 

307 For example, given this input: 

308 <a> 

309 <b> 

310 <!-- b comment --> 

311 <c> 

312 <!-- c comment --> 

313 1 

314 </c> 

315 <d>2</d> 

316 </b> 

317 </a> 

318 

319 If called with `process_comments=True`, it will produce 

320 this dictionary: 

321 'a': { 

322 'b': { 

323 '#comment': 'b comment', 

324 'c': { 

325 

326 '#comment': 'c comment', 

327 '#text': '1', 

328 }, 

329 'd': '2', 

330 }, 

331 } 

332 Comment text is subject to the `strip_whitespace` flag: when it is left 

333 at the default `True`, comments will have leading and trailing 

334 whitespace removed. Disable `strip_whitespace` to keep comment 

335 indentation or padding intact. 

336 """ 

337 handler = _DictSAXHandler(namespace_separator=namespace_separator, 

338 **kwargs) 

339 if isinstance(xml_input, str): 

340 encoding = encoding or 'utf-8' 

341 xml_input = xml_input.encode(encoding) 

342 if not process_namespaces: 

343 namespace_separator = None 

344 parser = expat.ParserCreate( 

345 encoding, 

346 namespace_separator 

347 ) 

348 parser.ordered_attributes = True 

349 parser.StartNamespaceDeclHandler = handler.startNamespaceDecl 

350 parser.StartElementHandler = handler.startElement 

351 parser.EndElementHandler = handler.endElement 

352 parser.CharacterDataHandler = handler.characters 

353 if process_comments: 

354 parser.CommentHandler = handler.comments 

355 parser.buffer_text = True 

356 if disable_entities: 

357 def _forbid_entities(*_args, **_kwargs): 

358 raise ValueError("entities are disabled") 

359 

360 parser.EntityDeclHandler = _forbid_entities 

361 if hasattr(xml_input, 'read'): 

362 parser.ParseFile(xml_input) 

363 elif isgenerator(xml_input): 

364 for chunk in xml_input: 

365 parser.Parse(chunk, False) 

366 parser.Parse(b'', True) 

367 else: 

368 parser.Parse(xml_input, True) 

369 return handler.item 

370 

371 

372def _convert_value_to_string(value): 

373 """Convert a value to its string representation for XML output. 

374 

375 Handles boolean values consistently by converting them to lowercase. 

376 """ 

377 if isinstance(value, (str, bytes)): 

378 return value 

379 if isinstance(value, bool): 

380 return "true" if value else "false" 

381 return str(value) 

382 

383 

384def _validate_name(value, kind): 

385 """Validate an element/attribute name for XML safety. 

386 

387 Raises ValueError with a specific reason when invalid. 

388 

389 kind: 'element' or 'attribute' (used in error messages) 

390 """ 

391 if not isinstance(value, str): 

392 raise ValueError(f"{kind} name must be a string") 

393 if value.startswith("?") or value.startswith("!"): 

394 raise ValueError(f'Invalid {kind} name: cannot start with "?" or "!"') 

395 if "<" in value or ">" in value: 

396 raise ValueError(f'Invalid {kind} name: "<" or ">" not allowed') 

397 if "/" in value: 

398 raise ValueError(f'Invalid {kind} name: "/" not allowed') 

399 if '"' in value or "'" in value: 

400 raise ValueError(f"Invalid {kind} name: quotes not allowed") 

401 if "=" in value: 

402 raise ValueError(f'Invalid {kind} name: "=" not allowed') 

403 if any(ch.isspace() for ch in value): 

404 raise ValueError(f"Invalid {kind} name: whitespace not allowed") 

405 

406 

407def _validate_comment(value): 

408 if isinstance(value, bytes): 

409 try: 

410 value = value.decode("utf-8") 

411 except UnicodeDecodeError as exc: 

412 raise ValueError("Comment text must be valid UTF-8") from exc 

413 if not isinstance(value, str): 

414 raise ValueError("Comment text must be a string") 

415 if "--" in value: 

416 raise ValueError("Comment text cannot contain '--'") 

417 if value.endswith("-"): 

418 raise ValueError("Comment text cannot end with '-'") 

419 return value 

420 

421 

422def _process_namespace(name, namespaces, ns_sep=':', attr_prefix='@'): 

423 if not isinstance(name, str): 

424 return name 

425 if not namespaces: 

426 return name 

427 try: 

428 ns, name = name.rsplit(ns_sep, 1) 

429 except ValueError: 

430 pass 

431 else: 

432 ns_res = namespaces.get(ns.strip(attr_prefix)) 

433 name = '{}{}{}{}'.format( 

434 attr_prefix if ns.startswith(attr_prefix) else '', 

435 ns_res, ns_sep, name) if ns_res else name 

436 return name 

437 

438 

439def _emit(key, value, content_handler, 

440 attr_prefix='@', 

441 cdata_key='#text', 

442 depth=0, 

443 preprocessor=None, 

444 pretty=False, 

445 newl='\n', 

446 indent='\t', 

447 namespace_separator=':', 

448 namespaces=None, 

449 full_document=True, 

450 expand_iter=None, 

451 comment_key='#comment'): 

452 if isinstance(key, str) and key == comment_key: 

453 comments_list = value if isinstance(value, list) else [value] 

454 if isinstance(indent, int): 

455 indent = " " * indent 

456 for comment_text in comments_list: 

457 if comment_text is None: 

458 continue 

459 comment_text = _convert_value_to_string(comment_text) 

460 if not comment_text: 

461 continue 

462 if pretty: 

463 content_handler.ignorableWhitespace(depth * indent) 

464 content_handler.comment(comment_text) 

465 if pretty: 

466 content_handler.ignorableWhitespace(newl) 

467 return 

468 

469 key = _process_namespace(key, namespaces, namespace_separator, attr_prefix) 

470 if preprocessor is not None: 

471 result = preprocessor(key, value) 

472 if result is None: 

473 return 

474 key, value = result 

475 # Minimal validation to avoid breaking out of tag context 

476 _validate_name(key, "element") 

477 if not hasattr(value, '__iter__') or isinstance(value, (str, dict)): 

478 value = [value] 

479 for index, v in enumerate(value): 

480 if full_document and depth == 0 and index > 0: 

481 raise ValueError('document with multiple roots') 

482 if v is None: 

483 v = {} 

484 elif not isinstance(v, (dict, str)): 

485 if expand_iter and hasattr(v, '__iter__'): 

486 v = {expand_iter: v} 

487 else: 

488 v = _convert_value_to_string(v) 

489 if isinstance(v, str): 

490 v = {cdata_key: v} 

491 cdata = None 

492 attrs = {} 

493 children = [] 

494 for ik, iv in v.items(): 

495 if ik == cdata_key: 

496 cdata = _convert_value_to_string(iv) 

497 continue 

498 if isinstance(ik, str) and ik.startswith(attr_prefix): 

499 ik = _process_namespace(ik, namespaces, namespace_separator, 

500 attr_prefix) 

501 if ik == '@xmlns' and isinstance(iv, dict): 

502 for k, v in iv.items(): 

503 _validate_name(k, "attribute") 

504 attr = 'xmlns{}'.format(f':{k}' if k else '') 

505 attrs[attr] = str(v) 

506 continue 

507 if not isinstance(iv, str): 

508 iv = str(iv) 

509 attr_name = ik[len(attr_prefix) :] 

510 _validate_name(attr_name, "attribute") 

511 attrs[attr_name] = iv 

512 continue 

513 if isinstance(iv, list) and not iv: 

514 continue # Skip empty lists to avoid creating empty child elements 

515 children.append((ik, iv)) 

516 if isinstance(indent, int): 

517 indent = ' ' * indent 

518 if pretty: 

519 content_handler.ignorableWhitespace(depth * indent) 

520 content_handler.startElement(key, AttributesImpl(attrs)) 

521 if pretty and children: 

522 content_handler.ignorableWhitespace(newl) 

523 for child_key, child_value in children: 

524 _emit(child_key, child_value, content_handler, 

525 attr_prefix, cdata_key, depth+1, preprocessor, 

526 pretty, newl, indent, namespaces=namespaces, 

527 namespace_separator=namespace_separator, 

528 expand_iter=expand_iter, comment_key=comment_key) 

529 if cdata is not None: 

530 content_handler.characters(cdata) 

531 if pretty and children: 

532 content_handler.ignorableWhitespace(depth * indent) 

533 content_handler.endElement(key) 

534 if pretty and depth: 

535 content_handler.ignorableWhitespace(newl) 

536 

537 

538class _XMLGenerator(XMLGenerator): 

539 def comment(self, text): 

540 text = _validate_comment(text) 

541 self._write(f"<!--{escape(text)}-->") 

542 

543 

544def unparse(input_dict, output=None, encoding='utf-8', full_document=True, 

545 short_empty_elements=False, comment_key='#comment', 

546 **kwargs): 

547 """Emit an XML document for the given `input_dict` (reverse of `parse`). 

548 

549 The resulting XML document is returned as a string, but if `output` (a 

550 file-like object) is specified, it is written there instead. 

551 

552 Dictionary keys prefixed with `attr_prefix` (default=`'@'`) are interpreted 

553 as XML node attributes, whereas keys equal to `cdata_key` 

554 (default=`'#text'`) are treated as character data. 

555 

556 Empty lists are omitted entirely: ``{"a": []}`` produces no ``<a>`` element. 

557 Provide a placeholder entry (for example ``{"a": [""]}``) when an explicit 

558 empty container element must be emitted. 

559 

560 The `pretty` parameter (default=`False`) enables pretty-printing. In this 

561 mode, lines are terminated with `'\n'` and indented with `'\t'`, but this 

562 can be customized with the `newl` and `indent` parameters. 

563 

564 """ 

565 must_return = False 

566 if output is None: 

567 output = StringIO() 

568 must_return = True 

569 if short_empty_elements: 

570 content_handler = _XMLGenerator(output, encoding, True) 

571 else: 

572 content_handler = _XMLGenerator(output, encoding) 

573 if full_document: 

574 content_handler.startDocument() 

575 seen_root = False 

576 for key, value in input_dict.items(): 

577 if key != comment_key and full_document and seen_root: 

578 raise ValueError("Document must have exactly one root.") 

579 _emit(key, value, content_handler, full_document=full_document, comment_key=comment_key, **kwargs) 

580 if key != comment_key: 

581 seen_root = True 

582 if full_document and not seen_root: 

583 raise ValueError("Document must have exactly one root.") 

584 if full_document: 

585 content_handler.endDocument() 

586 if must_return: 

587 value = output.getvalue() 

588 try: # pragma no cover 

589 value = value.decode(encoding) 

590 except AttributeError: # pragma no cover 

591 pass 

592 return value 

593 

594 

595if __name__ == '__main__': # pragma: no cover 

596 import marshal 

597 import sys 

598 

599 stdin = sys.stdin.buffer 

600 stdout = sys.stdout.buffer 

601 

602 (item_depth,) = sys.argv[1:] 

603 item_depth = int(item_depth) 

604 

605 def handle_item(path, item): 

606 marshal.dump((path, item), stdout) 

607 return True 

608 

609 try: 

610 root = parse(stdin, 

611 item_depth=item_depth, 

612 item_callback=handle_item, 

613 dict_constructor=dict) 

614 if item_depth == 0: 

615 handle_item([], root) 

616 except KeyboardInterrupt: 

617 pass