Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_utils.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

276 statements  

1# Copyright (c) 2006, Mathieu Fenniak 

2# All rights reserved. 

3# 

4# Redistribution and use in source and binary forms, with or without 

5# modification, are permitted provided that the following conditions are 

6# met: 

7# 

8# * Redistributions of source code must retain the above copyright notice, 

9# this list of conditions and the following disclaimer. 

10# * Redistributions in binary form must reproduce the above copyright notice, 

11# this list of conditions and the following disclaimer in the documentation 

12# and/or other materials provided with the distribution. 

13# * The name of the author may not be used to endorse or promote products 

14# derived from this software without specific prior written permission. 

15# 

16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 

19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 

20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 

22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 

23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 

24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 

25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 

26# POSSIBILITY OF SUCH DAMAGE. 

27 

28"""Utility functions for PDF library.""" 

29__author__ = "Mathieu Fenniak" 

30__author_email__ = "biziqe@mathieu.fenniak.net" 

31 

32import functools 

33import logging 

34import re 

35import sys 

36import warnings 

37from dataclasses import dataclass 

38from datetime import datetime, timezone 

39from io import DEFAULT_BUFFER_SIZE 

40from os import SEEK_CUR 

41from re import Pattern 

42from typing import ( 

43 IO, 

44 Any, 

45 Optional, 

46 Union, 

47 overload, 

48) 

49 

50if sys.version_info[:2] >= (3, 10): 

51 # Python 3.10+: https://www.python.org/dev/peps/pep-0484/ 

52 from typing import TypeAlias 

53else: 

54 from typing_extensions import TypeAlias 

55 

56if sys.version_info >= (3, 11): 

57 from typing import Self 

58else: 

59 from typing_extensions import Self 

60 

61from .errors import ( 

62 STREAM_TRUNCATED_PREMATURELY, 

63 DeprecationError, 

64 PdfStreamError, 

65) 

66 

67TransformationMatrixType: TypeAlias = tuple[ 

68 tuple[float, float, float], tuple[float, float, float], tuple[float, float, float] 

69] 

70CompressedTransformationMatrix: TypeAlias = tuple[ 

71 float, float, float, float, float, float 

72] 

73 

74StreamType = IO[Any] 

75BinaryStreamType = IO[bytes] 

76StrByteType = Union[str, StreamType] 

77 

78 

79def parse_iso8824_date(text: Optional[str]) -> Optional[datetime]: 

80 orgtext = text 

81 if not text: 

82 return None 

83 if text[0].isdigit(): 

84 text = "D:" + text 

85 if text.endswith(("Z", "z")): 

86 text += "0000" 

87 text = text.replace("z", "+").replace("Z", "+").replace("'", "") 

88 i = max(text.find("+"), text.find("-")) 

89 if i > 0 and i != len(text) - 5: 

90 text += "00" 

91 for f in ( 

92 "D:%Y", 

93 "D:%Y%m", 

94 "D:%Y%m%d", 

95 "D:%Y%m%d%H", 

96 "D:%Y%m%d%H%M", 

97 "D:%Y%m%d%H%M%S", 

98 "D:%Y%m%d%H%M%S%z", 

99 ): 

100 try: 

101 d = datetime.strptime(text, f) # noqa: DTZ007 

102 except ValueError: 

103 continue 

104 else: 

105 if text.endswith("+0000"): 

106 d = d.replace(tzinfo=timezone.utc) 

107 return d 

108 raise ValueError(f"Can not convert date: {orgtext}") 

109 

110 

111def format_iso8824_date(dt: datetime) -> str: 

112 """ 

113 Convert a datetime object to PDF date string format. 

114 

115 Converts datetime to the PDF date format D:YYYYMMDDHHmmSSOHH'mm 

116 as specified in the PDF Reference. 

117 

118 Args: 

119 dt: A datetime object to convert. 

120 

121 Returns: 

122 A date string in PDF format. 

123 """ 

124 date_str = dt.strftime("D:%Y%m%d%H%M%S") 

125 if dt.tzinfo is not None: 

126 offset = dt.utcoffset() 

127 assert offset is not None 

128 total_seconds = int(offset.total_seconds()) 

129 hours, remainder = divmod(abs(total_seconds), 3600) 

130 minutes = remainder // 60 

131 sign = "+" if total_seconds >= 0 else "-" 

132 date_str += f"{sign}{hours:02d}'{minutes:02d}'" 

