Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pikepdf/models/metadata/_xmp.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

223 statements  

1# SPDX-FileCopyrightText: 2022 James R. Barlow 

2# SPDX-License-Identifier: MPL-2.0 

3 

4"""XMP document handling - pure XMP XML manipulation without PDF awareness.""" 

5 

6from __future__ import annotations 

7 

8import logging 

9from collections.abc import Callable, Iterable, Iterator 

10from io import BytesIO 

11from typing import Any 

12 

13from lxml import etree 

14from lxml.etree import QName, XMLSyntaxError, _Element, _ElementTree 

15 

16from pikepdf._xml import parse_xml 

17from pikepdf.models.metadata._constants import ( 

18 DEFAULT_NAMESPACES, 

19 LANG_ALTS, 

20 XMP_CONTAINERS, 

21 XMP_EMPTY, 

22 XMP_NS_RDF, 

23 XMP_NS_XML, 

24 XPACKET_BEGIN, 

25 XPACKET_END, 

26 AltList, 

27 clean, 

28 re_xml_illegal_bytes, 

29) 

30 

31log = logging.getLogger(__name__) 

32 

33 

34class NeverRaise(Exception): 

35 """An exception that is never raised.""" 

36 

37 

38def _parser_basic(xml: bytes) -> _ElementTree: 

39 return parse_xml(BytesIO(xml)) 

40 

41 

42def _parser_strip_illegal_bytes(xml: bytes) -> _ElementTree: 

43 return parse_xml(BytesIO(re_xml_illegal_bytes.sub(b'', xml))) 

44 

45 

46def _parser_recovery(xml: bytes) -> _ElementTree: 

47 return parse_xml(BytesIO(xml), recover=True) 

48 

49 

50def _parser_replace_with_empty_xmp(_xml: bytes = b'') -> _ElementTree: 

51 log.warning("Error occurred parsing XMP, replacing with empty XMP.") 

52 return _parser_basic(XMP_EMPTY) 

53 

54 

55PARSERS_OVERWRITE_INVALID_XML: list[Callable[[bytes], _ElementTree]] = [ 

56 _parser_basic, 

57 _parser_strip_illegal_bytes, 

58 _parser_recovery, 

59 _parser_replace_with_empty_xmp, 

60] 

61 

62PARSERS_STANDARD: list[Callable[[bytes], _ElementTree]] = [_parser_basic] 

63 

64 

65class XmpDocument: 

66 """Pure XMP XML manipulation. 

67 

68 This class handles parsing, traversing, modifying, and serializing XMP 

69 metadata without any PDF-specific knowledge. It can be used standalone 

70 for XMP manipulation. 

71 

72 Example: 

73 >>> xmp = XmpDocument(xmp_bytes) 

74 >>> title = xmp.get('dc:title') 

75 >>> xmp.set('dc:title', 'New Title') 

76 >>> xml_bytes = xmp.to_bytes() 

77 """ 

78 

79 # Namespace mappings 

80 NS: dict[str, str] = {prefix: uri for uri, prefix in DEFAULT_NAMESPACES} 

81 REVERSE_NS: dict[str, str] = dict(DEFAULT_NAMESPACES) 

82 

83 def __init__( 

84 self, 

85 data: bytes = b'', 

86 *, 

87 parsers: Iterable[Callable[[bytes], _ElementTree]] | None = None, 

88 overwrite_invalid_xml: bool = True, 

89 ): 

90 """Parse XMP data. 

91 

92 Args: 

93 data: XMP XML bytes to parse. Empty creates a new XMP document. 

94 parsers: Custom parser chain. If None, uses default based on 

95 overwrite_invalid_xml setting. 

96 overwrite_invalid_xml: If True, use recovery parsers for invalid XML. 

97 """ 

98 if parsers is None: 

99 parsers = ( 

100 PARSERS_OVERWRITE_INVALID_XML 

101 if overwrite_invalid_xml 

102 else PARSERS_STANDARD 

103 ) 

104 

105 self._xmp: _ElementTree = self._parse(data, parsers, overwrite_invalid_xml) 

106 

107 def _parse( 

108 self, 

109 data: bytes, 

110 parsers: Iterable[Callable[[bytes], _ElementTree]], 

111 overwrite_invalid_xml: bool, 

112 ) -> _ElementTree: 

