Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_utils.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

254 statements  

1# Copyright (c) 2006, Mathieu Fenniak 

2# All rights reserved. 

3# 

4# Redistribution and use in source and binary forms, with or without 

5# modification, are permitted provided that the following conditions are 

6# met: 

7# 

8# * Redistributions of source code must retain the above copyright notice, 

9# this list of conditions and the following disclaimer. 

10# * Redistributions in binary form must reproduce the above copyright notice, 

11# this list of conditions and the following disclaimer in the documentation 

12# and/or other materials provided with the distribution. 

13# * The name of the author may not be used to endorse or promote products 

14# derived from this software without specific prior written permission. 

15# 

16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 

19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 

20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 

22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 

23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 

24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 

25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 

26# POSSIBILITY OF SUCH DAMAGE. 

27 

28"""Utility functions for PDF library.""" 

29__author__ = "Mathieu Fenniak" 

30__author_email__ = "biziqe@mathieu.fenniak.net" 

31 

32import functools 

33import logging 

34import re 

35import sys 

36import warnings 

37from dataclasses import dataclass 

38from datetime import datetime, timezone 

39from io import DEFAULT_BUFFER_SIZE 

40from os import SEEK_CUR 

41from typing import ( 

42 IO, 

43 Any, 

44 Dict, 

45 List, 

46 Optional, 

47 Pattern, 

48 Tuple, 

49 Union, 

50 overload, 

51) 

52 

53if sys.version_info[:2] >= (3, 10): 

54 # Python 3.10+: https://www.python.org/dev/peps/pep-0484/ 

55 from typing import TypeAlias 

56else: 

57 from typing_extensions import TypeAlias 

58 

59if sys.version_info >= (3, 11): 

60 from typing import Self 

61else: 

62 from typing_extensions import Self 

63 

64from .errors import ( 

65 STREAM_TRUNCATED_PREMATURELY, 

66 DeprecationError, 

67 PdfStreamError, 

68) 

69 

70TransformationMatrixType: TypeAlias = Tuple[ 

71 Tuple[float, float, float], Tuple[float, float, float], Tuple[float, float, float] 

72] 

73CompressedTransformationMatrix: TypeAlias = Tuple[ 

74 float, float, float, float, float, float 

75] 

76 

77StreamType = IO[Any] 

78StrByteType = Union[str, StreamType] 

79 

80 

81def parse_iso8824_date(text: Optional[str]) -> Optional[datetime]: 

82 orgtext = text 

83 if text is None: 

84 return None 

85 if text[0].isdigit(): 

86 text = "D:" + text 

87 if text.endswith(("Z", "z")): 

88 text += "0000" 

89 text = text.replace("z", "+").replace("Z", "+").replace("'", "") 

90 i = max(text.find("+"), text.find("-")) 

91 if i > 0 and i != len(text) - 5: 

92 text += "00" 

93 for f in ( 

94 "D:%Y", 

95 "D:%Y%m", 

96 "D:%Y%m%d", 

97 "D:%Y%m%d%H", 

98 "D:%Y%m%d%H%M", 

99 "D:%Y%m%d%H%M%S", 

100 "D:%Y%m%d%H%M%S%z", 

101 ): 

102 try: 

103 d = datetime.strptime(text, f) # noqa: DTZ007 

104 except ValueError: 

105 continue 

106 else: 

107 if text.endswith("+0000"): 

108 d = d.replace(tzinfo=timezone.utc) 

109 return d 

110 raise ValueError(f"Can not convert date: {orgtext}") 

111 

112 

113def _get_max_pdf_version_header(header1: str, header2: str) -> str: 

114 versions = ( 

115 "%PDF-1.3", 

116 "%PDF-1.4", 

117 "%PDF-1.5", 

118 "%PDF-1.6", 

119 "%PDF-1.7", 

120 "%PDF-2.0", 

121 ) 

122 pdf_header_indices = [] 

123 if header1 in versions: 

124 pdf_header_indices.append(versions.index(header1)) 

125 if header2 in versions: 

126 pdf_header_indices.append(versions.index(header2)) 

