Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/xmltodict.py: 72%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

259 statements  

1#!/usr/bin/env python 

2"Makes working with XML feel like you are working with JSON" 

3 

4from xml.parsers import expat 

5from xml.sax.saxutils import XMLGenerator 

6from xml.sax.xmlreader import AttributesImpl 

7from io import StringIO 

8 

9_dict = dict 

10import platform 

11if tuple(map(int, platform.python_version_tuple()[:2])) < (3, 7): 

12 from collections import OrderedDict as _dict 

13 

14from inspect import isgenerator 

15 

16__author__ = 'Martin Blech' 

17__version__ = "0.14.2" 

18__license__ = 'MIT' 

19 

20 

21class ParsingInterrupted(Exception): 

22 pass 

23 

24 

25class _DictSAXHandler: 

26 def __init__(self, 

27 item_depth=0, 

28 item_callback=lambda *args: True, 

29 xml_attribs=True, 

30 attr_prefix='@', 

31 cdata_key='#text', 

32 force_cdata=False, 

33 cdata_separator='', 

34 postprocessor=None, 

35 dict_constructor=_dict, 

36 strip_whitespace=True, 

37 namespace_separator=':', 

38 namespaces=None, 

39 force_list=None, 

40 comment_key='#comment'): 

41 self.path = [] 

42 self.stack = [] 

43 self.data = [] 

44 self.item = None 

45 self.item_depth = item_depth 

46 self.xml_attribs = xml_attribs 

47 self.item_callback = item_callback 

48 self.attr_prefix = attr_prefix 

49 self.cdata_key = cdata_key 

50 self.force_cdata = force_cdata 

51 self.cdata_separator = cdata_separator 

52 self.postprocessor = postprocessor 

53 self.dict_constructor = dict_constructor 

54 self.strip_whitespace = strip_whitespace 

55 self.namespace_separator = namespace_separator 

56 self.namespaces = namespaces 

57 self.namespace_declarations = dict_constructor() 

58 self.force_list = force_list 

59 self.comment_key = comment_key 

60 

61 def _build_name(self, full_name): 

62 if self.namespaces is None: 

63 return full_name 

64 i = full_name.rfind(self.namespace_separator) 

65 if i == -1: 

66 return full_name 

67 namespace, name = full_name[:i], full_name[i+1:] 

68 try: 

69 short_namespace = self.namespaces[namespace] 

70 except KeyError: 

71 short_namespace = namespace 

72 if not short_namespace: 

73 return name 

74 else: 

75 return self.namespace_separator.join((short_namespace, name)) 

76 

77 def _attrs_to_dict(self, attrs): 

78 if isinstance(attrs, dict): 

79 return attrs 

80 return self.dict_constructor(zip(attrs[0::2], attrs[1::2])) 

81 

82 def startNamespaceDecl(self, prefix, uri): 

83 self.namespace_declarations[prefix or ''] = uri 

84 

85 def startElement(self, full_name, attrs): 

86 name = self._build_name(full_name) 

87 attrs = self._attrs_to_dict(attrs) 

88 if attrs and self.namespace_declarations: 

89 attrs['xmlns'] = self.namespace_declarations 

90 self.namespace_declarations = self.dict_constructor() 

91 self.path.append((name, attrs or None)) 

92 if len(self.path) >= self.item_depth: 

93 self.stack.append((self.item, self.data)) 

94 if self.xml_attribs: 

95 attr_entries = [] 

96 for key, value in attrs.items(): 

97 key = self.attr_prefix+self._build_name(key) 

98 if self.postprocessor: 

99 entry = self.postprocessor(self.path, key, value) 

100 else: 

101 entry = (key, value) 

102 if entry: 

103 attr_entries.append(entry) 

104 attrs = self.dict_constructor(attr_entries) 

105 else: 

106 attrs = None 

107 self.item = attrs or None 

108 self.data = [] 

109 

110 def endElement(self, full_name): 

111 name = self._build_name(full_name) 

112 if len(self.path) == self.item_depth: 

113 item = self.item 

114 if item is None: 

115 item = (None if not self.data 

116 else self.cdata_separator.join(self.data)) 

117 

118 should_continue = self.item_callback(self.path, item) 

119 if not should_continue: 

120 raise ParsingInterrupted 

121 if self.stack: 

122 data = (None if not self.data 

123 else self.cdata_separator.join(self.data)) 

124 item = self.item 

125 self.item, self.data = self.stack.pop() 

126 if self.strip_whitespace and data: 

127 data = data.strip() or None 

128 if data and self.force_cdata and item is None: 

129 item = self.dict_constructor() 

130 if item is not None: 

131 if data: 

132 self.push_data(item, self.cdata_key, data) 

