Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/xmltodict.py: 58%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
2"Makes working with XML feel like you are working with JSON"
4from xml.parsers import expat
5from xml.sax.saxutils import XMLGenerator, escape
6from xml.sax.xmlreader import AttributesImpl
7from io import StringIO
8from inspect import isgenerator
10class ParsingInterrupted(Exception):
11 pass
14class _DictSAXHandler:
15 def __init__(
16 self,
17 item_depth=0,
18 item_callback=lambda *args: True,
19 xml_attribs=True,
20 attr_prefix="@",
21 cdata_key="#text",
22 force_cdata=False,
23 cdata_separator="",
24 postprocessor=None,
25 dict_constructor=dict,
26 strip_whitespace=True,
27 namespace_separator=":",
28 namespaces=None,
29 force_list=None,
30 comment_key="#comment",
31 ):
32 self.path = []
33 self.stack = []
34 self.data = []
35 self.item = None
36 self.item_depth = item_depth
37 self.xml_attribs = xml_attribs
38 self.item_callback = item_callback
39 self.attr_prefix = attr_prefix
40 self.cdata_key = cdata_key
41 self.force_cdata = force_cdata
42 self.cdata_separator = cdata_separator
43 self.postprocessor = postprocessor
44 self.dict_constructor = dict_constructor
45 self.strip_whitespace = strip_whitespace
46 self.namespace_separator = namespace_separator
47 self.namespaces = namespaces
48 self.namespace_declarations = dict_constructor()
49 self.force_list = force_list
50 self.comment_key = comment_key
52 def _build_name(self, full_name):
53 if self.namespaces is None:
54 return full_name
55 i = full_name.rfind(self.namespace_separator)
56 if i == -1:
57 return full_name
58 namespace, name = full_name[:i], full_name[i+1:]
59 try:
60 short_namespace = self.namespaces[namespace]
61 except KeyError:
62 short_namespace = namespace
63 if not short_namespace:
64 return name
65 else:
66 return self.namespace_separator.join((short_namespace, name))
68 def _attrs_to_dict(self, attrs):
69 if isinstance(attrs, dict):
70 return attrs
71 return self.dict_constructor(zip(attrs[0::2], attrs[1::2]))
73 def startNamespaceDecl(self, prefix, uri):
74 self.namespace_declarations[prefix or ''] = uri
76 def startElement(self, full_name, attrs):
77 name = self._build_name(full_name)
78 attrs = self._attrs_to_dict(attrs)
79 if self.namespace_declarations:
80 if not attrs:
81 attrs = self.dict_constructor()
82 attrs['xmlns'] = self.namespace_declarations
83 self.namespace_declarations = self.dict_constructor()
84 self.path.append((name, attrs or None))
85 if len(self.path) >= self.item_depth:
86 self.stack.append((self.item, self.data))
87 if self.xml_attribs:
88 attr_entries = []
89 for key, value in attrs.items():
90 key = self.attr_prefix+self._build_name(key)
91 if self.postprocessor:
92 entry = self.postprocessor(self.path, key, value)
93 else:
94 entry = (key, value)
95 if entry:
96 attr_entries.append(entry)
97 attrs = self.dict_constructor(attr_entries)
98 else:
99 attrs = None
100 self.item = attrs or None
101 self.data = []
103 def endElement(self, full_name):
104 name = self._build_name(full_name)
105 # If we just closed an item at the streaming depth, emit it and drop it
106 # without attaching it back to its parent. This avoids accumulating all
107 # streamed items in memory when using item_depth > 0.
108 if len(self.path) == self.item_depth:
109 item = self.item
110 if item is None:
111 item = (None if not self.data
112 else self.cdata_separator.join(self.data))
114 should_continue = self.item_callback(self.path, item)
115 if not should_continue:
116 raise ParsingInterrupted
117 # Reset state for the parent context without keeping a reference to
118 # the emitted item.
119 if self.stack:
120 self.item, self.data = self.stack.pop()
121 else:
122 self.item = None
123 self.data = []
124 self.path.pop()
125 return
126 if self.stack:
127 data = (None if not self.data
128 else self.cdata_separator.join(self.data))
129 item = self.item
130 self.item, self.data = self.stack.pop()
131 if self.strip_whitespace and data:
132 data = data.strip() or None
133 if data and self._should_force_cdata(name, data) and item is None:
134 item = self.dict_constructor()
135 if item is not None:
136 if data:
137 self.push_data(item, self.cdata_key, data)
138 self.item = self.push_data(self.item, name, item)
139 else:
140 self.item = self.push_data(self.item, name, data)
141 else:
142 self.item = None
143 self.data = []
144 self.path.pop()
146 def characters(self, data):
147 if not self.data:
148 self.data = [data]
149 else:
150 self.data.append(data)
152 def comments(self, data):
153 if self.strip_whitespace:
154 data = data.strip()
155 self.item = self.push_data(self.item, self.comment_key, data)
157 def push_data(self, item, key, data):
158 if self.postprocessor is not None:
159 result = self.postprocessor(self.path, key, data)
160 if result is None:
161 return item
162 key, data = result
163 if item is None:
164 item = self.dict_constructor()
165 try:
166 value = item[key]
167 if isinstance(value, list):
168 value.append(data)
169 else:
170 item[key] = [value, data]
171 except KeyError:
172 if self._should_force_list(key, data):
173 item[key] = [data]
174 else:
175 item[key] = data
176 return item
178 def _should_force_list(self, key, value):
179 if not self.force_list:
180 return False
181 if isinstance(self.force_list, bool):
182 return self.force_list
183 try:
184 return key in self.force_list
185 except TypeError:
186 return self.force_list(self.path[:-1], key, value)
188 def _should_force_cdata(self, key, value):
189 if not self.force_cdata:
190 return False
191 if isinstance(self.force_cdata, bool):
192 return self.force_cdata
193 try:
194 return key in self.force_cdata
195 except TypeError:
196 return self.force_cdata(self.path[:-1], key, value)
199def parse(xml_input, encoding=None, expat=expat, process_namespaces=False,
200 namespace_separator=':', disable_entities=True, process_comments=False, **kwargs):
201 """Parse the given XML input and convert it into a dictionary.
203 `xml_input` can either be a `string`, a file-like object, or a generator of strings.
205 If `xml_attribs` is `True`, element attributes are put in the dictionary
206 among regular child elements, using `@` as a prefix to avoid collisions. If
207 set to `False`, they are just ignored.
209 Simple example::
211 >>> import xmltodict
212 >>> doc = xmltodict.parse(\"\"\"
213 ... <a prop="x">
214 ... <b>1</b>
215 ... <b>2</b>
216 ... </a>
217 ... \"\"\")
218 >>> doc['a']['@prop']
219 'x'
220 >>> doc['a']['b']
221 ['1', '2']
223 If `item_depth` is `0`, the function returns a dictionary for the root
224 element (default behavior). Otherwise, it calls `item_callback` every time
225 an item at the specified depth is found and returns `None` in the end
226 (streaming mode).
228 The callback function receives two parameters: the `path` from the document
229 root to the item (name-attribs pairs), and the `item` (dict). If the
230 callback's return value is false-ish, parsing will be stopped with the
231 :class:`ParsingInterrupted` exception.
233 Streaming example::
235 >>> def handle(path, item):
236 ... print('path:%s item:%s' % (path, item))
237 ... return True
238 ...
239 >>> xmltodict.parse(\"\"\"
240 ... <a prop="x">
241 ... <b>1</b>
242 ... <b>2</b>
243 ... </a>\"\"\", item_depth=2, item_callback=handle)
244 path:[('a', {'prop': 'x'}), ('b', None)] item:1
245 path:[('a', {'prop': 'x'}), ('b', None)] item:2
247 The optional argument `postprocessor` is a function that takes `path`,
248 `key` and `value` as positional arguments and returns a new `(key, value)`
249 pair where both `key` and `value` may have changed. Usage example::
251 >>> def postprocessor(path, key, value):
252 ... try:
253 ... return key + ':int', int(value)
254 ... except (ValueError, TypeError):
255 ... return key, value
256 >>> xmltodict.parse('<a><b>1</b><b>2</b><b>x</b></a>',
257 ... postprocessor=postprocessor)
258 {'a': {'b:int': [1, 2], 'b': 'x'}}
260 You can pass an alternate version of `expat` (such as `defusedexpat`) by
261 using the `expat` parameter. E.g:
263 >>> import defusedexpat
264 >>> xmltodict.parse('<a>hello</a>', expat=defusedexpat.pyexpat)
265 {'a': 'hello'}
267 You can use the force_list argument to force lists to be created even
268 when there is only a single child of a given level of hierarchy. The
269 force_list argument is a tuple of keys. If the key for a given level
270 of hierarchy is in the force_list argument, that level of hierarchy
271 will have a list as a child (even if there is only one sub-element).
272 The index_keys operation takes precedence over this. This is applied
273 after any user-supplied postprocessor has already run.
275 For example, given this input:
276 <servers>
277 <server>
278 <name>host1</name>
279 <os>Linux</os>
280 <interfaces>
281 <interface>
282 <name>em0</name>
283 <ip_address>10.0.0.1</ip_address>
284 </interface>
285 </interfaces>
286 </server>
287 </servers>
289 If called with force_list=('interface',), it will produce
290 this dictionary:
291 {'servers':
292 {'server':
293 {'name': 'host1',
294 'os': 'Linux'},
295 'interfaces':
296 {'interface':
297 [ {'name': 'em0', 'ip_address': '10.0.0.1' } ] } } }
299 `force_list` can also be a callable that receives `path`, `key` and
300 `value`. This is helpful in cases where the logic that decides whether
301 a list should be forced is more complex.
304 If `process_comments` is `True`, comments will be added using `comment_key`
305 (default=`'#comment'`) to the tag that contains the comment.
307 For example, given this input:
308 <a>
309 <b>
310 <!-- b comment -->
311 <c>
312 <!-- c comment -->
313 1
314 </c>
315 <d>2</d>
316 </b>
317 </a>
319 If called with `process_comments=True`, it will produce
320 this dictionary:
321 'a': {
322 'b': {
323 '#comment': 'b comment',
324 'c': {
326 '#comment': 'c comment',
327 '#text': '1',
328 },
329 'd': '2',
330 },
331 }
332 Comment text is subject to the `strip_whitespace` flag: when it is left
333 at the default `True`, comments will have leading and trailing
334 whitespace removed. Disable `strip_whitespace` to keep comment
335 indentation or padding intact.
336 """
337 handler = _DictSAXHandler(namespace_separator=namespace_separator,
338 **kwargs)
339 if isinstance(xml_input, str):
340 encoding = encoding or 'utf-8'
341 xml_input = xml_input.encode(encoding)
342 if not process_namespaces:
343 namespace_separator = None
344 parser = expat.ParserCreate(
345 encoding,
346 namespace_separator
347 )
348 parser.ordered_attributes = True
349 parser.StartNamespaceDeclHandler = handler.startNamespaceDecl
350 parser.StartElementHandler = handler.startElement
351 parser.EndElementHandler = handler.endElement
352 parser.CharacterDataHandler = handler.characters
353 if process_comments:
354 parser.CommentHandler = handler.comments
355 parser.buffer_text = True
356 if disable_entities:
357 def _forbid_entities(*_args, **_kwargs):
358 raise ValueError("entities are disabled")
360 parser.EntityDeclHandler = _forbid_entities
361 if hasattr(xml_input, 'read'):
362 parser.ParseFile(xml_input)
363 elif isgenerator(xml_input):
364 for chunk in xml_input:
365 parser.Parse(chunk, False)
366 parser.Parse(b'', True)
367 else:
368 parser.Parse(xml_input, True)
369 return handler.item
372def _convert_value_to_string(value):
373 """Convert a value to its string representation for XML output.
375 Handles boolean values consistently by converting them to lowercase.
376 """
377 if isinstance(value, (str, bytes)):
378 return value
379 if isinstance(value, bool):
380 return "true" if value else "false"
381 return str(value)
384def _validate_name(value, kind):
385 """Validate an element/attribute name for XML safety.
387 Raises ValueError with a specific reason when invalid.
389 kind: 'element' or 'attribute' (used in error messages)
390 """
391 if not isinstance(value, str):
392 raise ValueError(f"{kind} name must be a string")
393 if value.startswith("?") or value.startswith("!"):
394 raise ValueError(f'Invalid {kind} name: cannot start with "?" or "!"')
395 if "<" in value or ">" in value:
396 raise ValueError(f'Invalid {kind} name: "<" or ">" not allowed')
397 if "/" in value:
398 raise ValueError(f'Invalid {kind} name: "/" not allowed')
399 if '"' in value or "'" in value:
400 raise ValueError(f"Invalid {kind} name: quotes not allowed")
401 if "=" in value:
402 raise ValueError(f'Invalid {kind} name: "=" not allowed')
403 if any(ch.isspace() for ch in value):
404 raise ValueError(f"Invalid {kind} name: whitespace not allowed")
407def _validate_comment(value):
408 if isinstance(value, bytes):
409 try:
410 value = value.decode("utf-8")
411 except UnicodeDecodeError as exc:
412 raise ValueError("Comment text must be valid UTF-8") from exc
413 if not isinstance(value, str):
414 raise ValueError("Comment text must be a string")
415 if "--" in value:
416 raise ValueError("Comment text cannot contain '--'")
417 if value.endswith("-"):
418 raise ValueError("Comment text cannot end with '-'")
419 return value
422def _process_namespace(name, namespaces, ns_sep=':', attr_prefix='@'):
423 if not isinstance(name, str):
424 return name
425 if not namespaces:
426 return name
427 try:
428 ns, name = name.rsplit(ns_sep, 1)
429 except ValueError:
430 pass
431 else:
432 ns_res = namespaces.get(ns.strip(attr_prefix))
433 name = '{}{}{}{}'.format(
434 attr_prefix if ns.startswith(attr_prefix) else '',
435 ns_res, ns_sep, name) if ns_res else name
436 return name
439def _emit(key, value, content_handler,
440 attr_prefix='@',
441 cdata_key='#text',
442 depth=0,
443 preprocessor=None,
444 pretty=False,
445 newl='\n',
446 indent='\t',
447 namespace_separator=':',
448 namespaces=None,
449 full_document=True,
450 expand_iter=None,
451 comment_key='#comment'):
452 if isinstance(key, str) and key == comment_key:
453 comments_list = value if isinstance(value, list) else [value]
454 if isinstance(indent, int):
455 indent = " " * indent
456 for comment_text in comments_list:
457 if comment_text is None:
458 continue
459 comment_text = _convert_value_to_string(comment_text)
460 if not comment_text:
461 continue
462 if pretty:
463 content_handler.ignorableWhitespace(depth * indent)
464 content_handler.comment(comment_text)
465 if pretty:
466 content_handler.ignorableWhitespace(newl)
467 return
469 key = _process_namespace(key, namespaces, namespace_separator, attr_prefix)
470 if preprocessor is not None:
471 result = preprocessor(key, value)
472 if result is None:
473 return
474 key, value = result
475 # Minimal validation to avoid breaking out of tag context
476 _validate_name(key, "element")
477 if not hasattr(value, '__iter__') or isinstance(value, (str, dict)):
478 value = [value]
479 for index, v in enumerate(value):
480 if full_document and depth == 0 and index > 0:
481 raise ValueError('document with multiple roots')
482 if v is None:
483 v = {}
484 elif not isinstance(v, (dict, str)):
485 if expand_iter and hasattr(v, '__iter__'):
486 v = {expand_iter: v}
487 else:
488 v = _convert_value_to_string(v)
489 if isinstance(v, str):
490 v = {cdata_key: v}
491 cdata = None
492 attrs = {}
493 children = []
494 for ik, iv in v.items():
495 if ik == cdata_key:
496 if iv is None:
497 cdata = None
498 else:
499 cdata = _convert_value_to_string(iv)
500 continue
501 if isinstance(ik, str) and ik.startswith(attr_prefix):
502 ik = _process_namespace(ik, namespaces, namespace_separator,
503 attr_prefix)
504 if ik == '@xmlns' and isinstance(iv, dict):
505 for k, v in iv.items():
506 _validate_name(k, "attribute")
507 attr = 'xmlns{}'.format(f':{k}' if k else '')
508 attrs[attr] = '' if v is None else str(v)
509 continue
510 if iv is None:
511 iv = ''
512 elif not isinstance(iv, str):
513 iv = str(iv)
514 attr_name = ik[len(attr_prefix) :]
515 _validate_name(attr_name, "attribute")
516 attrs[attr_name] = iv
517 continue
518 if isinstance(iv, list) and not iv:
519 continue # Skip empty lists to avoid creating empty child elements
520 children.append((ik, iv))
521 if isinstance(indent, int):
522 indent = ' ' * indent
523 if pretty:
524 content_handler.ignorableWhitespace(depth * indent)
525 content_handler.startElement(key, AttributesImpl(attrs))
526 if pretty and children:
527 content_handler.ignorableWhitespace(newl)
528 for child_key, child_value in children:
529 _emit(child_key, child_value, content_handler,
530 attr_prefix, cdata_key, depth+1, preprocessor,
531 pretty, newl, indent, namespaces=namespaces,
532 namespace_separator=namespace_separator,
533 expand_iter=expand_iter, comment_key=comment_key)
534 if cdata is not None:
535 content_handler.characters(cdata)
536 if pretty and children:
537 content_handler.ignorableWhitespace(depth * indent)
538 content_handler.endElement(key)
539 if pretty and depth:
540 content_handler.ignorableWhitespace(newl)
543class _XMLGenerator(XMLGenerator):
544 def comment(self, text):
545 text = _validate_comment(text)
546 self._write(f"<!--{escape(text)}-->")
549def unparse(input_dict, output=None, encoding='utf-8', full_document=True,
550 short_empty_elements=False, comment_key='#comment',
551 **kwargs):
552 """Emit an XML document for the given `input_dict` (reverse of `parse`).
554 The resulting XML document is returned as a string, but if `output` (a
555 file-like object) is specified, it is written there instead.
557 Dictionary keys prefixed with `attr_prefix` (default=`'@'`) are interpreted
558 as XML node attributes, whereas keys equal to `cdata_key`
559 (default=`'#text'`) are treated as character data.
561 Empty lists are omitted entirely: ``{"a": []}`` produces no ``<a>`` element.
562 Provide a placeholder entry (for example ``{"a": [""]}``) when an explicit
563 empty container element must be emitted.
565 The `pretty` parameter (default=`False`) enables pretty-printing. In this
566 mode, lines are terminated with `'\n'` and indented with `'\t'`, but this
567 can be customized with the `newl` and `indent` parameters.
569 """
570 must_return = False
571 if output is None:
572 output = StringIO()
573 must_return = True
574 if short_empty_elements:
575 content_handler = _XMLGenerator(output, encoding, True)
576 else:
577 content_handler = _XMLGenerator(output, encoding)
578 if full_document:
579 content_handler.startDocument()
580 seen_root = False
581 for key, value in input_dict.items():
582 if key != comment_key and full_document and seen_root:
583 raise ValueError("Document must have exactly one root.")
584 _emit(key, value, content_handler, full_document=full_document, comment_key=comment_key, **kwargs)
585 if key != comment_key:
586 seen_root = True
587 if full_document and not seen_root:
588 raise ValueError("Document must have exactly one root.")
589 if full_document:
590 content_handler.endDocument()
591 if must_return:
592 value = output.getvalue()
593 try: # pragma no cover
594 value = value.decode(encoding)
595 except AttributeError: # pragma no cover
596 pass
597 return value
600if __name__ == '__main__': # pragma: no cover
601 import marshal
602 import sys
604 stdin = sys.stdin.buffer
605 stdout = sys.stdout.buffer
607 (item_depth,) = sys.argv[1:]
608 item_depth = int(item_depth)
610 def handle_item(path, item):
611 marshal.dump((path, item), stdout)
612 return True
614 try:
615 root = parse(stdin,
616 item_depth=item_depth,
617 item_callback=handle_item,
618 dict_constructor=dict)
619 if item_depth == 0:
620 handle_item([], root)
621 except KeyboardInterrupt:
622 pass