Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/generic/_data_structures.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

979 statements  

1# Copyright (c) 2006, Mathieu Fenniak 

2# All rights reserved. 

3# 

4# Redistribution and use in source and binary forms, with or without 

5# modification, are permitted provided that the following conditions are 

6# met: 

7# 

8# * Redistributions of source code must retain the above copyright notice, 

9# this list of conditions and the following disclaimer. 

10# * Redistributions in binary form must reproduce the above copyright notice, 

11# this list of conditions and the following disclaimer in the documentation 

12# and/or other materials provided with the distribution. 

13# * The name of the author may not be used to endorse or promote products 

14# derived from this software without specific prior written permission. 

15# 

16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 

19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 

20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 

22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 

23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 

24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 

25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 

26# POSSIBILITY OF SUCH DAMAGE. 

27 

28 

29__author__ = "Mathieu Fenniak" 

30__author_email__ = "biziqe@mathieu.fenniak.net" 

31 

32import logging 

33import re 

34import sys 

35from collections.abc import Iterable, Sequence 

36from io import BytesIO 

37from math import ceil 

38from typing import ( 

39 Any, 

40 Callable, 

41 Optional, 

42 Union, 

43 cast, 

44) 

45 

46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol 

47from .._utils import ( 

48 WHITESPACES, 

49 BinaryStreamType, 

50 StreamType, 

51 deprecation_no_replacement, 

52 logger_warning, 

53 read_non_whitespace, 

54 read_until_regex, 

55 read_until_whitespace, 

56 skip_over_comment, 

57) 

58from ..constants import ( 

59 CheckboxRadioButtonAttributes, 

60 FieldDictionaryAttributes, 

61 OutlineFontFlag, 

62 StreamAttributes, 

63) 

64from ..constants import FilterTypes as FT 

65from ..constants import TypArguments as TA 

66from ..constants import TypFitArguments as TF 

67from ..errors import STREAM_TRUNCATED_PREMATURELY, LimitReachedError, PdfReadError, PdfStreamError 

68from ._base import ( 

69 BooleanObject, 

70 ByteStringObject, 

71 FloatObject, 

72 IndirectObject, 

73 NameObject, 

74 NullObject, 

75 NumberObject, 

76 PdfObject, 

77 TextStringObject, 

78 is_null_or_none, 

79) 

80from ._fit import Fit 

81from ._image_inline import ( 

82 extract_inline__ascii85_decode, 

83 extract_inline__ascii_hex_decode, 

84 extract_inline__dct_decode, 

85 extract_inline__run_length_decode, 

86 extract_inline_default, 

87) 

88from ._utils import read_hex_string_from_stream, read_string_from_stream 

89 

90if sys.version_info >= (3, 11): 

91 from typing import Self 

92else: 

93 from typing_extensions import Self 

94 

95logger = logging.getLogger(__name__) 

96 

97IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]") 

98 

99 

100class ArrayObject(list[Any], PdfObject): 

101 def replicate( 

102 self, 

103 pdf_dest: PdfWriterProtocol, 

104 ) -> "ArrayObject": 

105 arr = cast( 

106 "ArrayObject", 

107 self._reference_clone(ArrayObject(), pdf_dest, False), 

108 ) 

109 for data in self: 

110 if hasattr(data, "replicate"): 

111 arr.append(data.replicate(pdf_dest)) 

112 else: 

113 arr.append(data) 

114 return arr 

115 

116 def clone( 

117 self, 

118 pdf_dest: PdfWriterProtocol, 

119 force_duplicate: bool = False, 

120 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

121 ) -> "ArrayObject": 

122 """Clone object into pdf_dest.""" 

123 try: 

124 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

125 return self 

126 except Exception: 

127 pass 

128 arr = cast( 

129 "ArrayObject", 

130 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate=True), 

131 ) 

132 for data in self: 

133 if isinstance(data, StreamObject): 

134 dup = data._reference_clone( 

135 data.clone(pdf_dest, force_duplicate, ignore_fields), 

136 pdf_dest, 

137 force_duplicate, 

138 ) 

139 arr.append(dup.indirect_reference) 

140 elif isinstance(data, IndirectObject) and isinstance(resolved := data.get_object(), StreamObject): 

141 dup = data._reference_clone( 

142 resolved.clone(pdf_dest, force_duplicate=True, ignore_fields=ignore_fields), 

143 pdf_dest, 

144 force_duplicate, 

145 ) 

146 arr.append(dup.indirect_reference) 

147 elif hasattr(data, "clone"): 

148 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields)) 

149 else: 

150 arr.append(data) 

151 return arr 

152 

153 def hash_bin(self) -> int: 

154 """ 

155 Used to detect modified object. 

156 

157 Returns: 

158 Hash considering type and value. 

159 

160 """ 

161 return hash((self.__class__, tuple(x.hash_bin() for x in self))) 

162 

163 def items(self) -> Iterable[Any]: 

164 """Emulate DictionaryObject.items for a list (index, object).""" 

165 return enumerate(self) 

166 

167 def _to_lst(self, lst: Any) -> list[Any]: 

168 # Convert to list, internal 

169 result: list[Any] 

170 if isinstance(lst, (list, tuple, set)): 

171 result = list(lst) 

172 elif isinstance(lst, PdfObject): 

173 result = [lst] 

174 elif isinstance(lst, str): 

175 if lst[0] == "/": 

176 result = [NameObject(lst)] 

177 else: 

178 result = [TextStringObject(lst)] 

179 elif isinstance(lst, bytes): 

180 result = [ByteStringObject(lst)] 

181 else: # for numbers,... 

182 result = [lst] 

183 return result 

184 

185 def __add__(self, lst: Any) -> "ArrayObject": 

186 """ 

187 Allow extension by adding list or add one element only 

188 

189 Args: 

190 lst: any list, tuples are extended the list. 

191 other types(numbers,...) will be appended. 

192 if str is passed it will be converted into TextStringObject 

193 or NameObject (if starting with "/") 

194 if bytes is passed it will be converted into ByteStringObject 

195 

196 Returns: 

197 ArrayObject with all elements 

198 

199 """ 

200 temp = ArrayObject(self) 

201 temp.extend(self._to_lst(lst)) 

202 return temp 

203 

204 def __iadd__(self, lst: Any) -> Self: 

205 """ 

206 Allow extension by adding list or add one element only 

207 

208 Args: 

209 lst: any list, tuples are extended the list. 

210 other types(numbers,...) will be appended. 

211 if str is passed it will be converted into TextStringObject 

212 or NameObject (if starting with "/") 

213 if bytes is passed it will be converted into ByteStringObject 

214 

215 """ 

216 self.extend(self._to_lst(lst)) 

217 return self 

218 

219 def __isub__(self, lst: Any) -> Self: 

220 """Allow to remove items""" 

221 for x in self._to_lst(lst): 

222 try: 

223 index = self.index(x) 

224 del self[index] 

225 except ValueError: 

226 pass 

227 return self 

228 

229 def write_to_stream( 

230 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

231 ) -> None: 

232 if encryption_key is not None: # deprecated 

233 deprecation_no_replacement( 

234 "the encryption_key parameter of write_to_stream", "5.0.0" 

235 ) 

236 stream.write(b"[") 

237 for data in self: 

238 stream.write(b" ") 

239 data.write_to_stream(stream) 

240 stream.write(b" ]") 

241 

242 @staticmethod 

243 def read_from_stream( 

244 stream: StreamType, 

245 pdf: Optional[PdfReaderProtocol], 

246 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

247 ) -> "ArrayObject": 

248 arr = ArrayObject() 

249 tmp = stream.read(1) 

250 if tmp != b"[": 

251 raise PdfReadError("Could not read array") 

252 while True: 

253 # skip leading whitespace 

254 tok = stream.read(1) 

255 while tok.isspace(): 

256 tok = stream.read(1) 

257 if tok == b"": 

258 break 

259 if tok == b"%": 

260 stream.seek(-1, 1) 

261 skip_over_comment(stream) 

262 continue 

263 stream.seek(-1, 1) 

264 # check for array ending 