133 self.item = self.push_data(self.item, name, item) 

134 else: 

135 self.item = self.push_data(self.item, name, data) 

136 else: 

137 self.item = None 

138 self.data = [] 

139 self.path.pop() 

140 

141 def characters(self, data): 

142 if not self.data: 

143 self.data = [data] 

144 else: 

145 self.data.append(data) 

146 

147 def comments(self, data): 

148 if self.strip_whitespace: 

149 data = data.strip() 

150 self.item = self.push_data(self.item, self.comment_key, data) 

151 

152 def push_data(self, item, key, data): 

153 if self.postprocessor is not None: 

154 result = self.postprocessor(self.path, key, data) 

155 if result is None: 

156 return item 

157 key, data = result 

158 if item is None: 

159 item = self.dict_constructor() 

160 try: 

161 value = item[key] 

162 if isinstance(value, list): 

163 value.append(data) 

164 else: 

165 item[key] = [value, data] 

166 except KeyError: 

167 if self._should_force_list(key, data): 

168 item[key] = [data] 

169 else: 

170 item[key] = data 

171 return item 

172 

173 def _should_force_list(self, key, value): 

174 if not self.force_list: 

175 return False 

176 if isinstance(self.force_list, bool): 

177 return self.force_list 

178 try: 

179 return key in self.force_list 

180 except TypeError: 

181 return self.force_list(self.path[:-1], key, value) 

182 

183 

184def parse(xml_input, encoding=None, expat=expat, process_namespaces=False, 

185 namespace_separator=':', disable_entities=True, process_comments=False, **kwargs): 

186 """Parse the given XML input and convert it into a dictionary. 

187 

188 `xml_input` can either be a `string`, a file-like object, or a generator of strings. 

189 

190 If `xml_attribs` is `True`, element attributes are put in the dictionary 

191 among regular child elements, using `@` as a prefix to avoid collisions. If 

192 set to `False`, they are just ignored. 

193 

194 Simple example:: 

195 

196 >>> import xmltodict 

197 >>> doc = xmltodict.parse(\"\"\" 

198 ... <a prop="x"> 

199 ... <b>1</b> 

200 ... <b>2</b> 

201 ... </a> 

202 ... \"\"\") 

203 >>> doc['a']['@prop'] 

204 u'x' 

205 >>> doc['a']['b'] 

206 [u'1', u'2'] 

207 

208 If `item_depth` is `0`, the function returns a dictionary for the root 

209 element (default behavior). Otherwise, it calls `item_callback` every time 

210 an item at the specified depth is found and returns `None` in the end 

211 (streaming mode). 

212 

213 The callback function receives two parameters: the `path` from the document 

214 root to the item (name-attribs pairs), and the `item` (dict). If the 

215 callback's return value is false-ish, parsing will be stopped with the 

216 :class:`ParsingInterrupted` exception. 

217 

218 Streaming example:: 

219 

220 >>> def handle(path, item): 

221 ... print('path:%s item:%s' % (path, item)) 

222 ... return True 

223 ... 

224 >>> xmltodict.parse(\"\"\" 

225 ... <a prop="x"> 

226 ... <b>1</b> 

227 ... <b>2</b> 

228 ... </a>\"\"\", item_depth=2, item_callback=handle) 

229 path:[(u'a', {u'prop': u'x'}), (u'b', None)] item:1 

230 path:[(u'a', {u'prop': u'x'}), (u'b', None)] item:2 

231 

232 The optional argument `postprocessor` is a function that takes `path`, 

233 `key` and `value` as positional arguments and returns a new `(key, value)` 

234 pair where both `key` and `value` may have changed. Usage example:: 

235 

236 >>> def postprocessor(path, key, value): 

237 ... try: 

238 ... return key + ':int', int(value) 

239 ... except (ValueError, TypeError): 

240 ... return key, value 

241 >>> xmltodict.parse('<a><b>1</b><b>2</b><b>x</b></a>', 

242 ... postprocessor=postprocessor) 

243 {'a': {'b:int': [1, 2], 'b': 'x'}} 

244 

245 You can pass an alternate version of `expat` (such as `defusedexpat`) by 

246 using the `expat` parameter. E.g: 

247 

248 >>> import defusedexpat 

249 >>> xmltodict.parse('<a>hello</a>', expat=defusedexpat.pyexpat) 

250 {'a': 'hello'} 

251 

252 You can use the force_list argument to force lists to be created even 

253 when there is only a single child of a given level of hierarchy. The 

254 force_list argument is a tuple of keys. If the key for a given level 

255 of hierarchy is in the force_list argument, that level of hierarchy 

256 will have a list as a child (even if there is only one sub-element). 

257 The index_keys operation takes precedence over this. This is applied 

258 after any user-supplied postprocessor has already run. 

259 

260 For example, given this input: 

261 <servers> 

262 <server> 

263 <name>host1</name> 

264 <os>Linux</os> 

265 <interfaces> 

266 <interface> 

267 <name>em0</name> 

268 <ip_address>10.0.0.1</ip_address> 

269 </interface> 

270 </interfaces> 

271 </server> 

272 </servers> 

273 

274 If called with force_list=('interface',), it will produce 

275 this dictionary: 

276 {'servers': 

277 {'server': 

278 {'name': 'host1', 

279 'os': 'Linux'}, 

280 'interfaces': 

281 {'interface': 

282 [ {'name': 'em0', 'ip_address': '10.0.0.1' } ] } } } 

283 

284 `force_list` can also be a callable that receives `path`, `key` and 

285 `value`. This is helpful in cases where the logic that decides whether 

286 a list should be forced is more complex. 

287 

288 

289 If `process_comment` is `True` then comment will be added with comment_key 

290 (default=`'#comment'`) to then tag which contains comment 

291 

292 For example, given this input: 

293 <a> 

294 <b> 

295 <!-- b comment --> 

296 <c> 

297 <!-- c comment --> 

298 1 

299 </c> 

300 <d>2</d> 

301 </b> 

302 </a> 

303 

304 If called with process_comment=True, it will produce 

305 this dictionary: 

306 'a': { 

307 'b': { 

308 '#comment': 'b comment', 

309 'c': { 

310 

311 '#comment': 'c comment', 

312 '#text': '1', 

313 }, 

314 'd': '2', 

315 }, 

316 } 

317 """ 

