Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/xmltodict.py: 59%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

328 statements  

1#!/usr/bin/env python 

2"Makes working with XML feel like you are working with JSON" 

3 

4from xml.parsers import expat 

5from xml.sax.saxutils import XMLGenerator, escape 

6from xml.sax.xmlreader import AttributesImpl 

7from io import StringIO 

8from inspect import isgenerator 

9 

10class ParsingInterrupted(Exception): 

11 pass 

12 

13 

14class _DictSAXHandler: 

15 def __init__( 

16 self, 

17 item_depth=0, 

18 item_callback=lambda *args: True, 

19 xml_attribs=True, 

20 attr_prefix="@", 

21 cdata_key="#text", 

22 force_cdata=False, 

23 cdata_separator="", 

24 postprocessor=None, 

25 dict_constructor=dict, 

26 strip_whitespace=True, 

27 namespace_separator=":", 

28 namespaces=None, 

29 force_list=None, 

30 comment_key="#comment", 

31 ): 

32 self.path = [] 

33 self.stack = [] 

34 self.data = [] 

35 self.item = None 

36 self.item_depth = item_depth 

37 self.xml_attribs = xml_attribs 

38 self.item_callback = item_callback 

39 self.attr_prefix = attr_prefix 

40 self.cdata_key = cdata_key 

41 self.force_cdata = force_cdata 

42 self.cdata_separator = cdata_separator 

43 self.postprocessor = postprocessor 

44 self.dict_constructor = dict_constructor 

45 self.strip_whitespace = strip_whitespace 

46 self.namespace_separator = namespace_separator 

47 self.namespaces = namespaces 

48 self.namespace_declarations = dict_constructor() 

49 self.force_list = force_list 

50 self.comment_key = comment_key 

51 

52 def _build_name(self, full_name): 

53 if self.namespaces is None: 

54 return full_name 

55 i = full_name.rfind(self.namespace_separator) 

56 if i == -1: 

57 return full_name 

58 namespace, name = full_name[:i], full_name[i+1:] 

59 try: 

60 short_namespace = self.namespaces[namespace] 

61 except KeyError: 

62 short_namespace = namespace 

63 if not short_namespace: 

64 return name 

65 else: 

66 return self.namespace_separator.join((short_namespace, name)) 

67 

68 def _attrs_to_dict(self, attrs): 

69 if isinstance(attrs, dict): 

70 return attrs 

71 return self.dict_constructor(zip(attrs[0::2], attrs[1::2])) 

72 

73 def startNamespaceDecl(self, prefix, uri): 

74 self.namespace_declarations[prefix or ''] = uri 

75 

76 def startElement(self, full_name, attrs): 

77 name = self._build_name(full_name) 

78 attrs = self._attrs_to_dict(attrs) 

79 if self.namespace_declarations: 

80 if not attrs: 

81 attrs = self.dict_constructor() 

82 attrs['xmlns'] = self.namespace_declarations 

83 self.namespace_declarations = self.dict_constructor() 

84 self.path.append((name, attrs or None)) 

85 if len(self.path) >= self.item_depth: 

86 self.stack.append((self.item, self.data)) 

87 if self.xml_attribs: 

88 attr_entries = [] 

89 for key, value in attrs.items(): 

90 key = self.attr_prefix+self._build_name(key) 

91 if self.postprocessor: 

92 entry = self.postprocessor(self.path, key, value) 

93 else: 

94 entry = (key, value) 

95 if entry: 

96 attr_entries.append(entry) 

97 attrs = self.dict_constructor(attr_entries) 

98 else: 

99 attrs = None 

100 self.item = attrs or None 

101 self.data = [] 

102 

103 def endElement(self, full_name): 

104 name = self._build_name(full_name) 

105 # If we just closed an item at the streaming depth, emit it and drop it 

106 # without attaching it back to its parent. This avoids accumulating all 

107 # streamed items in memory when using item_depth > 0. 

108 if len(self.path) == self.item_depth: 

109 item = self.item 

110 if item is None: 

111 item = (None if not self.data 

112 else self.cdata_separator.join(self.data)) 

113 

114 should_continue = self.item_callback(self.path, item) 

115 if not should_continue: 

116 raise ParsingInterrupted 

117 # Reset state for the parent context without keeping a reference to 

118 # the emitted item. 

119 if self.stack: 

120 self.item, self.data = self.stack.pop() 

121 else: 

122 self.item = None 

123 self.data = [] 

124 self.path.pop() 

125 return 

126 if self.stack: 

