Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_utils.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

265 statements  

1# Copyright (c) 2006, Mathieu Fenniak 

2# All rights reserved. 

3# 

4# Redistribution and use in source and binary forms, with or without 

5# modification, are permitted provided that the following conditions are 

6# met: 

7# 

8# * Redistributions of source code must retain the above copyright notice, 

9# this list of conditions and the following disclaimer. 

10# * Redistributions in binary form must reproduce the above copyright notice, 

11# this list of conditions and the following disclaimer in the documentation 

12# and/or other materials provided with the distribution. 

13# * The name of the author may not be used to endorse or promote products 

14# derived from this software without specific prior written permission. 

15# 

16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 

19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 

20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 

22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 

23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 

24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 

25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 

26# POSSIBILITY OF SUCH DAMAGE. 

27 

28"""Utility functions for PDF library.""" 

29__author__ = "Mathieu Fenniak" 

30__author_email__ = "biziqe@mathieu.fenniak.net" 

31 

32import functools 

33import logging 

34import re 

35import sys 

36import warnings 

37from dataclasses import dataclass 

38from datetime import datetime, timezone 

39from io import DEFAULT_BUFFER_SIZE 

40from os import SEEK_CUR 

41from typing import ( 

42 IO, 

43 Any, 

44 Dict, 

45 List, 

46 Optional, 

47 Pattern, 

48 Tuple, 

49 Union, 

50 overload, 

51) 

52 

53if sys.version_info[:2] >= (3, 10): 

54 # Python 3.10+: https://www.python.org/dev/peps/pep-0484/ 

55 from typing import TypeAlias 

56else: 

57 from typing_extensions import TypeAlias 

58 

59if sys.version_info >= (3, 11): 

60 from typing import Self 

61else: 

62 from typing_extensions import Self 

63 

64from .errors import ( 

65 STREAM_TRUNCATED_PREMATURELY, 

66 DeprecationError, 

67 PdfStreamError, 

68) 

69 

70TransformationMatrixType: TypeAlias = Tuple[ 

71 Tuple[float, float, float], Tuple[float, float, float], Tuple[float, float, float] 

72] 

73CompressedTransformationMatrix: TypeAlias = Tuple[ 

74 float, float, float, float, float, float 

75] 

76 

77StreamType = IO[Any] 

78StrByteType = Union[str, StreamType] 

79 

80 

81def parse_iso8824_date(text: Optional[str]) -> Optional[datetime]: 

82 orgtext = text 

83 if text is None: 

84 return None 

85 if text[0].isdigit(): 

86 text = "D:" + text 

87 if text.endswith(("Z", "z")): 

88 text += "0000" 

89 text = text.replace("z", "+").replace("Z", "+").replace("'", "") 

90 i = max(text.find("+"), text.find("-")) 

91 if i > 0 and i != len(text) - 5: 

92 text += "00" 

93 for f in ( 

94 "D:%Y", 

95 "D:%Y%m", 

96 "D:%Y%m%d", 

97 "D:%Y%m%d%H", 

98 "D:%Y%m%d%H%M", 

99 "D:%Y%m%d%H%M%S", 

100 "D:%Y%m%d%H%M%S%z", 

101 ): 

102 try: 

103 d = datetime.strptime(text, f) # noqa: DTZ007 

104 except ValueError: 

105 continue 

106 else: 

107 if text.endswith("+0000"): 

108 d = d.replace(tzinfo=timezone.utc) 

109 return d 

110 raise ValueError(f"Can not convert date: {orgtext}") 

111 

112 

113def format_iso8824_date(dt: datetime) -> str: 

114 """ 

115 Convert a datetime object to PDF date string format. 

116 

117 Converts datetime to the PDF date format D:YYYYMMDDHHmmSSOHH'mm 

118 as specified in the PDF Reference. 

119 

120 Args: 

121 dt: A datetime object to convert. 

122 

123 Returns: 

124 A date string in PDF format. 

125 """ 

126 date_str = dt.strftime("D:%Y%m%d%H%M%S") 

127 if dt.tzinfo is not None: 

128 offset = dt.utcoffset() 

129 assert offset is not None 

130 total_seconds = int(offset.total_seconds()) 

131 hours, remainder = divmod(abs(total_seconds), 3600) 