265 peek_ahead = stream.read(1) 

266 if peek_ahead == b"]": 

267 break 

268 stream.seek(-1, 1) 

269 # read and append object 

270 arr.append(read_object(stream, pdf, forced_encoding)) 

271 return arr 

272 

273 

274class DictionaryObject(dict[Any, Any], PdfObject): 

275 def replicate( 

276 self, 

277 pdf_dest: PdfWriterProtocol, 

278 ) -> "DictionaryObject": 

279 d__ = cast( 

280 "DictionaryObject", 

281 self._reference_clone(self.__class__(), pdf_dest, False), 

282 ) 

283 for k, v in self.items(): 

284 d__[k.replicate(pdf_dest)] = ( 

285 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

286 ) 

287 return d__ 

288 

289 def clone( 

290 self, 

291 pdf_dest: PdfWriterProtocol, 

292 force_duplicate: bool = False, 

293 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

294 ) -> "DictionaryObject": 

295 """Clone object into pdf_dest.""" 

296 try: 

297 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

298 return self 

299 except Exception: 

300 pass 

301 

302 visited: set[tuple[int, int]] = set() # (idnum, generation) 

303 d__ = cast( 

304 "DictionaryObject", 

305 self._reference_clone(self.__class__(), pdf_dest, force_duplicate), 

306 ) 

307 if ignore_fields is None: 

308 ignore_fields = [] 

309 if len(d__.keys()) == 0: 

310 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

311 return d__ 

312 

313 def _clone( 

314 self, 

315 src: "DictionaryObject", 

316 pdf_dest: PdfWriterProtocol, 

317 force_duplicate: bool, 

318 ignore_fields: Optional[Sequence[Union[str, int]]], 

319 visited: set[tuple[int, int]], # (idnum, generation) 

320 ) -> None: 

321 """ 

322 Update the object from src. 

323 

324 Args: 

325 src: "DictionaryObject": 

326 pdf_dest: 

327 force_duplicate: 

328 ignore_fields: 

329 

330 """ 

331 # First we remove the ignore_fields 

332 # that are for a limited number of levels 

333 assert ignore_fields is not None 

334 ignore_fields = list(ignore_fields) 

335 x = 0 

336 while x < len(ignore_fields): 

337 if isinstance(ignore_fields[x], int): 

338 if cast(int, ignore_fields[x]) <= 0: 

339 del ignore_fields[x] 

340 del ignore_fields[x] 

341 continue 

342 ignore_fields[x] -= 1 # type:ignore 

343 x += 1 

344 # Check if this is a chain list, we need to loop to prevent recur 

345 if any( 

346 field not in ignore_fields 

347 and field in src 

348 and isinstance(src.raw_get(field), IndirectObject) 

349 and isinstance(src[field], DictionaryObject) 

350 and ( 

351 src.get("/Type", None) is None 

352 or cast(DictionaryObject, src[field]).get("/Type", None) is None 

353 or src.get("/Type", None) 

354 == cast(DictionaryObject, src[field]).get("/Type", None) 

355 ) 

356 for field in ["/Next", "/Prev", "/N", "/V"] 

357 ): 

358 ignore_fields = list(ignore_fields) 

359 for lst in (("/Next", "/Prev"), ("/N", "/V")): 

360 for k in lst: 

361 objs = [] 

362 if ( 

363 k in src 

364 and k not in self 

365 and isinstance(src.raw_get(k), IndirectObject) 

366 and isinstance(src[k], DictionaryObject) 

367 # If need to go further the idea is to check 

368 # that the types are the same 

369 and ( 

370 src.get("/Type", None) is None 

371 or cast(DictionaryObject, src[k]).get("/Type", None) is None 

372 or src.get("/Type", None) 

373 == cast(DictionaryObject, src[k]).get("/Type", None) 

374 ) 

375 ): 

376 cur_obj: Optional[DictionaryObject] = cast( 

377 "DictionaryObject", src[k] 

378 ) 

379 prev_obj: Optional[DictionaryObject] = self 

380 while cur_obj is not None: 

381 clon = cast( 

382 "DictionaryObject", 

383 cur_obj._reference_clone( 

384 cur_obj.__class__(), pdf_dest, force_duplicate 

385 ), 

386 ) 

387 # Check to see if we've previously processed our item 

388 if clon.indirect_reference is not None: 

389 idnum = clon.indirect_reference.idnum 

390 generation = clon.indirect_reference.generation 

391 if (idnum, generation) in visited: 

392 cur_obj = None 

393 break 

394 visited.add((idnum, generation)) 

395 objs.append((cur_obj, clon)) 

396 assert prev_obj is not None 

397 prev_obj[NameObject(k)] = clon.indirect_reference 

398 prev_obj = clon 

399 try: 

400 if cur_obj == src: 

401 cur_obj = None 

402 else: 

403 cur_obj = cast("DictionaryObject", cur_obj[k]) 

404 except Exception: 

405 cur_obj = None 

406 for s, c in objs: 

407 c._clone( 

408 s, pdf_dest, force_duplicate, ignore_fields, visited 

409 ) 

410 

411 for k, v in src.items(): 

412 if k not in ignore_fields: 

413 if isinstance(v, StreamObject): 

414 if not hasattr(v, "indirect_reference"): 

415 v.indirect_reference = None 

416 vv = v.clone(pdf_dest, force_duplicate, ignore_fields) 

417 assert vv.indirect_reference is not None 

418 self[k.clone(pdf_dest)] = vv.indirect_reference 

419 elif k not in self: 

420 self[NameObject(k)] = ( 

421 v.clone(pdf_dest, force_duplicate, ignore_fields) 

422 if hasattr(v, "clone") 

423 else v 

424 ) 

425 

426 def hash_bin(self) -> int: 

427 """ 

428 Used to detect modified object. 

429 

430 Returns: 

431 Hash considering type and value. 

432 

433 """ 

434 return hash( 

435 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items()))) 

436 ) 

437 

438 def raw_get(self, key: Any) -> Any: 

439 return dict.__getitem__(self, key) 

440 

441 def get_inherited(self, key: str, default: Any = None) -> Any: 

442 """ 

443 Returns the value of a key or from the parent if not found. 

444 If not found returns default. 

445 

446 Args: 

447 key: string identifying the field to return 

448 

449 default: default value to return 

450 

451 Returns: 

452 Current key or inherited one, otherwise default value. 

453 

454 """ 

455 if key in self: 

456 return self[key] 

457 try: 

458 if "/Parent" not in self: 

459 return default 

460 raise KeyError("Not present") 

461 except KeyError: 

462 return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited( 

463 key, default 

464 ) 

465 

466 def __setitem__(self, key: Any, value: Any) -> Any: 

467 if not isinstance(key, PdfObject): 

468 raise ValueError("Key must be a PdfObject") 

469 if not isinstance(value, PdfObject): 

470 raise ValueError("Value must be a PdfObject") 

471 return dict.__setitem__(self, key, value) 

472 

473 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any: 

474 if not isinstance(key, PdfObject): 

475 raise ValueError("Key must be a PdfObject") 

476 if not isinstance(value, PdfObject): 

477 raise ValueError("Value must be a PdfObject") 

478 return dict.setdefault(self, key, value) 

479 

480 def __getitem__(self, key: Any) -> PdfObject: 

481 return cast(PdfObject, dict.__getitem__(self, key).get_object()) 

482 

483 @property 

484 def xmp_metadata(self) -> Optional[XmpInformationProtocol]: 

485 """ 

486 Retrieve XMP (Extensible Metadata Platform) data relevant to this 

487 object, if available. 

488 

489 See Table 347 — Additional entries in a metadata stream dictionary. 

490 

491 Returns: 

492 Returns a :class:`~pypdf.xmp.XmpInformation` instance 

493 that can be used to access XMP metadata from the document. Can also 

494 return None if no metadata was found on the document root. 

495 

496 """ 

497 from ..xmp import XmpInformation # noqa: PLC0415 

498 

499 metadata = self.get("/Metadata", None) 

500 if is_null_or_none(metadata): 

501 return None 

502 assert metadata is not None, "mypy" 

