Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_utils.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2006, Mathieu Fenniak
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright notice,
9# this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above copyright notice,
11# this list of conditions and the following disclaimer in the documentation
12# and/or other materials provided with the distribution.
13# * The name of the author may not be used to endorse or promote products
14# derived from this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26# POSSIBILITY OF SUCH DAMAGE.
28"""Utility functions for PDF library."""
29__author__ = "Mathieu Fenniak"
30__author_email__ = "biziqe@mathieu.fenniak.net"
32import functools
33import logging
34import re
35import sys
36import warnings
37from dataclasses import dataclass
38from datetime import datetime, timezone
39from io import DEFAULT_BUFFER_SIZE
40from os import SEEK_CUR
41from typing import (
42 IO,
43 Any,
44 Dict,
45 List,
46 Optional,
47 Pattern,
48 Tuple,
49 Union,
50 overload,
51)
53if sys.version_info[:2] >= (3, 10):
54 # Python 3.10+: https://www.python.org/dev/peps/pep-0484/
55 from typing import TypeAlias
56else:
57 from typing_extensions import TypeAlias
59if sys.version_info >= (3, 11):
60 from typing import Self
61else:
62 from typing_extensions import Self
64from .errors import (
65 STREAM_TRUNCATED_PREMATURELY,
66 DeprecationError,
67 PdfStreamError,
68)
70TransformationMatrixType: TypeAlias = Tuple[
71 Tuple[float, float, float], Tuple[float, float, float], Tuple[float, float, float]
72]
73CompressedTransformationMatrix: TypeAlias = Tuple[
74 float, float, float, float, float, float
75]
77StreamType = IO[Any]
78StrByteType = Union[str, StreamType]
81def parse_iso8824_date(text: Optional[str]) -> Optional[datetime]:
82 orgtext = text
83 if text is None:
84 return None
85 if text[0].isdigit():
86 text = "D:" + text
87 if text.endswith(("Z", "z")):
88 text += "0000"
89 text = text.replace("z", "+").replace("Z", "+").replace("'", "")
90 i = max(text.find("+"), text.find("-"))
91 if i > 0 and i != len(text) - 5:
92 text += "00"
93 for f in (
94 "D:%Y",
95 "D:%Y%m",
96 "D:%Y%m%d",
97 "D:%Y%m%d%H",
98 "D:%Y%m%d%H%M",
99 "D:%Y%m%d%H%M%S",
100 "D:%Y%m%d%H%M%S%z",
101 ):
102 try:
103 d = datetime.strptime(text, f) # noqa: DTZ007
104 except ValueError:
105 continue
106 else:
107 if text.endswith("+0000"):
108 d = d.replace(tzinfo=timezone.utc)
109 return d
110 raise ValueError(f"Can not convert date: {orgtext}")
113def _get_max_pdf_version_header(header1: str, header2: str) -> str:
114 versions = (
115 "%PDF-1.3",
116 "%PDF-1.4",
117 "%PDF-1.5",
118 "%PDF-1.6",
119 "%PDF-1.7",
120 "%PDF-2.0",
121 )
122 pdf_header_indices = []
123 if header1 in versions:
124 pdf_header_indices.append(versions.index(header1))
125 if header2 in versions:
126 pdf_header_indices.append(versions.index(header2))
127 if len(pdf_header_indices) == 0:
128 raise ValueError(f"Neither {header1!r} nor {header2!r} are proper headers")
129 return versions[max(pdf_header_indices)]
132WHITESPACES = (b"\x00", b"\t", b"\n", b"\f", b"\r", b" ")
133WHITESPACES_AS_BYTES = b"".join(WHITESPACES)
134WHITESPACES_AS_REGEXP = b"[" + WHITESPACES_AS_BYTES + b"]"
137def read_until_whitespace(stream: StreamType, maxchars: Optional[int] = None) -> bytes:
138 """
139 Read non-whitespace characters and return them.
141 Stops upon encountering whitespace or when maxchars is reached.
143 Args:
144 stream: The data stream from which was read.
145 maxchars: The maximum number of bytes returned; by default unlimited.
147 Returns:
148 The data which was read.
150 """
151 txt = b""
152 while True:
153 tok = stream.read(1)
154 if tok.isspace() or not tok:
155 break
156 txt += tok
157 if len(txt) == maxchars:
158 break
159 return txt
162def read_non_whitespace(stream: StreamType) -> bytes:
163 """
164 Find and read the next non-whitespace character (ignores whitespace).
166 Args:
167 stream: The data stream from which was read.
169 Returns:
170 The data which was read.
172 """
173 tok = stream.read(1)
174 while tok in WHITESPACES:
175 tok = stream.read(1)
176 return tok
179def skip_over_whitespace(stream: StreamType) -> bool:
180 """
181 Similar to read_non_whitespace, but return a boolean if at least one
182 whitespace character was read.
184 Args:
185 stream: The data stream from which was read.
187 Returns:
188 True if one or more whitespace was skipped, otherwise return False.
190 """
191 tok = stream.read(1)
192 cnt = 0
193 while tok in WHITESPACES:
194 cnt += 1
195 tok = stream.read(1)
196 return cnt > 0
199def check_if_whitespace_only(value: bytes) -> bool:
200 """
201 Check if the given value consists of whitespace characters only.
203 Args:
204 value: The bytes to check.
206 Returns:
207 True if the value only has whitespace characters, otherwise return False.
209 """
210 return all(b in WHITESPACES_AS_BYTES for b in value)
213def skip_over_comment(stream: StreamType) -> None:
214 tok = stream.read(1)
215 stream.seek(-1, 1)
216 if tok == b"%":
217 while tok not in (b"\n", b"\r"):
218 tok = stream.read(1)
219 if tok == b"":
220 raise PdfStreamError("File ended unexpectedly.")
223def read_until_regex(stream: StreamType, regex: Pattern[bytes]) -> bytes:
224 """
225 Read until the regular expression pattern matched (ignore the match).
226 Treats EOF on the underlying stream as the end of the token to be matched.
228 Args:
229 regex: re.Pattern
231 Returns:
232 The read bytes.
234 """
235 name = b""
236 while True:
237 tok = stream.read(16)
238 if not tok:
239 return name
240 m = regex.search(name + tok)
241 if m is not None:
242 stream.seek(m.start() - (len(name) + len(tok)), 1)
243 name = (name + tok)[: m.start()]
244 break
245 name += tok
246 return name
249def read_block_backwards(stream: StreamType, to_read: int) -> bytes:
250 """
251 Given a stream at position X, read a block of size to_read ending at position X.
253 This changes the stream's position to the beginning of where the block was
254 read.
256 Args:
257 stream:
258 to_read:
260 Returns:
261 The data which was read.
263 """
264 if stream.tell() < to_read:
265 raise PdfStreamError("Could not read malformed PDF file")
266 # Seek to the start of the block we want to read.
267 stream.seek(-to_read, SEEK_CUR)
268 read = stream.read(to_read)
269 # Seek to the start of the block we read after reading it.
270 stream.seek(-to_read, SEEK_CUR)
271 return read
274def read_previous_line(stream: StreamType) -> bytes:
275 """
276 Given a byte stream with current position X, return the previous line.
278 All characters between the first CR/LF byte found before X
279 (or, the start of the file, if no such byte is found) and position X
280 After this call, the stream will be positioned one byte after the
281 first non-CRLF character found beyond the first CR/LF byte before X,
282 or, if no such byte is found, at the beginning of the stream.
284 Args:
285 stream: StreamType:
287 Returns:
288 The data which was read.
290 """
291 line_content = []
292 found_crlf = False
293 if stream.tell() == 0:
294 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY)
295 while True:
296 to_read = min(DEFAULT_BUFFER_SIZE, stream.tell())
297 if to_read == 0:
298 break
299 # Read the block. After this, our stream will be one
300 # beyond the initial position.
301 block = read_block_backwards(stream, to_read)
302 idx = len(block) - 1
303 if not found_crlf:
304 # We haven't found our first CR/LF yet.
305 # Read off characters until we hit one.
306 while idx >= 0 and block[idx] not in b"\r\n":
307 idx -= 1
308 if idx >= 0:
309 found_crlf = True
310 if found_crlf:
311 # We found our first CR/LF already (on this block or
312 # a previous one).
313 # Our combined line is the remainder of the block
314 # plus any previously read blocks.
315 line_content.append(block[idx + 1 :])
316 # Continue to read off any more CRLF characters.
317 while idx >= 0 and block[idx] in b"\r\n":
318 idx -= 1
319 else:
320 # Didn't find CR/LF yet - add this block to our
321 # previously read blocks and continue.
322 line_content.append(block)
323 if idx >= 0:
324 # We found the next non-CRLF character.
325 # Set the stream position correctly, then break
326 stream.seek(idx + 1, SEEK_CUR)
327 break
328 # Join all the blocks in the line (which are in reverse order)
329 return b"".join(line_content[::-1])
332def matrix_multiply(
333 a: TransformationMatrixType, b: TransformationMatrixType
334) -> TransformationMatrixType:
335 return tuple( # type: ignore[return-value]
336 tuple(sum(float(i) * float(j) for i, j in zip(row, col)) for col in zip(*b))
337 for row in a
338 )
341def mark_location(stream: StreamType) -> None:
342 """Create text file showing current location in context."""
343 # Mainly for debugging
344 radius = 5000
345 stream.seek(-radius, 1)
346 with open("pypdf_pdfLocation.txt", "wb") as output_fh:
347 output_fh.write(stream.read(radius))
348 output_fh.write(b"HERE")
349 output_fh.write(stream.read(radius))
350 stream.seek(-radius, 1)
353@overload
354def ord_(b: str) -> int:
355 ...
358@overload
359def ord_(b: bytes) -> bytes:
360 ...
363@overload
364def ord_(b: int) -> int:
365 ...
368def ord_(b: Union[int, str, bytes]) -> Union[int, bytes]:
369 if isinstance(b, str):
370 return ord(b)
371 return b
374def deprecate(msg: str, stacklevel: int = 3) -> None:
375 warnings.warn(msg, DeprecationWarning, stacklevel=stacklevel)
378def deprecation(msg: str) -> None:
379 raise DeprecationError(msg)
382def deprecate_with_replacement(old_name: str, new_name: str, removed_in: str) -> None:
383 """Issue a warning that a feature will be removed, but has a replacement."""
384 deprecate(
385 f"{old_name} is deprecated and will be removed in pypdf {removed_in}. Use {new_name} instead.",
386 4,
387 )
390def deprecation_with_replacement(old_name: str, new_name: str, removed_in: str) -> None:
391 """Raise an exception that a feature was already removed, but has a replacement."""
392 deprecation(
393 f"{old_name} is deprecated and was removed in pypdf {removed_in}. Use {new_name} instead."
394 )
397def deprecate_no_replacement(name: str, removed_in: str) -> None:
398 """Issue a warning that a feature will be removed without replacement."""
399 deprecate(f"{name} is deprecated and will be removed in pypdf {removed_in}.", 4)
402def deprecation_no_replacement(name: str, removed_in: str) -> None:
403 """Raise an exception that a feature was already removed without replacement."""
404 deprecation(f"{name} is deprecated and was removed in pypdf {removed_in}.")
407def logger_error(msg: str, src: str) -> None:
408 """
409 Use this instead of logger.error directly.
411 That allows people to overwrite it more easily.
413 See the docs on when to use which:
414 https://pypdf.readthedocs.io/en/latest/user/suppress-warnings.html
415 """
416 logging.getLogger(src).error(msg)
419def logger_warning(msg: str, src: str) -> None:
420 """
421 Use this instead of logger.warning directly.
423 That allows people to overwrite it more easily.
425 ## Exception, warnings.warn, logger_warning
426 - Exceptions should be used if the user should write code that deals with
427 an error case, e.g. the PDF being completely broken.
428 - warnings.warn should be used if the user needs to fix their code, e.g.
429 DeprecationWarnings
430 - logger_warning should be used if the user needs to know that an issue was
431 handled by pypdf, e.g. a non-compliant PDF being read in a way that
432 pypdf could apply a robustness fix to still read it. This applies mainly
433 to strict=False mode.
434 """
435 logging.getLogger(src).warning(msg)
438def rename_kwargs(
439 func_name: str, kwargs: Dict[str, Any], aliases: Dict[str, str], fail: bool = False
440) -> None:
441 """
442 Helper function to deprecate arguments.
444 Args:
445 func_name: Name of the function to be deprecated
446 kwargs:
447 aliases:
448 fail:
450 """
451 for old_term, new_term in aliases.items():
452 if old_term in kwargs:
453 if fail:
454 raise DeprecationError(
455 f"{old_term} is deprecated as an argument. Use {new_term} instead"
456 )
457 if new_term in kwargs:
458 raise TypeError(
459 f"{func_name} received both {old_term} and {new_term} as "
460 f"an argument. {old_term} is deprecated. "
461 f"Use {new_term} instead."
462 )
463 kwargs[new_term] = kwargs.pop(old_term)
464 warnings.warn(
465 message=(
466 f"{old_term} is deprecated as an argument. Use {new_term} instead"
467 ),
468 category=DeprecationWarning,
469 stacklevel=3,
470 )
473def _human_readable_bytes(bytes: int) -> str:
474 if bytes < 10**3:
475 return f"{bytes} Byte"
476 if bytes < 10**6:
477 return f"{bytes / 10**3:.1f} kB"
478 if bytes < 10**9:
479 return f"{bytes / 10**6:.1f} MB"
480 return f"{bytes / 10**9:.1f} GB"
483# The following class has been copied from Django:
484# https://github.com/django/django/blob/adae619426b6f50046b3daaa744db52989c9d6db/django/utils/functional.py#L51-L65
485# It received some modifications to comply with our own coding standards.
486#
487# Original license:
488#
489# ---------------------------------------------------------------------------------
490# Copyright (c) Django Software Foundation and individual contributors.
491# All rights reserved.
492#
493# Redistribution and use in source and binary forms, with or without modification,
494# are permitted provided that the following conditions are met:
495#
496# 1. Redistributions of source code must retain the above copyright notice,
497# this list of conditions and the following disclaimer.
498#
499# 2. Redistributions in binary form must reproduce the above copyright
500# notice, this list of conditions and the following disclaimer in the
501# documentation and/or other materials provided with the distribution.
502#
503# 3. Neither the name of Django nor the names of its contributors may be used
504# to endorse or promote products derived from this software without
505# specific prior written permission.
506#
507# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
508# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
509# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
510# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
511# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
512# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
513# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
514# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
515# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
516# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
517# ---------------------------------------------------------------------------------
518class classproperty: # noqa: N801
519 """
520 Decorator that converts a method with a single cls argument into a property
521 that can be accessed directly from the class.
522 """
524 def __init__(self, method=None) -> None: # type: ignore # noqa: ANN001
525 self.fget = method
527 def __get__(self, instance, cls=None) -> Any: # type: ignore # noqa: ANN001
528 return self.fget(cls)
530 def getter(self, method) -> Self: # type: ignore # noqa: ANN001
531 self.fget = method
532 return self
535@dataclass
536class File:
537 from .generic import IndirectObject # noqa: PLC0415
539 name: str = ""
540 """
541 Filename as identified within the PDF file.
542 """
543 data: bytes = b""
544 """
545 Data as bytes.
546 """
547 indirect_reference: Optional[IndirectObject] = None
548 """
549 Reference to the object storing the stream.
550 """
552 def __str__(self) -> str:
553 return f"{self.__class__.__name__}(name={self.name}, data: {_human_readable_bytes(len(self.data))})"
555 def __repr__(self) -> str:
556 return self.__str__()[:-1] + f", hash: {hash(self.data)})"
559@functools.total_ordering
560class Version:
561 COMPONENT_PATTERN = re.compile(r"^(\d+)(.*)$")
563 def __init__(self, version_str: str) -> None:
564 self.version_str = version_str
565 self.components = self._parse_version(version_str)
567 def _parse_version(self, version_str: str) -> List[Tuple[int, str]]:
568 components = version_str.split(".")
569 parsed_components = []
570 for component in components:
571 match = Version.COMPONENT_PATTERN.match(component)
572 if not match:
573 parsed_components.append((0, component))
574 continue
575 integer_prefix = match.group(1)
576 suffix = match.group(2)
577 if integer_prefix is None:
578 integer_prefix = 0
579 parsed_components.append((int(integer_prefix), suffix))
580 return parsed_components
582 def __eq__(self, other: object) -> bool:
583 if not isinstance(other, Version):
584 return False
585 return self.components == other.components
587 def __hash__(self) -> int:
588 # Convert to tuple as lists cannot be hashed.
589 return hash((self.__class__, tuple(self.components)))
591 def __lt__(self, other: Any) -> bool:
592 if not isinstance(other, Version):
593 raise ValueError(f"Version cannot be compared against {type(other)}")
595 for self_component, other_component in zip(self.components, other.components):
596 self_value, self_suffix = self_component
597 other_value, other_suffix = other_component
599 if self_value < other_value:
600 return True
601 if self_value > other_value:
602 return False
604 if self_suffix < other_suffix:
605 return True
606 if self_suffix > other_suffix:
607 return False
609 return len(self.components) < len(other.components)