Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_utils.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

266 statements  

1# Copyright (c) 2006, Mathieu Fenniak 

2# All rights reserved. 

3# 

4# Redistribution and use in source and binary forms, with or without 

5# modification, are permitted provided that the following conditions are 

6# met: 

7# 

8# * Redistributions of source code must retain the above copyright notice, 

9# this list of conditions and the following disclaimer. 

10# * Redistributions in binary form must reproduce the above copyright notice, 

11# this list of conditions and the following disclaimer in the documentation 

12# and/or other materials provided with the distribution. 

13# * The name of the author may not be used to endorse or promote products 

14# derived from this software without specific prior written permission. 

15# 

16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 

19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 

20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 

22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 

23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 

24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 

25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 

26# POSSIBILITY OF SUCH DAMAGE. 

27 

28"""Utility functions for PDF library.""" 

29__author__ = "Mathieu Fenniak" 

30__author_email__ = "biziqe@mathieu.fenniak.net" 

31 

32import functools 

33import logging 

34import re 

35import sys 

36import warnings 

37from dataclasses import dataclass 

38from datetime import datetime, timezone 

39from io import DEFAULT_BUFFER_SIZE 

40from os import SEEK_CUR 

41from re import Pattern 

42from typing import ( 

43 IO, 

44 Any, 

45 Optional, 

46 Union, 

47 overload, 

48) 

49 

50if sys.version_info[:2] >= (3, 10): 

51 # Python 3.10+: https://www.python.org/dev/peps/pep-0484/ 

52 from typing import TypeAlias 

53else: 

54 from typing_extensions import TypeAlias 

55 

56if sys.version_info >= (3, 11): 

57 from typing import Self 

58else: 

59 from typing_extensions import Self 

60 

61from .errors import ( 

62 STREAM_TRUNCATED_PREMATURELY, 

63 DeprecationError, 

64 PdfStreamError, 

65) 

66 

67TransformationMatrixType: TypeAlias = tuple[ 

68 tuple[float, float, float], tuple[float, float, float], tuple[float, float, float] 

69] 

70CompressedTransformationMatrix: TypeAlias = tuple[ 

71 float, float, float, float, float, float 

72] 

73 

74StreamType = IO[Any] 

75StrByteType = Union[str, StreamType] 

76 

77 

78def parse_iso8824_date(text: Optional[str]) -> Optional[datetime]: 

79 orgtext = text 

80 if text is None: 

81 return None 

82 if text[0].isdigit(): 

83 text = "D:" + text 

84 if text.endswith(("Z", "z")): 

85 text += "0000" 

86 text = text.replace("z", "+").replace("Z", "+").replace("'", "") 

87 i = max(text.find("+"), text.find("-")) 

88 if i > 0 and i != len(text) - 5: 

89 text += "00" 

90 for f in ( 

91 "D:%Y", 

92 "D:%Y%m", 

93 "D:%Y%m%d", 

94 "D:%Y%m%d%H", 

95 "D:%Y%m%d%H%M", 

96 "D:%Y%m%d%H%M%S", 

97 "D:%Y%m%d%H%M%S%z", 

98 ): 

99 try: 

100 d = datetime.strptime(text, f) # noqa: DTZ007 

101 except ValueError: 

102 continue 

103 else: 

104 if text.endswith("+0000"): 

105 d = d.replace(tzinfo=timezone.utc) 

106 return d 

107 raise ValueError(f"Can not convert date: {orgtext}") 

108 

109 

110def format_iso8824_date(dt: datetime) -> str: 

111 """ 

112 Convert a datetime object to PDF date string format. 

113 

114 Converts datetime to the PDF date format D:YYYYMMDDHHmmSSOHH'mm 

115 as specified in the PDF Reference. 

116 

117 Args: 

118 dt: A datetime object to convert. 

119 

120 Returns: 

121 A date string in PDF format. 

122 """ 

123 date_str = dt.strftime("D:%Y%m%d%H%M%S") 

124 if dt.tzinfo is not None: 

125 offset = dt.utcoffset() 

126 assert offset is not None 

127 total_seconds = int(offset.total_seconds()) 

128 hours, remainder = divmod(abs(total_seconds), 3600) 

129 minutes = remainder // 60 