113 """Parse XMP data using fallback parsers.""" 

114 if data.strip() == b'': 

115 data = XMP_EMPTY # on some platforms lxml chokes on empty documents 

116 

117 xmp: _ElementTree | None = None 

118 for parser in parsers: 

119 try: 

120 xmp = parser(data) 

121 except ( 

122 XMLSyntaxError if overwrite_invalid_xml else NeverRaise # type: ignore 

123 ) as e: 

124 if str(e).startswith("Start tag expected, '<' not found") or str( 

125 e 

126 ).startswith("Document is empty"): 

127 xmp = _parser_replace_with_empty_xmp() 

128 break 

129 else: 

130 break 

131 

132 if xmp is not None: 

133 try: 

134 pis = xmp.xpath('/processing-instruction()') 

135 for pi in pis: # type: ignore[union-attr] 

136 etree.strip_tags(xmp, pi.tag) # type: ignore[union-attr] 

137 self._get_rdf_root_from(xmp) 

138 except ( 

139 Exception # pylint: disable=broad-except 

140 if overwrite_invalid_xml 

141 else NeverRaise 

142 ) as e: 

143 log.warning("Error occurred parsing XMP", exc_info=e) 

144 xmp = _parser_replace_with_empty_xmp() 

145 else: 

146 log.warning("Error occurred parsing XMP") 

147 xmp = _parser_replace_with_empty_xmp() 

148 

149 return xmp 

150 

151 @classmethod 

152 def register_xml_namespace(cls, uri: str, prefix: str) -> None: 

153 """Register a new XML/XMP namespace. 

154 

155 Arguments: 

156 uri: The long form of the namespace. 

157 prefix: The alias to use when interpreting XMP. 

158 """ 

159 cls.NS[prefix] = uri 

160 cls.REVERSE_NS[uri] = prefix 

161 etree.register_namespace(prefix, uri) 

162 

163 @classmethod 

164 def qname(cls, name: QName | str) -> str: 

165 """Convert name to an XML QName. 

166 

167 e.g. pdf:Producer -> {http://ns.adobe.com/pdf/1.3/}Producer 

168 """ 

169 if isinstance(name, QName): 

170 return str(name) 

171 if not isinstance(name, str): 

172 raise TypeError(f"{name} must be str") 

173 if name == '': 

174 return name 

175 if name.startswith('{'): 

176 return name 

177 try: 

178 prefix, tag = name.split(':', maxsplit=1) 

179 except ValueError: 

180 # If missing the namespace, it belongs in the default namespace. 

181 prefix, tag = '', name 

182 uri = cls.NS.get(prefix, None) 

183 return str(QName(uri, tag)) 

184 

185 def prefix_from_uri(self, uriname: str) -> str: 

186 """Given a fully qualified XML name, find a prefix. 

187 

188 e.g. {http://ns.adobe.com/pdf/1.3/}Producer -> pdf:Producer 

189 """ 

190 uripart, tag = uriname.split('}', maxsplit=1) 

191 uri = uripart.replace('{', '') 

192 return self.REVERSE_NS[uri] + ':' + tag 

193 

194 def _get_rdf_root_from(self, xmp: _ElementTree) -> _Element: 

195 """Get the rdf:RDF root element from an XMP tree.""" 

196 rdf = xmp.find('.//rdf:RDF', self.NS) 

197 if rdf is None: 

198 rdf = xmp.getroot() 

199 if not rdf.tag == '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}RDF': 

200 raise ValueError("Metadata seems to be XML but not XMP") 

201 return rdf 

202 

203 def _get_rdf_root(self) -> _Element: 

204 """Get the rdf:RDF root element.""" 

205 return self._get_rdf_root_from(self._xmp) 

206 

207 def _get_elements( 

208 self, name: str | QName = '' 

209 ) -> Iterator[tuple[_Element, str | bytes | None, Any, _Element]]: 

210 """Get elements from XMP. 

211 

212 Core routine to find elements matching name within the XMP and yield 

213 them. 

214 

215 For XMP spec 7.9.2.2, rdf:Description with property attributes, 

216 we yield the node which will have the desired as one of its attributes. 

217 qname is returned so that the node.attrib can be used to locate the 

218 source. 

219 

220 For XMP spec 7.5, simple valued XMP properties, we yield the node, 

221 None, and the value. For structure or array valued properties we gather 

222 the elements. We ignore qualifiers. 

223 

224 Args: 

225 name: a prefixed name or QName to look for within the 

226 data section of the XMP; looks for all data keys if omitted 

227 

228 Yields: 

229 tuple: (node, qname_attrib, value, parent_node) 

230 

231 """ 

