1# SPDX-FileCopyrightText: 2022 James R. Barlow
2# SPDX-License-Identifier: MPL-2.0
3
4"""XMP document handling - pure XMP XML manipulation without PDF awareness."""
5
6from __future__ import annotations
7
8import logging
9from collections.abc import Callable, Iterable, Iterator
10from io import BytesIO
11from typing import TYPE_CHECKING, Any
12
13from pikepdf._xml import parse_xml
14from pikepdf.models.metadata._constants import (
15 DEFAULT_NAMESPACES,
16 XMP_CONTAINERS,
17 XMP_EMPTY,
18 XMP_NS_RDF,
19 XMP_NS_XML,
20 XPACKET_BEGIN,
21 XPACKET_END,
22 AltList,
23 clean,
24 re_xml_illegal_bytes,
25)
26
27if TYPE_CHECKING:
28 from lxml.etree import QName, _Element, _ElementTree
29
30
31log = logging.getLogger(__name__)
32
33
34class NeverRaise(Exception):
35 """An exception that is never raised."""
36
37
38def _parser_basic(xml: bytes) -> _ElementTree:
39 return parse_xml(BytesIO(xml))
40
41
42def _parser_strip_illegal_bytes(xml: bytes) -> _ElementTree:
43 return parse_xml(BytesIO(re_xml_illegal_bytes.sub(b'', xml)))
44
45
46def _parser_recovery(xml: bytes) -> _ElementTree:
47 return parse_xml(BytesIO(xml), recover=True)
48
49
50def _parser_replace_with_empty_xmp(_xml: bytes = b'') -> _ElementTree:
51 log.warning("Error occurred parsing XMP, replacing with empty XMP.")
52 return _parser_basic(XMP_EMPTY)
53
54
55PARSERS_OVERWRITE_INVALID_XML: list[Callable[[bytes], _ElementTree]] = [
56 _parser_basic,
57 _parser_strip_illegal_bytes,
58 _parser_recovery,
59 _parser_replace_with_empty_xmp,
60]
61
62PARSERS_STANDARD: list[Callable[[bytes], _ElementTree]] = [_parser_basic]
63
64
65class XmpDocument:
66 """Pure XMP XML manipulation.
67
68 This class handles parsing, traversing, modifying, and serializing XMP
69 metadata without any PDF-specific knowledge. It can be used standalone
70 for XMP manipulation.
71
72 Example:
73 >>> xmp = XmpDocument(xmp_bytes)
74 >>> title = xmp.get('dc:title')
75 >>> xmp.set('dc:title', 'New Title')
76 >>> xml_bytes = xmp.to_bytes()
77 """
78
79 # Namespace mappings
80 NS: dict[str, str] = {prefix: uri for uri, prefix in DEFAULT_NAMESPACES}
81 REVERSE_NS: dict[str, str] = dict(DEFAULT_NAMESPACES)
82
83 def __init__(
84 self,
85 data: bytes = b'',
86 *,
87 parsers: Iterable[Callable[[bytes], _ElementTree]] | None = None,
88 overwrite_invalid_xml: bool = True,
89 ):
90 """Parse XMP data.
91
92 Args:
93 data: XMP XML bytes to parse. Empty creates a new XMP document.
94 parsers: Custom parser chain. If None, uses default based on
95 overwrite_invalid_xml setting.
96 overwrite_invalid_xml: If True, use recovery parsers for invalid XML.
97 """
98 if parsers is None:
99 parsers = (
100 PARSERS_OVERWRITE_INVALID_XML
101 if overwrite_invalid_xml
102 else PARSERS_STANDARD
103 )
104
105 self._xmp: _ElementTree = self._parse(data, parsers, overwrite_invalid_xml)
106
107 def _parse(
108 self,
109 data: bytes,
110 parsers: Iterable[Callable[[bytes], _ElementTree]],
111 overwrite_invalid_xml: bool,
112 ) -> _ElementTree:
113 """Parse XMP data using fallback parsers."""
114 from lxml import etree
115 from lxml.etree import XMLSyntaxError
116
117 if data.strip() == b'':
118 data = XMP_EMPTY # on some platforms lxml chokes on empty documents
119
120 xmp: _ElementTree | None = None
121 for parser in parsers:
122 try:
123 xmp = parser(data)
124 except (
125 XMLSyntaxError if overwrite_invalid_xml else NeverRaise # type: ignore
126 ) as e:
127 if str(e).startswith("Start tag expected, '<' not found") or str(
128 e
129 ).startswith("Document is empty"):
130 xmp = _parser_replace_with_empty_xmp()
131 break
132 else:
133 break
134
135 if xmp is not None:
136 try:
137 pis = xmp.xpath('/processing-instruction()')
138 for pi in pis: # type: ignore[union-attr]
139 etree.strip_tags(xmp, pi.tag) # type: ignore[union-attr]
140 self._get_rdf_root_from(xmp)
141 except (
142 Exception # pylint: disable=broad-except
143 if overwrite_invalid_xml
144 else NeverRaise
145 ) as e:
146 log.warning("Error occurred parsing XMP", exc_info=e)
147 xmp = _parser_replace_with_empty_xmp()
148 else:
149 log.warning("Error occurred parsing XMP")
150 xmp = _parser_replace_with_empty_xmp()
151
152 return xmp
153
154 @classmethod
155 def register_xml_namespace(cls, uri: str, prefix: str) -> None:
156 """Register a new XML/XMP namespace.
157
158 Arguments:
159 uri: The long form of the namespace.
160 prefix: The alias to use when interpreting XMP.
161 """
162 from lxml import etree
163
164 cls.NS[prefix] = uri
165 cls.REVERSE_NS[uri] = prefix
166 etree.register_namespace(prefix, uri)
167
168 @classmethod
169 def qname(cls, name: QName | str) -> str:
170 """Convert name to an XML QName.
171
172 e.g. pdf:Producer -> {http://ns.adobe.com/pdf/1.3/}Producer
173 """
174 from lxml.etree import QName
175
176 if isinstance(name, QName):
177 return str(name)
178 if not isinstance(name, str):
179 raise TypeError(f"{name} must be str")
180 if name == '':
181 return name
182 if name.startswith('{'):
183 return name
184 try:
185 prefix, tag = name.split(':', maxsplit=1)
186 except ValueError:
187 # If missing the namespace, it belongs in the default namespace.
188 prefix, tag = '', name
189 uri = cls.NS.get(prefix, None)
190 return str(QName(uri, tag))
191
192 def prefix_from_uri(self, uriname: str) -> str:
193 """Given a fully qualified XML name, find a prefix.
194
195 e.g. {http://ns.adobe.com/pdf/1.3/}Producer -> pdf:Producer
196 """
197 uripart, tag = uriname.split('}', maxsplit=1)
198 uri = uripart.replace('{', '')
199 return self.REVERSE_NS[uri] + ':' + tag
200
201 def _get_rdf_root_from(self, xmp: _ElementTree) -> _Element:
202 """Get the rdf:RDF root element from an XMP tree."""
203 rdf = xmp.find('.//rdf:RDF', self.NS)
204 if rdf is None:
205 rdf = xmp.getroot()
206 if not rdf.tag == '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}RDF':
207 raise ValueError("Metadata seems to be XML but not XMP")
208 return rdf
209
210 def _get_rdf_root(self) -> _Element:
211 """Get the rdf:RDF root element."""
212 return self._get_rdf_root_from(self._xmp)
213
214 def _get_elements(
215 self, name: str | QName = ''
216 ) -> Iterator[tuple[_Element, str | bytes | None, Any, _Element]]:
217 """Get elements from XMP.
218
219 Core routine to find elements matching name within the XMP and yield
220 them.
221
222 For XMP spec 7.9.2.2, rdf:Description with property attributes,
223 we yield the node which will have the desired as one of its attributes.
224 qname is returned so that the node.attrib can be used to locate the
225 source.
226
227 For XMP spec 7.5, simple valued XMP properties, we yield the node,
228 None, and the value. For structure or array valued properties we gather
229 the elements. We ignore qualifiers.
230
231 Args:
232 name: a prefixed name or QName to look for within the
233 data section of the XMP; looks for all data keys if omitted
234
235 Yields:
236 tuple: (node, qname_attrib, value, parent_node)
237
238 """
239 qname = self.qname(name)
240 rdf = self._get_rdf_root()
241 for rdfdesc in rdf.findall('rdf:Description[@rdf:about=""]', self.NS):
242 if qname and qname in rdfdesc.keys():
243 yield (rdfdesc, qname, rdfdesc.get(qname), rdf)
244 elif not qname:
245 for k, v in rdfdesc.items():
246 if v:
247 yield (rdfdesc, k, v, rdf)
248 xpath = qname if name else '*'
249 for node in rdfdesc.findall(xpath, self.NS):
250 if node.text and node.text.strip():
251 yield (node, None, node.text, rdfdesc)
252 continue
253 values = self._get_subelements(node)
254 yield (node, None, values, rdfdesc)
255
256 def _get_subelements(self, node: _Element) -> Any:
257 """Gather the sub-elements attached to a node.
258
259 Gather rdf:Bag and and rdf:Seq into set and list respectively. For
260 alternate languages values, take the first language only for
261 simplicity.
262 """
263 items = node.find('rdf:Alt', self.NS)
264 if items is not None:
265 try:
266 return items[0].text
267 except IndexError:
268 return ''
269
270 for xmlcontainer, container, insertfn in XMP_CONTAINERS:
271 items = node.find(f'rdf:{xmlcontainer}', self.NS)
272 if items is None:
273 continue
274 result = container()
275 for item in items:
276 insertfn(result, item.text)
277 return result
278 return ''
279
280 def _get_element_values(self, name: str | QName = '') -> Iterator[Any]:
281 yield from (v[2] for v in self._get_elements(name))
282
283 def __contains__(self, key: str | QName) -> bool:
284 """Test if XMP key exists."""
285 return any(self._get_element_values(key))
286
287 def get(self, key: str | QName, default: Any = None) -> Any:
288 """Get XMP value for key, or default if not found."""
289 try:
290 return next(self._get_element_values(key))
291 except StopIteration:
292 return default
293
294 def __getitem__(self, key: str | QName) -> Any:
295 """Retrieve XMP metadata for key."""
296 try:
297 return next(self._get_element_values(key))
298 except StopIteration:
299 raise KeyError(key) from None
300
301 def __iter__(self) -> Iterator[str]:
302 """Iterate through XMP metadata attributes and nodes."""
303 for node, attrib, _val, _parents in self._get_elements():
304 if attrib:
305 yield str(attrib)
306 else:
307 yield node.tag
308
309 def __len__(self) -> int:
310 """Return number of items in metadata."""
311 return len(list(iter(self)))
312
313 def set_value(
314 self,
315 key: str | QName,
316 val: set[str] | list[str] | str,
317 ) -> None:
318 """Set XMP metadata key to value."""
319 qkey = self.qname(key)
320
321 try:
322 # Update existing node
323 self._setitem_update(key, val, qkey)
324 except StopIteration:
325 # Insert a new node
326 self._setitem_insert(key, val)
327
328 def __setitem__(self, key: str | QName, val: set[str] | list[str] | str) -> None:
329 """Set XMP metadata key to value."""
330 self.set_value(key, val)
331
332 def _setitem_add_array(self, node: _Element, items: Iterable) -> None:
333 rdf_type = next(
334 c.rdf_type for c in XMP_CONTAINERS if isinstance(items, c.py_type)
335 )
336 from lxml import etree
337 from lxml.etree import QName
338 seq = etree.SubElement(node, str(QName(XMP_NS_RDF, rdf_type)))
339 tag_attrib: dict[str, str] | None = None
340 if rdf_type == 'Alt':
341 tag_attrib = {str(QName(XMP_NS_XML, 'lang')): 'x-default'}
342 for item in items:
343 el = etree.SubElement(seq, str(QName(XMP_NS_RDF, 'li')), attrib=tag_attrib)
344 if item is not None:
345 inner_text: str | None = clean(item)
346 if inner_text == '':
347 inner_text = None
348 el.text = inner_text
349
350 def _setitem_update(self, key: str | QName, val: Any, qkey: str) -> None:
351 from pikepdf.models.metadata._constants import LANG_ALTS
352
353 # Locate existing node to replace
354 node, attrib, _oldval, _parent = next(self._get_elements(key))
355 if attrib:
356 if not isinstance(val, str):
357 if qkey == self.qname('dc:creator'):
358 # dc:creator incorrectly created as an attribute - we're
359 # replacing it anyway, so remove the old one
360 del node.attrib[qkey]
361 self._setitem_add_array(node, clean(val))
362 else:
363 raise TypeError(f"Setting {key} to {val} with type {type(val)}")
364 else:
365 node.set(attrib, clean(val))
366 elif isinstance(val, list | set):
367 for child in node.findall('*'):
368 node.remove(child)
369 self._setitem_add_array(node, val)
370 elif isinstance(val, str):
371 for child in node.findall('*'):
372 node.remove(child)
373 if str(self.qname(key)) in LANG_ALTS:
374 self._setitem_add_array(node, AltList([clean(val)]))
375 else:
376 node.text = clean(val)
377 else:
378 raise TypeError(f"Setting {key} to {val} with type {type(val)}")
379
380 def _setitem_insert(self, key: str | QName, val: Any) -> None:
381 from lxml import etree
382 from lxml.etree import QName
383
384 from pikepdf.models.metadata._constants import LANG_ALTS
385
386 rdf = self._get_rdf_root()
387 if str(self.qname(key)) in LANG_ALTS:
388 val = AltList([clean(val)])
389 # Reuse existing rdf:Description element if available, to avoid
390 # creating multiple Description elements with the same rdf:about=""
391 rdfdesc = rdf.find('rdf:Description[@rdf:about=""]', self.NS)
392 if rdfdesc is None:
393 rdfdesc = etree.SubElement(
394 rdf,
395 str(QName(XMP_NS_RDF, 'Description')),
396 attrib={str(QName(XMP_NS_RDF, 'about')): ''},
397 )
398 if isinstance(val, list | set):
399 node = etree.SubElement(rdfdesc, self.qname(key))
400 self._setitem_add_array(node, val)
401 elif isinstance(val, str):
402 node = etree.SubElement(rdfdesc, self.qname(key))
403 node.text = clean(val)
404 else:
405 raise TypeError(f"Setting {key} to {val} with type {type(val)}") from None
406
407 def delete(self, key: str | QName) -> bool:
408 """Delete item from XMP metadata.
409
410 Returns:
411 True if item was found and deleted, False if not found.
412 """
413 from lxml.etree import QName
414
415 try:
416 node, attrib, _oldval, parent = next(self._get_elements(key))
417 if attrib: # Inline
418 del node.attrib[attrib]
419 if (
420 len(node.attrib) == 1
421 and len(node) == 0
422 and QName(XMP_NS_RDF, 'about') in node.attrib.keys()
423 ):
424 # The only thing left on this node is rdf:about="", so remove it
425 parent.remove(node)
426 else:
427 parent.remove(node)
428 return True
429 except StopIteration:
430 return False
431
432 def __delitem__(self, key: str | QName) -> None:
433 """Delete item from XMP metadata."""
434 if not self.delete(key):
435 raise KeyError(key)
436
437 def to_bytes(self, xpacket: bool = True) -> bytes:
438 """Serialize XMP to XML bytes.
439
440 Args:
441 xpacket: If True, wrap in xpacket markers.
442
443 Returns:
444 XML bytes representation of the XMP.
445 """
446 data = BytesIO()
447 if xpacket:
448 data.write(XPACKET_BEGIN)
449 self._xmp.write(data, encoding='utf-8', pretty_print=True)
450 if xpacket:
451 data.write(XPACKET_END)
452 data.seek(0)
453 return data.read()
454
455 def __str__(self) -> str:
456 """Convert XMP metadata to XML string."""
457 return self.to_bytes(xpacket=False).decode('utf-8')