127 if len(pdf_header_indices) == 0: 

128 raise ValueError(f"Neither {header1!r} nor {header2!r} are proper headers") 

129 return versions[max(pdf_header_indices)] 

130 

131 

132WHITESPACES = (b"\x00", b"\t", b"\n", b"\f", b"\r", b" ") 

133WHITESPACES_AS_BYTES = b"".join(WHITESPACES) 

134WHITESPACES_AS_REGEXP = b"[" + WHITESPACES_AS_BYTES + b"]" 

135 

136 

137def read_until_whitespace(stream: StreamType, maxchars: Optional[int] = None) -> bytes: 

138 """ 

139 Read non-whitespace characters and return them. 

140 

141 Stops upon encountering whitespace or when maxchars is reached. 

142 

143 Args: 

144 stream: The data stream from which was read. 

145 maxchars: The maximum number of bytes returned; by default unlimited. 

146 

147 Returns: 

148 The data which was read. 

149 

150 """ 

151 txt = b"" 

152 while True: 

153 tok = stream.read(1) 

154 if tok.isspace() or not tok: 

155 break 

156 txt += tok 

157 if len(txt) == maxchars: 

158 break 

159 return txt 

160 

161 

162def read_non_whitespace(stream: StreamType) -> bytes: 

163 """ 

164 Find and read the next non-whitespace character (ignores whitespace). 

165 

166 Args: 

167 stream: The data stream from which was read. 

168 

169 Returns: 

170 The data which was read. 

171 

172 """ 

173 tok = stream.read(1) 

174 while tok in WHITESPACES: 

175 tok = stream.read(1) 

176 return tok 

177 

178 

179def skip_over_whitespace(stream: StreamType) -> bool: 

180 """ 

181 Similar to read_non_whitespace, but return a boolean if at least one 

182 whitespace character was read. 

183 

184 Args: 

185 stream: The data stream from which was read. 

186 

187 Returns: 

188 True if one or more whitespace was skipped, otherwise return False. 

189 

190 """ 

191 tok = stream.read(1) 

192 cnt = 0 

193 while tok in WHITESPACES: 

194 cnt += 1 

195 tok = stream.read(1) 

196 return cnt > 0 

197 

198 

199def check_if_whitespace_only(value: bytes) -> bool: 

200 """ 

201 Check if the given value consists of whitespace characters only. 

202 

203 Args: 

204 value: The bytes to check. 

205 

206 Returns: 

207 True if the value only has whitespace characters, otherwise return False. 

208 

209 """ 

210 return all(b in WHITESPACES_AS_BYTES for b in value) 

211 

212 

213def skip_over_comment(stream: StreamType) -> None: 

214 tok = stream.read(1) 

215 stream.seek(-1, 1) 

216 if tok == b"%": 

217 while tok not in (b"\n", b"\r"): 

218 tok = stream.read(1) 

219 if tok == b"": 

220 raise PdfStreamError("File ended unexpectedly.") 

221 

222 

223def read_until_regex(stream: StreamType, regex: Pattern[bytes]) -> bytes: 

224 """ 

225 Read until the regular expression pattern matched (ignore the match). 

226 Treats EOF on the underlying stream as the end of the token to be matched. 

227 

228 Args: 

229 regex: re.Pattern 

230 

231 Returns: 

232 The read bytes. 

233 

234 """ 

235 name = b"" 

236 while True: 

237 tok = stream.read(16) 

238 if not tok: 

239 return name 

240 m = regex.search(name + tok) 

241 if m is not None: 

242 stream.seek(m.start() - (len(name) + len(tok)), 1) 

243 name = (name + tok)[: m.start()] 

244 break 

245 name += tok 

246 return name 

247 

248 

249def read_block_backwards(stream: StreamType, to_read: int) -> bytes: 

250 """ 

251 Given a stream at position X, read a block of size to_read ending at position X. 

252 

253 This changes the stream's position to the beginning of where the block was 

254 read. 

255 

256 Args: 

257 stream: 

258 to_read: 

259 

260 Returns: 

261 The data which was read. 

262 

263 """ 