232 qname = self.qname(name) 

233 rdf = self._get_rdf_root() 

234 for rdfdesc in rdf.findall('rdf:Description[@rdf:about=""]', self.NS): 

235 if qname and qname in rdfdesc.keys(): 

236 yield (rdfdesc, qname, rdfdesc.get(qname), rdf) 

237 elif not qname: 

238 for k, v in rdfdesc.items(): 

239 if v: 

240 yield (rdfdesc, k, v, rdf) 

241 xpath = qname if name else '*' 

242 for node in rdfdesc.findall(xpath, self.NS): 

243 if node.text and node.text.strip(): 

244 yield (node, None, node.text, rdfdesc) 

245 continue 

246 values = self._get_subelements(node) 

247 yield (node, None, values, rdfdesc) 

248 

249 def _get_subelements(self, node: _Element) -> Any: 

250 """Gather the sub-elements attached to a node. 

251 

252 Gather rdf:Bag and and rdf:Seq into set and list respectively. For 

253 alternate languages values, take the first language only for 

254 simplicity. 

255 """ 

256 items = node.find('rdf:Alt', self.NS) 

257 if items is not None: 

258 try: 

259 return items[0].text 

260 except IndexError: 

261 return '' 

262 

263 for xmlcontainer, container, insertfn in XMP_CONTAINERS: 

264 items = node.find(f'rdf:{xmlcontainer}', self.NS) 

265 if items is None: 

266 continue 

267 result = container() 

268 for item in items: 

269 insertfn(result, item.text) 

270 return result 

271 return '' 

272 

273 def _get_element_values(self, name: str | QName = '') -> Iterator[Any]: 

274 yield from (v[2] for v in self._get_elements(name)) 

275 

276 def __contains__(self, key: str | QName) -> bool: 

277 """Test if XMP key exists.""" 

278 return any(self._get_element_values(key)) 

279 

280 def get(self, key: str | QName, default: Any = None) -> Any: 

281 """Get XMP value for key, or default if not found.""" 

282 try: 

283 return next(self._get_element_values(key)) 

284 except StopIteration: 

285 return default 

286 

287 def __getitem__(self, key: str | QName) -> Any: 

288 """Retrieve XMP metadata for key.""" 

289 try: 

290 return next(self._get_element_values(key)) 

291 except StopIteration: 

292 raise KeyError(key) from None 

293 

294 def __iter__(self) -> Iterator[str]: 

295 """Iterate through XMP metadata attributes and nodes.""" 

296 for node, attrib, _val, _parents in self._get_elements(): 

297 if attrib: 

298 yield str(attrib) 

299 else: 

300 yield node.tag 

301 

302 def __len__(self) -> int: 

303 """Return number of items in metadata.""" 

304 return len(list(iter(self))) 

305 

306 def set_value( 

307 self, 

308 key: str | QName, 

309 val: set[str] | list[str] | str, 

310 ) -> None: 

311 """Set XMP metadata key to value.""" 

312 qkey = self.qname(key) 

313 

314 try: 

315 # Update existing node 

316 self._setitem_update(key, val, qkey) 

317 except StopIteration: 

318 # Insert a new node 

319 self._setitem_insert(key, val) 

320 

321 def __setitem__(self, key: str | QName, val: set[str] | list[str] | str) -> None: 

322 """Set XMP metadata key to value.""" 

323 self.set_value(key, val) 

324 

325 def _setitem_add_array(self, node: _Element, items: Iterable) -> None: 

326 rdf_type = next( 

327 c.rdf_type for c in XMP_CONTAINERS if isinstance(items, c.py_type) 

328 ) 

329 seq = etree.SubElement(node, str(QName(XMP_NS_RDF, rdf_type))) 

330 tag_attrib: dict[str, str] | None = None 

331 if rdf_type == 'Alt': 

332 tag_attrib = {str(QName(XMP_NS_XML, 'lang')): 'x-default'} 

333 for item in items: 