503 metadata = metadata.get_object() 

504 return XmpInformation(metadata) 

505 

506 def write_to_stream( 

507 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

508 ) -> None: 

509 if encryption_key is not None: # deprecated 

510 deprecation_no_replacement( 

511 "the encryption_key parameter of write_to_stream", "5.0.0" 

512 ) 

513 stream.write(b"<<\n") 

514 for key, value in self.items(): 

515 if len(key) > 2 and key[1] == "%" and key[-1] == "%": 

516 continue 

517 key.write_to_stream(stream, encryption_key) 

518 stream.write(b" ") 

519 value.write_to_stream(stream) 

520 stream.write(b"\n") 

521 stream.write(b">>") 

522 

523 @classmethod 

524 def _get_next_object_position( 

525 cls, position_before: int, position_end: int, generations: list[int], pdf: PdfReaderProtocol 

526 ) -> int: 

527 out = position_end 

528 for generation in generations: 

529 location = pdf.xref[generation] 

530 values = [x for x in location.values() if position_before < x <= position_end] 

531 if values: 

532 out = min(out, *values) 

533 return out 

534 

535 @classmethod 

536 def _read_unsized_from_stream( 

537 cls, stream: BinaryStreamType, pdf: PdfReaderProtocol 

538 ) -> bytes: 

539 object_position = cls._get_next_object_position( 

540 position_before=stream.tell(), position_end=2 ** 32, generations=list(pdf.xref), pdf=pdf 

541 ) - 1 

542 current_position = stream.tell() 

543 # Read until the next object position. 

544 read_value = stream.read(object_position - stream.tell()) 

545 endstream_position = read_value.find(b"endstream") 

546 if endstream_position < 0: 

547 raise PdfReadError( 

548 f"Unable to find 'endstream' marker for obj starting at {current_position}." 

549 ) 

550 # 9 = len(b"endstream") 

551 stream.seek(current_position + endstream_position + 9) 

552 return read_value[: endstream_position - 1] 

553 

554 @staticmethod 

555 def read_from_stream( 

556 stream: StreamType, 

557 pdf: Optional[PdfReaderProtocol], 

558 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

559 ) -> "DictionaryObject": 

560 tmp = stream.read(2) 

561 if tmp != b"<<": 

562 raise PdfReadError( 

563 f"Dictionary read error at byte {hex(stream.tell())}: " 

564 "stream must begin with '<<'" 

565 ) 

566 data: dict[Any, Any] = {} 

567 while True: 

568 tok = read_non_whitespace(stream) 

569 if tok == b"\x00": 

570 continue 

571 if tok == b"%": 

572 stream.seek(-1, 1) 

573 skip_over_comment(stream) 

574 continue 

575 if not tok: 

576 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) 

577 

578 if tok == b">": 

579 stream.read(1) 

580 break 

581 stream.seek(-1, 1) 

582 try: 

583 try: 

584 key = read_object(stream, pdf) 

585 if isinstance(key, NullObject): 

586 break 

587 if not isinstance(key, NameObject): 

588 raise PdfReadError( 

589 f"Expecting a NameObject for key but found {key!r}" 

590 ) 

591 except PdfReadError as exc: 

592 if pdf is not None and pdf.strict: 

593 raise 

594 logger_warning(exc.__repr__(), __name__) 

595 continue 

596 tok = read_non_whitespace(stream) 

597 stream.seek(-1, 1) 

598 value = read_object(stream, pdf, forced_encoding) 

599 except (RecursionError, LimitReachedError) as exc: 

600 raise PdfReadError(exc.__repr__()) 

601 except Exception as exc: 

602 if pdf is not None and pdf.strict: 

603 raise PdfReadError(exc.__repr__()) 

604 logger_warning(exc.__repr__(), __name__) 

605 retval = DictionaryObject() 

606 retval.update(data) 

607 return retval # return partial data 

608 

609 if not data.get(key): 

610 data[key] = value 

611 else: 

612 # multiple definitions of key not permitted 

613 msg = ( 

614 f"Multiple definitions in dictionary at byte " 

615 f"{hex(stream.tell())} for key {key}" 

616 ) 

617 if pdf is not None and pdf.strict: 

618 raise PdfReadError(msg) 

619 logger_warning(msg, __name__) 

620 

621 pos = stream.tell() 

622 s = read_non_whitespace(stream) 

623 if s == b"s" and stream.read(5) == b"tream": 

624 eol = stream.read(1) 

625 # Occasional PDF file output has spaces after 'stream' keyword but before EOL. 

626 # patch provided by Danial Sandler 

627 while eol == b" ": 

628 eol = stream.read(1) 

629 if eol not in (b"\n", b"\r"): 

630 raise PdfStreamError("Stream data must be followed by a newline") 

631 if eol == b"\r" and stream.read(1) != b"\n": 

632 stream.seek(-1, 1) 

633 # this is a stream object, not a dictionary 

634 if StreamAttributes.LENGTH not in data: 

635 if pdf is not None and pdf.strict: 

636 raise PdfStreamError("Stream length not defined") 

637 logger_warning( 

638 f"Stream length not defined @pos={stream.tell()}", __name__ 

639 ) 

640 data[NameObject(StreamAttributes.LENGTH)] = NumberObject(-1) 

641 length = data[StreamAttributes.LENGTH] 

642 if isinstance(length, IndirectObject): 

643 t = stream.tell() 

644 assert pdf is not None, "mypy" 

645 length = pdf.get_object(length) 

646 stream.seek(t, 0) 

647 if length is None: # if the PDF is damaged 

648 length = -1 

649 pstart = stream.tell() 

650 if length >= 0: 

651 from ..filters import MAX_DECLARED_STREAM_LENGTH # noqa: PLC0415 

652 if length > MAX_DECLARED_STREAM_LENGTH: 

653 raise LimitReachedError(f"Declared stream length of {length} exceeds maximum allowed length.") 

654 

655 data["__streamdata__"] = stream.read(length) 

656 else: 

657 data["__streamdata__"] = read_until_regex( 

658 stream, re.compile(b"endstream") 

659 ) 

660 e = read_non_whitespace(stream) 

661 ndstream = stream.read(8) 

662 if (e + ndstream) != b"endstream": 

663 # the odd PDF file has a length that is too long, so 

664 # we need to read backwards to find the "endstream" ending. 

665 # ReportLab (unknown version) generates files with this bug, 

666 # and Python users into PDF files tend to be our audience. 

667 # we need to do this to correct the streamdata and chop off 

668 # an extra character. 

669 pos = stream.tell() 

670 stream.seek(-10, 1) 

671 end = stream.read(9) 

672 if end == b"endstream": 

673 # we found it by looking back one character further. 

674 data["__streamdata__"] = data["__streamdata__"][:-1] 

675 elif pdf is not None and not pdf.strict: 

676 stream.seek(pstart, 0) 

677 data["__streamdata__"] = DictionaryObject._read_unsized_from_stream(stream, pdf) 

678 pos = stream.tell() 

679 else: 

680 stream.seek(pos, 0) 

681 raise PdfReadError( 

682 "Unable to find 'endstream' marker after stream at byte " 

683 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')." 

684 ) 

685 else: 

686 stream.seek(pos, 0) 

687 if "__streamdata__" in data: 

688 return StreamObject.initialize_from_dictionary(data) 

689 retval = DictionaryObject() 

690 retval.update(data) 

691 return retval 

692 

693 

694class TreeObject(DictionaryObject): 

695 def __init__(self, dct: Optional[DictionaryObject] = None) -> None: 

696 DictionaryObject.__init__(self) 

697 if dct: 

698 self.update(dct) 

699 

700 def has_children(self) -> bool: 

701 return "/First" in self 

702 

703 def __iter__(self) -> Any: 

704 return self.children() 

705 

706 def children(self) -> Iterable[Any]: 

707 if not self.has_children(): 

708 return 

709 

710 child_ref = self[NameObject("/First")] 

711 last = self[NameObject("/Last")] 

712 child = child_ref.get_object() 

713 visited: set[int] = set() 

714 while True: 

715 child_id = id(child) 