133 return date_str 

134 

135 

136def _get_max_pdf_version_header(header1: str, header2: str) -> str: 

137 versions = ( 

138 "%PDF-1.3", 

139 "%PDF-1.4", 

140 "%PDF-1.5", 

141 "%PDF-1.6", 

142 "%PDF-1.7", 

143 "%PDF-2.0", 

144 ) 

145 pdf_header_indices = [] 

146 if header1 in versions: 

147 pdf_header_indices.append(versions.index(header1)) 

148 if header2 in versions: 

149 pdf_header_indices.append(versions.index(header2)) 

150 if len(pdf_header_indices) == 0: 

151 raise ValueError(f"Neither {header1!r} nor {header2!r} are proper headers") 

152 return versions[max(pdf_header_indices)] 

153 

154 

155WHITESPACES = (b"\x00", b"\t", b"\n", b"\f", b"\r", b" ") 

156WHITESPACES_AS_BYTES = b"".join(WHITESPACES) 

157WHITESPACES_AS_REGEXP = b"[" + WHITESPACES_AS_BYTES + b"]" 

158 

159 

160def read_until_whitespace(stream: StreamType, maxchars: Optional[int] = None) -> bytes: 

161 """ 

162 Read non-whitespace characters and return them. 

163 

164 Stops upon encountering whitespace or when maxchars is reached. 

165 

166 Args: 

167 stream: The data stream from which was read. 

168 maxchars: The maximum number of bytes returned; by default unlimited. 

169 

170 Returns: 

171 The data which was read. 

172 

173 """ 

174 txt = b"" 

175 while True: 

176 tok = stream.read(1) 

177 if tok.isspace() or not tok: 

178 break 

179 txt += tok 

180 if len(txt) == maxchars: 

181 break 

182 return txt 

183 

184 

185def read_non_whitespace(stream: BinaryStreamType) -> bytes: 

186 """ 

187 Find and read the next non-whitespace character (ignores whitespace). 

188 

189 Args: 

190 stream: The data stream from which was read. 

191 

192 Returns: 

193 The data which was read. 

194 

195 """ 

196 tok = stream.read(1) 

197 while tok in WHITESPACES: 

198 tok = stream.read(1) 

199 return tok 

200 

201 

202def skip_over_whitespace(stream: StreamType) -> bool: 

203 """ 

204 Similar to read_non_whitespace, but return a boolean if at least one 

205 whitespace character was read. 

206 

207 Args: 

208 stream: The data stream from which was read. 

209 

210 Returns: 

211 True if one or more whitespace was skipped, otherwise return False. 

212 

213 """ 

214 tok = stream.read(1) 

215 cnt = 0 

216 while tok in WHITESPACES: 

217 cnt += 1 

218 tok = stream.read(1) 

219 return cnt > 0 

220 

221 

222def check_if_whitespace_only(value: bytes) -> bool: 

223 """ 

224 Check if the given value consists of whitespace characters only. 

225 

226 Args: 

227 value: The bytes to check. 

228 

229 Returns: 

230 True if the value only has whitespace characters, otherwise return False. 

231 

232 """ 

233 return all(b in WHITESPACES_AS_BYTES for b in value) 

234 

235 

236def skip_over_comment(stream: StreamType) -> None: 

237 tok = stream.read(1) 

238 stream.seek(-1, 1) 

239 if tok == b"%": 

240 while tok not in (b"\n", b"\r"): 

241 tok = stream.read(1) 

242 if tok == b"": 

243 raise PdfStreamError("File ended unexpectedly.") 

244 

245 

246def read_until_regex(stream: StreamType, regex: Pattern[bytes]) -> bytes: 

247 """ 

248 Read until the regular expression pattern matched (ignore the match). 

249 Treats EOF on the underlying stream as the end of the token to be matched. 

250 

251 Args: 

252 regex: re.Pattern 

253 

254 Returns: 

255 The read bytes. 

256 

257 """ 

258 parts: list[bytes] = [] 

259 total_len = 0 

260 tail = b"" 

261 chunk_size = 16 

262 while True: 

263 tok = stream.read(chunk_size) 

264 if not tok: 

265 return b"".join(parts) 

266 # Search overlap of previous tail + new chunk to catch 

267 # multi-byte regex matches spanning chunk boundaries. 

268 buf = tail + tok 

269 m = regex.search(buf) 