264 if stream.tell() < to_read: 

265 raise PdfStreamError("Could not read malformed PDF file") 

266 # Seek to the start of the block we want to read. 

267 stream.seek(-to_read, SEEK_CUR) 

268 read = stream.read(to_read) 

269 # Seek to the start of the block we read after reading it. 

270 stream.seek(-to_read, SEEK_CUR) 

271 return read 

272 

273 

274def read_previous_line(stream: StreamType) -> bytes: 

275 """ 

276 Given a byte stream with current position X, return the previous line. 

277 

278 All characters between the first CR/LF byte found before X 

279 (or, the start of the file, if no such byte is found) and position X 

280 After this call, the stream will be positioned one byte after the 

281 first non-CRLF character found beyond the first CR/LF byte before X, 

282 or, if no such byte is found, at the beginning of the stream. 

283 

284 Args: 

285 stream: StreamType: 

286 

287 Returns: 

288 The data which was read. 

289 

290 """ 

291 line_content = [] 

292 found_crlf = False 

293 if stream.tell() == 0: 

294 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) 

295 while True: 

296 to_read = min(DEFAULT_BUFFER_SIZE, stream.tell()) 

297 if to_read == 0: 

298 break 

299 # Read the block. After this, our stream will be one 

300 # beyond the initial position. 

301 block = read_block_backwards(stream, to_read) 

302 idx = len(block) - 1 

303 if not found_crlf: 

304 # We haven't found our first CR/LF yet. 

305 # Read off characters until we hit one. 

306 while idx >= 0 and block[idx] not in b"\r\n": 

307 idx -= 1 

308 if idx >= 0: 

309 found_crlf = True 

310 if found_crlf: 

311 # We found our first CR/LF already (on this block or 

312 # a previous one). 

313 # Our combined line is the remainder of the block 

314 # plus any previously read blocks. 

315 line_content.append(block[idx + 1 :]) 

316 # Continue to read off any more CRLF characters. 

317 while idx >= 0 and block[idx] in b"\r\n": 

318 idx -= 1 

319 else: 

320 # Didn't find CR/LF yet - add this block to our 

321 # previously read blocks and continue. 

322 line_content.append(block) 

323 if idx >= 0: 

324 # We found the next non-CRLF character. 

325 # Set the stream position correctly, then break 

326 stream.seek(idx + 1, SEEK_CUR) 

327 break 

328 # Join all the blocks in the line (which are in reverse order) 

329 return b"".join(line_content[::-1]) 

330 

331 

332def matrix_multiply( 

333 a: TransformationMatrixType, b: TransformationMatrixType 

334) -> TransformationMatrixType: 

335 return tuple( # type: ignore[return-value] 

336 tuple(sum(float(i) * float(j) for i, j in zip(row, col)) for col in zip(*b)) 

337 for row in a 

338 ) 

339 

340 

341def mark_location(stream: StreamType) -> None: 

342 """Create text file showing current location in context.""" 

343 # Mainly for debugging 

344 radius = 5000 

345 stream.seek(-radius, 1) 

346 with open("pypdf_pdfLocation.txt", "wb") as output_fh: 

347 output_fh.write(stream.read(radius)) 

348 output_fh.write(b"HERE") 

349 output_fh.write(stream.read(radius)) 

350 stream.seek(-radius, 1) 

351 

352 

353@overload 

354def ord_(b: str) -> int: 

355 ... 

356 

357 

358@overload 

359def ord_(b: bytes) -> bytes: 

360 ... 

361 

362 

363@overload 

364def ord_(b: int) -> int: 

365 ... 

366 

367 

368def ord_(b: Union[int, str, bytes]) -> Union[int, bytes]: 

369 if isinstance(b, str): 

370 return ord(b) 

371 return b 

372 

373 

374def deprecate(msg: str, stacklevel: int = 3) -> None: 

375 warnings.warn(msg, DeprecationWarning, stacklevel=stacklevel) 

376 

377 

378def deprecation(msg: str) -> None: 

379 raise DeprecationError(msg) 