716 if child_id in visited: 

717 logger_warning(f"Detected cycle in outline structure for {child}", __name__) 

718 return 

719 visited.add(child_id) 

720 

721 yield child 

722 

723 if child == last: 

724 return 

725 child_ref = child.get(NameObject("/Next")) # type: ignore 

726 if is_null_or_none(child_ref): 

727 return 

728 child = child_ref.get_object() 

729 

730 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None: 

731 self.insert_child(child, None, pdf) 

732 

733 def inc_parent_counter_default( 

734 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

735 ) -> None: 

736 if is_null_or_none(parent): 

737 return 

738 assert parent is not None, "mypy" 

739 parent = cast("TreeObject", parent.get_object()) 

740 if "/Count" in parent: 

741 parent[NameObject("/Count")] = NumberObject( 

742 max(0, cast(int, parent[NameObject("/Count")]) + n) 

743 ) 

744 self.inc_parent_counter_default(parent.get("/Parent", None), n) 

745 

746 def inc_parent_counter_outline( 

747 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

748 ) -> None: 

749 if is_null_or_none(parent): 

750 return 

751 assert parent is not None, "mypy" 

752 parent = cast("TreeObject", parent.get_object()) 

753 # BooleanObject requires comparison with == not is 

754 opn = parent.get("/%is_open%", True) == True # noqa: E712 

755 c = cast(int, parent.get("/Count", 0)) 

756 if c < 0: 

757 c = abs(c) 

758 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1)) 

759 if not opn: 

760 return 

761 self.inc_parent_counter_outline(parent.get("/Parent", None), n) 

762 

763 def insert_child( 

764 self, 

765 child: Any, 

766 before: Any, 

767 pdf: PdfWriterProtocol, 

768 inc_parent_counter: Optional[Callable[..., Any]] = None, 

769 ) -> IndirectObject: 

770 if inc_parent_counter is None: 

771 inc_parent_counter = self.inc_parent_counter_default 

772 child_obj = child.get_object() 

773 assert child.indirect_reference is not None, "mypy" 

774 child_reference: IndirectObject = child.indirect_reference 

775 

776 prev: Optional[DictionaryObject] 

777 if "/First" not in self: # no child yet 

778 self[NameObject("/First")] = child_reference 

779 self[NameObject("/Count")] = NumberObject(0) 

780 self[NameObject("/Last")] = child_reference 

781 child_obj[NameObject("/Parent")] = self.indirect_reference 

782 inc_parent_counter(self, child_obj.get("/Count", 1)) 

783 if "/Next" in child_obj: 

784 del child_obj["/Next"] 

785 if "/Prev" in child_obj: 

786 del child_obj["/Prev"] 

787 return child_reference 

788 prev = cast("DictionaryObject", self["/Last"]) 

789 

790 while prev.indirect_reference != before: 

791 if "/Next" in prev: 

792 prev = cast("TreeObject", prev["/Next"]) 

793 else: # append at the end 

794 prev[NameObject("/Next")] = cast("TreeObject", child_reference) 

795 child_obj[NameObject("/Prev")] = prev.indirect_reference 

796 child_obj[NameObject("/Parent")] = self.indirect_reference 

797 if "/Next" in child_obj: 

798 del child_obj["/Next"] 

799 self[NameObject("/Last")] = child_reference 

800 inc_parent_counter(self, child_obj.get("/Count", 1)) 

801 return child_reference 

802 try: # insert as first or in the middle 

803 assert isinstance(prev["/Prev"], DictionaryObject) 

804 prev["/Prev"][NameObject("/Next")] = child_reference 

805 child_obj[NameObject("/Prev")] = prev["/Prev"] 

806 except Exception: # it means we are inserting in first position 

807 del child_obj["/Next"] 

808 child_obj[NameObject("/Next")] = prev 

809 prev[NameObject("/Prev")] = child_reference 

810 child_obj[NameObject("/Parent")] = self.indirect_reference 

811 inc_parent_counter(self, child_obj.get("/Count", 1)) 

812 return child_reference 

813 

814 def _remove_node_from_tree( 

815 self, prev: Any, prev_ref: Any, cur: Any, last: Any 

816 ) -> None: 

817 """ 

818 Adjust the pointers of the linked list and tree node count. 

819 

820 Args: 

821 prev: 

822 prev_ref: 

823 cur: 

824 last: 

825 

826 """ 

827 next_ref = cur.get(NameObject("/Next"), None) 

828 if prev is None: 

829 if next_ref: 

830 # Removing first tree node 

831 next_obj = next_ref.get_object() 

832 del next_obj[NameObject("/Prev")] 

833 self[NameObject("/First")] = next_ref 

834 self[NameObject("/Count")] = NumberObject( 

835 self[NameObject("/Count")] - 1 # type: ignore 

836 ) 

837 

838 else: 

839 # Removing only tree node 

840 self[NameObject("/Count")] = NumberObject(0) 

841 del self[NameObject("/First")] 

842 if NameObject("/Last") in self: 

843 del self[NameObject("/Last")] 

844 else: 

845 if next_ref: 

846 # Removing middle tree node 

847 next_obj = next_ref.get_object() 

848 next_obj[NameObject("/Prev")] = prev_ref 

849 prev[NameObject("/Next")] = next_ref 

850 else: 

851 # Removing last tree node 

852 assert cur == last 

853 del prev[NameObject("/Next")] 

854 self[NameObject("/Last")] = prev_ref 

855 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore 

856 

857 def remove_child(self, child: Any) -> None: 

858 child_obj = child.get_object() 

859 child = child_obj.indirect_reference 

860 

861 if NameObject("/Parent") not in child_obj: 

862 raise ValueError("Removed child does not appear to be a tree item") 

863 if child_obj[NameObject("/Parent")] != self: 

864 raise ValueError("Removed child is not a member of this tree") 

865 

866 found = False 

867 prev_ref = None 

868 prev = None 

869 cur_ref: Optional[Any] = self[NameObject("/First")] 

870 cur: Optional[dict[str, Any]] = cur_ref.get_object() # type: ignore 

871 last_ref = self[NameObject("/Last")] 

872 last = last_ref.get_object() 

873 while cur is not None: 

874 if cur == child_obj: 

875 self._remove_node_from_tree(prev, prev_ref, cur, last) 

876 found = True 

877 break 

878 

879 # Go to the next node 

880 prev_ref = cur_ref 

881 prev = cur 

882 if NameObject("/Next") in cur: 

883 cur_ref = cur[NameObject("/Next")] 

884 cur = cur_ref.get_object() 

885 else: 

886 cur_ref = None 

887 cur = None 

888 

889 if not found: 

890 raise ValueError("Removal couldn't find item in tree") 

891 

892 _reset_node_tree_relationship(child_obj) 

893 

894 def remove_from_tree(self) -> None: 

895 """Remove the object from the tree it is in.""" 

896 if NameObject("/Parent") not in self: 

897 raise ValueError("Removed child does not appear to be a tree item") 

898 cast("TreeObject", self["/Parent"]).remove_child(self) 

899 

900 def empty_tree(self) -> None: 

901 for child in self: 

902 child_obj = child.get_object() 

903 _reset_node_tree_relationship(child_obj) 

904 

905 if NameObject("/Count") in self: 

906 del self[NameObject("/Count")] 

907 if NameObject("/First") in self: 

908 del self[NameObject("/First")] 

909 if NameObject("/Last") in self: 

910 del self[NameObject("/Last")] 

911 

912 

913def _reset_node_tree_relationship(child_obj: Any) -> None: 

914 """ 

915 Call this after a node has been removed from a tree. 

916 

917 This resets the nodes attributes in respect to that tree. 

918 

919 Args: 

920 child_obj: 

921 

922 """ 

923 del child_obj[NameObject("/Parent")] 

924 if NameObject("/Next") in child_obj: 

925 del child_obj[NameObject("/Next")] 

926 if NameObject("/Prev") in child_obj: 

927 del child_obj[NameObject("/Prev")] 

928 

929 

930class StreamObject(DictionaryObject): 

931 def __init__(self) -> None: 