127 data = (None if not self.data 

128 else self.cdata_separator.join(self.data)) 

129 item = self.item 

130 self.item, self.data = self.stack.pop() 

131 if self.strip_whitespace and data: 

132 data = data.strip() or None 

133 if data and self._should_force_cdata(name, data) and item is None: 

134 item = self.dict_constructor() 

135 if item is not None: 

136 if data: 

137 self.push_data(item, self.cdata_key, data) 

138 self.item = self.push_data(self.item, name, item) 

139 else: 

140 self.item = self.push_data(self.item, name, data) 

141 else: 

142 self.item = None 

143 self.data = [] 

144 self.path.pop() 

145 

146 def characters(self, data): 

147 if not self.data: 

148 self.data = [data] 

149 else: 

150 self.data.append(data) 

151 

152 def comments(self, data): 

153 if self.strip_whitespace: 

154 data = data.strip() 

155 self.item = self.push_data(self.item, self.comment_key, data) 

156 

157 def push_data(self, item, key, data): 

158 if self.postprocessor is not None: 

159 result = self.postprocessor(self.path, key, data) 

160 if result is None: 

161 return item 

162 key, data = result 

163 if item is None: 

164 item = self.dict_constructor() 

165 try: 

166 value = item[key] 

167 if isinstance(value, list): 

168 value.append(data) 

169 else: 

170 item[key] = [value, data] 

171 except KeyError: 

172 if self._should_force_list(key, data): 

173 item[key] = [data] 

174 else: 

175 item[key] = data 

176 return item 

177 

178 def _should_force_list(self, key, value): 

179 if not self.force_list: 

180 return False 

181 if isinstance(self.force_list, bool): 

182 return self.force_list 

183 try: 

184 return key in self.force_list 

185 except TypeError: 

186 return self.force_list(self.path[:-1], key, value) 

187 

188 def _should_force_cdata(self, key, value): 

189 if not self.force_cdata: 

190 return False 

191 if isinstance(self.force_cdata, bool): 

192 return self.force_cdata 

193 try: 

194 return key in self.force_cdata 

195 except TypeError: 

196 return self.force_cdata(self.path[:-1], key, value) 

197 

198 

199def parse(xml_input, encoding=None, expat=expat, process_namespaces=False, 

200 namespace_separator=':', disable_entities=True, process_comments=False, **kwargs): 