130 sign = "+" if total_seconds >= 0 else "-" 

131 date_str += f"{sign}{hours:02d}'{minutes:02d}'" 

132 return date_str 

133 

134 

135def _get_max_pdf_version_header(header1: str, header2: str) -> str: 

136 versions = ( 

137 "%PDF-1.3", 

138 "%PDF-1.4", 

139 "%PDF-1.5", 

140 "%PDF-1.6", 

141 "%PDF-1.7", 

142 "%PDF-2.0", 

143 ) 

144 pdf_header_indices = [] 

145 if header1 in versions: 

146 pdf_header_indices.append(versions.index(header1)) 

147 if header2 in versions: 

148 pdf_header_indices.append(versions.index(header2)) 

149 if len(pdf_header_indices) == 0: 

150 raise ValueError(f"Neither {header1!r} nor {header2!r} are proper headers") 

151 return versions[max(pdf_header_indices)] 

152 

153 

154WHITESPACES = (b"\x00", b"\t", b"\n", b"\f", b"\r", b" ") 

155WHITESPACES_AS_BYTES = b"".join(WHITESPACES) 

156WHITESPACES_AS_REGEXP = b"[" + WHITESPACES_AS_BYTES + b"]" 

157 

158 

159def read_until_whitespace(stream: StreamType, maxchars: Optional[int] = None) -> bytes: 

160 """ 

161 Read non-whitespace characters and return them. 

162 

163 Stops upon encountering whitespace or when maxchars is reached. 

164 

165 Args: 

166 stream: The data stream from which was read. 

167 maxchars: The maximum number of bytes returned; by default unlimited. 

168 

169 Returns: 

170 The data which was read. 

171 

172 """ 

173 txt = b"" 

174 while True: 

175 tok = stream.read(1) 

176 if tok.isspace() or not tok: 

177 break 

178 txt += tok 

179 if len(txt) == maxchars: 

180 break 

181 return txt 

182 

183 

184def read_non_whitespace(stream: StreamType) -> bytes: 

185 """ 

186 Find and read the next non-whitespace character (ignores whitespace). 

187 

188 Args: 

189 stream: The data stream from which was read. 

190 

191 Returns: 

192 The data which was read. 

193 

194 """ 

195 tok = stream.read(1) 

196 while tok in WHITESPACES: 

197 tok = stream.read(1) 

198 return tok 

199 

200 

201def skip_over_whitespace(stream: StreamType) -> bool: 

202 """ 

203 Similar to read_non_whitespace, but return a boolean if at least one 

204 whitespace character was read. 

205 

206 Args: 

207 stream: The data stream from which was read. 

208 

209 Returns: 

210 True if one or more whitespace was skipped, otherwise return False. 

211 

212 """ 

213 tok = stream.read(1) 

214 cnt = 0 

215 while tok in WHITESPACES: 

216 cnt += 1 

217 tok = stream.read(1) 

218 return cnt > 0 

219 

220 

221def check_if_whitespace_only(value: bytes) -> bool: 

222 """ 

223 Check if the given value consists of whitespace characters only. 

224 

225 Args: 

226 value: The bytes to check. 

227 

228 Returns: 

229 True if the value only has whitespace characters, otherwise return False. 

230 

231 """ 

232 return all(b in WHITESPACES_AS_BYTES for b in value) 

233 

234 

235def skip_over_comment(stream: StreamType) -> None: 

236 tok = stream.read(1) 

237 stream.seek(-1, 1) 

238 if tok == b"%": 

239 while tok not in (b"\n", b"\r"): 

240 tok = stream.read(1) 

241 if tok == b"": 

242 raise PdfStreamError("File ended unexpectedly.") 

243 

244 

245def read_until_regex(stream: StreamType, regex: Pattern[bytes]) -> bytes: 

246 """ 

247 Read until the regular expression pattern matched (ignore the match). 

248 Treats EOF on the underlying stream as the end of the token to be matched. 

249 

250 Args: 

251 regex: re.Pattern 

252 

253 Returns: 

254 The read bytes. 

255 

256 """ 

257 name = b"" 

258 while True: 

259 tok = stream.read(16) 

260 if not tok: 

261 return name 

262 m = regex.search(name + tok) 

263 if m is not None: 