132 minutes = remainder // 60 

133 sign = "+" if total_seconds >= 0 else "-" 

134 date_str += f"{sign}{hours:02d}'{minutes:02d}'" 

135 return date_str 

136 

137 

138def _get_max_pdf_version_header(header1: str, header2: str) -> str: 

139 versions = ( 

140 "%PDF-1.3", 

141 "%PDF-1.4", 

142 "%PDF-1.5", 

143 "%PDF-1.6", 

144 "%PDF-1.7", 

145 "%PDF-2.0", 

146 ) 

147 pdf_header_indices = [] 

148 if header1 in versions: 

149 pdf_header_indices.append(versions.index(header1)) 

150 if header2 in versions: 

151 pdf_header_indices.append(versions.index(header2)) 

152 if len(pdf_header_indices) == 0: 

153 raise ValueError(f"Neither {header1!r} nor {header2!r} are proper headers") 

154 return versions[max(pdf_header_indices)] 

155 

156 

157WHITESPACES = (b"\x00", b"\t", b"\n", b"\f", b"\r", b" ") 

158WHITESPACES_AS_BYTES = b"".join(WHITESPACES) 

159WHITESPACES_AS_REGEXP = b"[" + WHITESPACES_AS_BYTES + b"]" 

160 

161 

162def read_until_whitespace(stream: StreamType, maxchars: Optional[int] = None) -> bytes: 

163 """ 

164 Read non-whitespace characters and return them. 

165 

166 Stops upon encountering whitespace or when maxchars is reached. 

167 

168 Args: 

169 stream: The data stream from which was read. 

170 maxchars: The maximum number of bytes returned; by default unlimited. 

171 

172 Returns: 

173 The data which was read. 

174 

175 """ 

176 txt = b"" 

177 while True: 

178 tok = stream.read(1) 

179 if tok.isspace() or not tok: 

180 break 

181 txt += tok 

182 if len(txt) == maxchars: 

183 break 

184 return txt 

185 

186 

187def read_non_whitespace(stream: StreamType) -> bytes: 

188 """ 

189 Find and read the next non-whitespace character (ignores whitespace). 

190 

191 Args: 

192 stream: The data stream from which was read. 

193 

194 Returns: 

195 The data which was read. 

196 

197 """ 

198 tok = stream.read(1) 

199 while tok in WHITESPACES: 

200 tok = stream.read(1) 

201 return tok 

202 

203 

204def skip_over_whitespace(stream: StreamType) -> bool: 

205 """ 

206 Similar to read_non_whitespace, but return a boolean if at least one 

207 whitespace character was read. 

208 

209 Args: 

210 stream: The data stream from which was read. 

211 

212 Returns: 

213 True if one or more whitespace was skipped, otherwise return False. 

214 

215 """ 

216 tok = stream.read(1) 

217 cnt = 0 

218 while tok in WHITESPACES: 

219 cnt += 1 

220 tok = stream.read(1) 

221 return cnt > 0 

222 

223 

224def check_if_whitespace_only(value: bytes) -> bool: 

225 """ 

226 Check if the given value consists of whitespace characters only. 

227 

228 Args: 

229 value: The bytes to check. 

230 

231 Returns: 

232 True if the value only has whitespace characters, otherwise return False. 

233 

234 """ 

235 return all(b in WHITESPACES_AS_BYTES for b in value) 

236 

237 

238def skip_over_comment(stream: StreamType) -> None: 

239 tok = stream.read(1) 

240 stream.seek(-1, 1) 

241 if tok == b"%": 

242 while tok not in (b"\n", b"\r"): 

243 tok = stream.read(1) 

244 if tok == b"": 

245 raise PdfStreamError("File ended unexpectedly.") 

246 

247 

248def read_until_regex(stream: StreamType, regex: Pattern[bytes]) -> bytes: 

249 """ 

250 Read until the regular expression pattern matched (ignore the match). 

251 Treats EOF on the underlying stream as the end of the token to be matched. 

252 

253 Args: 

254 regex: re.Pattern 

255 

256 Returns: 

257 The read bytes. 

258 

259 """ 

260 name = b"" 

261 while True: 

262 tok = stream.read(16) 

263 if not tok: 

264 return name 

265 m = regex.search(name + tok) 