201 """Parse the given XML input and convert it into a dictionary. 

202 

203 `xml_input` can either be a `string`, a file-like object, or a generator of strings. 

204 

205 If `xml_attribs` is `True`, element attributes are put in the dictionary 

206 among regular child elements, using `@` as a prefix to avoid collisions. If 

207 set to `False`, they are just ignored. 

208 

209 Simple example:: 

210 

211 >>> import xmltodict 

212 >>> doc = xmltodict.parse(\"\"\" 

213 ... <a prop="x"> 

214 ... <b>1</b> 

215 ... <b>2</b> 

216 ... </a> 

217 ... \"\"\") 

218 >>> doc['a']['@prop'] 

219 'x' 

220 >>> doc['a']['b'] 

221 ['1', '2'] 

222 

223 If `item_depth` is `0`, the function returns a dictionary for the root 

224 element (default behavior). Otherwise, it calls `item_callback` every time 

225 an item at the specified depth is found and returns `None` in the end 

226 (streaming mode). 

227 

228 The callback function receives two parameters: the `path` from the document 

229 root to the item (name-attribs pairs), and the `item` (dict). If the 

230 callback's return value is false-ish, parsing will be stopped with the 

231 :class:`ParsingInterrupted` exception. 

232 

233 Streaming example:: 

234 

235 >>> def handle(path, item): 

236 ... print('path:%s item:%s' % (path, item)) 

237 ... return True 

238 ... 

239 >>> xmltodict.parse(\"\"\" 

240 ... <a prop="x"> 

241 ... <b>1</b> 

242 ... <b>2</b> 

243 ... </a>\"\"\", item_depth=2, item_callback=handle) 

244 path:[('a', {'prop': 'x'}), ('b', None)] item:1 

245 path:[('a', {'prop': 'x'}), ('b', None)] item:2 

246 

247 The optional argument `postprocessor` is a function that takes `path`, 

248 `key` and `value` as positional arguments and returns a new `(key, value)` 

249 pair where both `key` and `value` may have changed. Usage example:: 

250 

251 >>> def postprocessor(path, key, value): 

252 ... try: 

253 ... return key + ':int', int(value) 

254 ... except (ValueError, TypeError): 

255 ... return key, value 

256 >>> xmltodict.parse('<a><b>1</b><b>2</b><b>x</b></a>', 

257 ... postprocessor=postprocessor) 

258 {'a': {'b:int': [1, 2], 'b': 'x'}} 

259 

260 You can pass an alternate version of `expat` (such as `defusedexpat`) by 

261 using the `expat` parameter. E.g: 

262 

263 >>> import defusedexpat 

264 >>> xmltodict.parse('<a>hello</a>', expat=defusedexpat.pyexpat) 

265 {'a': 'hello'} 

266 

267 You can use the force_list argument to force lists to be created even 

268 when there is only a single child of a given level of hierarchy. The 

269 force_list argument is a tuple of keys. If the key for a given level 

270 of hierarchy is in the force_list argument, that level of hierarchy 

271 will have a list as a child (even if there is only one sub-element). 

272 The index_keys operation takes precedence over this. This is applied 

273 after any user-supplied postprocessor has already run. 

274 

275 For example, given this input: 

276 <servers> 

277 <server> 

278 <name>host1</name> 

279 <os>Linux</os> 

280 <interfaces> 

281 <interface> 

282 <name>em0</name> 

283 <ip_address>10.0.0.1</ip_address> 

284 </interface> 

285 </interfaces> 

286 </server> 

287 </servers> 

288 

289 If called with force_list=('interface',), it will produce 

290 this dictionary: 

291 {'servers': 

292 {'server': 

293 {'name': 'host1', 

294 'os': 'Linux'}, 

295 'interfaces': 

296 {'interface': 

297 [ {'name': 'em0', 'ip_address': '10.0.0.1' } ] } } } 

298 

299 `force_list` can also be a callable that receives `path`, `key` and 

300 `value`. This is helpful in cases where the logic that decides whether 

301 a list should be forced is more complex. 

302 

303 

304 If `process_comments` is `True`, comments will be added using `comment_key` 

305 (default=`'#comment'`) to the tag that contains the comment. 

306 

307 For example, given this input: 

308 <a> 

309 <b> 

310 <!-- b comment --> 

311 <c> 

312 <!-- c comment --> 

313 1 

314 </c> 

315 <d>2</d> 

316 </b> 

317 </a> 

318 

319 If called with `process_comments=True`, it will produce 

320 this dictionary: 

321 'a': { 

322 'b': { 

323 '#comment': 'b comment', 

324 'c': { 

325 

326 '#comment': 'c comment', 

327 '#text': '1', 

328 }, 

329 'd': '2', 

330 }, 

331 } 

332 Comment text is subject to the `strip_whitespace` flag: when it is left 

333 at the default `True`, comments will have leading and trailing 

334 whitespace removed. Disable `strip_whitespace` to keep comment 

335 indentation or padding intact. 

336 """ 

337 handler = _DictSAXHandler(namespace_separator=namespace_separator, 

338 **kwargs) 

339 if isinstance(xml_input, str): 

340 encoding = encoding or 'utf-8' 

341 xml_input = xml_input.encode(encoding) 

342 if not process_namespaces: 

343 namespace_separator = None 

344 parser = expat.ParserCreate( 

345 encoding, 

346 namespace_separator 

347 ) 

348 parser.ordered_attributes = True 

349 parser.StartNamespaceDeclHandler = handler.startNamespaceDecl 

350 parser.StartElementHandler = handler.startElement 

351 parser.EndElementHandler = handler.endElement 

352 parser.CharacterDataHandler = handler.characters 

353 if process_comments: 

354 parser.CommentHandler = handler.comments 

355 parser.buffer_text = True 

356 if disable_entities: 

357 def _forbid_entities(*_args, **_kwargs): 

358 raise ValueError("entities are disabled") 

359 

360 parser.EntityDeclHandler = _forbid_entities 

361 if hasattr(xml_input, 'read'): 

362 parser.ParseFile(xml_input) 

363 elif isgenerator(xml_input): 

364 for chunk in xml_input: 

365 parser.Parse(chunk, False) 

366 parser.Parse(b'', True) 

367 else: 

368 parser.Parse(xml_input, True) 

369 return handler.item 

370 

371 

372def _convert_value_to_string(value): 

373 """Convert a value to its string representation for XML output. 

374 

375 Handles boolean values consistently by converting them to lowercase. 

376 """ 

377 if isinstance(value, (str, bytes)): 

378 return value 

379 if isinstance(value, bool): 

380 return "true" if value else "false" 

381 return str(value) 

382def _validate_name(value, kind): 

