Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/xmltodict.py: 59%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
2"Makes working with XML feel like you are working with JSON"
4from xml.parsers import expat
5from xml.sax.saxutils import XMLGenerator, escape
6from xml.sax.xmlreader import AttributesImpl
7from io import StringIO
8from inspect import isgenerator
9import codecs
11class ParsingInterrupted(Exception):
12 pass
15class _DictSAXHandler:
16 def __init__(
17 self,
18 item_depth=0,
19 item_callback=lambda *args: True,
20 xml_attribs=True,
21 attr_prefix="@",
22 cdata_key="#text",
23 force_cdata=False,
24 cdata_separator="",
25 postprocessor=None,
26 dict_constructor=dict,
27 strip_whitespace=True,
28 namespace_separator=":",
29 namespaces=None,
30 force_list=None,
31 comment_key="#comment",
32 ):
33 self.path = []
34 self.stack = []
35 self.data = []
36 self.item = None
37 self.item_depth = item_depth
38 self.xml_attribs = xml_attribs
39 self.item_callback = item_callback
40 self.attr_prefix = attr_prefix
41 self.cdata_key = cdata_key
42 self.force_cdata = force_cdata
43 self.cdata_separator = cdata_separator
44 self.postprocessor = postprocessor
45 self.dict_constructor = dict_constructor
46 self.strip_whitespace = strip_whitespace
47 self.namespace_separator = namespace_separator
48 self.namespaces = namespaces
49 self.namespace_declarations = dict_constructor()
50 self.force_list = force_list
51 self.comment_key = comment_key
53 def _build_name(self, full_name):
54 if self.namespaces is None:
55 return full_name
56 i = full_name.rfind(self.namespace_separator)
57 if i == -1:
58 return full_name
59 namespace, name = full_name[:i], full_name[i+1:]
60 try:
61 short_namespace = self.namespaces[namespace]
62 except KeyError:
63 short_namespace = namespace
64 if not short_namespace:
65 return name
66 else:
67 return self.namespace_separator.join((short_namespace, name))
69 def _attrs_to_dict(self, attrs):
70 if isinstance(attrs, dict):
71 return attrs
72 return self.dict_constructor(zip(attrs[0::2], attrs[1::2]))
74 def startNamespaceDecl(self, prefix, uri):
75 self.namespace_declarations[prefix or ''] = uri
77 def startElement(self, full_name, attrs):
78 name = self._build_name(full_name)
79 attrs = self._attrs_to_dict(attrs)
80 if self.namespace_declarations:
81 if not attrs:
82 attrs = self.dict_constructor()
83 attrs['xmlns'] = self.namespace_declarations
84 self.namespace_declarations = self.dict_constructor()
85 self.path.append((name, attrs or None))
86 if len(self.path) >= self.item_depth:
87 self.stack.append((self.item, self.data))
88 if self.xml_attribs:
89 attr_entries = []
90 for key, value in attrs.items():
91 key = self.attr_prefix+self._build_name(key)
92 if self.postprocessor:
93 entry = self.postprocessor(self.path, key, value)
94 else:
95 entry = (key, value)
96 if entry:
97 attr_entries.append(entry)
98 attrs = self.dict_constructor(attr_entries)
99 else:
100 attrs = None
101 self.item = attrs or None
102 self.data = []
104 def endElement(self, full_name):
105 name = self._build_name(full_name)
106 # If we just closed an item at the streaming depth, emit it and drop it
107 # without attaching it back to its parent. This avoids accumulating all
108 # streamed items in memory when using item_depth > 0.
109 if len(self.path) == self.item_depth:
110 item = self.item
111 if item is None:
112 item = (None if not self.data
113 else self.cdata_separator.join(self.data))
115 should_continue = self.item_callback(self.path, item)
116 if not should_continue:
117 raise ParsingInterrupted
118 # Reset state for the parent context without keeping a reference to
119 # the emitted item.
120 if self.stack:
121 self.item, self.data = self.stack.pop()
122 else:
123 self.item = None
124 self.data = []
125 self.path.pop()
126 return
127 if self.stack:
128 data = (None if not self.data
129 else self.cdata_separator.join(self.data))
130 item = self.item
131 self.item, self.data = self.stack.pop()
132 if self.strip_whitespace and data:
133 data = data.strip() or None
134 if data and self._should_force_cdata(name, data) and item is None:
135 item = self.dict_constructor()
136 if item is not None:
137 if data:
138 self.push_data(item, self.cdata_key, data)
139 self.item = self.push_data(self.item, name, item)
140 else:
141 self.item = self.push_data(self.item, name, data)
142 else:
143 self.item = None
144 self.data = []
145 self.path.pop()
147 def characters(self, data):
148 if not self.data:
149 self.data = [data]
150 else:
151 self.data.append(data)
153 def comments(self, data):
154 if self.strip_whitespace:
155 data = data.strip()
156 self.item = self.push_data(self.item, self.comment_key, data)
158 def push_data(self, item, key, data):
159 if self.postprocessor is not None:
160 result = self.postprocessor(self.path, key, data)
161 if result is None:
162 return item
163 key, data = result
164 if item is None:
165 item = self.dict_constructor()
166 try:
167 value = item[key]
168 if isinstance(value, list):
169 value.append(data)
170 else:
171 item[key] = [value, data]
172 except KeyError:
173 if self._should_force_list(key, data):
174 item[key] = [data]
175 else:
176 item[key] = data
177 return item
179 def _should_force_list(self, key, value):
180 if not self.force_list:
181 return False
182 if isinstance(self.force_list, bool):
183 return self.force_list
184 try:
185 return key in self.force_list
186 except TypeError:
187 return self.force_list(self.path[:-1], key, value)
189 def _should_force_cdata(self, key, value):
190 if not self.force_cdata:
191 return False
192 if isinstance(self.force_cdata, bool):
193 return self.force_cdata
194 try:
195 return key in self.force_cdata
196 except TypeError:
197 return self.force_cdata(self.path[:-1], key, value)
200def parse(xml_input, encoding=None, expat=expat, process_namespaces=False,
201 namespace_separator=':', disable_entities=True, process_comments=False, **kwargs):
202 """Parse the given XML input and convert it into a dictionary.
204 `xml_input` can either be a `string`, a file-like object, or a generator of strings.
206 If `xml_attribs` is `True`, element attributes are put in the dictionary
207 among regular child elements, using `@` as a prefix to avoid collisions. If
208 set to `False`, they are just ignored.
210 Simple example::
212 >>> import xmltodict
213 >>> doc = xmltodict.parse(\"\"\"
214 ... <a prop="x">
215 ... <b>1</b>
216 ... <b>2</b>
217 ... </a>
218 ... \"\"\")
219 >>> doc['a']['@prop']
220 'x'
221 >>> doc['a']['b']
222 ['1', '2']
224 If `item_depth` is `0`, the function returns a dictionary for the root
225 element (default behavior). Otherwise, it calls `item_callback` every time
226 an item at the specified depth is found and returns `None` in the end
227 (streaming mode).
229 The callback function receives two parameters: the `path` from the document
230 root to the item (name-attribs pairs), and the `item` (dict). If the
231 callback's return value is false-ish, parsing will be stopped with the
232 :class:`ParsingInterrupted` exception.
234 Streaming example::
236 >>> def handle(path, item):
237 ... print('path:%s item:%s' % (path, item))
238 ... return True
239 ...
240 >>> xmltodict.parse(\"\"\"
241 ... <a prop="x">
242 ... <b>1</b>
243 ... <b>2</b>
244 ... </a>\"\"\", item_depth=2, item_callback=handle)
245 path:[('a', {'prop': 'x'}), ('b', None)] item:1
246 path:[('a', {'prop': 'x'}), ('b', None)] item:2
248 The optional argument `postprocessor` is a function that takes `path`,
249 `key` and `value` as positional arguments and returns a new `(key, value)`
250 pair where both `key` and `value` may have changed. Usage example::
252 >>> def postprocessor(path, key, value):
253 ... try:
254 ... return key + ':int', int(value)
255 ... except (ValueError, TypeError):
256 ... return key, value
257 >>> xmltodict.parse('<a><b>1</b><b>2</b><b>x</b></a>',
258 ... postprocessor=postprocessor)
259 {'a': {'b:int': [1, 2], 'b': 'x'}}
261 You can pass an alternate version of `expat` (such as `defusedexpat`) by
262 using the `expat` parameter. E.g:
264 >>> import defusedexpat
265 >>> xmltodict.parse('<a>hello</a>', expat=defusedexpat.pyexpat)
266 {'a': 'hello'}
268 You can use the force_list argument to force lists to be created even
269 when there is only a single child of a given level of hierarchy. The
270 force_list argument is a tuple of keys. If the key for a given level
271 of hierarchy is in the force_list argument, that level of hierarchy
272 will have a list as a child (even if there is only one sub-element).
273 The index_keys operation takes precedence over this. This is applied
274 after any user-supplied postprocessor has already run.
276 For example, given this input:
277 <servers>
278 <server>
279 <name>host1</name>
280 <os>Linux</os>
281 <interfaces>
282 <interface>
283 <name>em0</name>
284 <ip_address>10.0.0.1</ip_address>
285 </interface>
286 </interfaces>
287 </server>
288 </servers>
290 If called with force_list=('interface',), it will produce
291 this dictionary:
292 {'servers':
293 {'server':
294 {'name': 'host1',
295 'os': 'Linux'},
296 'interfaces':
297 {'interface':
298 [ {'name': 'em0', 'ip_address': '10.0.0.1' } ] } } }
300 `force_list` can also be a callable that receives `path`, `key` and
301 `value`. This is helpful in cases where the logic that decides whether
302 a list should be forced is more complex.
305 If `process_comments` is `True`, comments will be added using `comment_key`
306 (default=`'#comment'`) to the tag that contains the comment.
308 For example, given this input:
309 <a>
310 <b>
311 <!-- b comment -->
312 <c>
313 <!-- c comment -->
314 1
315 </c>
316 <d>2</d>
317 </b>
318 </a>
320 If called with `process_comments=True`, it will produce
321 this dictionary:
322 'a': {
323 'b': {
324 '#comment': 'b comment',
325 'c': {
327 '#comment': 'c comment',
328 '#text': '1',
329 },
330 'd': '2',
331 },
332 }
333 Comment text is subject to the `strip_whitespace` flag: when it is left
334 at the default `True`, comments will have leading and trailing
335 whitespace removed. Disable `strip_whitespace` to keep comment
336 indentation or padding intact.
337 """
338 handler = _DictSAXHandler(namespace_separator=namespace_separator,
339 **kwargs)
340 if isinstance(xml_input, str):
341 encoding = encoding or 'utf-8'
342 xml_input = xml_input.encode(encoding)
343 if not process_namespaces:
344 namespace_separator = None
345 parser = expat.ParserCreate(
346 encoding,
347 namespace_separator
348 )
349 parser.ordered_attributes = True
350 parser.StartNamespaceDeclHandler = handler.startNamespaceDecl
351 parser.StartElementHandler = handler.startElement
352 parser.EndElementHandler = handler.endElement
353 parser.CharacterDataHandler = handler.characters
354 if process_comments:
355 parser.CommentHandler = handler.comments
356 parser.buffer_text = True
357 if disable_entities:
358 def _forbid_entities(*_args, **_kwargs):
359 raise ValueError("entities are disabled")
361 parser.EntityDeclHandler = _forbid_entities
362 if hasattr(xml_input, 'read'):
363 parser.ParseFile(xml_input)
364 elif isgenerator(xml_input):
365 for chunk in xml_input:
366 parser.Parse(chunk, False)
367 parser.Parse(b'', True)
368 else:
369 parser.Parse(xml_input, True)
370 return handler.item
373def _convert_value_to_string(value, encoding='utf-8', bytes_errors='replace'):
374 """Convert a value to its string representation for XML output.
376 Handles boolean values consistently by converting them to lowercase.
377 """
378 if isinstance(value, str):
379 return value
380 if isinstance(value, bool):
381 return "true" if value else "false"
382 if isinstance(value, (bytes, bytearray, memoryview)):
383 return bytes(value).decode(encoding, errors=bytes_errors)
384 return str(value)
387def _validate_name(value, kind):
388 """Validate an element/attribute name for XML safety.
390 Raises ValueError with a specific reason when invalid.
392 kind: 'element' or 'attribute' (used in error messages)
393 """
394 if not isinstance(value, str):
395 raise ValueError(f"{kind} name must be a string")
396 if value.startswith("?") or value.startswith("!"):
397 raise ValueError(f'Invalid {kind} name: cannot start with "?" or "!"')
398 if "<" in value or ">" in value:
399 raise ValueError(f'Invalid {kind} name: "<" or ">" not allowed')
400 if "/" in value:
401 raise ValueError(f'Invalid {kind} name: "/" not allowed')
402 if '"' in value or "'" in value:
403 raise ValueError(f"Invalid {kind} name: quotes not allowed")
404 if "=" in value:
405 raise ValueError(f'Invalid {kind} name: "=" not allowed')
406 if any(ch.isspace() for ch in value):
407 raise ValueError(f"Invalid {kind} name: whitespace not allowed")
410def _validate_comment(value):
411 if isinstance(value, bytes):
412 try:
413 value = value.decode("utf-8")
414 except UnicodeDecodeError as exc:
415 raise ValueError("Comment text must be valid UTF-8") from exc
416 if not isinstance(value, str):
417 raise ValueError("Comment text must be a string")
418 if "--" in value:
419 raise ValueError("Comment text cannot contain '--'")
420 if value.endswith("-"):
421 raise ValueError("Comment text cannot end with '-'")
422 return value
425def _process_namespace(name, namespaces, ns_sep=':', attr_prefix='@'):
426 if not isinstance(name, str):
427 return name
428 if not namespaces:
429 return name
430 try:
431 ns, name = name.rsplit(ns_sep, 1)
432 except ValueError:
433 pass
434 else:
435 ns_res = namespaces.get(ns.strip(attr_prefix))
436 name = '{}{}{}{}'.format(
437 attr_prefix if ns.startswith(attr_prefix) else '',
438 ns_res, ns_sep, name) if ns_res else name
439 return name
442def _emit(key, value, content_handler,
443 attr_prefix='@',
444 cdata_key='#text',
445 depth=0,
446 preprocessor=None,
447 pretty=False,
448 newl='\n',
449 indent='\t',
450 namespace_separator=':',
451 namespaces=None,
452 full_document=True,
453 expand_iter=None,
454 encoding='utf-8',
455 bytes_errors='replace',
456 comment_key='#comment'):
457 if isinstance(key, str) and key == comment_key:
458 comments_list = value if isinstance(value, list) else [value]
459 if isinstance(indent, int):
460 indent = " " * indent
461 for comment_text in comments_list:
462 if comment_text is None:
463 continue
464 comment_text = _convert_value_to_string(
465 comment_text, encoding=encoding, bytes_errors=bytes_errors
466 )
467 if not comment_text:
468 continue
469 if pretty:
470 content_handler.ignorableWhitespace(depth * indent)
471 content_handler.comment(comment_text)
472 if pretty:
473 content_handler.ignorableWhitespace(newl)
474 return
476 key = _process_namespace(key, namespaces, namespace_separator, attr_prefix)
477 if preprocessor is not None:
478 result = preprocessor(key, value)
479 if result is None:
480 return
481 key, value = result
482 # Minimal validation to avoid breaking out of tag context
483 _validate_name(key, "element")
484 if not hasattr(value, '__iter__') or isinstance(value, (str, bytes, bytearray, memoryview, dict)):
485 value = [value]
486 for index, v in enumerate(value):
487 if full_document and depth == 0 and index > 0:
488 raise ValueError('document with multiple roots')
489 if v is None:
490 v = {}
491 elif not isinstance(v, (dict, str)):
492 if expand_iter and hasattr(v, '__iter__') and not isinstance(v, (bytes, bytearray, memoryview)):
493 v = {expand_iter: v}
494 else:
495 v = _convert_value_to_string(v, encoding=encoding, bytes_errors=bytes_errors)
496 if isinstance(v, str):
497 v = {cdata_key: v}
498 cdata = None
499 attrs = {}
500 children = []
501 for ik, iv in v.items():
502 if ik == cdata_key:
503 if iv is None:
504 cdata = None
505 else:
506 cdata = _convert_value_to_string(iv, encoding=encoding, bytes_errors=bytes_errors)
507 continue
508 if isinstance(ik, str) and ik.startswith(attr_prefix):
509 ik = _process_namespace(ik, namespaces, namespace_separator,
510 attr_prefix)
511 if ik == '@xmlns' and isinstance(iv, dict):
512 for k, v in iv.items():
513 _validate_name(k, "attribute")
514 attr = 'xmlns{}'.format(f':{k}' if k else '')
515 attrs[attr] = '' if v is None else _convert_value_to_string(
516 v, encoding=encoding, bytes_errors=bytes_errors
517 )
518 continue
519 if iv is None:
520 iv = ''
521 elif not isinstance(iv, str):
522 iv = _convert_value_to_string(iv, encoding=encoding, bytes_errors=bytes_errors)
523 attr_name = ik[len(attr_prefix) :]
524 _validate_name(attr_name, "attribute")
525 attrs[attr_name] = iv
526 continue
527 if isinstance(iv, list) and not iv:
528 continue # Skip empty lists to avoid creating empty child elements
529 children.append((ik, iv))
530 if isinstance(indent, int):
531 indent = ' ' * indent
532 if pretty:
533 content_handler.ignorableWhitespace(depth * indent)
534 content_handler.startElement(key, AttributesImpl(attrs))
535 if pretty and children:
536 content_handler.ignorableWhitespace(newl)
537 for child_key, child_value in children:
538 _emit(child_key, child_value, content_handler,
539 attr_prefix, cdata_key, depth+1, preprocessor,
540 pretty, newl, indent, namespaces=namespaces,
541 namespace_separator=namespace_separator,
542 expand_iter=expand_iter, encoding=encoding,
543 bytes_errors=bytes_errors, comment_key=comment_key)
544 if cdata is not None:
545 content_handler.characters(cdata)
546 if pretty and children:
547 content_handler.ignorableWhitespace(depth * indent)
548 content_handler.endElement(key)
549 if pretty and depth:
550 content_handler.ignorableWhitespace(newl)
553class _XMLGenerator(XMLGenerator):
554 def comment(self, text):
555 text = _validate_comment(text)
556 self._write(f"<!--{escape(text)}-->")
559def unparse(input_dict, output=None, encoding='utf-8', full_document=True,
560 short_empty_elements=False, comment_key='#comment',
561 **kwargs):
562 """Emit an XML document for the given `input_dict` (reverse of `parse`).
564 The resulting XML document is returned as a string, but if `output` (a
565 file-like object) is specified, it is written there instead.
567 Dictionary keys prefixed with `attr_prefix` (default=`'@'`) are interpreted
568 as XML node attributes, whereas keys equal to `cdata_key`
569 (default=`'#text'`) are treated as character data.
571 Empty lists are omitted entirely: ``{"a": []}`` produces no ``<a>`` element.
572 Provide a placeholder entry (for example ``{"a": [""]}``) when an explicit
573 empty container element must be emitted.
575 The `pretty` parameter (default=`False`) enables pretty-printing. In this
576 mode, lines are terminated with `'\n'` and indented with `'\t'`, but this
577 can be customized with the `newl` and `indent` parameters.
578 The `bytes_errors` parameter controls decoding errors for byte values and
579 defaults to `'replace'`.
581 """
582 bytes_errors = kwargs.pop('bytes_errors', 'replace')
583 try:
584 codecs.lookup_error(bytes_errors)
585 except LookupError as exc:
586 raise ValueError(f"Invalid bytes_errors handler: {bytes_errors}") from exc
588 must_return = False
589 if output is None:
590 output = StringIO()
591 must_return = True
592 if short_empty_elements:
593 content_handler = _XMLGenerator(output, encoding, True)
594 else:
595 content_handler = _XMLGenerator(output, encoding)
596 if full_document:
597 content_handler.startDocument()
598 seen_root = False
599 for key, value in input_dict.items():
600 if key != comment_key and full_document and seen_root:
601 raise ValueError("Document must have exactly one root.")
602 _emit(
603 key,
604 value,
605 content_handler,
606 full_document=full_document,
607 encoding=encoding,
608 bytes_errors=bytes_errors,
609 comment_key=comment_key,
610 **kwargs,
611 )
612 if key != comment_key:
613 seen_root = True
614 if full_document and not seen_root:
615 raise ValueError("Document must have exactly one root.")
616 if full_document:
617 content_handler.endDocument()
618 if must_return:
619 value = output.getvalue()
620 try: # pragma no cover
621 value = value.decode(encoding)
622 except AttributeError: # pragma no cover
623 pass
624 return value
627if __name__ == '__main__': # pragma: no cover
628 import marshal
629 import sys
631 stdin = sys.stdin.buffer
632 stdout = sys.stdout.buffer
634 (item_depth,) = sys.argv[1:]
635 item_depth = int(item_depth)
637 def handle_item(path, item):
638 marshal.dump((path, item), stdout)
639 return True
641 try:
642 root = parse(stdin,
643 item_depth=item_depth,
644 item_callback=handle_item,
645 dict_constructor=dict)
646 if item_depth == 0:
647 handle_item([], root)
648 except KeyboardInterrupt:
649 pass