266 if m is not None: 

267 stream.seek(m.start() - (len(name) + len(tok)), 1) 

268 name = (name + tok)[: m.start()] 

269 break 

270 name += tok 

271 return name 

272 

273 

274def read_block_backwards(stream: StreamType, to_read: int) -> bytes: 

275 """ 

276 Given a stream at position X, read a block of size to_read ending at position X. 

277 

278 This changes the stream's position to the beginning of where the block was 

279 read. 

280 

281 Args: 

282 stream: 

283 to_read: 

284 

285 Returns: 

286 The data which was read. 

287 

288 """ 

289 if stream.tell() < to_read: 

290 raise PdfStreamError("Could not read malformed PDF file") 

291 # Seek to the start of the block we want to read. 

292 stream.seek(-to_read, SEEK_CUR) 

293 read = stream.read(to_read) 

294 # Seek to the start of the block we read after reading it. 

295 stream.seek(-to_read, SEEK_CUR) 

296 return read 

297 

298 

299def read_previous_line(stream: StreamType) -> bytes: 

300 """ 

301 Given a byte stream with current position X, return the previous line. 

302 

303 All characters between the first CR/LF byte found before X 

304 (or, the start of the file, if no such byte is found) and position X 

305 After this call, the stream will be positioned one byte after the 

306 first non-CRLF character found beyond the first CR/LF byte before X, 

307 or, if no such byte is found, at the beginning of the stream. 

308 

309 Args: 

310 stream: StreamType: 

311 

312 Returns: 

313 The data which was read. 

314 

315 """ 

316 line_content = [] 

317 found_crlf = False 

318 if stream.tell() == 0: 

319 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) 

320 while True: 

321 to_read = min(DEFAULT_BUFFER_SIZE, stream.tell()) 

322 if to_read == 0: 

323 break 

324 # Read the block. After this, our stream will be one 

325 # beyond the initial position. 

326 block = read_block_backwards(stream, to_read) 

327 idx = len(block) - 1 

328 if not found_crlf: 

329 # We haven't found our first CR/LF yet. 

330 # Read off characters until we hit one. 

331 while idx >= 0 and block[idx] not in b"\r\n": 

332 idx -= 1 

333 if idx >= 0: 

334 found_crlf = True 

335 if found_crlf: 

336 # We found our first CR/LF already (on this block or 

337 # a previous one). 

338 # Our combined line is the remainder of the block 

339 # plus any previously read blocks. 

340 line_content.append(block[idx + 1 :]) 

341 # Continue to read off any more CRLF characters. 

342 while idx >= 0 and block[idx] in b"\r\n": 

343 idx -= 1 

344 else: 

345 # Didn't find CR/LF yet - add this block to our 

346 # previously read blocks and continue. 

347 line_content.append(block) 

348 if idx >= 0: 

349 # We found the next non-CRLF character. 

350 # Set the stream position correctly, then break 

351 stream.seek(idx + 1, SEEK_CUR) 

352 break 

353 # Join all the blocks in the line (which are in reverse order) 

354 return b"".join(line_content[::-1]) 

355 

356 

357def matrix_multiply( 

358 a: TransformationMatrixType, b: TransformationMatrixType 

359) -> TransformationMatrixType: 

360 return tuple( # type: ignore[return-value] 

361 tuple(sum(float(i) * float(j) for i, j in zip(row, col)) for col in zip(*b)) 

362 for row in a 

363 ) 

364 

365 

366def mark_location(stream: StreamType) -> None: 

367 """Create text file showing current location in context.""" 

368 # Mainly for debugging 

369 radius = 5000 

370 stream.seek(-radius, 1) 

371 with open("pypdf_pdfLocation.txt", "wb") as output_fh: 

372 output_fh.write(stream.read(radius)) 

373 output_fh.write(b"HERE") 

374 output_fh.write(stream.read(radius)) 

375 stream.seek(-radius, 1) 

376 

377 

378@overload 

379def ord_(b: str) -> int: 

380 ... 

381 

382 

383@overload 

384def ord_(b: bytes) -> bytes: 

385 ... 

386 

387 

388@overload 

389def ord_(b: int) -> int: 

390 ... 

391 

392 

393def ord_(b: Union[int, str, bytes]) -> Union[int, bytes]: 