932 self._data: bytes = b"" 

933 self.decoded_self: Optional[DecodedStreamObject] = None 

934 

935 def replicate( 

936 self, 

937 pdf_dest: PdfWriterProtocol, 

938 ) -> "StreamObject": 

939 d__ = cast( 

940 "StreamObject", 

941 self._reference_clone(self.__class__(), pdf_dest, False), 

942 ) 

943 d__._data = self._data 

944 try: 

945 decoded_self = self.decoded_self 

946 if decoded_self is None: 

947 self.decoded_self = None 

948 else: 

949 self.decoded_self = cast( 

950 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

951 ) 

952 except Exception: 

953 pass 

954 for k, v in self.items(): 

955 d__[k.replicate(pdf_dest)] = ( 

956 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

957 ) 

958 return d__ 

959 

960 def _clone( 

961 self, 

962 src: DictionaryObject, 

963 pdf_dest: PdfWriterProtocol, 

964 force_duplicate: bool, 

965 ignore_fields: Optional[Sequence[Union[str, int]]], 

966 visited: set[tuple[int, int]], 

967 ) -> None: 

968 """ 

969 Update the object from src. 

970 

971 Args: 

972 src: 

973 pdf_dest: 

974 force_duplicate: 

975 ignore_fields: 

976 

977 """ 

978 self._data = cast("StreamObject", src)._data 

979 try: 

980 decoded_self = cast("StreamObject", src).decoded_self 

981 if decoded_self is None: 

982 self.decoded_self = None 

983 else: 

984 self.decoded_self = cast( 

985 "DecodedStreamObject", 

986 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields), 

987 ) 

988 except Exception: 

989 pass 

990 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

991 

992 def hash_bin(self) -> int: 

993 """ 

994 Used to detect modified object. 

995 

996 Returns: 

997 Hash considering type and value. 

998 

999 """ 

1000 # Use _data to prevent errors on non-decoded streams. 

1001 return hash((super().hash_bin(), self._data)) 

1002 

1003 def get_data(self) -> bytes: 

1004 return self._data 

1005 

1006 def set_data(self, data: bytes) -> None: 

1007 self._data = data 

1008 

1009 def hash_value_data(self) -> bytes: 

1010 data = super().hash_value_data() 

1011 data += self.get_data() 

1012 return data 

1013 

1014 def write_to_stream( 

1015 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1016 ) -> None: 

1017 if encryption_key is not None: # deprecated 

1018 deprecation_no_replacement( 

1019 "the encryption_key parameter of write_to_stream", "5.0.0" 

1020 ) 

1021 self[NameObject(StreamAttributes.LENGTH)] = NumberObject(len(self._data)) 

1022 DictionaryObject.write_to_stream(self, stream) 

1023 del self[StreamAttributes.LENGTH] 

1024 stream.write(b"\nstream\n") 

1025 stream.write(self._data) 

1026 stream.write(b"\nendstream") 

1027 

1028 @staticmethod 

1029 def initialize_from_dictionary( 

1030 data: dict[str, Any] 

1031 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]: 

1032 retval: Union[EncodedStreamObject, DecodedStreamObject] 

1033 if StreamAttributes.FILTER in data: 

1034 retval = EncodedStreamObject() 

1035 else: 

1036 retval = DecodedStreamObject() 

1037 retval._data = data["__streamdata__"] 

1038 del data["__streamdata__"] 

1039 if StreamAttributes.LENGTH in data: 

1040 del data[StreamAttributes.LENGTH] 

1041 retval.update(data) 

1042 return retval 

1043 

1044 def flate_encode(self, level: int = -1) -> "EncodedStreamObject": 

1045 from ..filters import FlateDecode # noqa: PLC0415 

1046 

1047 if StreamAttributes.FILTER in self: 

1048 f = self[StreamAttributes.FILTER] 

1049 if isinstance(f, ArrayObject): 

1050 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f]) 

1051 try: 

1052 params = ArrayObject( 

1053 [NullObject(), *self.get(StreamAttributes.DECODE_PARMS, ArrayObject())] 

1054 ) 

1055 except TypeError: 

1056 # case of error where the * operator is not working (not an array 

1057 params = ArrayObject( 

1058 [NullObject(), self.get(StreamAttributes.DECODE_PARMS, ArrayObject())] 

1059 ) 

1060 else: 

1061 f = ArrayObject([NameObject(FT.FLATE_DECODE), f]) 

1062 params = ArrayObject( 

1063 [NullObject(), self.get(StreamAttributes.DECODE_PARMS, NullObject())] 

1064 ) 

1065 else: 

1066 f = NameObject(FT.FLATE_DECODE) 

1067 params = None 

1068 retval = EncodedStreamObject() 

1069 retval.update(self) 

1070 retval[NameObject(StreamAttributes.FILTER)] = f 

1071 if params is not None: 

1072 retval[NameObject(StreamAttributes.DECODE_PARMS)] = params 

1073 retval._data = FlateDecode.encode(self._data, level) 

1074 return retval 

1075 

1076 def decode_as_image(self, pillow_parameters: Union[dict[str, Any], None] = None) -> Any: 

1077 """ 

1078 Try to decode the stream object as an image 

1079 

1080 Args: 

1081 pillow_parameters: parameters provided to Pillow Image.save() method, 

1082 cf. <https://pillow.readthedocs.io/en/stable/reference/Image.html#PIL.Image.Image.save> 

1083 

1084 Returns: 

1085 a PIL image if proper decoding has been found 

1086 Raises: 

1087 Exception: Errors during decoding will be reported. 

1088 It is recommended to catch exceptions to prevent 

1089 stops in your program. 

1090 

1091 """ 

1092 from ._image_xobject import _xobj_to_image # noqa: PLC0415 

1093 

1094 if self.get("/Subtype", "") != "/Image": 

1095 try: 

1096 msg = f"{self.indirect_reference} does not seem to be an Image" # pragma: no cover 

1097 except AttributeError: 

1098 msg = f"{self.__repr__()} object does not seem to be an Image" # pragma: no cover 

1099 logger_warning(msg, __name__) 

1100 extension, _, img = _xobj_to_image(self, pillow_parameters) 

1101 if extension is None: 

1102 return None # pragma: no cover 

1103 return img 

1104 

1105 

1106class DecodedStreamObject(StreamObject): 

1107 pass 

1108 

1109 

1110class EncodedStreamObject(StreamObject): 

1111 def __init__(self) -> None: 

1112 self.decoded_self: Optional[DecodedStreamObject] = None 

1113 

1114 # This overrides the parent method 

1115 def get_data(self) -> bytes: 

1116 from ..filters import decode_stream_data # noqa: PLC0415 

1117 

1118 if self.decoded_self is not None: 

1119 # Cached version of decoded object 

1120 return self.decoded_self.get_data() 

1121 

1122 # Create decoded object 

1123 decoded = DecodedStreamObject() 

1124 decoded.set_data(decode_stream_data(self)) 

1125 for key, value in self.items(): 

1126 if key not in (StreamAttributes.LENGTH, StreamAttributes.FILTER, StreamAttributes.DECODE_PARMS): 

1127 decoded[key] = value 

1128 self.decoded_self = decoded 

1129 return decoded.get_data() 

1130 

1131 # This overrides the parent method: 

1132 def set_data(self, data: bytes) -> None: 

1133 from ..filters import FlateDecode # noqa: PLC0415 

1134 

1135 if self.get(StreamAttributes.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]): 

1136 if not isinstance(data, bytes): 

1137 raise TypeError("Data must be bytes") 

1138 if self.decoded_self is None: 

1139 self.get_data() # to create self.decoded_self 

1140 assert self.decoded_self is not None, "mypy" 

1141 self.decoded_self.set_data(data) 

1142 super().set_data(FlateDecode.encode(data)) 

1143 else: 

1144 raise PdfReadError( 

1145 "Streams encoded with a filter different from FlateDecode are not supported" 

1146 ) 

1147 

1148 

1149CONTENT_STREAM_ARRAY_MAX_LENGTH = 10_000 

1150 

1151 

