Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/xmltodict.py: 59%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

340 statements  

1#!/usr/bin/env python 

2"Makes working with XML feel like you are working with JSON" 

3 

4from xml.parsers import expat 

5from xml.sax.saxutils import XMLGenerator, escape 

6from xml.sax.xmlreader import AttributesImpl 

7from io import StringIO 

8from inspect import isgenerator 

9import codecs 

10 

11class ParsingInterrupted(Exception): 

12 pass 

13 

14 

15class _DictSAXHandler: 

16 def __init__( 

17 self, 

18 item_depth=0, 

19 item_callback=lambda *args: True, 

20 xml_attribs=True, 

21 attr_prefix="@", 

22 cdata_key="#text", 

23 force_cdata=False, 

24 cdata_separator="", 

25 postprocessor=None, 

26 dict_constructor=dict, 

27 strip_whitespace=True, 

28 namespace_separator=":", 

29 namespaces=None, 

30 force_list=None, 

31 comment_key="#comment", 

32 ): 

33 self.path = [] 

34 self.stack = [] 

35 self.data = [] 

36 self.item = None 

37 self.item_depth = item_depth 

38 self.xml_attribs = xml_attribs 

39 self.item_callback = item_callback 

40 self.attr_prefix = attr_prefix 

41 self.cdata_key = cdata_key 

42 self.force_cdata = force_cdata 

43 self.cdata_separator = cdata_separator 

44 self.postprocessor = postprocessor 

45 self.dict_constructor = dict_constructor 

46 self.strip_whitespace = strip_whitespace 

47 self.namespace_separator = namespace_separator 

48 self.namespaces = namespaces 

49 self.namespace_declarations = dict_constructor() 

50 self.force_list = force_list 

51 self.comment_key = comment_key 

52 

53 def _build_name(self, full_name): 

54 if self.namespaces is None: 

55 return full_name 

56 i = full_name.rfind(self.namespace_separator) 

57 if i == -1: 

58 return full_name 

59 namespace, name = full_name[:i], full_name[i+1:] 

60 try: 

61 short_namespace = self.namespaces[namespace] 

62 except KeyError: 

63 short_namespace = namespace 

64 if not short_namespace: 

65 return name 

66 else: 

67 return self.namespace_separator.join((short_namespace, name)) 

68 

69 def _attrs_to_dict(self, attrs): 

70 if isinstance(attrs, dict): 

71 return attrs 

72 return self.dict_constructor(zip(attrs[0::2], attrs[1::2])) 

73 

74 def startNamespaceDecl(self, prefix, uri): 

75 self.namespace_declarations[prefix or ''] = uri 

76 

77 def startElement(self, full_name, attrs): 

78 name = self._build_name(full_name) 

79 attrs = self._attrs_to_dict(attrs) 

80 if self.namespace_declarations: 

81 if not attrs: 

82 attrs = self.dict_constructor() 

83 attrs['xmlns'] = self.namespace_declarations 

84 self.namespace_declarations = self.dict_constructor() 

85 self.path.append((name, attrs or None)) 

86 if len(self.path) >= self.item_depth: 

87 self.stack.append((self.item, self.data)) 

88 if self.xml_attribs: 

89 attr_entries = [] 

90 for key, value in attrs.items(): 

91 key = self.attr_prefix+self._build_name(key) 

92 if self.postprocessor: 

93 entry = self.postprocessor(self.path, key, value) 

94 else: 

95 entry = (key, value) 

96 if entry: 

97 attr_entries.append(entry) 

98 attrs = self.dict_constructor(attr_entries) 

99 else: 

100 attrs = None 

101 self.item = attrs or None 

102 self.data = [] 

103 

104 def endElement(self, full_name): 

105 name = self._build_name(full_name) 

106 # If we just closed an item at the streaming depth, emit it and drop it 

107 # without attaching it back to its parent. This avoids accumulating all 

108 # streamed items in memory when using item_depth > 0. 

109 if len(self.path) == self.item_depth: 

110 item = self.item 

111 if item is None: 