394 if isinstance(b, str): 

395 return ord(b) 

396 return b 

397 

398 

399def deprecate(msg: str, stacklevel: int = 3) -> None: 

400 warnings.warn(msg, DeprecationWarning, stacklevel=stacklevel) 

401 

402 

403def deprecation(msg: str) -> None: 

404 raise DeprecationError(msg) 

405 

406 

407def deprecate_with_replacement(old_name: str, new_name: str, removed_in: str) -> None: 

408 """Issue a warning that a feature will be removed, but has a replacement.""" 

409 deprecate( 

410 f"{old_name} is deprecated and will be removed in pypdf {removed_in}. Use {new_name} instead.", 

411 4, 

412 ) 

413 

414 

415def deprecation_with_replacement(old_name: str, new_name: str, removed_in: str) -> None: 

416 """Raise an exception that a feature was already removed, but has a replacement.""" 

417 deprecation( 

418 f"{old_name} is deprecated and was removed in pypdf {removed_in}. Use {new_name} instead." 

419 ) 

420 

421 

422def deprecate_no_replacement(name: str, removed_in: str) -> None: 

423 """Issue a warning that a feature will be removed without replacement.""" 

424 deprecate(f"{name} is deprecated and will be removed in pypdf {removed_in}.", 4) 

425 

426 

427def deprecation_no_replacement(name: str, removed_in: str) -> None: 

428 """Raise an exception that a feature was already removed without replacement.""" 

429 deprecation(f"{name} is deprecated and was removed in pypdf {removed_in}.") 

430 

431 

432def logger_error(msg: str, src: str) -> None: 

433 """ 

434 Use this instead of logger.error directly. 

435 

436 That allows people to overwrite it more easily. 

437 

438 See the docs on when to use which: 

439 https://pypdf.readthedocs.io/en/latest/user/suppress-warnings.html 

440 """ 

441 logging.getLogger(src).error(msg) 

442 

443 

444def logger_warning(msg: str, src: str) -> None: 

445 """ 

446 Use this instead of logger.warning directly. 

447 

448 That allows people to overwrite it more easily. 

449 

450 ## Exception, warnings.warn, logger_warning 

451 - Exceptions should be used if the user should write code that deals with 

452 an error case, e.g. the PDF being completely broken. 

453 - warnings.warn should be used if the user needs to fix their code, e.g. 

454 DeprecationWarnings 

455 - logger_warning should be used if the user needs to know that an issue was 

456 handled by pypdf, e.g. a non-compliant PDF being read in a way that 

457 pypdf could apply a robustness fix to still read it. This applies mainly 

458 to strict=False mode. 

459 """ 

460 logging.getLogger(src).warning(msg) 

461 

462 

463def rename_kwargs( 

464 func_name: str, kwargs: Dict[str, Any], aliases: Dict[str, str], fail: bool = False 

465) -> None: 

466 """ 

467 Helper function to deprecate arguments. 

468 

469 Args: 

470 func_name: Name of the function to be deprecated 

471 kwargs: 

472 aliases: 

473 fail: 

474 

475 """ 

476 for old_term, new_term in aliases.items(): 

477 if old_term in kwargs: 

478 if fail: 

479 raise DeprecationError( 

480 f"{old_term} is deprecated as an argument. Use {new_term} instead" 

481 ) 

482 if new_term in kwargs: 

483 raise TypeError( 

484 f"{func_name} received both {old_term} and {new_term} as " 

485 f"an argument. {old_term} is deprecated. " 

486 f"Use {new_term} instead." 

487 ) 

488 kwargs[new_term] = kwargs.pop(old_term) 

489 warnings.warn( 

490 message=( 

491 f"{old_term} is deprecated as an argument. Use {new_term} instead" 

492 ), 

493 category=DeprecationWarning, 

494 stacklevel=3, 

495 ) 

496 

497 

498def _human_readable_bytes(bytes: int) -> str: 

499 if bytes < 10**3: 

500 return f"{bytes} Byte" 

501 if bytes < 10**6: 

502 return f"{bytes / 10**3:.1f} kB" 

503 if bytes < 10**9: 

504 return f"{bytes / 10**6:.1f} MB" 

505 return f"{bytes / 10**9:.1f} GB" 