270 if m is not None: 

271 overlap = len(tail) 

272 actual_start = total_len - overlap + m.start() 

273 stream.seek(actual_start - total_len - len(tok), 1) 

274 parts.append(tok) 

275 return b"".join(parts)[:actual_start] 

276 parts.append(tok) 

277 total_len += len(tok) 

278 # Fixed overlap: 16 bytes is sufficient for the short 

279 # delimiter patterns used in PDF parsing. 

280 tail = tok[-16:] 

281 if chunk_size < 8192: 

282 chunk_size <<= 1 

283 return b"".join(parts) 

284 

285 

286def read_block_backwards(stream: BinaryStreamType, to_read: int) -> bytes: 

287 """ 

288 Given a stream at position X, read a block of size to_read ending at position X. 

289 

290 This changes the stream's position to the beginning of where the block was 

291 read. 

292 

293 Args: 

294 stream: 

295 to_read: 

296 

297 Returns: 

298 The data which was read. 

299 

300 """ 

301 if stream.tell() < to_read: 

302 raise PdfStreamError("Could not read malformed PDF file") 

303 # Seek to the start of the block we want to read. 

304 stream.seek(-to_read, SEEK_CUR) 

305 read = stream.read(to_read) 

306 # Seek to the start of the block we read after reading it. 

307 stream.seek(-to_read, SEEK_CUR) 

308 return read 

309 

310 

311def read_previous_line(stream: StreamType) -> bytes: 

312 """ 

313 Given a byte stream with current position X, return the previous line. 

314 

315 All characters between the first CR/LF byte found before X 

316 (or, the start of the file, if no such byte is found) and position X 

317 After this call, the stream will be positioned one byte after the 

318 first non-CRLF character found beyond the first CR/LF byte before X, 

319 or, if no such byte is found, at the beginning of the stream. 

320 

321 Args: 

322 stream: StreamType: 

323 

324 Returns: 

325 The data which was read. 

326 

327 """ 

328 line_content = [] 

329 found_crlf = False 

330 if stream.tell() == 0: 

331 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) 

332 while True: 

333 to_read = min(DEFAULT_BUFFER_SIZE, stream.tell()) 

334 if to_read == 0: 

335 break 

336 # Read the block. After this, our stream will be one 

337 # beyond the initial position. 

338 block = read_block_backwards(stream, to_read) 

339 idx = len(block) - 1 

340 if not found_crlf: 

341 # We haven't found our first CR/LF yet. 

342 # Read off characters until we hit one. 

343 while idx >= 0 and block[idx] not in b"\r\n": 

344 idx -= 1 

345 if idx >= 0: 

346 found_crlf = True 

347 if found_crlf: 

348 # We found our first CR/LF already (on this block or 

349 # a previous one). 

350 # Our combined line is the remainder of the block 

351 # plus any previously read blocks. 

352 line_content.append(block[idx + 1 :]) 

353 # Continue to read off any more CRLF characters. 

354 while idx >= 0 and block[idx] in b"\r\n": 

355 idx -= 1 

356 else: 

357 # Didn't find CR/LF yet - add this block to our 

358 # previously read blocks and continue. 

359 line_content.append(block) 

360 if idx >= 0: 

361 # We found the next non-CRLF character. 

362 # Set the stream position correctly, then break 

363 stream.seek(idx + 1, SEEK_CUR) 

364 break 

365 # Join all the blocks in the line (which are in reverse order) 

366 return b"".join(line_content[::-1]) 

367 

368 

369def matrix_multiply( 

370 a: TransformationMatrixType, b: TransformationMatrixType 

371) -> TransformationMatrixType: 

372 return tuple( # type: ignore[return-value] 

373 tuple(sum(float(i) * float(j) for i, j in zip(row, col)) for col in zip(*b)) 

374 for row in a 

375 ) 

376 

377 

378def mark_location(stream: StreamType) -> None: 

379 """Create text file showing current location in context.""" 

380 # Mainly for debugging 

381 radius = 5000 

382 stream.seek(-radius, 1) 

383 with open("pypdf_pdfLocation.txt", "wb") as output_fh: 

384 output_fh.write(stream.read(radius)) 

385 output_fh.write(b"HERE") 

386 output_fh.write(stream.read(radius)) 

387 stream.seek(-radius, 1) 

388 

389 

390@overload 

391def ord_(b: str) -> int: 

392 ... 

393 

