Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/xmltodict.py: 59%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
2"Makes working with XML feel like you are working with JSON"
4from xml.parsers import expat
5from xml.sax.saxutils import XMLGenerator, escape
6from xml.sax.xmlreader import AttributesImpl
7from io import StringIO
8from inspect import isgenerator
10class ParsingInterrupted(Exception):
11 pass
14class _DictSAXHandler:
15 def __init__(
16 self,
17 item_depth=0,
18 item_callback=lambda *args: True,
19 xml_attribs=True,
20 attr_prefix="@",
21 cdata_key="#text",
22 force_cdata=False,
23 cdata_separator="",
24 postprocessor=None,
25 dict_constructor=dict,
26 strip_whitespace=True,
27 namespace_separator=":",
28 namespaces=None,
29 force_list=None,
30 comment_key="#comment",
31 ):
32 self.path = []
33 self.stack = []
34 self.data = []
35 self.item = None
36 self.item_depth = item_depth
37 self.xml_attribs = xml_attribs
38 self.item_callback = item_callback
39 self.attr_prefix = attr_prefix
40 self.cdata_key = cdata_key
41 self.force_cdata = force_cdata
42 self.cdata_separator = cdata_separator
43 self.postprocessor = postprocessor
44 self.dict_constructor = dict_constructor
45 self.strip_whitespace = strip_whitespace
46 self.namespace_separator = namespace_separator
47 self.namespaces = namespaces
48 self.namespace_declarations = dict_constructor()
49 self.force_list = force_list
50 self.comment_key = comment_key
52 def _build_name(self, full_name):
53 if self.namespaces is None:
54 return full_name
55 i = full_name.rfind(self.namespace_separator)
56 if i == -1:
57 return full_name
58 namespace, name = full_name[:i], full_name[i+1:]
59 try:
60 short_namespace = self.namespaces[namespace]
61 except KeyError:
62 short_namespace = namespace
63 if not short_namespace:
64 return name
65 else:
66 return self.namespace_separator.join((short_namespace, name))
68 def _attrs_to_dict(self, attrs):
69 if isinstance(attrs, dict):
70 return attrs
71 return self.dict_constructor(zip(attrs[0::2], attrs[1::2]))
73 def startNamespaceDecl(self, prefix, uri):
74 self.namespace_declarations[prefix or ''] = uri
76 def startElement(self, full_name, attrs):
77 name = self._build_name(full_name)
78 attrs = self._attrs_to_dict(attrs)
79 if self.namespace_declarations:
80 if not attrs:
81 attrs = self.dict_constructor()
82 attrs['xmlns'] = self.namespace_declarations
83 self.namespace_declarations = self.dict_constructor()
84 self.path.append((name, attrs or None))
85 if len(self.path) >= self.item_depth:
86 self.stack.append((self.item, self.data))
87 if self.xml_attribs:
88 attr_entries = []
89 for key, value in attrs.items():
90 key = self.attr_prefix+self._build_name(key)
91 if self.postprocessor:
92 entry = self.postprocessor(self.path, key, value)
93 else:
94 entry = (key, value)
95 if entry:
96 attr_entries.append(entry)
97 attrs = self.dict_constructor(attr_entries)
98 else:
99 attrs = None
100 self.item = attrs or None
101 self.data = []
103 def endElement(self, full_name):
104 name = self._build_name(full_name)
105 # If we just closed an item at the streaming depth, emit it and drop it
106 # without attaching it back to its parent. This avoids accumulating all
107 # streamed items in memory when using item_depth > 0.
108 if len(self.path) == self.item_depth:
109 item = self.item
110 if item is None:
111 item = (None if not self.data
112 else self.cdata_separator.join(self.data))
114 should_continue = self.item_callback(self.path, item)
115 if not should_continue:
116 raise ParsingInterrupted
117 # Reset state for the parent context without keeping a reference to
118 # the emitted item.
119 if self.stack:
120 self.item, self.data = self.stack.pop()
121 else:
122 self.item = None
123 self.data = []
124 self.path.pop()
125 return
126 if self.stack:
127 data = (None if not self.data
128 else self.cdata_separator.join(self.data))
129 item = self.item
130 self.item, self.data = self.stack.pop()
131 if self.strip_whitespace and data:
132 data = data.strip() or None
133 if data and self._should_force_cdata(name, data) and item is None:
134 item = self.dict_constructor()
135 if item is not None:
136 if data:
137 self.push_data(item, self.cdata_key, data)
138 self.item = self.push_data(self.item, name, item)
139 else:
140 self.item = self.push_data(self.item, name, data)
141 else:
142 self.item = None
143 self.data = []
144 self.path.pop()
146 def characters(self, data):
147 if not self.data:
148 self.data = [data]
149 else:
150 self.data.append(data)
152 def comments(self, data):
153 if self.strip_whitespace:
154 data = data.strip()
155 self.item = self.push_data(self.item, self.comment_key, data)
157 def push_data(self, item, key, data):
158 if self.postprocessor is not None:
159 result = self.postprocessor(self.path, key, data)
160 if result is None:
161 return item
162 key, data = result
163 if item is None:
164 item = self.dict_constructor()
165 try:
166 value = item[key]
167 if isinstance(value, list):
168 value.append(data)
169 else:
170 item[key] = [value, data]
171 except KeyError:
172 if self._should_force_list(key, data):
173 item[key] = [data]
174 else:
175 item[key] = data
176 return item
178 def _should_force_list(self, key, value):
179 if not self.force_list:
180 return False
181 if isinstance(self.force_list, bool):
182 return self.force_list
183 try:
184 return key in self.force_list
185 except TypeError:
186 return self.force_list(self.path[:-1], key, value)
188 def _should_force_cdata(self, key, value):
189 if not self.force_cdata:
190 return False
191 if isinstance(self.force_cdata, bool):
192 return self.force_cdata
193 try:
194 return key in self.force_cdata
195 except TypeError:
196 return self.force_cdata(self.path[:-1], key, value)
199def parse(xml_input, encoding=None, expat=expat, process_namespaces=False,
200 namespace_separator=':', disable_entities=True, process_comments=False, **kwargs):
201 """Parse the given XML input and convert it into a dictionary.
203 `xml_input` can either be a `string`, a file-like object, or a generator of strings.
205 If `xml_attribs` is `True`, element attributes are put in the dictionary
206 among regular child elements, using `@` as a prefix to avoid collisions. If
207 set to `False`, they are just ignored.
209 Simple example::
211 >>> import xmltodict
212 >>> doc = xmltodict.parse(\"\"\"
213 ... <a prop="x">
214 ... <b>1</b>
215 ... <b>2</b>
216 ... </a>
217 ... \"\"\")
218 >>> doc['a']['@prop']
219 'x'
220 >>> doc['a']['b']
221 ['1', '2']
223 If `item_depth` is `0`, the function returns a dictionary for the root
224 element (default behavior). Otherwise, it calls `item_callback` every time
225 an item at the specified depth is found and returns `None` in the end
226 (streaming mode).
228 The callback function receives two parameters: the `path` from the document
229 root to the item (name-attribs pairs), and the `item` (dict). If the
230 callback's return value is false-ish, parsing will be stopped with the
231 :class:`ParsingInterrupted` exception.
233 Streaming example::
235 >>> def handle(path, item):
236 ... print('path:%s item:%s' % (path, item))
237 ... return True
238 ...
239 >>> xmltodict.parse(\"\"\"
240 ... <a prop="x">
241 ... <b>1</b>
242 ... <b>2</b>
243 ... </a>\"\"\", item_depth=2, item_callback=handle)
244 path:[('a', {'prop': 'x'}), ('b', None)] item:1
245 path:[('a', {'prop': 'x'}), ('b', None)] item:2
247 The optional argument `postprocessor` is a function that takes `path`,
248 `key` and `value` as positional arguments and returns a new `(key, value)`
249 pair where both `key` and `value` may have changed. Usage example::
251 >>> def postprocessor(path, key, value):
252 ... try:
253 ... return key + ':int', int(value)
254 ... except (ValueError, TypeError):
255 ... return key, value
256 >>> xmltodict.parse('<a><b>1</b><b>2</b><b>x</b></a>',
257 ... postprocessor=postprocessor)
258 {'a': {'b:int': [1, 2], 'b': 'x'}}
260 You can pass an alternate version of `expat` (such as `defusedexpat`) by
261 using the `expat` parameter. E.g:
263 >>> import defusedexpat
264 >>> xmltodict.parse('<a>hello</a>', expat=defusedexpat.pyexpat)
265 {'a': 'hello'}
267 You can use the force_list argument to force lists to be created even
268 when there is only a single child of a given level of hierarchy. The
269 force_list argument is a tuple of keys. If the key for a given level
270 of hierarchy is in the force_list argument, that level of hierarchy
271 will have a list as a child (even if there is only one sub-element).
272 The index_keys operation takes precedence over this. This is applied
273 after any user-supplied postprocessor has already run.
275 For example, given this input:
276 <servers>
277 <server>
278 <name>host1</name>
279 <os>Linux</os>
280 <interfaces>
281 <interface>
282 <name>em0</name>
283 <ip_address>10.0.0.1</ip_address>
284 </interface>
285 </interfaces>
286 </server>
287 </servers>
289 If called with force_list=('interface',), it will produce
290 this dictionary:
291 {'servers':
292 {'server':
293 {'name': 'host1',
294 'os': 'Linux'},
295 'interfaces':
296 {'interface':
297 [ {'name': 'em0', 'ip_address': '10.0.0.1' } ] } } }
299 `force_list` can also be a callable that receives `path`, `key` and
300 `value`. This is helpful in cases where the logic that decides whether
301 a list should be forced is more complex.
304 If `process_comments` is `True`, comments will be added using `comment_key`
305 (default=`'#comment'`) to the tag that contains the comment.
307 For example, given this input:
308 <a>
309 <b>
310 <!-- b comment -->
311 <c>
312 <!-- c comment -->
313 1
314 </c>
315 <d>2</d>
316 </b>
317 </a>
319 If called with `process_comments=True`, it will produce
320 this dictionary:
321 'a': {
322 'b': {
323 '#comment': 'b comment',
324 'c': {
326 '#comment': 'c comment',
327 '#text': '1',
328 },
329 'd': '2',
330 },
331 }
332 Comment text is subject to the `strip_whitespace` flag: when it is left
333 at the default `True`, comments will have leading and trailing
334 whitespace removed. Disable `strip_whitespace` to keep comment
335 indentation or padding intact.
336 """
337 handler = _DictSAXHandler(namespace_separator=namespace_separator,
338 **kwargs)
339 if isinstance(xml_input, str):
340 encoding = encoding or 'utf-8'
341 xml_input = xml_input.encode(encoding)
342 if not process_namespaces:
343 namespace_separator = None
344 parser = expat.ParserCreate(
345 encoding,
346 namespace_separator
347 )
348 parser.ordered_attributes = True
349 parser.StartNamespaceDeclHandler = handler.startNamespaceDecl
350 parser.StartElementHandler = handler.startElement
351 parser.EndElementHandler = handler.endElement
352 parser.CharacterDataHandler = handler.characters
353 if process_comments:
354 parser.CommentHandler = handler.comments
355 parser.buffer_text = True
356 if disable_entities:
357 def _forbid_entities(*_args, **_kwargs):
358 raise ValueError("entities are disabled")
360 parser.EntityDeclHandler = _forbid_entities
361 if hasattr(xml_input, 'read'):
362 parser.ParseFile(xml_input)
363 elif isgenerator(xml_input):
364 for chunk in xml_input:
365 parser.Parse(chunk, False)
366 parser.Parse(b'', True)
367 else:
368 parser.Parse(xml_input, True)
369 return handler.item
372def _convert_value_to_string(value):
373 """Convert a value to its string representation for XML output.
375 Handles boolean values consistently by converting them to lowercase.
376 """
377 if isinstance(value, (str, bytes)):
378 return value
379 if isinstance(value, bool):
380 return "true" if value else "false"
381 return str(value)
382def _validate_name(value, kind):
383 """Validate an element/attribute name for XML safety.
385 Raises ValueError with a specific reason when invalid.
387 kind: 'element' or 'attribute' (used in error messages)
388 """
389 if not isinstance(value, str):
390 raise ValueError(f"{kind} name must be a string")
391 if value.startswith("?") or value.startswith("!"):
392 raise ValueError(f'Invalid {kind} name: cannot start with "?" or "!"')
393 if "<" in value or ">" in value:
394 raise ValueError(f'Invalid {kind} name: "<" or ">" not allowed')
395 if "/" in value:
396 raise ValueError(f'Invalid {kind} name: "/" not allowed')
397 if '"' in value or "'" in value:
398 raise ValueError(f"Invalid {kind} name: quotes not allowed")
399 if "=" in value:
400 raise ValueError(f'Invalid {kind} name: "=" not allowed')
401 if any(ch.isspace() for ch in value):
402 raise ValueError(f"Invalid {kind} name: whitespace not allowed")
405def _validate_comment(value):
406 if isinstance(value, bytes):
407 try:
408 value = value.decode("utf-8")
409 except UnicodeDecodeError as exc:
410 raise ValueError("Comment text must be valid UTF-8") from exc
411 if not isinstance(value, str):
412 raise ValueError("Comment text must be a string")
413 if "--" in value:
414 raise ValueError("Comment text cannot contain '--'")
415 if value.endswith("-"):
416 raise ValueError("Comment text cannot end with '-'")
417 return value
420def _process_namespace(name, namespaces, ns_sep=':', attr_prefix='@'):
421 if not isinstance(name, str):
422 return name
423 if not namespaces:
424 return name
425 try:
426 ns, name = name.rsplit(ns_sep, 1)
427 except ValueError:
428 pass
429 else:
430 ns_res = namespaces.get(ns.strip(attr_prefix))
431 name = '{}{}{}{}'.format(
432 attr_prefix if ns.startswith(attr_prefix) else '',
433 ns_res, ns_sep, name) if ns_res else name
434 return name
437def _emit(key, value, content_handler,
438 attr_prefix='@',
439 cdata_key='#text',
440 depth=0,
441 preprocessor=None,
442 pretty=False,
443 newl='\n',
444 indent='\t',
445 namespace_separator=':',
446 namespaces=None,
447 full_document=True,
448 expand_iter=None,
449 comment_key='#comment'):
450 if isinstance(key, str) and key == comment_key:
451 comments_list = value if isinstance(value, list) else [value]
452 if isinstance(indent, int):
453 indent = " " * indent
454 for comment_text in comments_list:
455 if comment_text is None:
456 continue
457 comment_text = _convert_value_to_string(comment_text)
458 if not comment_text:
459 continue
460 if pretty:
461 content_handler.ignorableWhitespace(depth * indent)
462 content_handler.comment(comment_text)
463 if pretty:
464 content_handler.ignorableWhitespace(newl)
465 return
467 key = _process_namespace(key, namespaces, namespace_separator, attr_prefix)
468 if preprocessor is not None:
469 result = preprocessor(key, value)
470 if result is None:
471 return
472 key, value = result
473 # Minimal validation to avoid breaking out of tag context
474 _validate_name(key, "element")
475 if not hasattr(value, '__iter__') or isinstance(value, (str, dict)):
476 value = [value]
477 for index, v in enumerate(value):
478 if full_document and depth == 0 and index > 0:
479 raise ValueError('document with multiple roots')
480 if v is None:
481 v = {}
482 elif not isinstance(v, (dict, str)):
483 if expand_iter and hasattr(v, '__iter__'):
484 v = {expand_iter: v}
485 else:
486 v = _convert_value_to_string(v)
487 if isinstance(v, str):
488 v = {cdata_key: v}
489 cdata = None
490 attrs = {}
491 children = []
492 for ik, iv in v.items():
493 if ik == cdata_key:
494 cdata = _convert_value_to_string(iv)
495 continue
496 if isinstance(ik, str) and ik.startswith(attr_prefix):
497 ik = _process_namespace(ik, namespaces, namespace_separator,
498 attr_prefix)
499 if ik == '@xmlns' and isinstance(iv, dict):
500 for k, v in iv.items():
501 _validate_name(k, "attribute")
502 attr = 'xmlns{}'.format(f':{k}' if k else '')
503 attrs[attr] = str(v)
504 continue
505 if not isinstance(iv, str):
506 iv = str(iv)
507 attr_name = ik[len(attr_prefix) :]
508 _validate_name(attr_name, "attribute")
509 attrs[attr_name] = iv
510 continue
511 if isinstance(iv, list) and not iv:
512 continue # Skip empty lists to avoid creating empty child elements
513 children.append((ik, iv))
514 if isinstance(indent, int):
515 indent = ' ' * indent
516 if pretty:
517 content_handler.ignorableWhitespace(depth * indent)
518 content_handler.startElement(key, AttributesImpl(attrs))
519 if pretty and children:
520 content_handler.ignorableWhitespace(newl)
521 for child_key, child_value in children:
522 _emit(child_key, child_value, content_handler,
523 attr_prefix, cdata_key, depth+1, preprocessor,
524 pretty, newl, indent, namespaces=namespaces,
525 namespace_separator=namespace_separator,
526 expand_iter=expand_iter, comment_key=comment_key)
527 if cdata is not None:
528 content_handler.characters(cdata)
529 if pretty and children:
530 content_handler.ignorableWhitespace(depth * indent)
531 content_handler.endElement(key)
532 if pretty and depth:
533 content_handler.ignorableWhitespace(newl)
536class _XMLGenerator(XMLGenerator):
537 def comment(self, text):
538 text = _validate_comment(text)
539 self._write(f"<!--{escape(text)}-->")
542def unparse(input_dict, output=None, encoding='utf-8', full_document=True,
543 short_empty_elements=False, comment_key='#comment',
544 **kwargs):
545 """Emit an XML document for the given `input_dict` (reverse of `parse`).
547 The resulting XML document is returned as a string, but if `output` (a
548 file-like object) is specified, it is written there instead.
550 Dictionary keys prefixed with `attr_prefix` (default=`'@'`) are interpreted
551 as XML node attributes, whereas keys equal to `cdata_key`
552 (default=`'#text'`) are treated as character data.
554 Empty lists are omitted entirely: ``{"a": []}`` produces no ``<a>`` element.
555 Provide a placeholder entry (for example ``{"a": [""]}``) when an explicit
556 empty container element must be emitted.
558 The `pretty` parameter (default=`False`) enables pretty-printing. In this
559 mode, lines are terminated with `'\n'` and indented with `'\t'`, but this
560 can be customized with the `newl` and `indent` parameters.
562 """
563 must_return = False
564 if output is None:
565 output = StringIO()
566 must_return = True
567 if short_empty_elements:
568 content_handler = _XMLGenerator(output, encoding, True)
569 else:
570 content_handler = _XMLGenerator(output, encoding)
571 if full_document:
572 content_handler.startDocument()
573 seen_root = False
574 for key, value in input_dict.items():
575 if key != comment_key and full_document and seen_root:
576 raise ValueError("Document must have exactly one root.")
577 _emit(key, value, content_handler, full_document=full_document, comment_key=comment_key, **kwargs)
578 if key != comment_key:
579 seen_root = True
580 if full_document and not seen_root:
581 raise ValueError("Document must have exactly one root.")
582 if full_document:
583 content_handler.endDocument()
584 if must_return:
585 value = output.getvalue()
586 try: # pragma no cover
587 value = value.decode(encoding)
588 except AttributeError: # pragma no cover
589 pass
590 return value
593if __name__ == '__main__': # pragma: no cover
594 import marshal
595 import sys
597 stdin = sys.stdin.buffer
598 stdout = sys.stdout.buffer
600 (item_depth,) = sys.argv[1:]
601 item_depth = int(item_depth)
603 def handle_item(path, item):
604 marshal.dump((path, item), stdout)
605 return True
607 try:
608 root = parse(stdin,
609 item_depth=item_depth,
610 item_callback=handle_item,
611 dict_constructor=dict)
612 if item_depth == 0:
613 handle_item([], root)
614 except KeyboardInterrupt:
615 pass