383 """Validate an element/attribute name for XML safety. 

384 

385 Raises ValueError with a specific reason when invalid. 

386 

387 kind: 'element' or 'attribute' (used in error messages) 

388 """ 

389 if not isinstance(value, str): 

390 raise ValueError(f"{kind} name must be a string") 

391 if value.startswith("?") or value.startswith("!"): 

392 raise ValueError(f'Invalid {kind} name: cannot start with "?" or "!"') 

393 if "<" in value or ">" in value: 

394 raise ValueError(f'Invalid {kind} name: "<" or ">" not allowed') 

395 if "/" in value: 

396 raise ValueError(f'Invalid {kind} name: "/" not allowed') 

397 if '"' in value or "'" in value: 

398 raise ValueError(f"Invalid {kind} name: quotes not allowed") 

399 if "=" in value: 

400 raise ValueError(f'Invalid {kind} name: "=" not allowed') 

401 if any(ch.isspace() for ch in value): 

402 raise ValueError(f"Invalid {kind} name: whitespace not allowed") 

403 

404 

405def _validate_comment(value): 

406 if isinstance(value, bytes): 

407 try: 

408 value = value.decode("utf-8") 

409 except UnicodeDecodeError as exc: 

410 raise ValueError("Comment text must be valid UTF-8") from exc 

411 if not isinstance(value, str): 

412 raise ValueError("Comment text must be a string") 

413 if "--" in value: 

414 raise ValueError("Comment text cannot contain '--'") 

415 if value.endswith("-"): 

416 raise ValueError("Comment text cannot end with '-'") 

417 return value 

418 

419 

420def _process_namespace(name, namespaces, ns_sep=':', attr_prefix='@'): 

421 if not isinstance(name, str): 

422 return name 

423 if not namespaces: 

424 return name 

425 try: 

426 ns, name = name.rsplit(ns_sep, 1) 

427 except ValueError: 

428 pass 

429 else: 

430 ns_res = namespaces.get(ns.strip(attr_prefix)) 

431 name = '{}{}{}{}'.format( 

432 attr_prefix if ns.startswith(attr_prefix) else '', 

433 ns_res, ns_sep, name) if ns_res else name 

434 return name 

435 

436 

437def _emit(key, value, content_handler, 

438 attr_prefix='@', 

439 cdata_key='#text', 

440 depth=0, 

441 preprocessor=None, 

442 pretty=False, 

443 newl='\n', 

444 indent='\t', 

445 namespace_separator=':', 

446 namespaces=None, 

447 full_document=True, 

448 expand_iter=None, 

449 comment_key='#comment'): 

450 if isinstance(key, str) and key == comment_key: 

451 comments_list = value if isinstance(value, list) else [value] 

452 if isinstance(indent, int): 

453 indent = " " * indent 

454 for comment_text in comments_list: 

455 if comment_text is None: 

456 continue 

457 comment_text = _convert_value_to_string(comment_text) 

458 if not comment_text: 

459 continue 

460 if pretty: 

461 content_handler.ignorableWhitespace(depth * indent) 

462 content_handler.comment(comment_text) 

463 if pretty: 

464 content_handler.ignorableWhitespace(newl) 

465 return 

466 

467 key = _process_namespace(key, namespaces, namespace_separator, attr_prefix) 

468 if preprocessor is not None: 

469 result = preprocessor(key, value) 

470 if result is None: 

471 return 

472 key, value = result 

473 # Minimal validation to avoid breaking out of tag context 

474 _validate_name(key, "element") 

475 if not hasattr(value, '__iter__') or isinstance(value, (str, dict)): 

476 value = [value] 

477 for index, v in enumerate(value): 

478 if full_document and depth == 0 and index > 0: 

479 raise ValueError('document with multiple roots') 

480 if v is None: 

481 v = {} 

482 elif not isinstance(v, (dict, str)): 

483 if expand_iter and hasattr(v, '__iter__'): 

484 v = {expand_iter: v} 

485 else: 

486 v = _convert_value_to_string(v) 

487 if isinstance(v, str): 

488 v = {cdata_key: v} 

489 cdata = None 

490 attrs = {} 

491 children = [] 

492 for ik, iv in v.items(): 

493 if ik == cdata_key: 

494 cdata = _convert_value_to_string(iv) 

495 continue 

496 if isinstance(ik, str) and ik.startswith(attr_prefix): 

497 ik = _process_namespace(ik, namespaces, namespace_separator, 

498 attr_prefix) 

499 if ik == '@xmlns' and isinstance(iv, dict): 