1152class ContentStream(DecodedStreamObject): 

1153 """ 

1154 In order to be fast, this data structure can contain either: 

1155 

1156 * raw data in ._data 

1157 * parsed stream operations in ._operations. 

1158 

1159 At any time, ContentStream object can either have both of those fields defined, 

1160 or one field defined and the other set to None. 

1161 

1162 These fields are "rebuilt" lazily, when accessed: 

1163 

1164 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations. 

1165 * when .operations is called, if ._operations is None, it is rebuilt from ._data. 

1166 

1167 Conversely, these fields can be invalidated: 

1168 

1169 * when .set_data() is called, ._operations is set to None. 

1170 * when .operations is set, ._data is set to None. 

1171 """ 

1172 

1173 def __init__( 

1174 self, 

1175 stream: Any, 

1176 pdf: Any, 

1177 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

1178 ) -> None: 

1179 self.pdf = pdf 

1180 self._operations: list[tuple[Any, bytes]] = [] 

1181 

1182 # stream may be a StreamObject or an ArrayObject containing 

1183 # StreamObjects to be concatenated together. 

1184 if stream is None: 

1185 super().set_data(b"") 

1186 else: 

1187 stream = stream.get_object() 

1188 if isinstance(stream, ArrayObject): 

1189 from pypdf.filters import MAX_ARRAY_BASED_STREAM_OUTPUT_LENGTH # noqa: PLC0415 

1190 

1191 if (stream_length := len(stream)) > CONTENT_STREAM_ARRAY_MAX_LENGTH: 

1192 raise LimitReachedError( 

1193 f"Array-based stream has {stream_length} > {CONTENT_STREAM_ARRAY_MAX_LENGTH} elements." 

1194 ) 

1195 data = bytearray() 

1196 length = 0 

1197 for s in stream: 

1198 s_resolved = s.get_object() 

1199 if isinstance(s_resolved, NullObject): 

1200 continue 

1201 if not isinstance(s_resolved, StreamObject): 

1202 # No need to emit an exception here for now - the PDF structure 

1203 # seems to already be broken beforehand in these cases. 

1204 logger_warning( 

1205 f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.", 

1206 __name__ 

1207 ) 

1208 else: 

1209 new_data = s_resolved.get_data() 

1210 length += len(new_data) 

1211 if length > MAX_ARRAY_BASED_STREAM_OUTPUT_LENGTH: 

1212 raise LimitReachedError( 

1213 f"Array-based stream has at least {length} > " 

1214 f"{MAX_ARRAY_BASED_STREAM_OUTPUT_LENGTH} output bytes." 

1215 ) 

1216 data += new_data 

1217 if len(data) == 0 or data[-1:] != b"\n": 

1218 # There should be no direct need to check for a change of one byte. 

1219 length += 1 

1220 data += b"\n" 

1221 super().set_data(bytes(data)) 

1222 else: 

1223 stream_data = stream.get_data() 

1224 assert stream_data is not None 

1225 super().set_data(stream_data) 

1226 self.forced_encoding = forced_encoding 

1227 

1228 def replicate( 

1229 self, 

1230 pdf_dest: PdfWriterProtocol, 

1231 ) -> "ContentStream": 

1232 d__ = cast( 

1233 "ContentStream", 

1234 self._reference_clone(self.__class__(None, None), pdf_dest, False), 

1235 ) 

1236 d__._data = self._data 

1237 try: 

1238 decoded_self = self.decoded_self 

1239 if decoded_self is None: 

1240 self.decoded_self = None 

1241 else: 

1242 self.decoded_self = cast( 

1243 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

1244 ) 

1245 except Exception: 

1246 pass 

1247 for k, v in self.items(): 

1248 d__[k.replicate(pdf_dest)] = ( 

1249 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

1250 ) 

1251 return d__ 

1252 d__.set_data(self._data) 

1253 d__.pdf = pdf_dest 

1254 d__._operations = list(self._operations) 

1255 d__.forced_encoding = self.forced_encoding 

1256 return d__ 

1257 

1258 def clone( 

1259 self, 

1260 pdf_dest: Any, 

1261 force_duplicate: bool = False, 

1262 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

1263 ) -> "ContentStream": 

1264 """ 

1265 Clone object into pdf_dest. 

1266 

1267 Args: 

1268 pdf_dest: 

1269 force_duplicate: 

1270 ignore_fields: 

1271 

1272 Returns: 

1273 The cloned ContentStream 

1274 

1275 """ 

1276 try: 

1277 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

1278 return self 

1279 except Exception: 

1280 pass 

1281 

1282 visited: set[tuple[int, int]] = set() 

1283 d__ = cast( 

1284 "ContentStream", 

1285 self._reference_clone( 

1286 self.__class__(None, None), pdf_dest, force_duplicate 

1287 ), 

1288 ) 

1289 if ignore_fields is None: 

1290 ignore_fields = [] 

1291 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

1292 return d__ 

1293 

1294 def _clone( 

1295 self, 

1296 src: DictionaryObject, 

1297 pdf_dest: PdfWriterProtocol, 

1298 force_duplicate: bool, 

1299 ignore_fields: Optional[Sequence[Union[str, int]]], 

1300 visited: set[tuple[int, int]], 

1301 ) -> None: 

1302 """ 

1303 Update the object from src. 

1304 

1305 Args: 

1306 src: 

1307 pdf_dest: 

1308 force_duplicate: 

1309 ignore_fields: 

1310 

1311 """ 

1312 src_cs = cast("ContentStream", src) 

1313 super().set_data(src_cs._data) 

1314 self.pdf = pdf_dest 

1315 self._operations = list(src_cs._operations) 

1316 self.forced_encoding = src_cs.forced_encoding 

1317 # no need to call DictionaryObjection or anything 

1318 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

1319 

1320 def _parse_content_stream(self, stream: StreamType) -> None: 

1321 # 7.8.2 Content Streams 

1322 stream.seek(0, 0) 

1323 operands: list[Union[int, str, PdfObject]] = [] 

1324 while True: 

1325 peek = read_non_whitespace(stream) 

1326 if peek in (b"", 0): 

1327 break 

1328 stream.seek(-1, 1) 

1329 if peek.isalpha() or peek in (b"'", b'"'): 

1330 operator = read_until_regex(stream, NameObject.delimiter_pattern) 

1331 if operator == b"BI": 

1332 # begin inline image - a completely different parsing 

1333 # mechanism is required, of course... thanks buddy... 

1334 assert operands == [] 

1335 ii = self._read_inline_image(stream) 

1336 self._operations.append((ii, b"INLINE IMAGE")) 

1337 else: 

1338 self._operations.append((operands, operator)) 

1339 operands = [] 

1340 elif peek == b"%": 

1341 # If we encounter a comment in the content stream, we have to 

1342 # handle it here. Typically, read_object will handle 

1343 # encountering a comment -- but read_object assumes that 

1344 # following the comment must be the object we're trying to 

1345 # read. In this case, it could be an operator instead. 

1346 while peek not in (b"\r", b"\n", b""): 

1347 peek = stream.read(1) 

1348 else: 

1349 operands.append(read_object(stream, None, self.forced_encoding)) 

1350 

1351 def _read_inline_image(self, stream: StreamType) -> dict[str, Any]: 

1352 # begin reading just after the "BI" - begin image 

1353 # first read the dictionary of settings. 

1354 settings = DictionaryObject() 

1355 while True: 

1356 tok = read_non_whitespace(stream) 

1357 stream.seek(-1, 1) 

1358 if tok == b"I": 

1359 # "ID" - begin of image data 

1360 break 

1361 key = read_object(stream, self.pdf) 

1362 tok = read_non_whitespace(stream) 

1363 stream.seek(-1, 1) 

1364 value = read_object(stream, self.pdf) 

1365 settings[key] = value 

1366 # left at beginning of ID 

1367 tmp = stream.read(3) 

1368 assert tmp[:2] == b"ID" 

1369 filtr = settings.get("/F", settings.get("/Filter", "not set")) 

1370 savpos = stream.tell() 

1371 if isinstance(filtr, list): 

1372 filtr = filtr[0] # used forencoding 