112 item = (None if not self.data 

113 else self.cdata_separator.join(self.data)) 

114 

115 should_continue = self.item_callback(self.path, item) 

116 if not should_continue: 

117 raise ParsingInterrupted 

118 # Reset state for the parent context without keeping a reference to 

119 # the emitted item. 

120 if self.stack: 

121 self.item, self.data = self.stack.pop() 

122 else: 

123 self.item = None 

124 self.data = [] 

125 self.path.pop() 

126 return 

127 if self.stack: 

128 data = (None if not self.data 

129 else self.cdata_separator.join(self.data)) 

130 item = self.item 

131 self.item, self.data = self.stack.pop() 

132 if self.strip_whitespace and data: 

133 data = data.strip() or None 

134 if data and self._should_force_cdata(name, data) and item is None: 

135 item = self.dict_constructor() 

136 if item is not None: 

137 if data: 

138 self.push_data(item, self.cdata_key, data) 

139 self.item = self.push_data(self.item, name, item) 

140 else: 

141 self.item = self.push_data(self.item, name, data) 

142 else: 

143 self.item = None 

144 self.data = [] 

145 self.path.pop() 

146 

147 def characters(self, data): 

148 if not self.data: 

149 self.data = [data] 

150 else: 

151 self.data.append(data) 

152 

153 def comments(self, data): 

154 if self.strip_whitespace: 

155 data = data.strip() 

156 self.item = self.push_data(self.item, self.comment_key, data) 

157 

158 def push_data(self, item, key, data): 

159 if self.postprocessor is not None: 

160 result = self.postprocessor(self.path, key, data) 

161 if result is None: 

162 return item 

163 key, data = result 

164 if item is None: 

165 item = self.dict_constructor() 

166 try: 

167 value = item[key] 

168 if isinstance(value, list): 

169 value.append(data) 

170 else: 

171 item[key] = [value, data] 

172 except KeyError: 

173 if self._should_force_list(key, data): 

174 item[key] = [data] 

175 else: 

176 item[key] = data 

177 return item 

178 

179 def _should_force_list(self, key, value): 

180 if not self.force_list: 

181 return False 

182 if isinstance(self.force_list, bool): 

183 return self.force_list 

184 try: 

185 return key in self.force_list 

186 except TypeError: 

187 return self.force_list(self.path[:-1], key, value) 

188 

189 def _should_force_cdata(self, key, value): 

190 if not self.force_cdata: 

191 return False 

192 if isinstance(self.force_cdata, bool): 

193 return self.force_cdata 

194 try: 

195 return key in self.force_cdata 

196 except TypeError: 

197 return self.force_cdata(self.path[:-1], key, value) 

198 

199 

200def parse(xml_input, encoding=None, expat=expat, process_namespaces=False, 

201 namespace_separator=':', disable_entities=True, process_comments=False, **kwargs): 