264 stream.seek(m.start() - (len(name) + len(tok)), 1) 

265 name = (name + tok)[: m.start()] 

266 break 

267 name += tok 

268 return name 

269 

270 

271def read_block_backwards(stream: StreamType, to_read: int) -> bytes: 

272 """ 

273 Given a stream at position X, read a block of size to_read ending at position X. 

274 

275 This changes the stream's position to the beginning of where the block was 

276 read. 

277 

278 Args: 

279 stream: 

280 to_read: 

281 

282 Returns: 

283 The data which was read. 

284 

285 """ 

286 if stream.tell() < to_read: 

287 raise PdfStreamError("Could not read malformed PDF file") 

288 # Seek to the start of the block we want to read. 

289 stream.seek(-to_read, SEEK_CUR) 

290 read = stream.read(to_read) 

291 # Seek to the start of the block we read after reading it. 

292 stream.seek(-to_read, SEEK_CUR) 

293 return read 

294 

295 

296def read_previous_line(stream: StreamType) -> bytes: 

297 """ 

298 Given a byte stream with current position X, return the previous line. 

299 

300 All characters between the first CR/LF byte found before X 

301 (or, the start of the file, if no such byte is found) and position X 

302 After this call, the stream will be positioned one byte after the 

303 first non-CRLF character found beyond the first CR/LF byte before X, 

304 or, if no such byte is found, at the beginning of the stream. 

305 

306 Args: 

307 stream: StreamType: 

308 

309 Returns: 

310 The data which was read. 

311 

312 """ 

313 line_content = [] 

314 found_crlf = False 

315 if stream.tell() == 0: 

316 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) 

317 while True: 

318 to_read = min(DEFAULT_BUFFER_SIZE, stream.tell()) 

319 if to_read == 0: 

320 break 

321 # Read the block. After this, our stream will be one 

322 # beyond the initial position. 

323 block = read_block_backwards(stream, to_read) 

324 idx = len(block) - 1 

325 if not found_crlf: 

326 # We haven't found our first CR/LF yet. 

327 # Read off characters until we hit one. 

328 while idx >= 0 and block[idx] not in b"\r\n": 

329 idx -= 1 

330 if idx >= 0: 

331 found_crlf = True 

332 if found_crlf: 

333 # We found our first CR/LF already (on this block or 

334 # a previous one). 

335 # Our combined line is the remainder of the block 

336 # plus any previously read blocks. 

337 line_content.append(block[idx + 1 :]) 

338 # Continue to read off any more CRLF characters. 

339 while idx >= 0 and block[idx] in b"\r\n": 

340 idx -= 1 

341 else: 

342 # Didn't find CR/LF yet - add this block to our 

343 # previously read blocks and continue. 

344 line_content.append(block) 

345 if idx >= 0: 

346 # We found the next non-CRLF character. 

347 # Set the stream position correctly, then break 

348 stream.seek(idx + 1, SEEK_CUR) 

349 break 

350 # Join all the blocks in the line (which are in reverse order) 

351 return b"".join(line_content[::-1]) 

352 

353 

354def matrix_multiply( 

355 a: TransformationMatrixType, b: TransformationMatrixType 

356) -> TransformationMatrixType: 

357 return tuple( # type: ignore[return-value] 

358 tuple(sum(float(i) * float(j) for i, j in zip(row, col)) for col in zip(*b)) 

359 for row in a 

360 ) 

361 

362 

363def mark_location(stream: StreamType) -> None: 

364 """Create text file showing current location in context.""" 

365 # Mainly for debugging 

366 radius = 5000 

367 stream.seek(-radius, 1) 

368 with open("pypdf_pdfLocation.txt", "wb") as output_fh: 

369 output_fh.write(stream.read(radius)) 

370 output_fh.write(b"HERE") 

371 output_fh.write(stream.read(radius)) 

372 stream.seek(-radius, 1) 

373 

374 

375@overload 

376def ord_(b: str) -> int: 

377 ... 

378 

379 

380@overload 

381def ord_(b: bytes) -> bytes: 

382 ... 

383 

384 

385@overload 

386def ord_(b: int) -> int: 

387 ... 

388 

389 

390def ord_(b: Union[int, str, bytes]) -> Union[int, bytes]: 

