1# SPDX-FileCopyrightText: 2022 James R. Barlow
2# SPDX-License-Identifier: MPL-2.0
3
4"""XMP document handling - pure XMP XML manipulation without PDF awareness."""
5
6from __future__ import annotations
7
8import logging
9from collections.abc import Callable, Iterable, Iterator
10from io import BytesIO
11from typing import Any
12
13from lxml import etree
14from lxml.etree import QName, XMLSyntaxError, _Element, _ElementTree
15
16from pikepdf._xml import parse_xml
17from pikepdf.models.metadata._constants import (
18 DEFAULT_NAMESPACES,
19 LANG_ALTS,
20 XMP_CONTAINERS,
21 XMP_EMPTY,
22 XMP_NS_RDF,
23 XMP_NS_XML,
24 XPACKET_BEGIN,
25 XPACKET_END,
26 AltList,
27 clean,
28 re_xml_illegal_bytes,
29)
30
31log = logging.getLogger(__name__)
32
33
34class NeverRaise(Exception):
35 """An exception that is never raised."""
36
37
38def _parser_basic(xml: bytes) -> _ElementTree:
39 return parse_xml(BytesIO(xml))
40
41
42def _parser_strip_illegal_bytes(xml: bytes) -> _ElementTree:
43 return parse_xml(BytesIO(re_xml_illegal_bytes.sub(b'', xml)))
44
45
46def _parser_recovery(xml: bytes) -> _ElementTree:
47 return parse_xml(BytesIO(xml), recover=True)
48
49
50def _parser_replace_with_empty_xmp(_xml: bytes = b'') -> _ElementTree:
51 log.warning("Error occurred parsing XMP, replacing with empty XMP.")
52 return _parser_basic(XMP_EMPTY)
53
54
55PARSERS_OVERWRITE_INVALID_XML: list[Callable[[bytes], _ElementTree]] = [
56 _parser_basic,
57 _parser_strip_illegal_bytes,
58 _parser_recovery,
59 _parser_replace_with_empty_xmp,
60]
61
62PARSERS_STANDARD: list[Callable[[bytes], _ElementTree]] = [_parser_basic]
63
64
65class XmpDocument:
66 """Pure XMP XML manipulation.
67
68 This class handles parsing, traversing, modifying, and serializing XMP
69 metadata without any PDF-specific knowledge. It can be used standalone
70 for XMP manipulation.
71
72 Example:
73 >>> xmp = XmpDocument(xmp_bytes)
74 >>> title = xmp.get('dc:title')
75 >>> xmp.set('dc:title', 'New Title')
76 >>> xml_bytes = xmp.to_bytes()
77 """
78
79 # Namespace mappings
80 NS: dict[str, str] = {prefix: uri for uri, prefix in DEFAULT_NAMESPACES}
81 REVERSE_NS: dict[str, str] = dict(DEFAULT_NAMESPACES)
82
83 def __init__(
84 self,
85 data: bytes = b'',
86 *,
87 parsers: Iterable[Callable[[bytes], _ElementTree]] | None = None,
88 overwrite_invalid_xml: bool = True,
89 ):
90 """Parse XMP data.
91
92 Args:
93 data: XMP XML bytes to parse. Empty creates a new XMP document.
94 parsers: Custom parser chain. If None, uses default based on
95 overwrite_invalid_xml setting.
96 overwrite_invalid_xml: If True, use recovery parsers for invalid XML.
97 """
98 if parsers is None:
99 parsers = (
100 PARSERS_OVERWRITE_INVALID_XML
101 if overwrite_invalid_xml
102 else PARSERS_STANDARD
103 )
104
105 self._xmp: _ElementTree = self._parse(data, parsers, overwrite_invalid_xml)
106
107 def _parse(
108 self,
109 data: bytes,
110 parsers: Iterable[Callable[[bytes], _ElementTree]],
111 overwrite_invalid_xml: bool,
112 ) -> _ElementTree:
113 """Parse XMP data using fallback parsers."""
114 if data.strip() == b'':
115 data = XMP_EMPTY # on some platforms lxml chokes on empty documents
116
117 xmp: _ElementTree | None = None
118 for parser in parsers:
119 try:
120 xmp = parser(data)
121 except (
122 XMLSyntaxError if overwrite_invalid_xml else NeverRaise # type: ignore
123 ) as e:
124 if str(e).startswith("Start tag expected, '<' not found") or str(
125 e
126 ).startswith("Document is empty"):
127 xmp = _parser_replace_with_empty_xmp()
128 break
129 else:
130 break
131
132 if xmp is not None:
133 try:
134 pis = xmp.xpath('/processing-instruction()')
135 for pi in pis: # type: ignore[union-attr]
136 etree.strip_tags(xmp, pi.tag) # type: ignore[union-attr]
137 self._get_rdf_root_from(xmp)
138 except (
139 Exception # pylint: disable=broad-except
140 if overwrite_invalid_xml
141 else NeverRaise
142 ) as e:
143 log.warning("Error occurred parsing XMP", exc_info=e)
144 xmp = _parser_replace_with_empty_xmp()
145 else:
146 log.warning("Error occurred parsing XMP")
147 xmp = _parser_replace_with_empty_xmp()
148
149 return xmp
150
151 @classmethod
152 def register_xml_namespace(cls, uri: str, prefix: str) -> None:
153 """Register a new XML/XMP namespace.
154
155 Arguments:
156 uri: The long form of the namespace.
157 prefix: The alias to use when interpreting XMP.
158 """
159 cls.NS[prefix] = uri
160 cls.REVERSE_NS[uri] = prefix
161 etree.register_namespace(prefix, uri)
162
163 @classmethod
164 def qname(cls, name: QName | str) -> str:
165 """Convert name to an XML QName.
166
167 e.g. pdf:Producer -> {http://ns.adobe.com/pdf/1.3/}Producer
168 """
169 if isinstance(name, QName):
170 return str(name)
171 if not isinstance(name, str):
172 raise TypeError(f"{name} must be str")
173 if name == '':
174 return name
175 if name.startswith('{'):
176 return name
177 try:
178 prefix, tag = name.split(':', maxsplit=1)
179 except ValueError:
180 # If missing the namespace, it belongs in the default namespace.
181 prefix, tag = '', name
182 uri = cls.NS.get(prefix, None)
183 return str(QName(uri, tag))
184
185 def prefix_from_uri(self, uriname: str) -> str:
186 """Given a fully qualified XML name, find a prefix.
187
188 e.g. {http://ns.adobe.com/pdf/1.3/}Producer -> pdf:Producer
189 """
190 uripart, tag = uriname.split('}', maxsplit=1)
191 uri = uripart.replace('{', '')
192 return self.REVERSE_NS[uri] + ':' + tag
193
194 def _get_rdf_root_from(self, xmp: _ElementTree) -> _Element:
195 """Get the rdf:RDF root element from an XMP tree."""
196 rdf = xmp.find('.//rdf:RDF', self.NS)
197 if rdf is None:
198 rdf = xmp.getroot()
199 if not rdf.tag == '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}RDF':
200 raise ValueError("Metadata seems to be XML but not XMP")
201 return rdf
202
203 def _get_rdf_root(self) -> _Element:
204 """Get the rdf:RDF root element."""
205 return self._get_rdf_root_from(self._xmp)
206
207 def _get_elements(
208 self, name: str | QName = ''
209 ) -> Iterator[tuple[_Element, str | bytes | None, Any, _Element]]:
210 """Get elements from XMP.
211
212 Core routine to find elements matching name within the XMP and yield
213 them.
214
215 For XMP spec 7.9.2.2, rdf:Description with property attributes,
216 we yield the node which will have the desired as one of its attributes.
217 qname is returned so that the node.attrib can be used to locate the
218 source.
219
220 For XMP spec 7.5, simple valued XMP properties, we yield the node,
221 None, and the value. For structure or array valued properties we gather
222 the elements. We ignore qualifiers.
223
224 Args:
225 name: a prefixed name or QName to look for within the
226 data section of the XMP; looks for all data keys if omitted
227
228 Yields:
229 tuple: (node, qname_attrib, value, parent_node)
230
231 """
232 qname = self.qname(name)
233 rdf = self._get_rdf_root()
234 for rdfdesc in rdf.findall('rdf:Description[@rdf:about=""]', self.NS):
235 if qname and qname in rdfdesc.keys():
236 yield (rdfdesc, qname, rdfdesc.get(qname), rdf)
237 elif not qname:
238 for k, v in rdfdesc.items():
239 if v:
240 yield (rdfdesc, k, v, rdf)
241 xpath = qname if name else '*'
242 for node in rdfdesc.findall(xpath, self.NS):
243 if node.text and node.text.strip():
244 yield (node, None, node.text, rdfdesc)
245 continue
246 values = self._get_subelements(node)
247 yield (node, None, values, rdfdesc)
248
249 def _get_subelements(self, node: _Element) -> Any:
250 """Gather the sub-elements attached to a node.
251
252 Gather rdf:Bag and and rdf:Seq into set and list respectively. For
253 alternate languages values, take the first language only for
254 simplicity.
255 """
256 items = node.find('rdf:Alt', self.NS)
257 if items is not None:
258 try:
259 return items[0].text
260 except IndexError:
261 return ''
262
263 for xmlcontainer, container, insertfn in XMP_CONTAINERS:
264 items = node.find(f'rdf:{xmlcontainer}', self.NS)
265 if items is None:
266 continue
267 result = container()
268 for item in items:
269 insertfn(result, item.text)
270 return result
271 return ''
272
273 def _get_element_values(self, name: str | QName = '') -> Iterator[Any]:
274 yield from (v[2] for v in self._get_elements(name))
275
276 def __contains__(self, key: str | QName) -> bool:
277 """Test if XMP key exists."""
278 return any(self._get_element_values(key))
279
280 def get(self, key: str | QName, default: Any = None) -> Any:
281 """Get XMP value for key, or default if not found."""
282 try:
283 return next(self._get_element_values(key))
284 except StopIteration:
285 return default
286
287 def __getitem__(self, key: str | QName) -> Any:
288 """Retrieve XMP metadata for key."""
289 try:
290 return next(self._get_element_values(key))
291 except StopIteration:
292 raise KeyError(key) from None
293
294 def __iter__(self) -> Iterator[str]:
295 """Iterate through XMP metadata attributes and nodes."""
296 for node, attrib, _val, _parents in self._get_elements():
297 if attrib:
298 yield str(attrib)
299 else:
300 yield node.tag
301
302 def __len__(self) -> int:
303 """Return number of items in metadata."""
304 return len(list(iter(self)))
305
306 def set_value(
307 self,
308 key: str | QName,
309 val: set[str] | list[str] | str,
310 ) -> None:
311 """Set XMP metadata key to value."""
312 qkey = self.qname(key)
313
314 try:
315 # Update existing node
316 self._setitem_update(key, val, qkey)
317 except StopIteration:
318 # Insert a new node
319 self._setitem_insert(key, val)
320
321 def __setitem__(self, key: str | QName, val: set[str] | list[str] | str) -> None:
322 """Set XMP metadata key to value."""
323 self.set_value(key, val)
324
325 def _setitem_add_array(self, node: _Element, items: Iterable) -> None:
326 rdf_type = next(
327 c.rdf_type for c in XMP_CONTAINERS if isinstance(items, c.py_type)
328 )
329 seq = etree.SubElement(node, str(QName(XMP_NS_RDF, rdf_type)))
330 tag_attrib: dict[str, str] | None = None
331 if rdf_type == 'Alt':
332 tag_attrib = {str(QName(XMP_NS_XML, 'lang')): 'x-default'}
333 for item in items:
334 el = etree.SubElement(seq, str(QName(XMP_NS_RDF, 'li')), attrib=tag_attrib)
335 if item is not None:
336 inner_text: str | None = clean(item)
337 if inner_text == '':
338 inner_text = None
339 el.text = inner_text
340
341 def _setitem_update(self, key: str | QName, val: Any, qkey: str) -> None:
342 # Locate existing node to replace
343 node, attrib, _oldval, _parent = next(self._get_elements(key))
344 if attrib:
345 if not isinstance(val, str):
346 if qkey == self.qname('dc:creator'):
347 # dc:creator incorrectly created as an attribute - we're
348 # replacing it anyway, so remove the old one
349 del node.attrib[qkey]
350 self._setitem_add_array(node, clean(val))
351 else:
352 raise TypeError(f"Setting {key} to {val} with type {type(val)}")
353 else:
354 node.set(attrib, clean(val))
355 elif isinstance(val, list | set):
356 for child in node.findall('*'):
357 node.remove(child)
358 self._setitem_add_array(node, val)
359 elif isinstance(val, str):
360 for child in node.findall('*'):
361 node.remove(child)
362 if str(self.qname(key)) in LANG_ALTS:
363 self._setitem_add_array(node, AltList([clean(val)]))
364 else:
365 node.text = clean(val)
366 else:
367 raise TypeError(f"Setting {key} to {val} with type {type(val)}")
368
369 def _setitem_insert(self, key: str | QName, val: Any) -> None:
370 rdf = self._get_rdf_root()
371 if str(self.qname(key)) in LANG_ALTS:
372 val = AltList([clean(val)])
373 # Reuse existing rdf:Description element if available, to avoid
374 # creating multiple Description elements with the same rdf:about=""
375 rdfdesc = rdf.find('rdf:Description[@rdf:about=""]', self.NS)
376 if rdfdesc is None:
377 rdfdesc = etree.SubElement(
378 rdf,
379 str(QName(XMP_NS_RDF, 'Description')),
380 attrib={str(QName(XMP_NS_RDF, 'about')): ''},
381 )
382 if isinstance(val, list | set):
383 node = etree.SubElement(rdfdesc, self.qname(key))
384 self._setitem_add_array(node, val)
385 elif isinstance(val, str):
386 node = etree.SubElement(rdfdesc, self.qname(key))
387 node.text = clean(val)
388 else:
389 raise TypeError(f"Setting {key} to {val} with type {type(val)}") from None
390
391 def delete(self, key: str | QName) -> bool:
392 """Delete item from XMP metadata.
393
394 Returns:
395 True if item was found and deleted, False if not found.
396 """
397 try:
398 node, attrib, _oldval, parent = next(self._get_elements(key))
399 if attrib: # Inline
400 del node.attrib[attrib]
401 if (
402 len(node.attrib) == 1
403 and len(node) == 0
404 and QName(XMP_NS_RDF, 'about') in node.attrib.keys()
405 ):
406 # The only thing left on this node is rdf:about="", so remove it
407 parent.remove(node)
408 else:
409 parent.remove(node)
410 return True
411 except StopIteration:
412 return False
413
414 def __delitem__(self, key: str | QName) -> None:
415 """Delete item from XMP metadata."""
416 if not self.delete(key):
417 raise KeyError(key)
418
419 def to_bytes(self, xpacket: bool = True) -> bytes:
420 """Serialize XMP to XML bytes.
421
422 Args:
423 xpacket: If True, wrap in xpacket markers.
424
425 Returns:
426 XML bytes representation of the XMP.
427 """
428 data = BytesIO()
429 if xpacket:
430 data.write(XPACKET_BEGIN)
431 self._xmp.write(data, encoding='utf-8', pretty_print=True)
432 if xpacket:
433 data.write(XPACKET_END)
434 data.seek(0)
435 return data.read()
436
437 def __str__(self) -> str:
438 """Convert XMP metadata to XML string."""
439 return self.to_bytes(xpacket=False).decode('utf-8')