Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_utils.py: 51%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2006, Mathieu Fenniak
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright notice,
9# this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above copyright notice,
11# this list of conditions and the following disclaimer in the documentation
12# and/or other materials provided with the distribution.
13# * The name of the author may not be used to endorse or promote products
14# derived from this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26# POSSIBILITY OF SUCH DAMAGE.
28"""Utility functions for PDF library."""
29__author__ = "Mathieu Fenniak"
30__author_email__ = "biziqe@mathieu.fenniak.net"
32import functools
33import logging
34import re
35import sys
36import warnings
37from dataclasses import dataclass
38from datetime import datetime, timezone
39from io import DEFAULT_BUFFER_SIZE
40from os import SEEK_CUR
41from re import Pattern
42from typing import (
43 IO,
44 Any,
45 Optional,
46 Union,
47 overload,
48)
50if sys.version_info[:2] >= (3, 10):
51 # Python 3.10+: https://www.python.org/dev/peps/pep-0484/
52 from typing import TypeAlias
53else:
54 from typing_extensions import TypeAlias
56if sys.version_info >= (3, 11):
57 from typing import Self
58else:
59 from typing_extensions import Self
61from .errors import (
62 STREAM_TRUNCATED_PREMATURELY,
63 DeprecationError,
64 PdfStreamError,
65)
67TransformationMatrixType: TypeAlias = tuple[
68 tuple[float, float, float], tuple[float, float, float], tuple[float, float, float]
69]
70CompressedTransformationMatrix: TypeAlias = tuple[
71 float, float, float, float, float, float
72]
74StreamType = IO[Any]
75StrByteType = Union[str, StreamType]
78def parse_iso8824_date(text: Optional[str]) -> Optional[datetime]:
79 orgtext = text
80 if text is None:
81 return None
82 if text[0].isdigit():
83 text = "D:" + text
84 if text.endswith(("Z", "z")):
85 text += "0000"
86 text = text.replace("z", "+").replace("Z", "+").replace("'", "")
87 i = max(text.find("+"), text.find("-"))
88 if i > 0 and i != len(text) - 5:
89 text += "00"
90 for f in (
91 "D:%Y",
92 "D:%Y%m",
93 "D:%Y%m%d",
94 "D:%Y%m%d%H",
95 "D:%Y%m%d%H%M",
96 "D:%Y%m%d%H%M%S",
97 "D:%Y%m%d%H%M%S%z",
98 ):
99 try:
100 d = datetime.strptime(text, f) # noqa: DTZ007
101 except ValueError:
102 continue
103 else:
104 if text.endswith("+0000"):
105 d = d.replace(tzinfo=timezone.utc)
106 return d
107 raise ValueError(f"Can not convert date: {orgtext}")
110def format_iso8824_date(dt: datetime) -> str:
111 """
112 Convert a datetime object to PDF date string format.
114 Converts datetime to the PDF date format D:YYYYMMDDHHmmSSOHH'mm
115 as specified in the PDF Reference.
117 Args:
118 dt: A datetime object to convert.
120 Returns:
121 A date string in PDF format.
122 """
123 date_str = dt.strftime("D:%Y%m%d%H%M%S")
124 if dt.tzinfo is not None:
125 offset = dt.utcoffset()
126 assert offset is not None
127 total_seconds = int(offset.total_seconds())
128 hours, remainder = divmod(abs(total_seconds), 3600)
129 minutes = remainder // 60
130 sign = "+" if total_seconds >= 0 else "-"
131 date_str += f"{sign}{hours:02d}'{minutes:02d}'"
132 return date_str
135def _get_max_pdf_version_header(header1: str, header2: str) -> str:
136 versions = (
137 "%PDF-1.3",
138 "%PDF-1.4",
139 "%PDF-1.5",
140 "%PDF-1.6",
141 "%PDF-1.7",
142 "%PDF-2.0",
143 )
144 pdf_header_indices = []
145 if header1 in versions:
146 pdf_header_indices.append(versions.index(header1))
147 if header2 in versions:
148 pdf_header_indices.append(versions.index(header2))
149 if len(pdf_header_indices) == 0:
150 raise ValueError(f"Neither {header1!r} nor {header2!r} are proper headers")
151 return versions[max(pdf_header_indices)]
154WHITESPACES = (b"\x00", b"\t", b"\n", b"\f", b"\r", b" ")
155WHITESPACES_AS_BYTES = b"".join(WHITESPACES)
156WHITESPACES_AS_REGEXP = b"[" + WHITESPACES_AS_BYTES + b"]"
159def read_until_whitespace(stream: StreamType, maxchars: Optional[int] = None) -> bytes:
160 """
161 Read non-whitespace characters and return them.
163 Stops upon encountering whitespace or when maxchars is reached.
165 Args:
166 stream: The data stream from which was read.
167 maxchars: The maximum number of bytes returned; by default unlimited.
169 Returns:
170 The data which was read.
172 """
173 txt = b""
174 while True:
175 tok = stream.read(1)
176 if tok.isspace() or not tok:
177 break
178 txt += tok
179 if len(txt) == maxchars:
180 break
181 return txt
184def read_non_whitespace(stream: StreamType) -> bytes:
185 """
186 Find and read the next non-whitespace character (ignores whitespace).
188 Args:
189 stream: The data stream from which was read.
191 Returns:
192 The data which was read.
194 """
195 tok = stream.read(1)
196 while tok in WHITESPACES:
197 tok = stream.read(1)
198 return tok
201def skip_over_whitespace(stream: StreamType) -> bool:
202 """
203 Similar to read_non_whitespace, but return a boolean if at least one
204 whitespace character was read.
206 Args:
207 stream: The data stream from which was read.
209 Returns:
210 True if one or more whitespace was skipped, otherwise return False.
212 """
213 tok = stream.read(1)
214 cnt = 0
215 while tok in WHITESPACES:
216 cnt += 1
217 tok = stream.read(1)
218 return cnt > 0
221def check_if_whitespace_only(value: bytes) -> bool:
222 """
223 Check if the given value consists of whitespace characters only.
225 Args:
226 value: The bytes to check.
228 Returns:
229 True if the value only has whitespace characters, otherwise return False.
231 """
232 return all(b in WHITESPACES_AS_BYTES for b in value)
235def skip_over_comment(stream: StreamType) -> None:
236 tok = stream.read(1)
237 stream.seek(-1, 1)
238 if tok == b"%":
239 while tok not in (b"\n", b"\r"):
240 tok = stream.read(1)
241 if tok == b"":
242 raise PdfStreamError("File ended unexpectedly.")
245def read_until_regex(stream: StreamType, regex: Pattern[bytes]) -> bytes:
246 """
247 Read until the regular expression pattern matched (ignore the match).
248 Treats EOF on the underlying stream as the end of the token to be matched.
250 Args:
251 regex: re.Pattern
253 Returns:
254 The read bytes.
256 """
257 name = b""
258 while True:
259 tok = stream.read(16)
260 if not tok:
261 return name
262 m = regex.search(name + tok)
263 if m is not None:
264 stream.seek(m.start() - (len(name) + len(tok)), 1)
265 name = (name + tok)[: m.start()]
266 break
267 name += tok
268 return name
271def read_block_backwards(stream: StreamType, to_read: int) -> bytes:
272 """
273 Given a stream at position X, read a block of size to_read ending at position X.
275 This changes the stream's position to the beginning of where the block was
276 read.
278 Args:
279 stream:
280 to_read:
282 Returns:
283 The data which was read.
285 """
286 if stream.tell() < to_read:
287 raise PdfStreamError("Could not read malformed PDF file")
288 # Seek to the start of the block we want to read.
289 stream.seek(-to_read, SEEK_CUR)
290 read = stream.read(to_read)
291 # Seek to the start of the block we read after reading it.
292 stream.seek(-to_read, SEEK_CUR)
293 return read
296def read_previous_line(stream: StreamType) -> bytes:
297 """
298 Given a byte stream with current position X, return the previous line.
300 All characters between the first CR/LF byte found before X
301 (or, the start of the file, if no such byte is found) and position X
302 After this call, the stream will be positioned one byte after the
303 first non-CRLF character found beyond the first CR/LF byte before X,
304 or, if no such byte is found, at the beginning of the stream.
306 Args:
307 stream: StreamType:
309 Returns:
310 The data which was read.
312 """
313 line_content = []
314 found_crlf = False
315 if stream.tell() == 0:
316 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY)
317 while True:
318 to_read = min(DEFAULT_BUFFER_SIZE, stream.tell())
319 if to_read == 0:
320 break
321 # Read the block. After this, our stream will be one
322 # beyond the initial position.
323 block = read_block_backwards(stream, to_read)
324 idx = len(block) - 1
325 if not found_crlf:
326 # We haven't found our first CR/LF yet.
327 # Read off characters until we hit one.
328 while idx >= 0 and block[idx] not in b"\r\n":
329 idx -= 1
330 if idx >= 0:
331 found_crlf = True
332 if found_crlf:
333 # We found our first CR/LF already (on this block or
334 # a previous one).
335 # Our combined line is the remainder of the block
336 # plus any previously read blocks.
337 line_content.append(block[idx + 1 :])
338 # Continue to read off any more CRLF characters.
339 while idx >= 0 and block[idx] in b"\r\n":
340 idx -= 1
341 else:
342 # Didn't find CR/LF yet - add this block to our
343 # previously read blocks and continue.
344 line_content.append(block)
345 if idx >= 0:
346 # We found the next non-CRLF character.
347 # Set the stream position correctly, then break
348 stream.seek(idx + 1, SEEK_CUR)
349 break
350 # Join all the blocks in the line (which are in reverse order)
351 return b"".join(line_content[::-1])
354def matrix_multiply(
355 a: TransformationMatrixType, b: TransformationMatrixType
356) -> TransformationMatrixType:
357 return tuple( # type: ignore[return-value]
358 tuple(sum(float(i) * float(j) for i, j in zip(row, col)) for col in zip(*b))
359 for row in a
360 )
363def mark_location(stream: StreamType) -> None:
364 """Create text file showing current location in context."""
365 # Mainly for debugging
366 radius = 5000
367 stream.seek(-radius, 1)
368 with open("pypdf_pdfLocation.txt", "wb") as output_fh:
369 output_fh.write(stream.read(radius))
370 output_fh.write(b"HERE")
371 output_fh.write(stream.read(radius))
372 stream.seek(-radius, 1)
375@overload
376def ord_(b: str) -> int:
377 ...
380@overload
381def ord_(b: bytes) -> bytes:
382 ...
385@overload
386def ord_(b: int) -> int:
387 ...
390def ord_(b: Union[int, str, bytes]) -> Union[int, bytes]:
391 if isinstance(b, str):
392 return ord(b)
393 return b
396def deprecate(msg: str, stacklevel: int = 3) -> None:
397 warnings.warn(msg, DeprecationWarning, stacklevel=stacklevel)
400def deprecation(msg: str) -> None:
401 raise DeprecationError(msg)
404def deprecate_with_replacement(old_name: str, new_name: str, removed_in: str) -> None:
405 """Issue a warning that a feature will be removed, but has a replacement."""
406 deprecate(
407 f"{old_name} is deprecated and will be removed in pypdf {removed_in}. Use {new_name} instead.",
408 4,
409 )
412def deprecation_with_replacement(old_name: str, new_name: str, removed_in: str) -> None:
413 """Raise an exception that a feature was already removed, but has a replacement."""
414 deprecation(
415 f"{old_name} is deprecated and was removed in pypdf {removed_in}. Use {new_name} instead."
416 )
419def deprecate_no_replacement(name: str, removed_in: str) -> None:
420 """Issue a warning that a feature will be removed without replacement."""
421 deprecate(f"{name} is deprecated and will be removed in pypdf {removed_in}.", 4)
424def deprecation_no_replacement(name: str, removed_in: str) -> None:
425 """Raise an exception that a feature was already removed without replacement."""
426 deprecation(f"{name} is deprecated and was removed in pypdf {removed_in}.")
429def logger_error(msg: str, src: str) -> None:
430 """
431 Use this instead of logger.error directly.
433 That allows people to overwrite it more easily.
435 See the docs on when to use which:
436 https://pypdf.readthedocs.io/en/latest/user/suppress-warnings.html
437 """
438 logging.getLogger(src).error(msg)
441def logger_warning(msg: str, src: str) -> None:
442 """
443 Use this instead of logger.warning directly.
445 That allows people to overwrite it more easily.
447 ## Exception, warnings.warn, logger_warning
448 - Exceptions should be used if the user should write code that deals with
449 an error case, e.g. the PDF being completely broken.
450 - warnings.warn should be used if the user needs to fix their code, e.g.
451 DeprecationWarnings
452 - logger_warning should be used if the user needs to know that an issue was
453 handled by pypdf, e.g. a non-compliant PDF being read in a way that
454 pypdf could apply a robustness fix to still read it. This applies mainly
455 to strict=False mode.
456 """
457 logging.getLogger(src).warning(msg)
460def rename_kwargs(
461 func_name: str, kwargs: dict[str, Any], aliases: dict[str, str], fail: bool = False
462) -> None:
463 """
464 Helper function to deprecate arguments.
466 Args:
467 func_name: Name of the function to be deprecated
468 kwargs:
469 aliases:
470 fail:
472 """
473 for old_term, new_term in aliases.items():
474 if old_term in kwargs:
475 if fail:
476 raise DeprecationError(
477 f"{old_term} is deprecated as an argument. Use {new_term} instead"
478 )
479 if new_term in kwargs:
480 raise TypeError(
481 f"{func_name} received both {old_term} and {new_term} as "
482 f"an argument. {old_term} is deprecated. "
483 f"Use {new_term} instead."
484 )
485 kwargs[new_term] = kwargs.pop(old_term)
486 warnings.warn(
487 message=(
488 f"{old_term} is deprecated as an argument. Use {new_term} instead"
489 ),
490 category=DeprecationWarning,
491 stacklevel=3,
492 )
495def _human_readable_bytes(bytes: int) -> str:
496 if bytes < 10**3:
497 return f"{bytes} Byte"
498 if bytes < 10**6:
499 return f"{bytes / 10**3:.1f} kB"
500 if bytes < 10**9:
501 return f"{bytes / 10**6:.1f} MB"
502 return f"{bytes / 10**9:.1f} GB"
505# The following class has been copied from Django:
506# https://github.com/django/django/blob/adae619426b6f50046b3daaa744db52989c9d6db/django/utils/functional.py#L51-L65
507# It received some modifications to comply with our own coding standards.
508#
509# Original license:
510#
511# ---------------------------------------------------------------------------------
512# Copyright (c) Django Software Foundation and individual contributors.
513# All rights reserved.
514#
515# Redistribution and use in source and binary forms, with or without modification,
516# are permitted provided that the following conditions are met:
517#
518# 1. Redistributions of source code must retain the above copyright notice,
519# this list of conditions and the following disclaimer.
520#
521# 2. Redistributions in binary form must reproduce the above copyright
522# notice, this list of conditions and the following disclaimer in the
523# documentation and/or other materials provided with the distribution.
524#
525# 3. Neither the name of Django nor the names of its contributors may be used
526# to endorse or promote products derived from this software without
527# specific prior written permission.
528#
529# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
530# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
531# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
532# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
533# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
534# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
535# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
536# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
537# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
538# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
539# ---------------------------------------------------------------------------------
540class classproperty: # noqa: N801
541 """
542 Decorator that converts a method with a single cls argument into a property
543 that can be accessed directly from the class.
544 """
546 def __init__(self, method=None) -> None: # type: ignore # noqa: ANN001
547 self.fget = method
549 def __get__(self, instance, cls=None) -> Any: # type: ignore # noqa: ANN001
550 return self.fget(cls)
552 def getter(self, method) -> Self: # type: ignore # noqa: ANN001
553 self.fget = method
554 return self
557@dataclass
558class File:
559 from .generic import IndirectObject # noqa: PLC0415
561 name: str = ""
562 """
563 Filename as identified within the PDF file.
564 """
565 data: bytes = b""
566 """
567 Data as bytes.
568 """
569 indirect_reference: Optional[IndirectObject] = None
570 """
571 Reference to the object storing the stream.
572 """
574 def __str__(self) -> str:
575 return f"{self.__class__.__name__}(name={self.name}, data: {_human_readable_bytes(len(self.data))})"
577 def __repr__(self) -> str:
578 return self.__str__()[:-1] + f", hash: {hash(self.data)})"
581@functools.total_ordering
582class Version:
583 COMPONENT_PATTERN = re.compile(r"^(\d+)(.*)$")
585 def __init__(self, version_str: str) -> None:
586 self.version_str = version_str
587 self.components = self._parse_version(version_str)
589 def _parse_version(self, version_str: str) -> list[tuple[int, str]]:
590 components = version_str.split(".")
591 parsed_components = []
592 for component in components:
593 match = Version.COMPONENT_PATTERN.match(component)
594 if not match:
595 parsed_components.append((0, component))
596 continue
597 integer_prefix = match.group(1)
598 suffix = match.group(2)
599 if integer_prefix is None:
600 integer_prefix = 0
601 parsed_components.append((int(integer_prefix), suffix))
602 return parsed_components
604 def __eq__(self, other: object) -> bool:
605 if not isinstance(other, Version):
606 return False
607 return self.components == other.components
609 def __hash__(self) -> int:
610 # Convert to tuple as lists cannot be hashed.
611 return hash((self.__class__, tuple(self.components)))
613 def __lt__(self, other: Any) -> bool:
614 if not isinstance(other, Version):
615 raise ValueError(f"Version cannot be compared against {type(other)}")
617 for self_component, other_component in zip(self.components, other.components):
618 self_value, self_suffix = self_component
619 other_value, other_suffix = other_component
621 if self_value < other_value:
622 return True
623 if self_value > other_value:
624 return False
626 if self_suffix < other_suffix:
627 return True
628 if self_suffix > other_suffix:
629 return False
631 return len(self.components) < len(other.components)