1373 if "AHx" in filtr or "ASCIIHexDecode" in filtr: 

1374 data = extract_inline__ascii_hex_decode(stream) 

1375 elif "A85" in filtr or "ASCII85Decode" in filtr: 

1376 data = extract_inline__ascii85_decode(stream) 

1377 elif "RL" in filtr or "RunLengthDecode" in filtr: 

1378 data = extract_inline__run_length_decode(stream) 

1379 elif "DCT" in filtr or "DCTDecode" in filtr: 

1380 data = extract_inline__dct_decode(stream) 

1381 elif filtr == "not set": 

1382 cs = settings.get("/CS", "") 

1383 if isinstance(cs, list): 

1384 cs = cs[0] 

1385 if "RGB" in cs: 

1386 lcs = 3 

1387 elif "CMYK" in cs: 

1388 lcs = 4 

1389 else: 

1390 bits = settings.get( 

1391 "/BPC", 

1392 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1, 

1393 ) 

1394 if bits > 0: 

1395 lcs = bits / 8.0 

1396 else: 

1397 data = extract_inline_default(stream) 

1398 lcs = -1 

1399 if lcs > 0: 

1400 data = stream.read( 

1401 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"]) 

1402 ) 

1403 # Move to the `EI` if possible. 

1404 ei = read_non_whitespace(stream) 

1405 stream.seek(-1, 1) 

1406 else: 

1407 data = extract_inline_default(stream) 

1408 

1409 ei = stream.read(3) 

1410 stream.seek(-1, 1) 

1411 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: 

1412 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above. 

1413 stream.seek(savpos, 0) 

1414 data = extract_inline_default(stream) 

1415 ei = stream.read(3) 

1416 stream.seek(-1, 1) 

1417 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover 

1418 # Check the same condition again. This should never fail as 

1419 # edge cases are covered by `extract_inline_default` above, 

1420 # but check this ot make sure that we are behind the `EI` afterwards. 

1421 raise PdfStreamError( 

1422 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}" 

1423 ) 

1424 return {"settings": settings, "data": data} 

1425 

1426 # This overrides the parent method 

1427 def get_data(self) -> bytes: 

1428 if not self._data: 

1429 new_data = BytesIO() 

1430 for operands, operator in self._operations: 

1431 if operator == b"INLINE IMAGE": 

1432 new_data.write(b"BI") 

1433 dict_text = BytesIO() 

1434 operands["settings"].write_to_stream(dict_text) 

1435 new_data.write(dict_text.getvalue()[2:-2]) 

1436 new_data.write(b"ID ") 

1437 new_data.write(operands["data"]) 

1438 new_data.write(b"EI") 

1439 else: 

1440 for op in operands: 

1441 op.write_to_stream(new_data) 

1442 new_data.write(b" ") 

1443 new_data.write(operator) 

1444 new_data.write(b"\n") 

1445 self._data = new_data.getvalue() 

1446 return self._data 

1447 

1448 # This overrides the parent method 

1449 def set_data(self, data: bytes) -> None: 

1450 super().set_data(data) 

1451 self._operations = [] 

1452 

1453 @property 

1454 def operations(self) -> list[tuple[Any, bytes]]: 

1455 if not self._operations and self._data: 

1456 self._parse_content_stream(BytesIO(self._data)) 

1457 self._data = b"" 

1458 return self._operations 

1459 

1460 @operations.setter 

1461 def operations(self, operations: list[tuple[Any, bytes]]) -> None: 

1462 self._operations = operations 

1463 self._data = b"" 

1464 

1465 def isolate_graphics_state(self) -> None: 

1466 if self._operations: 

1467 self._operations.insert(0, ([], b"q")) 

1468 self._operations.append(([], b"Q")) 

1469 elif self._data: 

1470 self._data = b"q\n" + self._data + b"\nQ\n" 

1471 

1472 # This overrides the parent method 

1473 def write_to_stream( 

1474 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1475 ) -> None: 

1476 if not self._data and self._operations: 

1477 self.get_data() # this ensures ._data is rebuilt 

1478 super().write_to_stream(stream, encryption_key) 

1479 

1480 

1481def read_object( 

1482 stream: StreamType, 

1483 pdf: Optional[PdfReaderProtocol], 

1484 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

1485) -> Union[PdfObject, int, str, ContentStream]: 

1486 tok = stream.read(1) 

1487 stream.seek(-1, 1) # reset to start 

1488 if tok == b"/": 

1489 return NameObject.read_from_stream(stream, pdf) 

1490 if tok == b"<": 

1491 # hexadecimal string OR dictionary 

1492 peek = stream.read(2) 

1493 stream.seek(-2, 1) # reset to start 

1494 if peek == b"<<": 

1495 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding) 

1496 return read_hex_string_from_stream(stream, forced_encoding) 

1497 if tok == b"[": 

1498 return ArrayObject.read_from_stream(stream, pdf, forced_encoding) 

1499 if tok in (b"t", b"f"): 

1500 return BooleanObject.read_from_stream(stream) 

1501 if tok == b"(": 

1502 return read_string_from_stream(stream, forced_encoding) 

1503 if tok == b"e" and stream.read(6) == b"endobj": 

1504 return NullObject() 

1505 if tok == b"n": 

1506 return NullObject.read_from_stream(stream) 

1507 if tok == b"%": 

1508 # comment 

1509 skip_over_comment(stream) 

1510 tok = read_non_whitespace(stream) 

1511 stream.seek(-1, 1) 

1512 return read_object(stream, pdf, forced_encoding) 

1513 if tok in b"0123456789+-.": 

1514 # number object OR indirect reference 

1515 peek = stream.read(20) 

1516 stream.seek(-len(peek), 1) # reset to start 

1517 if IndirectPattern.match(peek) is not None: 

1518 assert pdf is not None, "mypy" 

1519 return IndirectObject.read_from_stream(stream, pdf) 

1520 return NumberObject.read_from_stream(stream) 

1521 pos = stream.tell() 

1522 stream.seek(-20, 1) 

1523 stream_extract = stream.read(80) 

1524 stream.seek(pos) 

1525 read_until_whitespace(stream) 

1526 raise PdfReadError( 

1527 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}" 

1528 ) 

1529 

1530 

1531class Field(TreeObject): 

1532 """ 

1533 A class representing a field dictionary. 

1534 

1535 This class is accessed through 

1536 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1537 """ 

1538 

1539 def __init__(self, data: DictionaryObject) -> None: 

1540 DictionaryObject.__init__(self) 

1541 field_attributes = ( 

1542 FieldDictionaryAttributes.attributes() 

1543 + CheckboxRadioButtonAttributes.attributes() 

1544 ) 

1545 self.indirect_reference = data.indirect_reference 

1546 for attr in field_attributes: 

1547 try: 

1548 self[NameObject(attr)] = data[attr] 

1549 except KeyError: 

1550 pass 

1551 if isinstance(self.get("/V"), EncodedStreamObject): 

1552 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data() 

1553 if isinstance(d, bytes): 

1554 d_str = d.decode() 

1555 elif d is None: 

1556 d_str = "" 

1557 else: 

1558 raise Exception("Should never happen") 

1559 self[NameObject("/V")] = TextStringObject(d_str) 

1560 

1561 # TABLE 8.69 Entries common to all field dictionaries 

1562 @property 

1563 def field_type(self) -> Optional[NameObject]: 

1564 """Read-only property accessing the type of this field.""" 

1565 return self.get(FieldDictionaryAttributes.FT) 

1566 

1567 @property 

1568 def parent(self) -> Optional[DictionaryObject]: 

1569 """Read-only property accessing the parent of this field.""" 

1570 return self.get(FieldDictionaryAttributes.Parent) 

1571 

1572 @property 

1573 def kids(self) -> Optional["ArrayObject"]: 

1574 """Read-only property accessing the kids of this field.""" 

1575 return self.get(FieldDictionaryAttributes.Kids) 

1576 

1577 @property 

1578 def name(self) -> Optional[str]: 

1579 """Read-only property accessing the name of this field.""" 