202 """Parse the given XML input and convert it into a dictionary. 

203 

204 `xml_input` can either be a `string`, a file-like object, or a generator of strings. 

205 

206 If `xml_attribs` is `True`, element attributes are put in the dictionary 

207 among regular child elements, using `@` as a prefix to avoid collisions. If 

208 set to `False`, they are just ignored. 

209 

210 Simple example:: 

211 

212 >>> import xmltodict 

213 >>> doc = xmltodict.parse(\"\"\" 

214 ... <a prop="x"> 

215 ... <b>1</b> 

216 ... <b>2</b> 

217 ... </a> 

218 ... \"\"\") 

219 >>> doc['a']['@prop'] 

220 'x' 

221 >>> doc['a']['b'] 

222 ['1', '2'] 

223 

224 If `item_depth` is `0`, the function returns a dictionary for the root 

225 element (default behavior). Otherwise, it calls `item_callback` every time 

226 an item at the specified depth is found and returns `None` in the end 

227 (streaming mode). 

228 

229 The callback function receives two parameters: the `path` from the document 

230 root to the item (name-attribs pairs), and the `item` (dict). If the 

231 callback's return value is false-ish, parsing will be stopped with the 

232 :class:`ParsingInterrupted` exception. 

233 

234 Streaming example:: 

235 

236 >>> def handle(path, item): 

237 ... print('path:%s item:%s' % (path, item)) 

238 ... return True 

239 ... 

240 >>> xmltodict.parse(\"\"\" 

241 ... <a prop="x"> 

242 ... <b>1</b> 

243 ... <b>2</b> 

244 ... </a>\"\"\", item_depth=2, item_callback=handle) 

245 path:[('a', {'prop': 'x'}), ('b', None)] item:1 

246 path:[('a', {'prop': 'x'}), ('b', None)] item:2 

247 

248 The optional argument `postprocessor` is a function that takes `path`, 

249 `key` and `value` as positional arguments and returns a new `(key, value)` 

250 pair where both `key` and `value` may have changed. Usage example:: 

251 

252 >>> def postprocessor(path, key, value): 

253 ... try: 

254 ... return key + ':int', int(value) 

255 ... except (ValueError, TypeError): 

256 ... return key, value 

257 >>> xmltodict.parse('<a><b>1</b><b>2</b><b>x</b></a>', 

258 ... postprocessor=postprocessor) 

259 {'a': {'b:int': [1, 2], 'b': 'x'}} 

260 

261 You can pass an alternate version of `expat` (such as `defusedexpat`) by 

262 using the `expat` parameter. E.g: 

263 

264 >>> import defusedexpat 

265 >>> xmltodict.parse('<a>hello</a>', expat=defusedexpat.pyexpat) 

266 {'a': 'hello'} 

267 

268 You can use the force_list argument to force lists to be created even 

269 when there is only a single child of a given level of hierarchy. The 

270 force_list argument is a tuple of keys. If the key for a given level 

271 of hierarchy is in the force_list argument, that level of hierarchy 

272 will have a list as a child (even if there is only one sub-element). 

273 The index_keys operation takes precedence over this. This is applied 

274 after any user-supplied postprocessor has already run. 

275 

276 For example, given this input: 

277 <servers> 

278 <server> 

279 <name>host1</name> 

280 <os>Linux</os> 

281 <interfaces> 

282 <interface> 

283 <name>em0</name> 

284 <ip_address>10.0.0.1</ip_address> 

285 </interface> 

286 </interfaces> 

287 </server> 

288 </servers> 

289 

290 If called with force_list=('interface',), it will produce 

291 this dictionary: 

292 {'servers': 

293 {'server': 

294 {'name': 'host1', 

295 'os': 'Linux'}, 

296 'interfaces': 

297 {'interface': 

298 [ {'name': 'em0', 'ip_address': '10.0.0.1' } ] } } } 

299 

300 `force_list` can also be a callable that receives `path`, `key` and 

301 `value`. This is helpful in cases where the logic that decides whether 

302 a list should be forced is more complex. 

303 

304 

305 If `process_comments` is `True`, comments will be added using `comment_key` 

306 (default=`'#comment'`) to the tag that contains the comment. 

307 

308 For example, given this input: 

309 <a> 

310 <b> 

311 <!-- b comment --> 

312 <c> 

313 <!-- c comment --> 

314 1 

315 </c> 

316 <d>2</d> 

317 </b> 

318 </a> 

319 

320 If called with `process_comments=True`, it will produce 

321 this dictionary: 

322 'a': { 

323 'b': { 

324 '#comment': 'b comment', 

325 'c': { 

326 

327 '#comment': 'c comment', 

328 '#text': '1', 

329 }, 

330 'd': '2', 

331 }, 

332 } 

333 Comment text is subject to the `strip_whitespace` flag: when it is left 

334 at the default `True`, comments will have leading and trailing 

335 whitespace removed. Disable `strip_whitespace` to keep comment 

336 indentation or padding intact. 

337 """ 

338 handler = _DictSAXHandler(namespace_separator=namespace_separator, 

339 **kwargs) 

340 if isinstance(xml_input, str): 

341 encoding = encoding or 'utf-8' 