334 el = etree.SubElement(seq, str(QName(XMP_NS_RDF, 'li')), attrib=tag_attrib) 

335 if item is not None: 

336 inner_text: str | None = clean(item) 

337 if inner_text == '': 

338 inner_text = None 

339 el.text = inner_text 

340 

341 def _setitem_update(self, key: str | QName, val: Any, qkey: str) -> None: 

342 # Locate existing node to replace 

343 node, attrib, _oldval, _parent = next(self._get_elements(key)) 

344 if attrib: 

345 if not isinstance(val, str): 

346 if qkey == self.qname('dc:creator'): 

347 # dc:creator incorrectly created as an attribute - we're 

348 # replacing it anyway, so remove the old one 

349 del node.attrib[qkey] 

350 self._setitem_add_array(node, clean(val)) 

351 else: 

352 raise TypeError(f"Setting {key} to {val} with type {type(val)}") 

353 else: 

354 node.set(attrib, clean(val)) 

355 elif isinstance(val, list | set): 

356 for child in node.findall('*'): 

357 node.remove(child) 

358 self._setitem_add_array(node, val) 

359 elif isinstance(val, str): 

360 for child in node.findall('*'): 

361 node.remove(child) 

362 if str(self.qname(key)) in LANG_ALTS: 

363 self._setitem_add_array(node, AltList([clean(val)])) 

364 else: 

365 node.text = clean(val) 

366 else: 

367 raise TypeError(f"Setting {key} to {val} with type {type(val)}") 

368 

369 def _setitem_insert(self, key: str | QName, val: Any) -> None: 

370 rdf = self._get_rdf_root() 

371 if str(self.qname(key)) in LANG_ALTS: 

372 val = AltList([clean(val)]) 

373 # Reuse existing rdf:Description element if available, to avoid 

374 # creating multiple Description elements with the same rdf:about="" 

375 rdfdesc = rdf.find('rdf:Description[@rdf:about=""]', self.NS) 

376 if rdfdesc is None: 

377 rdfdesc = etree.SubElement( 

378 rdf, 

379 str(QName(XMP_NS_RDF, 'Description')), 

380 attrib={str(QName(XMP_NS_RDF, 'about')): ''}, 

381 ) 

382 if isinstance(val, list | set): 

383 node = etree.SubElement(rdfdesc, self.qname(key)) 

384 self._setitem_add_array(node, val) 

385 elif isinstance(val, str): 

386 node = etree.SubElement(rdfdesc, self.qname(key)) 

387 node.text = clean(val) 

388 else: 

389 raise TypeError(f"Setting {key} to {val} with type {type(val)}") from None 

390 

391 def delete(self, key: str | QName) -> bool: 

392 """Delete item from XMP metadata. 

393 

394 Returns: 

395 True if item was found and deleted, False if not found. 

396 """ 

397 try: 

398 node, attrib, _oldval, parent = next(self._get_elements(key)) 

399 if attrib: # Inline 

400 del node.attrib[attrib] 

401 if ( 

402 len(node.attrib) == 1 

403 and len(node) == 0 

404 and QName(XMP_NS_RDF, 'about') in node.attrib.keys() 

405 ): 

406 # The only thing left on this node is rdf:about="", so remove it 

407 parent.remove(node) 

408 else: 

409 parent.remove(node) 

410 return True 

411 except StopIteration: 

412 return False 

413 

414 def __delitem__(self, key: str | QName) -> None: 

415 """Delete item from XMP metadata.""" 

416 if not self.delete(key): 

417 raise KeyError(key) 

418 

419 def to_bytes(self, xpacket: bool = True) -> bytes: 

420 """Serialize XMP to XML bytes. 

421 

422 Args: 

423 xpacket: If True, wrap in xpacket markers. 

424 

425 Returns: 

426 XML bytes representation of the XMP. 

427 """ 

428 data = BytesIO() 

429 if xpacket: 

430 data.write(XPACKET_BEGIN) 

431 self._xmp.write(data, encoding='utf-8', pretty_print=True) 

432 if xpacket: 

433 data.write(XPACKET_END) 

434 data.seek(0) 

435 return data.read() 

436 

437 def __str__(self) -> str: 

438 """Convert XMP metadata to XML string.""" 

439 return self.to_bytes(xpacket=False).decode('utf-8')