1580 return self.get(FieldDictionaryAttributes.T) 

1581 

1582 @property 

1583 def alternate_name(self) -> Optional[str]: 

1584 """Read-only property accessing the alternate name of this field.""" 

1585 return self.get(FieldDictionaryAttributes.TU) 

1586 

1587 @property 

1588 def mapping_name(self) -> Optional[str]: 

1589 """ 

1590 Read-only property accessing the mapping name of this field. 

1591 

1592 This name is used by pypdf as a key in the dictionary returned by 

1593 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1594 """ 

1595 return self.get(FieldDictionaryAttributes.TM) 

1596 

1597 @property 

1598 def flags(self) -> Optional[int]: 

1599 """ 

1600 Read-only property accessing the field flags, specifying various 

1601 characteristics of the field (see Table 8.70 of the PDF 1.7 reference). 

1602 """ 

1603 return self.get(FieldDictionaryAttributes.Ff) 

1604 

1605 @property 

1606 def value(self) -> Optional[Any]: 

1607 """ 

1608 Read-only property accessing the value of this field. 

1609 

1610 Format varies based on field type. 

1611 """ 

1612 return self.get(FieldDictionaryAttributes.V) 

1613 

1614 @property 

1615 def default_value(self) -> Optional[Any]: 

1616 """Read-only property accessing the default value of this field.""" 

1617 return self.get(FieldDictionaryAttributes.DV) 

1618 

1619 @property 

1620 def additional_actions(self) -> Optional[DictionaryObject]: 

1621 """ 

1622 Read-only property accessing the additional actions dictionary. 

1623 

1624 This dictionary defines the field's behavior in response to trigger 

1625 events. See Section 8.5.2 of the PDF 1.7 reference. 

1626 """ 

1627 return self.get(FieldDictionaryAttributes.AA) 

1628 

1629 

1630class Destination(TreeObject): 

1631 """ 

1632 A class representing a destination within a PDF file. 

1633 

1634 See section 12.3.2 of the PDF 2.0 reference. 

1635 

1636 Args: 

1637 title: Title of this destination. 

1638 page: Reference to the page of this destination. Should 

1639 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`. 

1640 fit: How the destination is displayed. 

1641 

1642 Raises: 

1643 PdfReadError: If destination type is invalid. 

1644 

1645 """ 

1646 

1647 node: Optional[ 

1648 DictionaryObject 

1649 ] = None # node provide access to the original Object 

1650 

1651 def __init__( 

1652 self, 

1653 title: Union[str, bytes], 

1654 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject], 

1655 fit: Fit, 

1656 ) -> None: 

1657 self._filtered_children: list[Any] = [] # used in PdfWriter 

1658 

1659 typ = fit.fit_type 

1660 args = fit.fit_args 

1661 

1662 DictionaryObject.__init__(self) 

1663 self[NameObject("/Title")] = TextStringObject(title) 

1664 self[NameObject("/Page")] = page 

1665 self[NameObject("/Type")] = typ 

1666 

1667 # from table 8.2 of the PDF 1.7 reference. 

1668 if typ == "/XYZ": 

1669 if len(args) < 1: # left is missing : should never occur 

1670 args.append(NumberObject(0.0)) 

1671 if len(args) < 2: # top is missing 

1672 args.append(NumberObject(0.0)) 

1673 if len(args) < 3: # zoom is missing 

1674 args.append(NumberObject(0.0)) 

1675 ( 

1676 self[NameObject(TA.LEFT)], 

1677 self[NameObject(TA.TOP)], 

1678 self[NameObject("/Zoom")], 

1679 ) = args 

1680 elif len(args) == 0: 

1681 pass 

1682 elif typ == TF.FIT_R: 

1683 ( 

1684 self[NameObject(TA.LEFT)], 

1685 self[NameObject(TA.BOTTOM)], 

1686 self[NameObject(TA.RIGHT)], 

1687 self[NameObject(TA.TOP)], 

1688 ) = args 

1689 elif typ in [TF.FIT_H, TF.FIT_BH]: 

1690 try: # Prefer to be more robust not only to null parameters 

1691 (self[NameObject(TA.TOP)],) = args 

1692 except Exception: 

1693 (self[NameObject(TA.TOP)],) = (NullObject(),) 

1694 elif typ in [TF.FIT_V, TF.FIT_BV]: 

1695 try: # Prefer to be more robust not only to null parameters 

1696 (self[NameObject(TA.LEFT)],) = args 

1697 except Exception: 

1698 (self[NameObject(TA.LEFT)],) = (NullObject(),) 

1699 elif typ in [TF.FIT, TF.FIT_B]: 

1700 pass 

1701 else: 

1702 raise PdfReadError(f"Unknown Destination Type: {typ!r}") 

1703 

1704 @property 

1705 def dest_array(self) -> "ArrayObject": 

1706 return ArrayObject( 

1707 [self.raw_get("/Page"), self["/Type"]] 

1708 + [ 

1709 self[x] 

1710 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"] 

1711 if x in self 

1712 ] 

1713 ) 

1714 

1715 def write_to_stream( 

1716 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1717 ) -> None: 

1718 if encryption_key is not None: # deprecated 

1719 deprecation_no_replacement( 

1720 "the encryption_key parameter of write_to_stream", "5.0.0" 

1721 ) 

1722 stream.write(b"<<\n") 

1723 key = NameObject("/D") 

1724 key.write_to_stream(stream) 

1725 stream.write(b" ") 

1726 value = self.dest_array 

1727 value.write_to_stream(stream) 

1728 

1729 key = NameObject("/S") 

1730 key.write_to_stream(stream) 

1731 stream.write(b" ") 

1732 value_s = NameObject("/GoTo") 

1733 value_s.write_to_stream(stream) 

1734 

1735 stream.write(b"\n") 

1736 stream.write(b">>") 

1737 

1738 @property 

1739 def title(self) -> Optional[str]: 

1740 """Read-only property accessing the destination title.""" 

1741 return self.get("/Title") 

1742 

1743 @property 

1744 def page(self) -> Optional[IndirectObject]: 

1745 """Read-only property accessing the IndirectObject of the destination page.""" 

1746 return self.get("/Page") 

1747 

1748 @property 

1749 def typ(self) -> Optional[str]: 

1750 """Read-only property accessing the destination type.""" 

1751 return self.get("/Type") 

1752 

1753 @property 

1754 def zoom(self) -> Optional[int]: 

1755 """Read-only property accessing the zoom factor.""" 

1756 return self.get("/Zoom", None) 

1757 

1758 @property 

1759 def left(self) -> Optional[FloatObject]: 

1760 """Read-only property accessing the left horizontal coordinate.""" 

1761 return self.get("/Left", None) 

1762 

1763 @property 

1764 def right(self) -> Optional[FloatObject]: 

1765 """Read-only property accessing the right horizontal coordinate.""" 

1766 return self.get("/Right", None) 

1767 

1768 @property 

1769 def top(self) -> Optional[FloatObject]: 

1770 """Read-only property accessing the top vertical coordinate.""" 

1771 return self.get("/Top", None) 

1772 

1773 @property 

1774 def bottom(self) -> Optional[FloatObject]: 

1775 """Read-only property accessing the bottom vertical coordinate.""" 

1776 return self.get("/Bottom", None) 

1777 

1778 @property 

1779 def color(self) -> Optional["ArrayObject"]: 

1780 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0.""" 

1781 return cast( 

1782 "ArrayObject", 

1783 self.get("/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)])), 

1784 ) 

1785 

1786 @property 

1787 def font_format(self) -> Optional[OutlineFontFlag]: 

1788 """ 

1789 Read-only property accessing the font type. 

1790 

1791 1=italic, 2=bold, 3=both 

1792 """ 

1793 return OutlineFontFlag(self.get("/F", 0)) 

1794 

1795 @property 

1796 def outline_count(self) -> Optional[int]: 

1797 """ 

1798 Read-only property accessing the outline count. 

1799 

1800 positive = expanded 

1801 negative = collapsed 

1802 absolute value = number of visible descendants at all levels 

1803 """ 

1804 return self.get("/Count", None)