342 xml_input = xml_input.encode(encoding) 

343 if not process_namespaces: 

344 namespace_separator = None 

345 parser = expat.ParserCreate( 

346 encoding, 

347 namespace_separator 

348 ) 

349 parser.ordered_attributes = True 

350 parser.StartNamespaceDeclHandler = handler.startNamespaceDecl 

351 parser.StartElementHandler = handler.startElement 

352 parser.EndElementHandler = handler.endElement 

353 parser.CharacterDataHandler = handler.characters 

354 if process_comments: 

355 parser.CommentHandler = handler.comments 

356 parser.buffer_text = True 

357 if disable_entities: 

358 def _forbid_entities(*_args, **_kwargs): 

359 raise ValueError("entities are disabled") 

360 

361 parser.EntityDeclHandler = _forbid_entities 

362 if hasattr(xml_input, 'read'): 

363 parser.ParseFile(xml_input) 

364 elif isgenerator(xml_input): 

365 for chunk in xml_input: 

366 parser.Parse(chunk, False) 

367 parser.Parse(b'', True) 

368 else: 

369 parser.Parse(xml_input, True) 

370 return handler.item 

371 

372 

373def _convert_value_to_string(value, encoding='utf-8', bytes_errors='replace'): 

374 """Convert a value to its string representation for XML output. 

375 

376 Handles boolean values consistently by converting them to lowercase. 

377 """ 

378 if isinstance(value, str): 

379 return value 

380 if isinstance(value, bool): 

381 return "true" if value else "false" 

382 if isinstance(value, (bytes, bytearray, memoryview)): 

383 return bytes(value).decode(encoding, errors=bytes_errors) 

384 return str(value) 

385 

386 

387def _validate_name(value, kind): 

388 """Validate an element/attribute name for XML safety. 

389 

390 Raises ValueError with a specific reason when invalid. 

391 

392 kind: 'element' or 'attribute' (used in error messages) 

393 """ 

394 if not isinstance(value, str): 

395 raise ValueError(f"{kind} name must be a string") 

396 if value.startswith("?") or value.startswith("!"): 

397 raise ValueError(f'Invalid {kind} name: cannot start with "?" or "!"') 

398 if "<" in value or ">" in value: 

399 raise ValueError(f'Invalid {kind} name: "<" or ">" not allowed') 

400 if "/" in value: 

401 raise ValueError(f'Invalid {kind} name: "/" not allowed') 

402 if '"' in value or "'" in value: 

403 raise ValueError(f"Invalid {kind} name: quotes not allowed") 

404 if "=" in value: 

405 raise ValueError(f'Invalid {kind} name: "=" not allowed') 

406 if any(ch.isspace() for ch in value): 

407 raise ValueError(f"Invalid {kind} name: whitespace not allowed") 

408 

409 

410def _validate_comment(value): 

411 if isinstance(value, bytes): 

412 try: 

413 value = value.decode("utf-8") 

414 except UnicodeDecodeError as exc: 

415 raise ValueError("Comment text must be valid UTF-8") from exc 

416 if not isinstance(value, str): 

417 raise ValueError("Comment text must be a string") 

418 if "--" in value: 

419 raise ValueError("Comment text cannot contain '--'") 

420 if value.endswith("-"): 

421 raise ValueError("Comment text cannot end with '-'") 

422 return value 

423 

424 

425def _process_namespace(name, namespaces, ns_sep=':', attr_prefix='@'): 

426 if not isinstance(name, str): 

427 return name 

428 if not namespaces: 

429 return name 

430 try: 

431 ns, name = name.rsplit(ns_sep, 1) 

432 except ValueError: 

433 pass 

434 else: 

435 ns_res = namespaces.get(ns.strip(attr_prefix)) 

436 name = '{}{}{}{}'.format( 

437 attr_prefix if ns.startswith(attr_prefix) else '', 

438 ns_res, ns_sep, name) if ns_res else name 

439 return name 

440 

441 