394 

395@overload 

396def ord_(b: bytes) -> bytes: 

397 ... 

398 

399 

400@overload 

401def ord_(b: int) -> int: 

402 ... 

403 

404 

405def ord_(b: Union[int, str, bytes]) -> Union[int, bytes]: 

406 if isinstance(b, str): 

407 return ord(b) 

408 return b 

409 

410 

411def deprecate(msg: str, stacklevel: int = 3) -> None: 

412 warnings.warn(msg, DeprecationWarning, stacklevel=stacklevel) 

413 

414 

415def deprecation(msg: str) -> None: 

416 raise DeprecationError(msg) 

417 

418 

419def deprecate_with_replacement(old_name: str, new_name: str, removed_in: str) -> None: 

420 """Issue a warning that a feature will be removed, but has a replacement.""" 

421 deprecate( 

422 f"{old_name} is deprecated and will be removed in pypdf {removed_in}. Use {new_name} instead.", 

423 4, 

424 ) 

425 

426 

427def deprecation_with_replacement(old_name: str, new_name: str, removed_in: str) -> None: 

428 """Raise an exception that a feature was already removed, but has a replacement.""" 

429 deprecation( 

430 f"{old_name} is deprecated and was removed in pypdf {removed_in}. Use {new_name} instead." 

431 ) 

432 

433 

434def deprecate_no_replacement(name: str, removed_in: str) -> None: 

435 """Issue a warning that a feature will be removed without replacement.""" 

436 deprecate(f"{name} is deprecated and will be removed in pypdf {removed_in}.", 4) 

437 

438 

439def deprecation_no_replacement(name: str, removed_in: str) -> None: 

440 """Raise an exception that a feature was already removed without replacement.""" 

441 deprecation(f"{name} is deprecated and was removed in pypdf {removed_in}.") 

442 

443 

444def logger_error(message: str, *, source: str, **values: Any) -> None: 

445 """ 

446 Use this instead of logger.error directly. 

447 

448 That allows people to overwrite it more easily. 

449 

450 See the docs on when to use which: 

451 https://pypdf.readthedocs.io/en/latest/user/suppress-warnings.html 

452 """ 

453 logging.getLogger(source).error(message, values) 

454 

455 

456def logger_warning(msg: str, src: str) -> None: 

457 """ 

458 Use this instead of logger.warning directly. 

459 

460 That allows people to overwrite it more easily. 

461 

462 ## Exception, warnings.warn, logger_warning 

463 - Exceptions should be used if the user should write code that deals with 

464 an error case, e.g. the PDF being completely broken. 

465 - warnings.warn should be used if the user needs to fix their code, e.g. 

466 DeprecationWarnings 

467 - logger_warning should be used if the user needs to know that an issue was 

468 handled by pypdf, e.g. a non-compliant PDF being read in a way that 

469 pypdf could apply a robustness fix to still read it. This applies mainly 

470 to strict=False mode. 

471 """ 

472 logging.getLogger(src).warning(msg) 

473 

474 

475def rename_kwargs( 

476 func_name: str, kwargs: dict[str, Any], aliases: dict[str, str], fail: bool = False 

477) -> None: 

478 """ 

479 Helper function to deprecate arguments. 

480 

481 Args: 

482 func_name: Name of the function to be deprecated 

483 kwargs: 

484 aliases: 

485 fail: 

486 

487 """ 

488 for old_term, new_term in aliases.items(): 

489 if old_term in kwargs: 

490 if fail: 

491 raise DeprecationError( 

492 f"{old_term} is deprecated as an argument. Use {new_term} instead" 

493 ) 

494 if new_term in kwargs: 

495 raise TypeError( 

496 f"{func_name} received both {old_term} and {new_term} as " 

497 f"an argument. {old_term} is deprecated. " 

498 f"Use {new_term} instead." 

499 ) 

500 kwargs[new_term] = kwargs.pop(old_term) 

501 warnings.warn( 

502 message=( 

503 f"{old_term} is deprecated as an argument. Use {new_term} instead" 

504 ), 

505 category=DeprecationWarning, 

506 stacklevel=3, 

507 ) 

508 

509 

510def _human_readable_bytes(bytes: int) -> str: 

511 if bytes < 10**3: 

512 return f"{bytes} Byte" 

513 if bytes < 10**6: 

514 return f"{bytes / 10**3:.1f} kB" 

515 if bytes < 10**9: 