318 handler = _DictSAXHandler(namespace_separator=namespace_separator, 

319 **kwargs) 

320 if isinstance(xml_input, str): 

321 encoding = encoding or 'utf-8' 

322 xml_input = xml_input.encode(encoding) 

323 if not process_namespaces: 

324 namespace_separator = None 

325 parser = expat.ParserCreate( 

326 encoding, 

327 namespace_separator 

328 ) 

329 try: 

330 parser.ordered_attributes = True 

331 except AttributeError: 

332 # Jython's expat does not support ordered_attributes 

333 pass 

334 parser.StartNamespaceDeclHandler = handler.startNamespaceDecl 

335 parser.StartElementHandler = handler.startElement 

336 parser.EndElementHandler = handler.endElement 

337 parser.CharacterDataHandler = handler.characters 

338 if process_comments: 

339 parser.CommentHandler = handler.comments 

340 parser.buffer_text = True 

341 if disable_entities: 

342 try: 

343 # Attempt to disable DTD in Jython's expat parser (Xerces-J). 

344 feature = "http://apache.org/xml/features/disallow-doctype-decl" 

345 parser._reader.setFeature(feature, True) 

346 except AttributeError: 

347 # For CPython / expat parser. 

348 # Anything not handled ends up here and entities aren't expanded. 

349 parser.DefaultHandler = lambda x: None 

350 # Expects an integer return; zero means failure -> expat.ExpatError. 

351 parser.ExternalEntityRefHandler = lambda *x: 1 

352 if hasattr(xml_input, 'read'): 

353 parser.ParseFile(xml_input) 

354 elif isgenerator(xml_input): 

355 for chunk in xml_input: 

356 parser.Parse(chunk, False) 

357 parser.Parse(b'', True) 

358 else: 

359 parser.Parse(xml_input, True) 

360 return handler.item 

361 

362 

363def _process_namespace(name, namespaces, ns_sep=':', attr_prefix='@'): 

364 if not namespaces: 

365 return name 

366 try: 

367 ns, name = name.rsplit(ns_sep, 1) 

368 except ValueError: 

369 pass 

370 else: 

371 ns_res = namespaces.get(ns.strip(attr_prefix)) 

372 name = '{}{}{}{}'.format( 

373 attr_prefix if ns.startswith(attr_prefix) else '', 

374 ns_res, ns_sep, name) if ns_res else name 

375 return name 

376 

377 

378def _emit(key, value, content_handler, 

379 attr_prefix='@', 

380 cdata_key='#text', 

381 depth=0, 

382 preprocessor=None, 

383 pretty=False, 

384 newl='\n', 

385 indent='\t', 

386 namespace_separator=':', 

387 namespaces=None, 

388 full_document=True, 

389 expand_iter=None): 

390 key = _process_namespace(key, namespaces, namespace_separator, attr_prefix) 

391 if preprocessor is not None: 

392 result = preprocessor(key, value) 

393 if result is None: 

394 return 

395 key, value = result 

396 if not hasattr(value, '__iter__') or isinstance(value, (str, dict)): 