442def _emit(key, value, content_handler, 

443 attr_prefix='@', 

444 cdata_key='#text', 

445 depth=0, 

446 preprocessor=None, 

447 pretty=False, 

448 newl='\n', 

449 indent='\t', 

450 namespace_separator=':', 

451 namespaces=None, 

452 full_document=True, 

453 expand_iter=None, 

454 encoding='utf-8', 

455 bytes_errors='replace', 

456 comment_key='#comment'): 

457 if isinstance(key, str) and key == comment_key: 

458 comments_list = value if isinstance(value, list) else [value] 

459 if isinstance(indent, int): 

460 indent = " " * indent 

461 for comment_text in comments_list: 

462 if comment_text is None: 

463 continue 

464 comment_text = _convert_value_to_string( 

465 comment_text, encoding=encoding, bytes_errors=bytes_errors 

466 ) 

467 if not comment_text: 

468 continue 

469 if pretty: 

470 content_handler.ignorableWhitespace(depth * indent) 

471 content_handler.comment(comment_text) 

472 if pretty: 

473 content_handler.ignorableWhitespace(newl) 

474 return 

475 

476 key = _process_namespace(key, namespaces, namespace_separator, attr_prefix) 

477 if preprocessor is not None: 

478 result = preprocessor(key, value) 

479 if result is None: 

480 return 

481 key, value = result 

482 # Minimal validation to avoid breaking out of tag context 

483 _validate_name(key, "element") 

484 if not hasattr(value, '__iter__') or isinstance(value, (str, bytes, bytearray, memoryview, dict)): 

485 value = [value] 

486 for index, v in enumerate(value): 

487 if full_document and depth == 0 and index > 0: 

488 raise ValueError('document with multiple roots') 

489 if v is None: 

490 v = {} 

491 elif not isinstance(v, (dict, str)): 

492 if expand_iter and hasattr(v, '__iter__') and not isinstance(v, (bytes, bytearray, memoryview)): 

493 v = {expand_iter: v} 

494 else: 

495 v = _convert_value_to_string(v, encoding=encoding, bytes_errors=bytes_errors) 

496 if isinstance(v, str): 

497 v = {cdata_key: v} 

498 cdata = None 

499 attrs = {} 

500 children = [] 

501 for ik, iv in v.items(): 

502 if ik == cdata_key: 

503 if iv is None: 

504 cdata = None 

505 else: 

506 cdata = _convert_value_to_string(iv, encoding=encoding, bytes_errors=bytes_errors) 

507 continue 

508 if isinstance(ik, str) and ik.startswith(attr_prefix): 

509 ik = _process_namespace(ik, namespaces, namespace_separator, 

510 attr_prefix) 

511 if ik == '@xmlns' and isinstance(iv, dict): 

512 for k, v in iv.items(): 

513 _validate_name(k, "attribute") 

514 attr = 'xmlns{}'.format(f':{k}' if k else '') 

515 attrs[attr] = '' if v is None else _convert_value_to_string( 

516 v, encoding=encoding, bytes_errors=bytes_errors 

517 ) 

518 continue 

519 if iv is None: 

520 iv = '' 

521 elif not isinstance(iv, str): 

522 iv = _convert_value_to_string(iv, encoding=encoding, bytes_errors=bytes_errors) 

523 attr_name = ik[len(attr_prefix) :] 

524 _validate_name(attr_name, "attribute") 

525 attrs[attr_name] = iv 

526 continue 

527 if isinstance(iv, list) and not iv: 

528 continue # Skip empty lists to avoid creating empty child elements 

529 children.append((ik, iv)) 

530 if isinstance(indent, int): 

531 indent = ' ' * indent 

532 if pretty: 

533 content_handler.ignorableWhitespace(depth * indent) 

534 content_handler.startElement(key, AttributesImpl(attrs)) 

535 if pretty and children: 

536 content_handler.ignorableWhitespace(newl) 

537 for child_key, child_value in children: 