516 return f"{bytes / 10**6:.1f} MB" 

517 return f"{bytes / 10**9:.1f} GB" 

518 

519 

520# The following class has been copied from Django: 

521# https://github.com/django/django/blob/adae619426b6f50046b3daaa744db52989c9d6db/django/utils/functional.py#L51-L65 

522# It received some modifications to comply with our own coding standards. 

523# 

524# Original license: 

525# 

526# --------------------------------------------------------------------------------- 

527# Copyright (c) Django Software Foundation and individual contributors. 

528# All rights reserved. 

529# 

530# Redistribution and use in source and binary forms, with or without modification, 

531# are permitted provided that the following conditions are met: 

532# 

533# 1. Redistributions of source code must retain the above copyright notice, 

534# this list of conditions and the following disclaimer. 

535# 

536# 2. Redistributions in binary form must reproduce the above copyright 

537# notice, this list of conditions and the following disclaimer in the 

538# documentation and/or other materials provided with the distribution. 

539# 

540# 3. Neither the name of Django nor the names of its contributors may be used 

541# to endorse or promote products derived from this software without 

542# specific prior written permission. 

543# 

544# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 

545# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 

546# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

547# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 

548# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 

549# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 

550# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 

551# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 

552# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 

553# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

554# --------------------------------------------------------------------------------- 

555class classproperty: # noqa: N801 

556 """ 

557 Decorator that converts a method with a single cls argument into a property 

558 that can be accessed directly from the class. 

559 """ 

560 

561 def __init__(self, method=None) -> None: # type: ignore # noqa: ANN001 

562 self.fget = method 

563 

564 def __get__(self, instance, cls=None) -> Any: # type: ignore # noqa: ANN001 

565 return self.fget(cls) 

566 

567 def getter(self, method) -> Self: # type: ignore # noqa: ANN001 

568 self.fget = method 

569 return self 

570 

571 

572@dataclass 

573class File: 

574 from .generic import IndirectObject # noqa: PLC0415 

575 

576 name: str = "" 

577 """ 

578 Filename as identified within the PDF file. 

579 """ 

580 data: bytes = b"" 

581 """ 

582 Data as bytes. 

583 """ 

584 indirect_reference: Optional[IndirectObject] = None 

585 """ 

586 Reference to the object storing the stream. 

587 """ 

588 

589 def __str__(self) -> str: 

590 return f"{self.__class__.__name__}(name={self.name}, data: {_human_readable_bytes(len(self.data))})" 

591 

592 def __repr__(self) -> str: 

593 return self.__str__()[:-1] + f", hash: {hash(self.data)})" 

594 

595 

596@functools.total_ordering 

597class Version: 

598 COMPONENT_PATTERN = re.compile(r"^(\d+)(.*)$") 

599 

600 def __init__(self, version_str: str) -> None: 

601 self.version_str = version_str 

602 self.components = self._parse_version(version_str) 

603 

604 def _parse_version(self, version_str: str) -> list[tuple[int, str]]: 

605 components = version_str.split(".") 

606 parsed_components = [] 

607 for component in components: 

608 match = Version.COMPONENT_PATTERN.match(component) 

609 if not match: 

610 parsed_components.append((0, component)) 

611 continue 

612 integer_prefix = match.group(1) 

613 suffix = match.group(2) 

614 if integer_prefix is None: 

615 integer_prefix = 0 

616 parsed_components.append((int(integer_prefix), suffix)) 

617 return parsed_components 

618 

619 def __eq__(self, other: object) -> bool: 

620 if not isinstance(other, Version): 

621 return False 

622 return self.components == other.components 

623 

624 def __hash__(self) -> int: 

625 # Convert to tuple as lists cannot be hashed. 

626 return hash((self.__class__, tuple(self.components))) 

627 

628 def __lt__(self, other: Any) -> bool: 

629 if not isinstance(other, Version): 

630 raise ValueError(f"Version cannot be compared against {type(other)}") 

631 

632 for self_component, other_component in zip(self.components, other.components): 

633 self_value, self_suffix = self_component 

634 other_value, other_suffix = other_component 

635 

636 if self_value < other_value: 

637 return True 

638 if self_value > other_value: 

639 return False 

640 

641 if self_suffix < other_suffix: 

642 return True 

643 if self_suffix > other_suffix: 

644 return False 

645 

646 return len(self.components) < len(other.components)