Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_utils.py: 51%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2006, Mathieu Fenniak
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright notice,
9# this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above copyright notice,
11# this list of conditions and the following disclaimer in the documentation
12# and/or other materials provided with the distribution.
13# * The name of the author may not be used to endorse or promote products
14# derived from this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26# POSSIBILITY OF SUCH DAMAGE.
28"""Utility functions for PDF library."""
29__author__ = "Mathieu Fenniak"
30__author_email__ = "biziqe@mathieu.fenniak.net"
32import functools
33import logging
34import re
35import sys
36import warnings
37from dataclasses import dataclass
38from datetime import datetime, timezone
39from io import DEFAULT_BUFFER_SIZE
40from os import SEEK_CUR
41from re import Pattern
42from typing import (
43 IO,
44 Any,
45 NoReturn,
46 Optional,
47 Union,
48)
50if sys.version_info[:2] >= (3, 10):
51 # Python 3.10+: https://www.python.org/dev/peps/pep-0484/
52 from typing import TypeAlias
53else:
54 from typing_extensions import TypeAlias
56if sys.version_info >= (3, 11):
57 from typing import Self
58else:
59 from typing_extensions import Self
61from .errors import (
62 STREAM_TRUNCATED_PREMATURELY,
63 DeprecationError,
64 PdfStreamError,
65)
67TransformationMatrixType: TypeAlias = tuple[
68 tuple[float, float, float], tuple[float, float, float], tuple[float, float, float]
69]
70CompressedTransformationMatrix: TypeAlias = tuple[
71 float, float, float, float, float, float
72]
74StreamType = IO[Any]
75BinaryStreamType = IO[bytes]
76StrByteType = Union[str, StreamType]
79def parse_iso8824_date(text: Optional[str]) -> Optional[datetime]:
80 orgtext = text
81 if not text:
82 return None
83 if text[0].isdigit():
84 text = "D:" + text
85 if text.endswith(("Z", "z")):
86 text += "0000"
87 text = text.replace("z", "+").replace("Z", "+").replace("'", "")
88 i = max(text.find("+"), text.find("-"))
89 if i > 0 and i != len(text) - 5:
90 text += "00"
91 for f in (
92 "D:%Y",
93 "D:%Y%m",
94 "D:%Y%m%d",
95 "D:%Y%m%d%H",
96 "D:%Y%m%d%H%M",
97 "D:%Y%m%d%H%M%S",
98 "D:%Y%m%d%H%M%S%z",
99 ):
100 try:
101 d = datetime.strptime(text, f) # noqa: DTZ007
102 except ValueError:
103 continue
104 else:
105 if text.endswith("+0000"):
106 d = d.replace(tzinfo=timezone.utc)
107 return d
108 raise ValueError(f"Can not convert date: {orgtext}")
111def format_iso8824_date(dt: datetime) -> str:
112 """
113 Convert a datetime object to PDF date string format.
115 Converts datetime to the PDF date format D:YYYYMMDDHHmmSSOHH'mm
116 as specified in the PDF Reference.
118 Args:
119 dt: A datetime object to convert.
121 Returns:
122 A date string in PDF format.
123 """
124 date_str = dt.strftime("D:%Y%m%d%H%M%S")
125 if dt.tzinfo is not None:
126 offset = dt.utcoffset()
127 assert offset is not None
128 total_seconds = int(offset.total_seconds())
129 hours, remainder = divmod(abs(total_seconds), 3600)
130 minutes = remainder // 60
131 sign = "+" if total_seconds >= 0 else "-"
132 date_str += f"{sign}{hours:02d}'{minutes:02d}'"
133 return date_str
136def _get_max_pdf_version_header(header1: str, header2: str) -> str:
137 versions = (
138 "%PDF-1.3",
139 "%PDF-1.4",
140 "%PDF-1.5",
141 "%PDF-1.6",
142 "%PDF-1.7",
143 "%PDF-2.0",
144 )
145 pdf_header_indices = []
146 if header1 in versions:
147 pdf_header_indices.append(versions.index(header1))
148 if header2 in versions:
149 pdf_header_indices.append(versions.index(header2))
150 if len(pdf_header_indices) == 0:
151 raise ValueError(f"Neither {header1!r} nor {header2!r} are proper headers")
152 return versions[max(pdf_header_indices)]
155WHITESPACES = (b"\x00", b"\t", b"\n", b"\f", b"\r", b" ")
156WHITESPACES_AS_BYTES = b"".join(WHITESPACES)
157WHITESPACES_AS_REGEXP = b"[" + WHITESPACES_AS_BYTES + b"]"
160def read_until_whitespace(stream: StreamType, maxchars: Optional[int] = None) -> bytes:
161 """
162 Read non-whitespace characters and return them.
164 Stops upon encountering whitespace or when maxchars is reached.
166 Args:
167 stream: The data stream from which was read.
168 maxchars: The maximum number of bytes returned; by default unlimited.
170 Returns:
171 The data which was read.
173 """
174 txt = b""
175 while True:
176 tok = stream.read(1)
177 if tok.isspace() or not tok:
178 break
179 txt += tok
180 if len(txt) == maxchars:
181 break
182 return txt
185def read_non_whitespace(stream: BinaryStreamType) -> bytes:
186 """
187 Find and read the next non-whitespace character (ignores whitespace).
189 Args:
190 stream: The data stream from which was read.
192 Returns:
193 The data which was read.
195 """
196 tok = stream.read(1)
197 while tok in WHITESPACES:
198 tok = stream.read(1)
199 return tok
202def skip_over_whitespace(stream: StreamType) -> bool:
203 """
204 Similar to read_non_whitespace, but return a boolean if at least one
205 whitespace character was read.
207 Args:
208 stream: The data stream from which was read.
210 Returns:
211 True if one or more whitespace was skipped, otherwise return False.
213 """
214 tok = stream.read(1)
215 cnt = 0
216 while tok in WHITESPACES:
217 cnt += 1
218 tok = stream.read(1)
219 return cnt > 0
222def check_if_whitespace_only(value: bytes) -> bool:
223 """
224 Check if the given value consists of whitespace characters only.
226 Args:
227 value: The bytes to check.
229 Returns:
230 True if the value only has whitespace characters, otherwise return False.
232 """
233 return all(b in WHITESPACES_AS_BYTES for b in value)
236def skip_over_comment(stream: StreamType) -> None:
237 tok = stream.read(1)
238 stream.seek(-1, 1)
239 if tok == b"%":
240 while tok not in (b"\n", b"\r"):
241 tok = stream.read(1)
242 if tok == b"":
243 raise PdfStreamError("File ended unexpectedly.")
246def read_until_regex(stream: StreamType, regex: Pattern[bytes]) -> bytes:
247 """
248 Read until the regular expression pattern matched (ignore the match).
249 Treats EOF on the underlying stream as the end of the token to be matched.
251 Args:
252 regex: re.Pattern
254 Returns:
255 The read bytes.
257 """
258 parts: list[bytes] = []
259 total_len = 0
260 tail = b""
261 chunk_size = 16
262 while True:
263 tok = stream.read(chunk_size)
264 if not tok:
265 return b"".join(parts)
266 # Search overlap of previous tail + new chunk to catch
267 # multi-byte regex matches spanning chunk boundaries.
268 buf = tail + tok
269 m = regex.search(buf)
270 if m is not None:
271 overlap = len(tail)
272 actual_start = total_len - overlap + m.start()
273 stream.seek(actual_start - total_len - len(tok), 1)
274 parts.append(tok)
275 return b"".join(parts)[:actual_start]
276 parts.append(tok)
277 total_len += len(tok)
278 # Fixed overlap: 16 bytes is sufficient for the short
279 # delimiter patterns used in PDF parsing.
280 tail = tok[-16:]
281 if chunk_size < 8192:
282 chunk_size <<= 1
283 return b"".join(parts)
286def read_block_backwards(stream: BinaryStreamType, to_read: int) -> bytes:
287 """
288 Given a stream at position X, read a block of size to_read ending at position X.
290 This changes the stream's position to the beginning of where the block was
291 read.
293 Args:
294 stream:
295 to_read:
297 Returns:
298 The data which was read.
300 """
301 if stream.tell() < to_read:
302 raise PdfStreamError("Could not read malformed PDF file")
303 # Seek to the start of the block we want to read.
304 stream.seek(-to_read, SEEK_CUR)
305 read = stream.read(to_read)
306 # Seek to the start of the block we read after reading it.
307 stream.seek(-to_read, SEEK_CUR)
308 return read
311def read_previous_line(stream: StreamType) -> bytes:
312 """
313 Given a byte stream with current position X, return the previous line.
315 All characters between the first CR/LF byte found before X
316 (or, the start of the file, if no such byte is found) and position X
317 After this call, the stream will be positioned one byte after the
318 first non-CRLF character found beyond the first CR/LF byte before X,
319 or, if no such byte is found, at the beginning of the stream.
321 Args:
322 stream: StreamType:
324 Returns:
325 The data which was read.
327 """
328 line_content = []
329 found_crlf = False
330 if stream.tell() == 0:
331 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY)
332 while True:
333 to_read = min(DEFAULT_BUFFER_SIZE, stream.tell())
334 if to_read == 0:
335 break
336 # Read the block. After this, our stream will be one
337 # beyond the initial position.
338 block = read_block_backwards(stream, to_read)
339 idx = len(block) - 1
340 if not found_crlf:
341 # We haven't found our first CR/LF yet.
342 # Read off characters until we hit one.
343 while idx >= 0 and block[idx] not in b"\r\n":
344 idx -= 1
345 if idx >= 0:
346 found_crlf = True
347 if found_crlf:
348 # We found our first CR/LF already (on this block or
349 # a previous one).
350 # Our combined line is the remainder of the block
351 # plus any previously read blocks.
352 line_content.append(block[idx + 1 :])
353 # Continue to read off any more CRLF characters.
354 while idx >= 0 and block[idx] in b"\r\n":
355 idx -= 1
356 else:
357 # Didn't find CR/LF yet - add this block to our
358 # previously read blocks and continue.
359 line_content.append(block)
360 if idx >= 0:
361 # We found the next non-CRLF character.
362 # Set the stream position correctly, then break
363 stream.seek(idx + 1, SEEK_CUR)
364 break
365 # Join all the blocks in the line (which are in reverse order)
366 return b"".join(line_content[::-1])
369def matrix_multiply(
370 a: TransformationMatrixType, b: TransformationMatrixType
371) -> TransformationMatrixType:
372 return tuple( # type: ignore[return-value]
373 tuple(sum(float(i) * float(j) for i, j in zip(row, col)) for col in zip(*b))
374 for row in a
375 )
378def mark_location(stream: StreamType) -> None:
379 """Create text file showing current location in context."""
380 # Mainly for debugging
381 radius = 5000
382 stream.seek(-radius, 1)
383 with open("pypdf_pdfLocation.txt", "wb") as output_fh:
384 output_fh.write(stream.read(radius))
385 output_fh.write(b"HERE")
386 output_fh.write(stream.read(radius))
387 stream.seek(-radius, 1)
390def deprecate(msg: str, stacklevel: int = 3) -> None:
391 warnings.warn(msg, DeprecationWarning, stacklevel=stacklevel)
394def deprecation(msg: str) -> NoReturn:
395 raise DeprecationError(msg)
398def deprecate_with_replacement(old_name: str, new_name: str, removed_in: str) -> None:
399 """Issue a warning that a feature will be removed, but has a replacement."""
400 deprecate(
401 f"{old_name} is deprecated and will be removed in pypdf {removed_in}. Use {new_name} instead.",
402 4,
403 )
406def deprecation_with_replacement(old_name: str, new_name: str, removed_in: str) -> NoReturn:
407 """Raise an exception that a feature was already removed, but has a replacement."""
408 deprecation(
409 f"{old_name} is deprecated and was removed in pypdf {removed_in}. Use {new_name} instead."
410 )
413def deprecate_no_replacement(name: str, removed_in: str) -> None:
414 """Issue a warning that a feature will be removed without replacement."""
415 deprecate(f"{name} is deprecated and will be removed in pypdf {removed_in}.", 4)
418def deprecation_no_replacement(name: str, removed_in: str) -> NoReturn:
419 """Raise an exception that a feature was already removed without replacement."""
420 deprecation(f"{name} is deprecated and was removed in pypdf {removed_in}.")
423def logger_error(message: str, *, source: str, **values: Any) -> None:
424 """
425 Use this instead of logger.error directly.
427 That allows people to overwrite it more easily.
429 See the docs on when to use which:
430 https://pypdf.readthedocs.io/en/latest/user/suppress-warnings.html
431 """
432 if values:
433 logging.getLogger(source).error(message, values)
434 else:
435 logging.getLogger(source).error(message)
438def logger_warning(message: str, *, source: str, **values: Any) -> None:
439 """
440 Use this instead of logger.warning directly.
442 That allows people to overwrite it more easily.
444 ## Exception, warnings.warn, logger_warning
445 - Exceptions should be used if the user should write code that deals with
446 an error case, e.g. the PDF being completely broken.
447 - warnings.warn should be used if the user needs to fix their code, e.g.
448 DeprecationWarnings
449 - logger_warning should be used if the user needs to know that an issue was
450 handled by pypdf, e.g. a non-compliant PDF being read in a way that
451 pypdf could apply a robustness fix to still read it. This applies mainly
452 to strict=False mode.
453 """
454 if values:
455 logging.getLogger(source).warning(message, values)
456 else:
457 # Keep parity with logger_error and support plain warning messages.
458 # Passing an empty dict to logging is not equivalent to passing no args:
459 # plain messages would fail while being formatted.
460 logging.getLogger(source).warning(message)
463def rename_kwargs(
464 func_name: str, kwargs: dict[str, Any], aliases: dict[str, str], fail: bool = False
465) -> None:
466 """
467 Helper function to deprecate arguments.
469 Args:
470 func_name: Name of the function to be deprecated
471 kwargs:
472 aliases:
473 fail:
475 """
476 for old_term, new_term in aliases.items():
477 if old_term in kwargs:
478 if fail:
479 raise DeprecationError(
480 f"{old_term} is deprecated as an argument. Use {new_term} instead"
481 )
482 if new_term in kwargs:
483 raise TypeError(
484 f"{func_name} received both {old_term} and {new_term} as "
485 f"an argument. {old_term} is deprecated. "
486 f"Use {new_term} instead."
487 )
488 kwargs[new_term] = kwargs.pop(old_term)
489 warnings.warn(
490 message=(
491 f"{old_term} is deprecated as an argument. Use {new_term} instead"
492 ),
493 category=DeprecationWarning,
494 stacklevel=3,
495 )
498def _human_readable_bytes(bytes: int) -> str:
499 if bytes < 10**3:
500 return f"{bytes} Byte"
501 if bytes < 10**6:
502 return f"{bytes / 10**3:.1f} kB"
503 if bytes < 10**9:
504 return f"{bytes / 10**6:.1f} MB"
505 return f"{bytes / 10**9:.1f} GB"
508# The following class has been copied from Django:
509# https://github.com/django/django/blob/adae619426b6f50046b3daaa744db52989c9d6db/django/utils/functional.py#L51-L65
510# It received some modifications to comply with our own coding standards.
511#
512# Original license:
513#
514# ---------------------------------------------------------------------------------
515# Copyright (c) Django Software Foundation and individual contributors.
516# All rights reserved.
517#
518# Redistribution and use in source and binary forms, with or without modification,
519# are permitted provided that the following conditions are met:
520#
521# 1. Redistributions of source code must retain the above copyright notice,
522# this list of conditions and the following disclaimer.
523#
524# 2. Redistributions in binary form must reproduce the above copyright
525# notice, this list of conditions and the following disclaimer in the
526# documentation and/or other materials provided with the distribution.
527#
528# 3. Neither the name of Django nor the names of its contributors may be used
529# to endorse or promote products derived from this software without
530# specific prior written permission.
531#
532# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
533# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
534# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
535# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
536# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
537# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
538# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
539# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
540# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
541# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
542# ---------------------------------------------------------------------------------
543class classproperty: # noqa: N801
544 """
545 Decorator that converts a method with a single cls argument into a property
546 that can be accessed directly from the class.
547 """
549 def __init__(self, method=None) -> None: # type: ignore # noqa: ANN001
550 self.fget = method
552 def __get__(self, instance, cls=None) -> Any: # type: ignore # noqa: ANN001
553 return self.fget(cls)
555 def getter(self, method) -> Self: # type: ignore # noqa: ANN001
556 self.fget = method
557 return self
560@dataclass
561class File:
562 from .generic import IndirectObject # noqa: PLC0415
564 name: str = ""
565 """
566 Filename as identified within the PDF file.
567 """
568 data: bytes = b""
569 """
570 Data as bytes.
571 """
572 indirect_reference: Optional[IndirectObject] = None
573 """
574 Reference to the object storing the stream.
575 """
577 def __str__(self) -> str:
578 return f"{self.__class__.__name__}(name={self.name}, data: {_human_readable_bytes(len(self.data))})"
580 def __repr__(self) -> str:
581 return self.__str__()[:-1] + f", hash: {hash(self.data)})"
584@functools.total_ordering
585class Version:
586 COMPONENT_PATTERN = re.compile(r"^(\d+)(.*)$")
588 def __init__(self, version_str: str) -> None:
589 self.version_str = version_str
590 self.components = self._parse_version(version_str)
592 def _parse_version(self, version_str: str) -> list[tuple[int, str]]:
593 components = version_str.split(".")
594 parsed_components = []
595 for component in components:
596 match = Version.COMPONENT_PATTERN.match(component)
597 if not match:
598 parsed_components.append((0, component))
599 continue
600 integer_prefix = match.group(1)
601 suffix = match.group(2)
602 if integer_prefix is None:
603 integer_prefix = 0
604 parsed_components.append((int(integer_prefix), suffix))
605 return parsed_components
607 def __eq__(self, other: object) -> bool:
608 if not isinstance(other, Version):
609 return False
610 return self.components == other.components
612 def __hash__(self) -> int:
613 # Convert to tuple as lists cannot be hashed.
614 return hash((self.__class__, tuple(self.components)))
616 def __lt__(self, other: Any) -> bool:
617 if not isinstance(other, Version):
618 raise ValueError(f"Version cannot be compared against {type(other)}")
620 for self_component, other_component in zip(self.components, other.components):
621 self_value, self_suffix = self_component
622 other_value, other_suffix = other_component
624 if self_value < other_value:
625 return True
626 if self_value > other_value:
627 return False
629 if self_suffix < other_suffix:
630 return True
631 if self_suffix > other_suffix:
632 return False
634 return len(self.components) < len(other.components)