380 

381 

382def deprecate_with_replacement(old_name: str, new_name: str, removed_in: str) -> None: 

383 """Issue a warning that a feature will be removed, but has a replacement.""" 

384 deprecate( 

385 f"{old_name} is deprecated and will be removed in pypdf {removed_in}. Use {new_name} instead.", 

386 4, 

387 ) 

388 

389 

390def deprecation_with_replacement(old_name: str, new_name: str, removed_in: str) -> None: 

391 """Raise an exception that a feature was already removed, but has a replacement.""" 

392 deprecation( 

393 f"{old_name} is deprecated and was removed in pypdf {removed_in}. Use {new_name} instead." 

394 ) 

395 

396 

397def deprecate_no_replacement(name: str, removed_in: str) -> None: 

398 """Issue a warning that a feature will be removed without replacement.""" 

399 deprecate(f"{name} is deprecated and will be removed in pypdf {removed_in}.", 4) 

400 

401 

402def deprecation_no_replacement(name: str, removed_in: str) -> None: 

403 """Raise an exception that a feature was already removed without replacement.""" 

404 deprecation(f"{name} is deprecated and was removed in pypdf {removed_in}.") 

405 

406 

407def logger_error(msg: str, src: str) -> None: 

408 """ 

409 Use this instead of logger.error directly. 

410 

411 That allows people to overwrite it more easily. 

412 

413 See the docs on when to use which: 

414 https://pypdf.readthedocs.io/en/latest/user/suppress-warnings.html 

415 """ 

416 logging.getLogger(src).error(msg) 

417 

418 

419def logger_warning(msg: str, src: str) -> None: 

420 """ 

421 Use this instead of logger.warning directly. 

422 

423 That allows people to overwrite it more easily. 

424 

425 ## Exception, warnings.warn, logger_warning 

426 - Exceptions should be used if the user should write code that deals with 

427 an error case, e.g. the PDF being completely broken. 

428 - warnings.warn should be used if the user needs to fix their code, e.g. 

429 DeprecationWarnings 

430 - logger_warning should be used if the user needs to know that an issue was 

431 handled by pypdf, e.g. a non-compliant PDF being read in a way that 

432 pypdf could apply a robustness fix to still read it. This applies mainly 

433 to strict=False mode. 

434 """ 

435 logging.getLogger(src).warning(msg) 

436 

437 

438def rename_kwargs( 

439 func_name: str, kwargs: Dict[str, Any], aliases: Dict[str, str], fail: bool = False 

440) -> None: 

441 """ 

442 Helper function to deprecate arguments. 

443 

444 Args: 

445 func_name: Name of the function to be deprecated 

446 kwargs: 

447 aliases: 

448 fail: 

449 

450 """ 

451 for old_term, new_term in aliases.items(): 

452 if old_term in kwargs: 

453 if fail: 

454 raise DeprecationError( 

455 f"{old_term} is deprecated as an argument. Use {new_term} instead" 

456 ) 

457 if new_term in kwargs: 

458 raise TypeError( 

459 f"{func_name} received both {old_term} and {new_term} as " 

460 f"an argument. {old_term} is deprecated. " 

461 f"Use {new_term} instead." 

462 ) 

463 kwargs[new_term] = kwargs.pop(old_term) 

464 warnings.warn( 

465 message=( 

466 f"{old_term} is deprecated as an argument. Use {new_term} instead" 

467 ), 

468 category=DeprecationWarning, 

469 stacklevel=3, 

470 ) 

471 

472 

473def _human_readable_bytes(bytes: int) -> str: 

474 if bytes < 10**3: 

475 return f"{bytes} Byte" 

476 if bytes < 10**6: 

477 return f"{bytes / 10**3:.1f} kB" 

478 if bytes < 10**9: 

479 return f"{bytes / 10**6:.1f} MB" 

480 return f"{bytes / 10**9:.1f} GB" 

481 

482 

483# The following class has been copied from Django: 

484# https://github.com/django/django/blob/adae619426b6f50046b3daaa744db52989c9d6db/django/utils/functional.py#L51-L65 