500 for k, v in iv.items(): 

501 _validate_name(k, "attribute") 

502 attr = 'xmlns{}'.format(f':{k}' if k else '') 

503 attrs[attr] = str(v) 

504 continue 

505 if not isinstance(iv, str): 

506 iv = str(iv) 

507 attr_name = ik[len(attr_prefix) :] 

508 _validate_name(attr_name, "attribute") 

509 attrs[attr_name] = iv 

510 continue 

511 if isinstance(iv, list) and not iv: 

512 continue # Skip empty lists to avoid creating empty child elements 

513 children.append((ik, iv)) 

514 if isinstance(indent, int): 

515 indent = ' ' * indent 

516 if pretty: 

517 content_handler.ignorableWhitespace(depth * indent) 

518 content_handler.startElement(key, AttributesImpl(attrs)) 

519 if pretty and children: 

520 content_handler.ignorableWhitespace(newl) 

521 for child_key, child_value in children: 

522 _emit(child_key, child_value, content_handler, 

523 attr_prefix, cdata_key, depth+1, preprocessor, 

524 pretty, newl, indent, namespaces=namespaces, 

525 namespace_separator=namespace_separator, 

526 expand_iter=expand_iter, comment_key=comment_key) 

527 if cdata is not None: 

528 content_handler.characters(cdata) 

529 if pretty and children: 

530 content_handler.ignorableWhitespace(depth * indent) 

531 content_handler.endElement(key) 

532 if pretty and depth: 

533 content_handler.ignorableWhitespace(newl) 

534 

535 

536class _XMLGenerator(XMLGenerator): 

537 def comment(self, text): 

538 text = _validate_comment(text) 

539 self._write(f"<!--{escape(text)}-->") 

540 

541 

542def unparse(input_dict, output=None, encoding='utf-8', full_document=True, 

543 short_empty_elements=False, comment_key='#comment', 

544 **kwargs): 

545 """Emit an XML document for the given `input_dict` (reverse of `parse`). 

546 

547 The resulting XML document is returned as a string, but if `output` (a 

548 file-like object) is specified, it is written there instead. 

549 

550 Dictionary keys prefixed with `attr_prefix` (default=`'@'`) are interpreted 

551 as XML node attributes, whereas keys equal to `cdata_key` 

552 (default=`'#text'`) are treated as character data. 

553 

554 Empty lists are omitted entirely: ``{"a": []}`` produces no ``<a>`` element. 

555 Provide a placeholder entry (for example ``{"a": [""]}``) when an explicit 

556 empty container element must be emitted. 

557 

558 The `pretty` parameter (default=`False`) enables pretty-printing. In this 

559 mode, lines are terminated with `'\n'` and indented with `'\t'`, but this 

560 can be customized with the `newl` and `indent` parameters. 

561 

562 """ 

563 must_return = False 

564 if output is None: 

565 output = StringIO() 

566 must_return = True 

567 if short_empty_elements: 

568 content_handler = _XMLGenerator(output, encoding, True) 

569 else: 

570 content_handler = _XMLGenerator(output, encoding) 

571 if full_document: 

572 content_handler.startDocument() 

573 seen_root = False 

574 for key, value in input_dict.items(): 

575 if key != comment_key and full_document and seen_root: 

576 raise ValueError("Document must have exactly one root.") 

577 _emit(key, value, content_handler, full_document=full_document, comment_key=comment_key, **kwargs) 

578 if key != comment_key: 

579 seen_root = True 

580 if full_document and not seen_root: 

581 raise ValueError("Document must have exactly one root.") 

582 if full_document: 

583 content_handler.endDocument() 

584 if must_return: 

585 value = output.getvalue() 

586 try: # pragma no cover 

587 value = value.decode(encoding) 

588 except AttributeError: # pragma no cover 

589 pass 

590 return value 

591 

592 

593if __name__ == '__main__': # pragma: no cover 

594 import marshal 

595 import sys 

596 

597 stdin = sys.stdin.buffer 

598 stdout = sys.stdout.buffer 

599 

600 (item_depth,) = sys.argv[1:] 

601 item_depth = int(item_depth) 

602 

603 def handle_item(path, item): 

604 marshal.dump((path, item), stdout) 

605 return True 

606 

607 try: 

608 root = parse(stdin, 

609 item_depth=item_depth, 

610 item_callback=handle_item, 

611 dict_constructor=dict) 

612 if item_depth == 0: 

613 handle_item([], root) 

614 except KeyboardInterrupt: 

615 pass