391 if isinstance(b, str): 

392 return ord(b) 

393 return b 

394 

395 

396def deprecate(msg: str, stacklevel: int = 3) -> None: 

397 warnings.warn(msg, DeprecationWarning, stacklevel=stacklevel) 

398 

399 

400def deprecation(msg: str) -> None: 

401 raise DeprecationError(msg) 

402 

403 

404def deprecate_with_replacement(old_name: str, new_name: str, removed_in: str) -> None: 

405 """Issue a warning that a feature will be removed, but has a replacement.""" 

406 deprecate( 

407 f"{old_name} is deprecated and will be removed in pypdf {removed_in}. Use {new_name} instead.", 

408 4, 

409 ) 

410 

411 

412def deprecation_with_replacement(old_name: str, new_name: str, removed_in: str) -> None: 

413 """Raise an exception that a feature was already removed, but has a replacement.""" 

414 deprecation( 

415 f"{old_name} is deprecated and was removed in pypdf {removed_in}. Use {new_name} instead." 

416 ) 

417 

418 

419def deprecate_no_replacement(name: str, removed_in: str) -> None: 

420 """Issue a warning that a feature will be removed without replacement.""" 

421 deprecate(f"{name} is deprecated and will be removed in pypdf {removed_in}.", 4) 

422 

423 

424def deprecation_no_replacement(name: str, removed_in: str) -> None: 

425 """Raise an exception that a feature was already removed without replacement.""" 

426 deprecation(f"{name} is deprecated and was removed in pypdf {removed_in}.") 

427 

428 

429def logger_error(msg: str, src: str) -> None: 

430 """ 

431 Use this instead of logger.error directly. 

432 

433 That allows people to overwrite it more easily. 

434 

435 See the docs on when to use which: 

436 https://pypdf.readthedocs.io/en/latest/user/suppress-warnings.html 

437 """ 

438 logging.getLogger(src).error(msg) 

439 

440 

441def logger_warning(msg: str, src: str) -> None: 

442 """ 

443 Use this instead of logger.warning directly. 

444 

445 That allows people to overwrite it more easily. 

446 

447 ## Exception, warnings.warn, logger_warning 

448 - Exceptions should be used if the user should write code that deals with 

449 an error case, e.g. the PDF being completely broken. 

450 - warnings.warn should be used if the user needs to fix their code, e.g. 

451 DeprecationWarnings 

452 - logger_warning should be used if the user needs to know that an issue was 

453 handled by pypdf, e.g. a non-compliant PDF being read in a way that 

454 pypdf could apply a robustness fix to still read it. This applies mainly 

455 to strict=False mode. 

456 """ 

457 logging.getLogger(src).warning(msg) 

458 

459 

460def rename_kwargs( 

461 func_name: str, kwargs: dict[str, Any], aliases: dict[str, str], fail: bool = False 

462) -> None: 

463 """ 

464 Helper function to deprecate arguments. 

465 

466 Args: 

467 func_name: Name of the function to be deprecated 

468 kwargs: 

469 aliases: 

470 fail: 

471 

472 """ 

473 for old_term, new_term in aliases.items(): 

474 if old_term in kwargs: 

475 if fail: 

476 raise DeprecationError( 

477 f"{old_term} is deprecated as an argument. Use {new_term} instead" 

478 ) 

479 if new_term in kwargs: 

480 raise TypeError( 

481 f"{func_name} received both {old_term} and {new_term} as " 

482 f"an argument. {old_term} is deprecated. " 

483 f"Use {new_term} instead." 

484 ) 

485 kwargs[new_term] = kwargs.pop(old_term) 

486 warnings.warn( 

487 message=( 

488 f"{old_term} is deprecated as an argument. Use {new_term} instead" 

489 ), 

490 category=DeprecationWarning, 

491 stacklevel=3, 

492 ) 

493 

494 

495def _human_readable_bytes(bytes: int) -> str: 

496 if bytes < 10**3: 

497 return f"{bytes} Byte" 

498 if bytes < 10**6: 

499 return f"{bytes / 10**3:.1f} kB" 

500 if bytes < 10**9: 

501 return f"{bytes / 10**6:.1f} MB" 

502 return f"{bytes / 10**9:.1f} GB" 

503 