485# It received some modifications to comply with our own coding standards. 

486# 

487# Original license: 

488# 

489# --------------------------------------------------------------------------------- 

490# Copyright (c) Django Software Foundation and individual contributors. 

491# All rights reserved. 

492# 

493# Redistribution and use in source and binary forms, with or without modification, 

494# are permitted provided that the following conditions are met: 

495# 

496# 1. Redistributions of source code must retain the above copyright notice, 

497# this list of conditions and the following disclaimer. 

498# 

499# 2. Redistributions in binary form must reproduce the above copyright 

500# notice, this list of conditions and the following disclaimer in the 

501# documentation and/or other materials provided with the distribution. 

502# 

503# 3. Neither the name of Django nor the names of its contributors may be used 

504# to endorse or promote products derived from this software without 

505# specific prior written permission. 

506# 

507# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 

508# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 

509# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

510# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 

511# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 

512# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 

513# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 

514# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 

515# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 

516# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

517# --------------------------------------------------------------------------------- 

518class classproperty: # noqa: N801 

519 """ 

520 Decorator that converts a method with a single cls argument into a property 

521 that can be accessed directly from the class. 

522 """ 

523 

524 def __init__(self, method=None) -> None: # type: ignore # noqa: ANN001 

525 self.fget = method 

526 

527 def __get__(self, instance, cls=None) -> Any: # type: ignore # noqa: ANN001 

528 return self.fget(cls) 

529 

530 def getter(self, method) -> Self: # type: ignore # noqa: ANN001 

531 self.fget = method 

532 return self 

533 

534 

535@dataclass 

536class File: 

537 from .generic import IndirectObject # noqa: PLC0415 

538 

539 name: str = "" 

540 """ 

541 Filename as identified within the PDF file. 

542 """ 

543 data: bytes = b"" 

544 """ 

545 Data as bytes. 

546 """ 

547 indirect_reference: Optional[IndirectObject] = None 

548 """ 

549 Reference to the object storing the stream. 

550 """ 

551 

552 def __str__(self) -> str: 

553 return f"{self.__class__.__name__}(name={self.name}, data: {_human_readable_bytes(len(self.data))})" 

554 

555 def __repr__(self) -> str: 

556 return self.__str__()[:-1] + f", hash: {hash(self.data)})" 

557 

558 

559@functools.total_ordering 

560class Version: 

561 COMPONENT_PATTERN = re.compile(r"^(\d+)(.*)$") 

562 

563 def __init__(self, version_str: str) -> None: 

564 self.version_str = version_str 

565 self.components = self._parse_version(version_str) 

566 

567 def _parse_version(self, version_str: str) -> List[Tuple[int, str]]: 

568 components = version_str.split(".") 

569 parsed_components = [] 

570 for component in components: 

571 match = Version.COMPONENT_PATTERN.match(component) 

572 if not match: 

573 parsed_components.append((0, component)) 

574 continue 

575 integer_prefix = match.group(1) 

576 suffix = match.group(2) 

577 if integer_prefix is None: 

578 integer_prefix = 0 

579 parsed_components.append((int(integer_prefix), suffix)) 

580 return parsed_components 

581 

582 def __eq__(self, other: object) -> bool: 

583 if not isinstance(other, Version): 

584 return False 

585 return self.components == other.components 

586 

587 def __hash__(self) -> int: 

588 # Convert to tuple as lists cannot be hashed. 

589 return hash((self.__class__, tuple(self.components))) 

590 

591 def __lt__(self, other: Any) -> bool: 

592 if not isinstance(other, Version): 

593 raise ValueError(f"Version cannot be compared against {type(other)}") 

594 

595 for self_component, other_component in zip(self.components, other.components): 

596 self_value, self_suffix = self_component 

597 other_value, other_suffix = other_component 

598 

599 if self_value < other_value: 

600 return True 

601 if self_value > other_value: 

602 return False 

603 

604 if self_suffix < other_suffix: 

605 return True 

606 if self_suffix > other_suffix: 

607 return False 

608 

609 return len(self.components) < len(other.components)