538 _emit(child_key, child_value, content_handler, 

539 attr_prefix, cdata_key, depth+1, preprocessor, 

540 pretty, newl, indent, namespaces=namespaces, 

541 namespace_separator=namespace_separator, 

542 expand_iter=expand_iter, encoding=encoding, 

543 bytes_errors=bytes_errors, comment_key=comment_key) 

544 if cdata is not None: 

545 content_handler.characters(cdata) 

546 if pretty and children: 

547 content_handler.ignorableWhitespace(depth * indent) 

548 content_handler.endElement(key) 

549 if pretty and depth: 

550 content_handler.ignorableWhitespace(newl) 

551 

552 

553class _XMLGenerator(XMLGenerator): 

554 def comment(self, text): 

555 text = _validate_comment(text) 

556 self._write(f"<!--{escape(text)}-->") 

557 

558 

559def unparse(input_dict, output=None, encoding='utf-8', full_document=True, 

560 short_empty_elements=False, comment_key='#comment', 

561 **kwargs): 

562 """Emit an XML document for the given `input_dict` (reverse of `parse`). 

563 

564 The resulting XML document is returned as a string, but if `output` (a 

565 file-like object) is specified, it is written there instead. 

566 

567 Dictionary keys prefixed with `attr_prefix` (default=`'@'`) are interpreted 

568 as XML node attributes, whereas keys equal to `cdata_key` 

569 (default=`'#text'`) are treated as character data. 

570 

571 Empty lists are omitted entirely: ``{"a": []}`` produces no ``<a>`` element. 

572 Provide a placeholder entry (for example ``{"a": [""]}``) when an explicit 

573 empty container element must be emitted. 

574 

575 The `pretty` parameter (default=`False`) enables pretty-printing. In this 

576 mode, lines are terminated with `'\n'` and indented with `'\t'`, but this 

577 can be customized with the `newl` and `indent` parameters. 

578 The `bytes_errors` parameter controls decoding errors for byte values and 

579 defaults to `'replace'`. 

580 

581 """ 

582 bytes_errors = kwargs.pop('bytes_errors', 'replace') 

583 try: 

584 codecs.lookup_error(bytes_errors) 

585 except LookupError as exc: 

586 raise ValueError(f"Invalid bytes_errors handler: {bytes_errors}") from exc 

587 

588 must_return = False 

589 if output is None: 

590 output = StringIO() 

591 must_return = True 

592 if short_empty_elements: 

593 content_handler = _XMLGenerator(output, encoding, True) 

594 else: 

595 content_handler = _XMLGenerator(output, encoding) 

596 if full_document: 

597 content_handler.startDocument() 

598 seen_root = False 

599 for key, value in input_dict.items(): 

600 if key != comment_key and full_document and seen_root: 

601 raise ValueError("Document must have exactly one root.") 

602 _emit( 

603 key, 

604 value, 

605 content_handler, 

606 full_document=full_document, 

607 encoding=encoding, 

608 bytes_errors=bytes_errors, 

609 comment_key=comment_key, 

610 **kwargs, 

611 ) 

612 if key != comment_key: 

613 seen_root = True 

614 if full_document and not seen_root: 

615 raise ValueError("Document must have exactly one root.") 

616 if full_document: 

617 content_handler.endDocument() 

618 if must_return: 

619 value = output.getvalue() 

620 try: # pragma no cover 

621 value = value.decode(encoding) 

622 except AttributeError: # pragma no cover 

623 pass 

624 return value 

625 

626 

627if __name__ == '__main__': # pragma: no cover 

628 import marshal 

629 import sys 

630 

631 stdin = sys.stdin.buffer 

632 stdout = sys.stdout.buffer 

633 

634 (item_depth,) = sys.argv[1:] 

635 item_depth = int(item_depth) 

636 

637 def handle_item(path, item): 

638 marshal.dump((path, item), stdout) 

639 return True 

640 

641 try: 

642 root = parse(stdin, 

643 item_depth=item_depth, 

644 item_callback=handle_item, 

645 dict_constructor=dict) 

646 if item_depth == 0: 

647 handle_item([], root) 

648 except KeyboardInterrupt: 

649 pass