504 

505# The following class has been copied from Django: 

506# https://github.com/django/django/blob/adae619426b6f50046b3daaa744db52989c9d6db/django/utils/functional.py#L51-L65 

507# It received some modifications to comply with our own coding standards. 

508# 

509# Original license: 

510# 

511# --------------------------------------------------------------------------------- 

512# Copyright (c) Django Software Foundation and individual contributors. 

513# All rights reserved. 

514# 

515# Redistribution and use in source and binary forms, with or without modification, 

516# are permitted provided that the following conditions are met: 

517# 

518# 1. Redistributions of source code must retain the above copyright notice, 

519# this list of conditions and the following disclaimer. 

520# 

521# 2. Redistributions in binary form must reproduce the above copyright 

522# notice, this list of conditions and the following disclaimer in the 

523# documentation and/or other materials provided with the distribution. 

524# 

525# 3. Neither the name of Django nor the names of its contributors may be used 

526# to endorse or promote products derived from this software without 

527# specific prior written permission. 

528# 

529# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 

530# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 

531# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

532# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 

533# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 

534# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 

535# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 

536# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 

537# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 

538# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

539# --------------------------------------------------------------------------------- 

540class classproperty: # noqa: N801 

541 """ 

542 Decorator that converts a method with a single cls argument into a property 

543 that can be accessed directly from the class. 

544 """ 

545 

546 def __init__(self, method=None) -> None: # type: ignore # noqa: ANN001 

547 self.fget = method 

548 

549 def __get__(self, instance, cls=None) -> Any: # type: ignore # noqa: ANN001 

550 return self.fget(cls) 

551 

552 def getter(self, method) -> Self: # type: ignore # noqa: ANN001 

553 self.fget = method 

554 return self 

555 

556 

557@dataclass 

558class File: 

559 from .generic import IndirectObject # noqa: PLC0415 

560 

561 name: str = "" 

562 """ 

563 Filename as identified within the PDF file. 

564 """ 

565 data: bytes = b"" 

566 """ 

567 Data as bytes. 

568 """ 

569 indirect_reference: Optional[IndirectObject] = None 

570 """ 

571 Reference to the object storing the stream. 

572 """ 

573 

574 def __str__(self) -> str: 

575 return f"{self.__class__.__name__}(name={self.name}, data: {_human_readable_bytes(len(self.data))})" 

576 

577 def __repr__(self) -> str: 

578 return self.__str__()[:-1] + f", hash: {hash(self.data)})" 

579 

580 

581@functools.total_ordering 

582class Version: 

583 COMPONENT_PATTERN = re.compile(r"^(\d+)(.*)$") 

584 

585 def __init__(self, version_str: str) -> None: 

586 self.version_str = version_str 

587 self.components = self._parse_version(version_str) 

588 

589 def _parse_version(self, version_str: str) -> list[tuple[int, str]]: 

590 components = version_str.split(".") 

591 parsed_components = [] 

592 for component in components: 

593 match = Version.COMPONENT_PATTERN.match(component) 

594 if not match: 

595 parsed_components.append((0, component)) 

596 continue 

597 integer_prefix = match.group(1) 

598 suffix = match.group(2) 

599 if integer_prefix is None: 

600 integer_prefix = 0 

601 parsed_components.append((int(integer_prefix), suffix)) 

602 return parsed_components 

603 

604 def __eq__(self, other: object) -> bool: 

605 if not isinstance(other, Version): 

606 return False 

607 return self.components == other.components 

608 

609 def __hash__(self) -> int: 

610 # Convert to tuple as lists cannot be hashed. 

611 return hash((self.__class__, tuple(self.components))) 

612 

613 def __lt__(self, other: Any) -> bool: 

614 if not isinstance(other, Version): 

615 raise ValueError(f"Version cannot be compared against {type(other)}") 

616 

617 for self_component, other_component in zip(self.components, other.components): 

618 self_value, self_suffix = self_component 

619 other_value, other_suffix = other_component 

620 

621 if self_value < other_value: 

622 return True 

623 if self_value > other_value: 

624 return False 

625 

626 if self_suffix < other_suffix: 

627 return True 

628 if self_suffix > other_suffix: 

629 return False 

630 

631 return len(self.components) < len(other.components)