506 

507 

508# The following class has been copied from Django: 

509# https://github.com/django/django/blob/adae619426b6f50046b3daaa744db52989c9d6db/django/utils/functional.py#L51-L65 

510# It received some modifications to comply with our own coding standards. 

511# 

512# Original license: 

513# 

514# --------------------------------------------------------------------------------- 

515# Copyright (c) Django Software Foundation and individual contributors. 

516# All rights reserved. 

517# 

518# Redistribution and use in source and binary forms, with or without modification, 

519# are permitted provided that the following conditions are met: 

520# 

521# 1. Redistributions of source code must retain the above copyright notice, 

522# this list of conditions and the following disclaimer. 

523# 

524# 2. Redistributions in binary form must reproduce the above copyright 

525# notice, this list of conditions and the following disclaimer in the 

526# documentation and/or other materials provided with the distribution. 

527# 

528# 3. Neither the name of Django nor the names of its contributors may be used 

529# to endorse or promote products derived from this software without 

530# specific prior written permission. 

531# 

532# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 

533# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 

534# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

535# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 

536# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 

537# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 

538# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 

539# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 

540# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 

541# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

542# --------------------------------------------------------------------------------- 

543class classproperty: # noqa: N801 

544 """ 

545 Decorator that converts a method with a single cls argument into a property 

546 that can be accessed directly from the class. 

547 """ 

548 

549 def __init__(self, method=None) -> None: # type: ignore # noqa: ANN001 

550 self.fget = method 

551 

552 def __get__(self, instance, cls=None) -> Any: # type: ignore # noqa: ANN001 

553 return self.fget(cls) 

554 

555 def getter(self, method) -> Self: # type: ignore # noqa: ANN001 

556 self.fget = method 

557 return self 

558 

559 

560@dataclass 

561class File: 

562 from .generic import IndirectObject # noqa: PLC0415 

563 

564 name: str = "" 

565 """ 

566 Filename as identified within the PDF file. 

567 """ 

568 data: bytes = b"" 

569 """ 

570 Data as bytes. 

571 """ 

572 indirect_reference: Optional[IndirectObject] = None 

573 """ 

574 Reference to the object storing the stream. 

575 """ 

576 

577 def __str__(self) -> str: 

578 return f"{self.__class__.__name__}(name={self.name}, data: {_human_readable_bytes(len(self.data))})" 

579 

580 def __repr__(self) -> str: 

581 return self.__str__()[:-1] + f", hash: {hash(self.data)})" 

582 

583 

584@functools.total_ordering 

585class Version: 

586 COMPONENT_PATTERN = re.compile(r"^(\d+)(.*)$") 

587 

588 def __init__(self, version_str: str) -> None: 

589 self.version_str = version_str 

590 self.components = self._parse_version(version_str) 

591 

592 def _parse_version(self, version_str: str) -> List[Tuple[int, str]]: 

593 components = version_str.split(".") 

594 parsed_components = [] 

595 for component in components: 

596 match = Version.COMPONENT_PATTERN.match(component) 

597 if not match: 

598 parsed_components.append((0, component)) 

599 continue 

600 integer_prefix = match.group(1) 

601 suffix = match.group(2) 

602 if integer_prefix is None: 

603 integer_prefix = 0 

604 parsed_components.append((int(integer_prefix), suffix)) 

605 return parsed_components 

606 

607 def __eq__(self, other: object) -> bool: 

608 if not isinstance(other, Version): 

609 return False 

610 return self.components == other.components 

611 

612 def __hash__(self) -> int: 

613 # Convert to tuple as lists cannot be hashed. 

614 return hash((self.__class__, tuple(self.components))) 

615 

616 def __lt__(self, other: Any) -> bool: 

617 if not isinstance(other, Version): 

618 raise ValueError(f"Version cannot be compared against {type(other)}") 

619 

620 for self_component, other_component in zip(self.components, other.components): 

621 self_value, self_suffix = self_component 

622 other_value, other_suffix = other_component 

623 

624 if self_value < other_value: 

625 return True 

626 if self_value > other_value: 

627 return False 

628 

629 if self_suffix < other_suffix: 

630 return True 

631 if self_suffix > other_suffix: 

632 return False 

633 

634 return len(self.components) < len(other.components)