Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/xmltodict.py: 59%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
2"Makes working with XML feel like you are working with JSON"
4from xml.parsers import expat
5from xml.sax.saxutils import XMLGenerator, escape
6from xml.sax.xmlreader import AttributesImpl
7from io import StringIO
8from inspect import isgenerator
10class ParsingInterrupted(Exception):
11 pass
14class _DictSAXHandler:
15 def __init__(
16 self,
17 item_depth=0,
18 item_callback=lambda *args: True,
19 xml_attribs=True,
20 attr_prefix="@",
21 cdata_key="#text",
22 force_cdata=False,
23 cdata_separator="",
24 postprocessor=None,
25 dict_constructor=dict,
26 strip_whitespace=True,
27 namespace_separator=":",
28 namespaces=None,
29 force_list=None,
30 comment_key="#comment",
31 ):
32 self.path = []
33 self.stack = []
34 self.data = []
35 self.item = None
36 self.item_depth = item_depth
37 self.xml_attribs = xml_attribs
38 self.item_callback = item_callback
39 self.attr_prefix = attr_prefix
40 self.cdata_key = cdata_key
41 self.force_cdata = force_cdata
42 self.cdata_separator = cdata_separator
43 self.postprocessor = postprocessor
44 self.dict_constructor = dict_constructor
45 self.strip_whitespace = strip_whitespace
46 self.namespace_separator = namespace_separator
47 self.namespaces = namespaces
48 self.namespace_declarations = dict_constructor()
49 self.force_list = force_list
50 self.comment_key = comment_key
52 def _build_name(self, full_name):
53 if self.namespaces is None:
54 return full_name
55 i = full_name.rfind(self.namespace_separator)
56 if i == -1:
57 return full_name
58 namespace, name = full_name[:i], full_name[i+1:]
59 try:
60 short_namespace = self.namespaces[namespace]
61 except KeyError:
62 short_namespace = namespace
63 if not short_namespace:
64 return name
65 else:
66 return self.namespace_separator.join((short_namespace, name))
68 def _attrs_to_dict(self, attrs):
69 if isinstance(attrs, dict):
70 return attrs
71 return self.dict_constructor(zip(attrs[0::2], attrs[1::2]))
73 def startNamespaceDecl(self, prefix, uri):
74 self.namespace_declarations[prefix or ''] = uri
76 def startElement(self, full_name, attrs):
77 name = self._build_name(full_name)
78 attrs = self._attrs_to_dict(attrs)
79 if self.namespace_declarations:
80 if not attrs:
81 attrs = self.dict_constructor()
82 attrs['xmlns'] = self.namespace_declarations
83 self.namespace_declarations = self.dict_constructor()
84 self.path.append((name, attrs or None))
85 if len(self.path) >= self.item_depth:
86 self.stack.append((self.item, self.data))
87 if self.xml_attribs:
88 attr_entries = []
89 for key, value in attrs.items():
90 key = self.attr_prefix+self._build_name(key)
91 if self.postprocessor:
92 entry = self.postprocessor(self.path, key, value)
93 else:
94 entry = (key, value)
95 if entry:
96 attr_entries.append(entry)
97 attrs = self.dict_constructor(attr_entries)
98 else:
99 attrs = None
100 self.item = attrs or None
101 self.data = []
103 def endElement(self, full_name):
104 name = self._build_name(full_name)
105 # If we just closed an item at the streaming depth, emit it and drop it
106 # without attaching it back to its parent. This avoids accumulating all
107 # streamed items in memory when using item_depth > 0.
108 if len(self.path) == self.item_depth:
109 item = self.item
110 if item is None:
111 item = (None if not self.data
112 else self.cdata_separator.join(self.data))
114 should_continue = self.item_callback(self.path, item)
115 if not should_continue:
116 raise ParsingInterrupted
117 # Reset state for the parent context without keeping a reference to
118 # the emitted item.
119 if self.stack:
120 self.item, self.data = self.stack.pop()
121 else:
122 self.item = None
123 self.data = []
124 self.path.pop()
125 return
126 if self.stack:
127 data = (None if not self.data
128 else self.cdata_separator.join(self.data))
129 item = self.item
130 self.item, self.data = self.stack.pop()
131 if self.strip_whitespace and data:
132 data = data.strip() or None
133 if data and self._should_force_cdata(name, data) and item is None:
134 item = self.dict_constructor()
135 if item is not None:
136 if data:
137 self.push_data(item, self.cdata_key, data)
138 self.item = self.push_data(self.item, name, item)
139 else:
140 self.item = self.push_data(self.item, name, data)
141 else:
142 self.item = None
143 self.data = []
144 self.path.pop()
146 def characters(self, data):
147 if not self.data:
148 self.data = [data]
149 else:
150 self.data.append(data)
152 def comments(self, data):
153 if self.strip_whitespace:
154 data = data.strip()
155 self.item = self.push_data(self.item, self.comment_key, data)
157 def push_data(self, item, key, data):
158 if self.postprocessor is not None:
159 result = self.postprocessor(self.path, key, data)
160 if result is None:
161 return item
162 key, data = result
163 if item is None:
164 item = self.dict_constructor()
165 try:
166 value = item[key]
167 if isinstance(value, list):
168 value.append(data)
169 else:
170 item[key] = [value, data]
171 except KeyError:
172 if self._should_force_list(key, data):
173 item[key] = [data]
174 else:
175 item[key] = data
176 return item
178 def _should_force_list(self, key, value):
179 if not self.force_list:
180 return False
181 if isinstance(self.force_list, bool):
182 return self.force_list
183 try:
184 return key in self.force_list
185 except TypeError:
186 return self.force_list(self.path[:-1], key, value)
188 def _should_force_cdata(self, key, value):
189 if not self.force_cdata:
190 return False
191 if isinstance(self.force_cdata, bool):
192 return self.force_cdata
193 try:
194 return key in self.force_cdata
195 except TypeError:
196 return self.force_cdata(self.path[:-1], key, value)
199def parse(xml_input, encoding=None, expat=expat, process_namespaces=False,
200 namespace_separator=':', disable_entities=True, process_comments=False, **kwargs):
201 """Parse the given XML input and convert it into a dictionary.
203 `xml_input` can either be a `string`, a file-like object, or a generator of strings.
205 If `xml_attribs` is `True`, element attributes are put in the dictionary
206 among regular child elements, using `@` as a prefix to avoid collisions. If
207 set to `False`, they are just ignored.
209 Simple example::
211 >>> import xmltodict
212 >>> doc = xmltodict.parse(\"\"\"
213 ... <a prop="x">
214 ... <b>1</b>
215 ... <b>2</b>
216 ... </a>
217 ... \"\"\")
218 >>> doc['a']['@prop']
219 'x'
220 >>> doc['a']['b']
221 ['1', '2']
223 If `item_depth` is `0`, the function returns a dictionary for the root
224 element (default behavior). Otherwise, it calls `item_callback` every time
225 an item at the specified depth is found and returns `None` in the end
226 (streaming mode).
228 The callback function receives two parameters: the `path` from the document
229 root to the item (name-attribs pairs), and the `item` (dict). If the
230 callback's return value is false-ish, parsing will be stopped with the
231 :class:`ParsingInterrupted` exception.
233 Streaming example::
235 >>> def handle(path, item):
236 ... print('path:%s item:%s' % (path, item))
237 ... return True
238 ...
239 >>> xmltodict.parse(\"\"\"
240 ... <a prop="x">
241 ... <b>1</b>
242 ... <b>2</b>
243 ... </a>\"\"\", item_depth=2, item_callback=handle)
244 path:[('a', {'prop': 'x'}), ('b', None)] item:1
245 path:[('a', {'prop': 'x'}), ('b', None)] item:2
247 The optional argument `postprocessor` is a function that takes `path`,
248 `key` and `value` as positional arguments and returns a new `(key, value)`
249 pair where both `key` and `value` may have changed. Usage example::
251 >>> def postprocessor(path, key, value):
252 ... try:
253 ... return key + ':int', int(value)
254 ... except (ValueError, TypeError):
255 ... return key, value
256 >>> xmltodict.parse('<a><b>1</b><b>2</b><b>x</b></a>',
257 ... postprocessor=postprocessor)
258 {'a': {'b:int': [1, 2], 'b': 'x'}}
260 You can pass an alternate version of `expat` (such as `defusedexpat`) by
261 using the `expat` parameter. E.g:
263 >>> import defusedexpat
264 >>> xmltodict.parse('<a>hello</a>', expat=defusedexpat.pyexpat)
265 {'a': 'hello'}
267 You can use the force_list argument to force lists to be created even
268 when there is only a single child of a given level of hierarchy. The
269 force_list argument is a tuple of keys. If the key for a given level
270 of hierarchy is in the force_list argument, that level of hierarchy
271 will have a list as a child (even if there is only one sub-element).
272 The index_keys operation takes precedence over this. This is applied
273 after any user-supplied postprocessor has already run.
275 For example, given this input:
276 <servers>
277 <server>
278 <name>host1</name>
279 <os>Linux</os>
280 <interfaces>
281 <interface>
282 <name>em0</name>
283 <ip_address>10.0.0.1</ip_address>
284 </interface>
285 </interfaces>
286 </server>
287 </servers>
289 If called with force_list=('interface',), it will produce
290 this dictionary:
291 {'servers':
292 {'server':
293 {'name': 'host1',
294 'os': 'Linux'},
295 'interfaces':
296 {'interface':
297 [ {'name': 'em0', 'ip_address': '10.0.0.1' } ] } } }
299 `force_list` can also be a callable that receives `path`, `key` and
300 `value`. This is helpful in cases where the logic that decides whether
301 a list should be forced is more complex.
304 If `process_comments` is `True`, comments will be added using `comment_key`
305 (default=`'#comment'`) to the tag that contains the comment.
307 For example, given this input:
308 <a>
309 <b>
310 <!-- b comment -->
311 <c>
312 <!-- c comment -->
313 1
314 </c>
315 <d>2</d>
316 </b>
317 </a>
319 If called with `process_comments=True`, it will produce
320 this dictionary:
321 'a': {
322 'b': {
323 '#comment': 'b comment',
324 'c': {
326 '#comment': 'c comment',
327 '#text': '1',
328 },
329 'd': '2',
330 },
331 }
332 Comment text is subject to the `strip_whitespace` flag: when it is left
333 at the default `True`, comments will have leading and trailing
334 whitespace removed. Disable `strip_whitespace` to keep comment
335 indentation or padding intact.
336 """
337 handler = _DictSAXHandler(namespace_separator=namespace_separator,
338 **kwargs)
339 if isinstance(xml_input, str):
340 encoding = encoding or 'utf-8'
341 xml_input = xml_input.encode(encoding)
342 if not process_namespaces:
343 namespace_separator = None
344 parser = expat.ParserCreate(
345 encoding,
346 namespace_separator
347 )
348 parser.ordered_attributes = True
349 parser.StartNamespaceDeclHandler = handler.startNamespaceDecl
350 parser.StartElementHandler = handler.startElement
351 parser.EndElementHandler = handler.endElement
352 parser.CharacterDataHandler = handler.characters
353 if process_comments:
354 parser.CommentHandler = handler.comments
355 parser.buffer_text = True
356 if disable_entities:
357 def _forbid_entities(*_args, **_kwargs):
358 raise ValueError("entities are disabled")
360 parser.EntityDeclHandler = _forbid_entities
361 if hasattr(xml_input, 'read'):
362 parser.ParseFile(xml_input)
363 elif isgenerator(xml_input):
364 for chunk in xml_input:
365 parser.Parse(chunk, False)
366 parser.Parse(b'', True)
367 else:
368 parser.Parse(xml_input, True)
369 return handler.item
372def _convert_value_to_string(value):
373 """Convert a value to its string representation for XML output.
375 Handles boolean values consistently by converting them to lowercase.
376 """
377 if isinstance(value, (str, bytes)):
378 return value
379 if isinstance(value, bool):
380 return "true" if value else "false"
381 return str(value)
384def _validate_name(value, kind):
385 """Validate an element/attribute name for XML safety.
387 Raises ValueError with a specific reason when invalid.
389 kind: 'element' or 'attribute' (used in error messages)
390 """
391 if not isinstance(value, str):
392 raise ValueError(f"{kind} name must be a string")
393 if value.startswith("?") or value.startswith("!"):
394 raise ValueError(f'Invalid {kind} name: cannot start with "?" or "!"')
395 if "<" in value or ">" in value:
396 raise ValueError(f'Invalid {kind} name: "<" or ">" not allowed')
397 if "/" in value:
398 raise ValueError(f'Invalid {kind} name: "/" not allowed')
399 if '"' in value or "'" in value:
400 raise ValueError(f"Invalid {kind} name: quotes not allowed")
401 if "=" in value:
402 raise ValueError(f'Invalid {kind} name: "=" not allowed')
403 if any(ch.isspace() for ch in value):
404 raise ValueError(f"Invalid {kind} name: whitespace not allowed")
407def _validate_comment(value):
408 if isinstance(value, bytes):
409 try:
410 value = value.decode("utf-8")
411 except UnicodeDecodeError as exc:
412 raise ValueError("Comment text must be valid UTF-8") from exc
413 if not isinstance(value, str):
414 raise ValueError("Comment text must be a string")
415 if "--" in value:
416 raise ValueError("Comment text cannot contain '--'")
417 if value.endswith("-"):
418 raise ValueError("Comment text cannot end with '-'")
419 return value
422def _process_namespace(name, namespaces, ns_sep=':', attr_prefix='@'):
423 if not isinstance(name, str):
424 return name
425 if not namespaces:
426 return name
427 try:
428 ns, name = name.rsplit(ns_sep, 1)
429 except ValueError:
430 pass
431 else:
432 ns_res = namespaces.get(ns.strip(attr_prefix))
433 name = '{}{}{}{}'.format(
434 attr_prefix if ns.startswith(attr_prefix) else '',
435 ns_res, ns_sep, name) if ns_res else name
436 return name
439def _emit(key, value, content_handler,
440 attr_prefix='@',
441 cdata_key='#text',
442 depth=0,
443 preprocessor=None,
444 pretty=False,
445 newl='\n',
446 indent='\t',
447 namespace_separator=':',
448 namespaces=None,
449 full_document=True,
450 expand_iter=None,
451 comment_key='#comment'):
452 if isinstance(key, str) and key == comment_key:
453 comments_list = value if isinstance(value, list) else [value]
454 if isinstance(indent, int):
455 indent = " " * indent
456 for comment_text in comments_list:
457 if comment_text is None:
458 continue
459 comment_text = _convert_value_to_string(comment_text)
460 if not comment_text:
461 continue
462 if pretty:
463 content_handler.ignorableWhitespace(depth * indent)
464 content_handler.comment(comment_text)
465 if pretty:
466 content_handler.ignorableWhitespace(newl)
467 return
469 key = _process_namespace(key, namespaces, namespace_separator, attr_prefix)
470 if preprocessor is not None:
471 result = preprocessor(key, value)
472 if result is None:
473 return
474 key, value = result
475 # Minimal validation to avoid breaking out of tag context
476 _validate_name(key, "element")
477 if not hasattr(value, '__iter__') or isinstance(value, (str, dict)):
478 value = [value]
479 for index, v in enumerate(value):
480 if full_document and depth == 0 and index > 0:
481 raise ValueError('document with multiple roots')
482 if v is None:
483 v = {}
484 elif not isinstance(v, (dict, str)):
485 if expand_iter and hasattr(v, '__iter__'):
486 v = {expand_iter: v}
487 else:
488 v = _convert_value_to_string(v)
489 if isinstance(v, str):
490 v = {cdata_key: v}
491 cdata = None
492 attrs = {}
493 children = []
494 for ik, iv in v.items():
495 if ik == cdata_key:
496 cdata = _convert_value_to_string(iv)
497 continue
498 if isinstance(ik, str) and ik.startswith(attr_prefix):
499 ik = _process_namespace(ik, namespaces, namespace_separator,
500 attr_prefix)
501 if ik == '@xmlns' and isinstance(iv, dict):
502 for k, v in iv.items():
503 _validate_name(k, "attribute")
504 attr = 'xmlns{}'.format(f':{k}' if k else '')
505 attrs[attr] = str(v)
506 continue
507 if not isinstance(iv, str):
508 iv = str(iv)
509 attr_name = ik[len(attr_prefix) :]
510 _validate_name(attr_name, "attribute")
511 attrs[attr_name] = iv
512 continue
513 if isinstance(iv, list) and not iv:
514 continue # Skip empty lists to avoid creating empty child elements
515 children.append((ik, iv))
516 if isinstance(indent, int):
517 indent = ' ' * indent
518 if pretty:
519 content_handler.ignorableWhitespace(depth * indent)
520 content_handler.startElement(key, AttributesImpl(attrs))
521 if pretty and children:
522 content_handler.ignorableWhitespace(newl)
523 for child_key, child_value in children:
524 _emit(child_key, child_value, content_handler,
525 attr_prefix, cdata_key, depth+1, preprocessor,
526 pretty, newl, indent, namespaces=namespaces,
527 namespace_separator=namespace_separator,
528 expand_iter=expand_iter, comment_key=comment_key)
529 if cdata is not None:
530 content_handler.characters(cdata)
531 if pretty and children:
532 content_handler.ignorableWhitespace(depth * indent)
533 content_handler.endElement(key)
534 if pretty and depth:
535 content_handler.ignorableWhitespace(newl)
538class _XMLGenerator(XMLGenerator):
539 def comment(self, text):
540 text = _validate_comment(text)
541 self._write(f"<!--{escape(text)}-->")
544def unparse(input_dict, output=None, encoding='utf-8', full_document=True,
545 short_empty_elements=False, comment_key='#comment',
546 **kwargs):
547 """Emit an XML document for the given `input_dict` (reverse of `parse`).
549 The resulting XML document is returned as a string, but if `output` (a
550 file-like object) is specified, it is written there instead.
552 Dictionary keys prefixed with `attr_prefix` (default=`'@'`) are interpreted
553 as XML node attributes, whereas keys equal to `cdata_key`
554 (default=`'#text'`) are treated as character data.
556 Empty lists are omitted entirely: ``{"a": []}`` produces no ``<a>`` element.
557 Provide a placeholder entry (for example ``{"a": [""]}``) when an explicit
558 empty container element must be emitted.
560 The `pretty` parameter (default=`False`) enables pretty-printing. In this
561 mode, lines are terminated with `'\n'` and indented with `'\t'`, but this
562 can be customized with the `newl` and `indent` parameters.
564 """
565 must_return = False
566 if output is None:
567 output = StringIO()
568 must_return = True
569 if short_empty_elements:
570 content_handler = _XMLGenerator(output, encoding, True)
571 else:
572 content_handler = _XMLGenerator(output, encoding)
573 if full_document:
574 content_handler.startDocument()
575 seen_root = False
576 for key, value in input_dict.items():
577 if key != comment_key and full_document and seen_root:
578 raise ValueError("Document must have exactly one root.")
579 _emit(key, value, content_handler, full_document=full_document, comment_key=comment_key, **kwargs)
580 if key != comment_key:
581 seen_root = True
582 if full_document and not seen_root:
583 raise ValueError("Document must have exactly one root.")
584 if full_document:
585 content_handler.endDocument()
586 if must_return:
587 value = output.getvalue()
588 try: # pragma no cover
589 value = value.decode(encoding)
590 except AttributeError: # pragma no cover
591 pass
592 return value
595if __name__ == '__main__': # pragma: no cover
596 import marshal
597 import sys
599 stdin = sys.stdin.buffer
600 stdout = sys.stdout.buffer
602 (item_depth,) = sys.argv[1:]
603 item_depth = int(item_depth)
605 def handle_item(path, item):
606 marshal.dump((path, item), stdout)
607 return True
609 try:
610 root = parse(stdin,
611 item_depth=item_depth,
612 item_callback=handle_item,
613 dict_constructor=dict)
614 if item_depth == 0:
615 handle_item([], root)
616 except KeyboardInterrupt:
617 pass