397 value = [value] 

398 for index, v in enumerate(value): 

399 if full_document and depth == 0 and index > 0: 

400 raise ValueError('document with multiple roots') 

401 if v is None: 

402 v = _dict() 

403 elif isinstance(v, bool): 

404 v = 'true' if v else 'false' 

405 elif not isinstance(v, (dict, str)): 

406 if expand_iter and hasattr(v, '__iter__'): 

407 v = _dict(((expand_iter, v),)) 

408 else: 

409 v = str(v) 

410 if isinstance(v, str): 

411 v = _dict(((cdata_key, v),)) 

412 cdata = None 

413 attrs = _dict() 

414 children = [] 

415 for ik, iv in v.items(): 

416 if ik == cdata_key: 

417 cdata = iv 

418 continue 

419 if ik.startswith(attr_prefix): 

420 ik = _process_namespace(ik, namespaces, namespace_separator, 

421 attr_prefix) 

422 if ik == '@xmlns' and isinstance(iv, dict): 

423 for k, v in iv.items(): 

424 attr = 'xmlns{}'.format(f':{k}' if k else '') 

425 attrs[attr] = str(v) 

426 continue 

427 if not isinstance(iv, str): 

428 iv = str(iv) 

429 attrs[ik[len(attr_prefix):]] = iv 

430 continue 

431 children.append((ik, iv)) 

432 if isinstance(indent, int): 

433 indent = ' ' * indent 

434 if pretty: 

435 content_handler.ignorableWhitespace(depth * indent) 

436 content_handler.startElement(key, AttributesImpl(attrs)) 

437 if pretty and children: 

438 content_handler.ignorableWhitespace(newl) 

439 for child_key, child_value in children: 

440 _emit(child_key, child_value, content_handler, 

441 attr_prefix, cdata_key, depth+1, preprocessor, 

442 pretty, newl, indent, namespaces=namespaces, 

443 namespace_separator=namespace_separator, 

444 expand_iter=expand_iter) 

445 if cdata is not None: 

446 content_handler.characters(cdata) 

447 if pretty and children: 

448 content_handler.ignorableWhitespace(depth * indent) 

449 content_handler.endElement(key) 

450 if pretty and depth: 

451 content_handler.ignorableWhitespace(newl) 

452 

453 

454def unparse(input_dict, output=None, encoding='utf-8', full_document=True, 

455 short_empty_elements=False, 

456 **kwargs): 

457 """Emit an XML document for the given `input_dict` (reverse of `parse`). 

458 

459 The resulting XML document is returned as a string, but if `output` (a 

460 file-like object) is specified, it is written there instead. 

461 

462 Dictionary keys prefixed with `attr_prefix` (default=`'@'`) are interpreted 

463 as XML node attributes, whereas keys equal to `cdata_key` 

464 (default=`'#text'`) are treated as character data. 

465 

466 The `pretty` parameter (default=`False`) enables pretty-printing. In this 

467 mode, lines are terminated with `'\n'` and indented with `'\t'`, but this 

468 can be customized with the `newl` and `indent` parameters. 

469 

470 """ 

471 if full_document and len(input_dict) != 1: 

472 raise ValueError('Document must have exactly one root.') 

473 must_return = False 

474 if output is None: 

475 output = StringIO() 

476 must_return = True 

477 if short_empty_elements: 

478 content_handler = XMLGenerator(output, encoding, True) 

479 else: 

480 content_handler = XMLGenerator(output, encoding) 

481 if full_document: 

482 content_handler.startDocument() 

483 for key, value in input_dict.items(): 

484 _emit(key, value, content_handler, full_document=full_document, 

485 **kwargs) 

486 if full_document: 

487 content_handler.endDocument() 

488 if must_return: 

489 value = output.getvalue() 

490 try: # pragma no cover 

491 value = value.decode(encoding) 

492 except AttributeError: # pragma no cover 

493 pass 

494 return value 

495 

496 

497if __name__ == '__main__': # pragma: no cover 

498 import sys 

499 import marshal 

500 try: 

501 stdin = sys.stdin.buffer 

502 stdout = sys.stdout.buffer 

503 except AttributeError: 

504 stdin = sys.stdin 

505 stdout = sys.stdout 

506 

507 (item_depth,) = sys.argv[1:] 

508 item_depth = int(item_depth) 

509 

510 def handle_item(path, item): 

511 marshal.dump((path, item), stdout) 

512 return True 

513 

514 try: 

515 root = parse(stdin, 

516 item_depth=item_depth, 

517 item_callback=handle_item, 

518 dict_constructor=dict) 

519 if item_depth == 0: 

520 handle_item([], root) 

521 except KeyboardInterrupt: 

522 pass