Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/generic/_data_structures.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

961 statements  

1# Copyright (c) 2006, Mathieu Fenniak 

2# All rights reserved. 

3# 

4# Redistribution and use in source and binary forms, with or without 

5# modification, are permitted provided that the following conditions are 

6# met: 

7# 

8# * Redistributions of source code must retain the above copyright notice, 

9# this list of conditions and the following disclaimer. 

10# * Redistributions in binary form must reproduce the above copyright notice, 

11# this list of conditions and the following disclaimer in the documentation 

12# and/or other materials provided with the distribution. 

13# * The name of the author may not be used to endorse or promote products 

14# derived from this software without specific prior written permission. 

15# 

16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 

19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 

20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 

22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 

23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 

24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 

25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 

26# POSSIBILITY OF SUCH DAMAGE. 

27 

28 

29__author__ = "Mathieu Fenniak" 

30__author_email__ = "biziqe@mathieu.fenniak.net" 

31 

32import logging 

33import re 

34import sys 

35from collections.abc import Iterable, Sequence 

36from io import BytesIO 

37from math import ceil 

38from typing import ( 

39 Any, 

40 Callable, 

41 Optional, 

42 Union, 

43 cast, 

44) 

45 

46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol 

47from .._utils import ( 

48 WHITESPACES, 

49 StreamType, 

50 deprecation_no_replacement, 

51 logger_warning, 

52 read_non_whitespace, 

53 read_until_regex, 

54 read_until_whitespace, 

55 skip_over_comment, 

56) 

57from ..constants import ( 

58 CheckboxRadioButtonAttributes, 

59 FieldDictionaryAttributes, 

60 OutlineFontFlag, 

61) 

62from ..constants import FilterTypes as FT 

63from ..constants import StreamAttributes as SA 

64from ..constants import TypArguments as TA 

65from ..constants import TypFitArguments as TF 

66from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError 

67from ._base import ( 

68 BooleanObject, 

69 ByteStringObject, 

70 FloatObject, 

71 IndirectObject, 

72 NameObject, 

73 NullObject, 

74 NumberObject, 

75 PdfObject, 

76 TextStringObject, 

77 is_null_or_none, 

78) 

79from ._fit import Fit 

80from ._image_inline import ( 

81 extract_inline__ascii85_decode, 

82 extract_inline__ascii_hex_decode, 

83 extract_inline__dct_decode, 

84 extract_inline__run_length_decode, 

85 extract_inline_default, 

86) 

87from ._utils import read_hex_string_from_stream, read_string_from_stream 

88 

89if sys.version_info >= (3, 11): 

90 from typing import Self 

91else: 

92 from typing_extensions import Self 

93 

94logger = logging.getLogger(__name__) 

95 

96IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]") 

97 

98 

99class ArrayObject(list[Any], PdfObject): 

100 def replicate( 

101 self, 

102 pdf_dest: PdfWriterProtocol, 

103 ) -> "ArrayObject": 

104 arr = cast( 

105 "ArrayObject", 

106 self._reference_clone(ArrayObject(), pdf_dest, False), 

107 ) 

108 for data in self: 

109 if hasattr(data, "replicate"): 

110 arr.append(data.replicate(pdf_dest)) 

111 else: 

112 arr.append(data) 

113 return arr 

114 

115 def clone( 

116 self, 

117 pdf_dest: PdfWriterProtocol, 

118 force_duplicate: bool = False, 

119 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

120 ) -> "ArrayObject": 

121 """Clone object into pdf_dest.""" 

122 try: 

123 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

124 return self 

125 except Exception: 

126 pass 

127 arr = cast( 

128 "ArrayObject", 

129 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate=True), 

130 ) 

131 for data in self: 

132 if isinstance(data, StreamObject): 

133 dup = data._reference_clone( 

134 data.clone(pdf_dest, force_duplicate, ignore_fields), 

135 pdf_dest, 

136 force_duplicate, 

137 ) 

138 arr.append(dup.indirect_reference) 

139 elif hasattr(data, "clone"): 

140 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields)) 

141 else: 

142 arr.append(data) 

143 return arr 

144 

145 def hash_bin(self) -> int: 

146 """ 

147 Used to detect modified object. 

148 

149 Returns: 

150 Hash considering type and value. 

151 

152 """ 

153 return hash((self.__class__, tuple(x.hash_bin() for x in self))) 

154 

155 def items(self) -> Iterable[Any]: 

156 """Emulate DictionaryObject.items for a list (index, object).""" 

157 return enumerate(self) 

158 

159 def _to_lst(self, lst: Any) -> list[Any]: 

160 # Convert to list, internal 

161 if isinstance(lst, (list, tuple, set)): 

162 pass 

163 elif isinstance(lst, PdfObject): 

164 lst = [lst] 

165 elif isinstance(lst, str): 

166 if lst[0] == "/": 

167 lst = [NameObject(lst)] 

168 else: 

169 lst = [TextStringObject(lst)] 

170 elif isinstance(lst, bytes): 

171 lst = [ByteStringObject(lst)] 

172 else: # for numbers,... 

173 lst = [lst] 

174 return lst 

175 

176 def __add__(self, lst: Any) -> "ArrayObject": 

177 """ 

178 Allow extension by adding list or add one element only 

179 

180 Args: 

181 lst: any list, tuples are extended the list. 

182 other types(numbers,...) will be appended. 

183 if str is passed it will be converted into TextStringObject 

184 or NameObject (if starting with "/") 

185 if bytes is passed it will be converted into ByteStringObject 

186 

187 Returns: 

188 ArrayObject with all elements 

189 

190 """ 

191 temp = ArrayObject(self) 

192 temp.extend(self._to_lst(lst)) 

193 return temp 

194 

195 def __iadd__(self, lst: Any) -> Self: 

196 """ 

197 Allow extension by adding list or add one element only 

198 

199 Args: 

200 lst: any list, tuples are extended the list. 

201 other types(numbers,...) will be appended. 

202 if str is passed it will be converted into TextStringObject 

203 or NameObject (if starting with "/") 

204 if bytes is passed it will be converted into ByteStringObject 

205 

206 """ 

207 self.extend(self._to_lst(lst)) 

208 return self 

209 

210 def __isub__(self, lst: Any) -> Self: 

211 """Allow to remove items""" 

212 for x in self._to_lst(lst): 

213 try: 

214 index = self.index(x) 

215 del self[index] 

216 except ValueError: 

217 pass 

218 return self 

219 

220 def write_to_stream( 

221 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

222 ) -> None: 

223 if encryption_key is not None: # deprecated 

224 deprecation_no_replacement( 

225 "the encryption_key parameter of write_to_stream", "5.0.0" 

226 ) 

227 stream.write(b"[") 

228 for data in self: 

229 stream.write(b" ") 

230 data.write_to_stream(stream) 

231 stream.write(b" ]") 

232 

233 @staticmethod 

234 def read_from_stream( 

235 stream: StreamType, 

236 pdf: Optional[PdfReaderProtocol], 

237 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

238 ) -> "ArrayObject": 

239 arr = ArrayObject() 

240 tmp = stream.read(1) 

241 if tmp != b"[": 

242 raise PdfReadError("Could not read array") 

243 while True: 

244 # skip leading whitespace 

245 tok = stream.read(1) 

246 while tok.isspace(): 

247 tok = stream.read(1) 

248 if tok == b"": 

249 break 

250 if tok == b"%": 

251 stream.seek(-1, 1) 

252 skip_over_comment(stream) 

253 continue 

254 stream.seek(-1, 1) 

255 # check for array ending 

256 peek_ahead = stream.read(1) 

257 if peek_ahead == b"]": 

258 break 

259 stream.seek(-1, 1) 

260 # read and append object 

261 arr.append(read_object(stream, pdf, forced_encoding)) 

262 return arr 

263 

264 

265class DictionaryObject(dict[Any, Any], PdfObject): 

266 def replicate( 

267 self, 

268 pdf_dest: PdfWriterProtocol, 

269 ) -> "DictionaryObject": 

270 d__ = cast( 

271 "DictionaryObject", 

272 self._reference_clone(self.__class__(), pdf_dest, False), 

273 ) 

274 for k, v in self.items(): 

275 d__[k.replicate(pdf_dest)] = ( 

276 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

277 ) 

278 return d__ 

279 

280 def clone( 

281 self, 

282 pdf_dest: PdfWriterProtocol, 

283 force_duplicate: bool = False, 

284 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

285 ) -> "DictionaryObject": 

286 """Clone object into pdf_dest.""" 

287 try: 

288 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

289 return self 

290 except Exception: 

291 pass 

292 

293 visited: set[tuple[int, int]] = set() # (idnum, generation) 

294 d__ = cast( 

295 "DictionaryObject", 

296 self._reference_clone(self.__class__(), pdf_dest, force_duplicate), 

297 ) 

298 if ignore_fields is None: 

299 ignore_fields = [] 

300 if len(d__.keys()) == 0: 

301 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

302 return d__ 

303 

304 def _clone( 

305 self, 

306 src: "DictionaryObject", 

307 pdf_dest: PdfWriterProtocol, 

308 force_duplicate: bool, 

309 ignore_fields: Optional[Sequence[Union[str, int]]], 

310 visited: set[tuple[int, int]], # (idnum, generation) 

311 ) -> None: 

312 """ 

313 Update the object from src. 

314 

315 Args: 

316 src: "DictionaryObject": 

317 pdf_dest: 

318 force_duplicate: 

319 ignore_fields: 

320 

321 """ 

322 # First we remove the ignore_fields 

323 # that are for a limited number of levels 

324 assert ignore_fields is not None 

325 ignore_fields = list(ignore_fields) 

326 x = 0 

327 while x < len(ignore_fields): 

328 if isinstance(ignore_fields[x], int): 

329 if cast(int, ignore_fields[x]) <= 0: 

330 del ignore_fields[x] 

331 del ignore_fields[x] 

332 continue 

333 ignore_fields[x] -= 1 # type:ignore 

334 x += 1 

335 # Check if this is a chain list, we need to loop to prevent recur 

336 if any( 

337 field not in ignore_fields 

338 and field in src 

339 and isinstance(src.raw_get(field), IndirectObject) 

340 and isinstance(src[field], DictionaryObject) 

341 and ( 

342 src.get("/Type", None) is None 

343 or cast(DictionaryObject, src[field]).get("/Type", None) is None 

344 or src.get("/Type", None) 

345 == cast(DictionaryObject, src[field]).get("/Type", None) 

346 ) 

347 for field in ["/Next", "/Prev", "/N", "/V"] 

348 ): 

349 ignore_fields = list(ignore_fields) 

350 for lst in (("/Next", "/Prev"), ("/N", "/V")): 

351 for k in lst: 

352 objs = [] 

353 if ( 

354 k in src 

355 and k not in self 

356 and isinstance(src.raw_get(k), IndirectObject) 

357 and isinstance(src[k], DictionaryObject) 

358 # If need to go further the idea is to check 

359 # that the types are the same 

360 and ( 

361 src.get("/Type", None) is None 

362 or cast(DictionaryObject, src[k]).get("/Type", None) is None 

363 or src.get("/Type", None) 

364 == cast(DictionaryObject, src[k]).get("/Type", None) 

365 ) 

366 ): 

367 cur_obj: Optional[DictionaryObject] = cast( 

368 "DictionaryObject", src[k] 

369 ) 

370 prev_obj: Optional[DictionaryObject] = self 

371 while cur_obj is not None: 

372 clon = cast( 

373 "DictionaryObject", 

374 cur_obj._reference_clone( 

375 cur_obj.__class__(), pdf_dest, force_duplicate 

376 ), 

377 ) 

378 # Check to see if we've previously processed our item 

379 if clon.indirect_reference is not None: 

380 idnum = clon.indirect_reference.idnum 

381 generation = clon.indirect_reference.generation 

382 if (idnum, generation) in visited: 

383 cur_obj = None 

384 break 

385 visited.add((idnum, generation)) 

386 objs.append((cur_obj, clon)) 

387 assert prev_obj is not None 

388 prev_obj[NameObject(k)] = clon.indirect_reference 

389 prev_obj = clon 

390 try: 

391 if cur_obj == src: 

392 cur_obj = None 

393 else: 

394 cur_obj = cast("DictionaryObject", cur_obj[k]) 

395 except Exception: 

396 cur_obj = None 

397 for s, c in objs: 

398 c._clone( 

399 s, pdf_dest, force_duplicate, ignore_fields, visited 

400 ) 

401 

402 for k, v in src.items(): 

403 if k not in ignore_fields: 

404 if isinstance(v, StreamObject): 

405 if not hasattr(v, "indirect_reference"): 

406 v.indirect_reference = None 

407 vv = v.clone(pdf_dest, force_duplicate, ignore_fields) 

408 assert vv.indirect_reference is not None 

409 self[k.clone(pdf_dest)] = vv.indirect_reference 

410 elif k not in self: 

411 self[NameObject(k)] = ( 

412 v.clone(pdf_dest, force_duplicate, ignore_fields) 

413 if hasattr(v, "clone") 

414 else v 

415 ) 

416 

417 def hash_bin(self) -> int: 

418 """ 

419 Used to detect modified object. 

420 

421 Returns: 

422 Hash considering type and value. 

423 

424 """ 

425 return hash( 

426 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items()))) 

427 ) 

428 

429 def raw_get(self, key: Any) -> Any: 

430 return dict.__getitem__(self, key) 

431 

432 def get_inherited(self, key: str, default: Any = None) -> Any: 

433 """ 

434 Returns the value of a key or from the parent if not found. 

435 If not found returns default. 

436 

437 Args: 

438 key: string identifying the field to return 

439 

440 default: default value to return 

441 

442 Returns: 

443 Current key or inherited one, otherwise default value. 

444 

445 """ 

446 if key in self: 

447 return self[key] 

448 try: 

449 if "/Parent" not in self: 

450 return default 

451 raise KeyError("Not present") 

452 except KeyError: 

453 return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited( 

454 key, default 

455 ) 

456 

457 def __setitem__(self, key: Any, value: Any) -> Any: 

458 if not isinstance(key, PdfObject): 

459 raise ValueError("Key must be a PdfObject") 

460 if not isinstance(value, PdfObject): 

461 raise ValueError("Value must be a PdfObject") 

462 return dict.__setitem__(self, key, value) 

463 

464 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any: 

465 if not isinstance(key, PdfObject): 

466 raise ValueError("Key must be a PdfObject") 

467 if not isinstance(value, PdfObject): 

468 raise ValueError("Value must be a PdfObject") 

469 return dict.setdefault(self, key, value) 

470 

471 def __getitem__(self, key: Any) -> PdfObject: 

472 return dict.__getitem__(self, key).get_object() 

473 

474 @property 

475 def xmp_metadata(self) -> Optional[XmpInformationProtocol]: 

476 """ 

477 Retrieve XMP (Extensible Metadata Platform) data relevant to this 

478 object, if available. 

479 

480 See Table 347 — Additional entries in a metadata stream dictionary. 

481 

482 Returns: 

483 Returns a :class:`~pypdf.xmp.XmpInformation` instance 

484 that can be used to access XMP metadata from the document. Can also 

485 return None if no metadata was found on the document root. 

486 

487 """ 

488 from ..xmp import XmpInformation # noqa: PLC0415 

489 

490 metadata = self.get("/Metadata", None) 

491 if is_null_or_none(metadata): 

492 return None 

493 assert metadata is not None, "mypy" 

494 metadata = metadata.get_object() 

495 return XmpInformation(metadata) 

496 

497 def write_to_stream( 

498 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

499 ) -> None: 

500 if encryption_key is not None: # deprecated 

501 deprecation_no_replacement( 

502 "the encryption_key parameter of write_to_stream", "5.0.0" 

503 ) 

504 stream.write(b"<<\n") 

505 for key, value in self.items(): 

506 if len(key) > 2 and key[1] == "%" and key[-1] == "%": 

507 continue 

508 key.write_to_stream(stream, encryption_key) 

509 stream.write(b" ") 

510 value.write_to_stream(stream) 

511 stream.write(b"\n") 

512 stream.write(b">>") 

513 

514 @classmethod 

515 def _get_next_object_position( 

516 cls, position_before: int, position_end: int, generations: list[int], pdf: PdfReaderProtocol 

517 ) -> int: 

518 out = position_end 

519 for generation in generations: 

520 location = pdf.xref[generation] 

521 values = [x for x in location.values() if position_before < x <= position_end] 

522 if values: 

523 out = min(out, *values) 

524 return out 

525 

526 @classmethod 

527 def _read_unsized_from_stream( 

528 cls, stream: StreamType, pdf: PdfReaderProtocol 

529 ) -> bytes: 

530 object_position = cls._get_next_object_position( 

531 position_before=stream.tell(), position_end=2 ** 32, generations=list(pdf.xref), pdf=pdf 

532 ) - 1 

533 current_position = stream.tell() 

534 # Read until the next object position. 

535 read_value = stream.read(object_position - stream.tell()) 

536 endstream_position = read_value.find(b"endstream") 

537 if endstream_position < 0: 

538 raise PdfReadError( 

539 f"Unable to find 'endstream' marker for obj starting at {current_position}." 

540 ) 

541 # 9 = len(b"endstream") 

542 stream.seek(current_position + endstream_position + 9) 

543 return read_value[: endstream_position - 1] 

544 

545 @staticmethod 

546 def read_from_stream( 

547 stream: StreamType, 

548 pdf: Optional[PdfReaderProtocol], 

549 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

550 ) -> "DictionaryObject": 

551 tmp = stream.read(2) 

552 if tmp != b"<<": 

553 raise PdfReadError( 

554 f"Dictionary read error at byte {hex(stream.tell())}: " 

555 "stream must begin with '<<'" 

556 ) 

557 data: dict[Any, Any] = {} 

558 while True: 

559 tok = read_non_whitespace(stream) 

560 if tok == b"\x00": 

561 continue 

562 if tok == b"%": 

563 stream.seek(-1, 1) 

564 skip_over_comment(stream) 

565 continue 

566 if not tok: 

567 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) 

568 

569 if tok == b">": 

570 stream.read(1) 

571 break 

572 stream.seek(-1, 1) 

573 try: 

574 try: 

575 key = read_object(stream, pdf) 

576 if isinstance(key, NullObject): 

577 break 

578 if not isinstance(key, NameObject): 

579 raise PdfReadError( 

580 f"Expecting a NameObject for key but found {key!r}" 

581 ) 

582 except PdfReadError as exc: 

583 if pdf is not None and pdf.strict: 

584 raise 

585 logger_warning(exc.__repr__(), __name__) 

586 continue 

587 tok = read_non_whitespace(stream) 

588 stream.seek(-1, 1) 

589 value = read_object(stream, pdf, forced_encoding) 

590 except Exception as exc: 

591 if pdf is not None and pdf.strict: 

592 raise PdfReadError(exc.__repr__()) 

593 logger_warning(exc.__repr__(), __name__) 

594 retval = DictionaryObject() 

595 retval.update(data) 

596 return retval # return partial data 

597 

598 if not data.get(key): 

599 data[key] = value 

600 else: 

601 # multiple definitions of key not permitted 

602 msg = ( 

603 f"Multiple definitions in dictionary at byte " 

604 f"{hex(stream.tell())} for key {key}" 

605 ) 

606 if pdf is not None and pdf.strict: 

607 raise PdfReadError(msg) 

608 logger_warning(msg, __name__) 

609 

610 pos = stream.tell() 

611 s = read_non_whitespace(stream) 

612 if s == b"s" and stream.read(5) == b"tream": 

613 eol = stream.read(1) 

614 # Occasional PDF file output has spaces after 'stream' keyword but before EOL. 

615 # patch provided by Danial Sandler 

616 while eol == b" ": 

617 eol = stream.read(1) 

618 if eol not in (b"\n", b"\r"): 

619 raise PdfStreamError("Stream data must be followed by a newline") 

620 if eol == b"\r" and stream.read(1) != b"\n": 

621 stream.seek(-1, 1) 

622 # this is a stream object, not a dictionary 

623 if SA.LENGTH not in data: 

624 if pdf is not None and pdf.strict: 

625 raise PdfStreamError("Stream length not defined") 

626 logger_warning( 

627 f"Stream length not defined @pos={stream.tell()}", __name__ 

628 ) 

629 data[NameObject(SA.LENGTH)] = NumberObject(-1) 

630 length = data[SA.LENGTH] 

631 if isinstance(length, IndirectObject): 

632 t = stream.tell() 

633 assert pdf is not None, "mypy" 

634 length = pdf.get_object(length) 

635 stream.seek(t, 0) 

636 if length is None: # if the PDF is damaged 

637 length = -1 

638 pstart = stream.tell() 

639 if length >= 0: 

640 data["__streamdata__"] = stream.read(length) 

641 else: 

642 data["__streamdata__"] = read_until_regex( 

643 stream, re.compile(b"endstream") 

644 ) 

645 e = read_non_whitespace(stream) 

646 ndstream = stream.read(8) 

647 if (e + ndstream) != b"endstream": 

648 # the odd PDF file has a length that is too long, so 

649 # we need to read backwards to find the "endstream" ending. 

650 # ReportLab (unknown version) generates files with this bug, 

651 # and Python users into PDF files tend to be our audience. 

652 # we need to do this to correct the streamdata and chop off 

653 # an extra character. 

654 pos = stream.tell() 

655 stream.seek(-10, 1) 

656 end = stream.read(9) 

657 if end == b"endstream": 

658 # we found it by looking back one character further. 

659 data["__streamdata__"] = data["__streamdata__"][:-1] 

660 elif pdf is not None and not pdf.strict: 

661 stream.seek(pstart, 0) 

662 data["__streamdata__"] = DictionaryObject._read_unsized_from_stream(stream, pdf) 

663 pos = stream.tell() 

664 else: 

665 stream.seek(pos, 0) 

666 raise PdfReadError( 

667 "Unable to find 'endstream' marker after stream at byte " 

668 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')." 

669 ) 

670 else: 

671 stream.seek(pos, 0) 

672 if "__streamdata__" in data: 

673 return StreamObject.initialize_from_dictionary(data) 

674 retval = DictionaryObject() 

675 retval.update(data) 

676 return retval 

677 

678 

679class TreeObject(DictionaryObject): 

680 def __init__(self, dct: Optional[DictionaryObject] = None) -> None: 

681 DictionaryObject.__init__(self) 

682 if dct: 

683 self.update(dct) 

684 

685 def has_children(self) -> bool: 

686 return "/First" in self 

687 

688 def __iter__(self) -> Any: 

689 return self.children() 

690 

691 def children(self) -> Iterable[Any]: 

692 if not self.has_children(): 

693 return 

694 

695 child_ref = self[NameObject("/First")] 

696 last = self[NameObject("/Last")] 

697 child = child_ref.get_object() 

698 visited: set[int] = set() 

699 while True: 

700 child_id = id(child) 

701 if child_id in visited: 

702 logger_warning(f"Detected cycle in outline structure for {child}", __name__) 

703 return 

704 visited.add(child_id) 

705 

706 yield child 

707 

708 if child == last: 

709 return 

710 child_ref = child.get(NameObject("/Next")) # type: ignore 

711 if is_null_or_none(child_ref): 

712 return 

713 child = child_ref.get_object() 

714 

715 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None: 

716 self.insert_child(child, None, pdf) 

717 

718 def inc_parent_counter_default( 

719 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

720 ) -> None: 

721 if is_null_or_none(parent): 

722 return 

723 assert parent is not None, "mypy" 

724 parent = cast("TreeObject", parent.get_object()) 

725 if "/Count" in parent: 

726 parent[NameObject("/Count")] = NumberObject( 

727 max(0, cast(int, parent[NameObject("/Count")]) + n) 

728 ) 

729 self.inc_parent_counter_default(parent.get("/Parent", None), n) 

730 

731 def inc_parent_counter_outline( 

732 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

733 ) -> None: 

734 if is_null_or_none(parent): 

735 return 

736 assert parent is not None, "mypy" 

737 parent = cast("TreeObject", parent.get_object()) 

738 # BooleanObject requires comparison with == not is 

739 opn = parent.get("/%is_open%", True) == True # noqa: E712 

740 c = cast(int, parent.get("/Count", 0)) 

741 if c < 0: 

742 c = abs(c) 

743 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1)) 

744 if not opn: 

745 return 

746 self.inc_parent_counter_outline(parent.get("/Parent", None), n) 

747 

748 def insert_child( 

749 self, 

750 child: Any, 

751 before: Any, 

752 pdf: PdfWriterProtocol, 

753 inc_parent_counter: Optional[Callable[..., Any]] = None, 

754 ) -> IndirectObject: 

755 if inc_parent_counter is None: 

756 inc_parent_counter = self.inc_parent_counter_default 

757 child_obj = child.get_object() 

758 child = child.indirect_reference # get_reference(child_obj) 

759 

760 prev: Optional[DictionaryObject] 

761 if "/First" not in self: # no child yet 

762 self[NameObject("/First")] = child 

763 self[NameObject("/Count")] = NumberObject(0) 

764 self[NameObject("/Last")] = child 

765 child_obj[NameObject("/Parent")] = self.indirect_reference 

766 inc_parent_counter(self, child_obj.get("/Count", 1)) 

767 if "/Next" in child_obj: 

768 del child_obj["/Next"] 

769 if "/Prev" in child_obj: 

770 del child_obj["/Prev"] 

771 return child 

772 prev = cast("DictionaryObject", self["/Last"]) 

773 

774 while prev.indirect_reference != before: 

775 if "/Next" in prev: 

776 prev = cast("TreeObject", prev["/Next"]) 

777 else: # append at the end 

778 prev[NameObject("/Next")] = cast("TreeObject", child) 

779 child_obj[NameObject("/Prev")] = prev.indirect_reference 

780 child_obj[NameObject("/Parent")] = self.indirect_reference 

781 if "/Next" in child_obj: 

782 del child_obj["/Next"] 

783 self[NameObject("/Last")] = child 

784 inc_parent_counter(self, child_obj.get("/Count", 1)) 

785 return child 

786 try: # insert as first or in the middle 

787 assert isinstance(prev["/Prev"], DictionaryObject) 

788 prev["/Prev"][NameObject("/Next")] = child 

789 child_obj[NameObject("/Prev")] = prev["/Prev"] 

790 except Exception: # it means we are inserting in first position 

791 del child_obj["/Next"] 

792 child_obj[NameObject("/Next")] = prev 

793 prev[NameObject("/Prev")] = child 

794 child_obj[NameObject("/Parent")] = self.indirect_reference 

795 inc_parent_counter(self, child_obj.get("/Count", 1)) 

796 return child 

797 

798 def _remove_node_from_tree( 

799 self, prev: Any, prev_ref: Any, cur: Any, last: Any 

800 ) -> None: 

801 """ 

802 Adjust the pointers of the linked list and tree node count. 

803 

804 Args: 

805 prev: 

806 prev_ref: 

807 cur: 

808 last: 

809 

810 """ 

811 next_ref = cur.get(NameObject("/Next"), None) 

812 if prev is None: 

813 if next_ref: 

814 # Removing first tree node 

815 next_obj = next_ref.get_object() 

816 del next_obj[NameObject("/Prev")] 

817 self[NameObject("/First")] = next_ref 

818 self[NameObject("/Count")] = NumberObject( 

819 self[NameObject("/Count")] - 1 # type: ignore 

820 ) 

821 

822 else: 

823 # Removing only tree node 

824 self[NameObject("/Count")] = NumberObject(0) 

825 del self[NameObject("/First")] 

826 if NameObject("/Last") in self: 

827 del self[NameObject("/Last")] 

828 else: 

829 if next_ref: 

830 # Removing middle tree node 

831 next_obj = next_ref.get_object() 

832 next_obj[NameObject("/Prev")] = prev_ref 

833 prev[NameObject("/Next")] = next_ref 

834 else: 

835 # Removing last tree node 

836 assert cur == last 

837 del prev[NameObject("/Next")] 

838 self[NameObject("/Last")] = prev_ref 

839 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore 

840 

841 def remove_child(self, child: Any) -> None: 

842 child_obj = child.get_object() 

843 child = child_obj.indirect_reference 

844 

845 if NameObject("/Parent") not in child_obj: 

846 raise ValueError("Removed child does not appear to be a tree item") 

847 if child_obj[NameObject("/Parent")] != self: 

848 raise ValueError("Removed child is not a member of this tree") 

849 

850 found = False 

851 prev_ref = None 

852 prev = None 

853 cur_ref: Optional[Any] = self[NameObject("/First")] 

854 cur: Optional[dict[str, Any]] = cur_ref.get_object() # type: ignore 

855 last_ref = self[NameObject("/Last")] 

856 last = last_ref.get_object() 

857 while cur is not None: 

858 if cur == child_obj: 

859 self._remove_node_from_tree(prev, prev_ref, cur, last) 

860 found = True 

861 break 

862 

863 # Go to the next node 

864 prev_ref = cur_ref 

865 prev = cur 

866 if NameObject("/Next") in cur: 

867 cur_ref = cur[NameObject("/Next")] 

868 cur = cur_ref.get_object() 

869 else: 

870 cur_ref = None 

871 cur = None 

872 

873 if not found: 

874 raise ValueError("Removal couldn't find item in tree") 

875 

876 _reset_node_tree_relationship(child_obj) 

877 

878 def remove_from_tree(self) -> None: 

879 """Remove the object from the tree it is in.""" 

880 if NameObject("/Parent") not in self: 

881 raise ValueError("Removed child does not appear to be a tree item") 

882 cast("TreeObject", self["/Parent"]).remove_child(self) 

883 

884 def empty_tree(self) -> None: 

885 for child in self: 

886 child_obj = child.get_object() 

887 _reset_node_tree_relationship(child_obj) 

888 

889 if NameObject("/Count") in self: 

890 del self[NameObject("/Count")] 

891 if NameObject("/First") in self: 

892 del self[NameObject("/First")] 

893 if NameObject("/Last") in self: 

894 del self[NameObject("/Last")] 

895 

896 

897def _reset_node_tree_relationship(child_obj: Any) -> None: 

898 """ 

899 Call this after a node has been removed from a tree. 

900 

901 This resets the nodes attributes in respect to that tree. 

902 

903 Args: 

904 child_obj: 

905 

906 """ 

907 del child_obj[NameObject("/Parent")] 

908 if NameObject("/Next") in child_obj: 

909 del child_obj[NameObject("/Next")] 

910 if NameObject("/Prev") in child_obj: 

911 del child_obj[NameObject("/Prev")] 

912 

913 

914class StreamObject(DictionaryObject): 

915 def __init__(self) -> None: 

916 self._data: bytes = b"" 

917 self.decoded_self: Optional[DecodedStreamObject] = None 

918 

919 def replicate( 

920 self, 

921 pdf_dest: PdfWriterProtocol, 

922 ) -> "StreamObject": 

923 d__ = cast( 

924 "StreamObject", 

925 self._reference_clone(self.__class__(), pdf_dest, False), 

926 ) 

927 d__._data = self._data 

928 try: 

929 decoded_self = self.decoded_self 

930 if decoded_self is None: 

931 self.decoded_self = None 

932 else: 

933 self.decoded_self = cast( 

934 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

935 ) 

936 except Exception: 

937 pass 

938 for k, v in self.items(): 

939 d__[k.replicate(pdf_dest)] = ( 

940 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

941 ) 

942 return d__ 

943 

944 def _clone( 

945 self, 

946 src: DictionaryObject, 

947 pdf_dest: PdfWriterProtocol, 

948 force_duplicate: bool, 

949 ignore_fields: Optional[Sequence[Union[str, int]]], 

950 visited: set[tuple[int, int]], 

951 ) -> None: 

952 """ 

953 Update the object from src. 

954 

955 Args: 

956 src: 

957 pdf_dest: 

958 force_duplicate: 

959 ignore_fields: 

960 

961 """ 

962 self._data = cast("StreamObject", src)._data 

963 try: 

964 decoded_self = cast("StreamObject", src).decoded_self 

965 if decoded_self is None: 

966 self.decoded_self = None 

967 else: 

968 self.decoded_self = cast( 

969 "DecodedStreamObject", 

970 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields), 

971 ) 

972 except Exception: 

973 pass 

974 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

975 

976 def hash_bin(self) -> int: 

977 """ 

978 Used to detect modified object. 

979 

980 Returns: 

981 Hash considering type and value. 

982 

983 """ 

984 # Use _data to prevent errors on non-decoded streams. 

985 return hash((super().hash_bin(), self._data)) 

986 

987 def get_data(self) -> bytes: 

988 return self._data 

989 

990 def set_data(self, data: bytes) -> None: 

991 self._data = data 

992 

993 def hash_value_data(self) -> bytes: 

994 data = super().hash_value_data() 

995 data += self.get_data() 

996 return data 

997 

998 def write_to_stream( 

999 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1000 ) -> None: 

1001 if encryption_key is not None: # deprecated 

1002 deprecation_no_replacement( 

1003 "the encryption_key parameter of write_to_stream", "5.0.0" 

1004 ) 

1005 self[NameObject(SA.LENGTH)] = NumberObject(len(self._data)) 

1006 DictionaryObject.write_to_stream(self, stream) 

1007 del self[SA.LENGTH] 

1008 stream.write(b"\nstream\n") 

1009 stream.write(self._data) 

1010 stream.write(b"\nendstream") 

1011 

1012 @staticmethod 

1013 def initialize_from_dictionary( 

1014 data: dict[str, Any] 

1015 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]: 

1016 retval: Union[EncodedStreamObject, DecodedStreamObject] 

1017 if SA.FILTER in data: 

1018 retval = EncodedStreamObject() 

1019 else: 

1020 retval = DecodedStreamObject() 

1021 retval._data = data["__streamdata__"] 

1022 del data["__streamdata__"] 

1023 if SA.LENGTH in data: 

1024 del data[SA.LENGTH] 

1025 retval.update(data) 

1026 return retval 

1027 

1028 def flate_encode(self, level: int = -1) -> "EncodedStreamObject": 

1029 from ..filters import FlateDecode # noqa: PLC0415 

1030 

1031 if SA.FILTER in self: 

1032 f = self[SA.FILTER] 

1033 if isinstance(f, ArrayObject): 

1034 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f]) 

1035 try: 

1036 params = ArrayObject( 

1037 [NullObject(), *self.get(SA.DECODE_PARMS, ArrayObject())] 

1038 ) 

1039 except TypeError: 

1040 # case of error where the * operator is not working (not an array 

1041 params = ArrayObject( 

1042 [NullObject(), self.get(SA.DECODE_PARMS, ArrayObject())] 

1043 ) 

1044 else: 

1045 f = ArrayObject([NameObject(FT.FLATE_DECODE), f]) 

1046 params = ArrayObject( 

1047 [NullObject(), self.get(SA.DECODE_PARMS, NullObject())] 

1048 ) 

1049 else: 

1050 f = NameObject(FT.FLATE_DECODE) 

1051 params = None 

1052 retval = EncodedStreamObject() 

1053 retval.update(self) 

1054 retval[NameObject(SA.FILTER)] = f 

1055 if params is not None: 

1056 retval[NameObject(SA.DECODE_PARMS)] = params 

1057 retval._data = FlateDecode.encode(self._data, level) 

1058 return retval 

1059 

1060 def decode_as_image(self, pillow_parameters: Union[dict[str, Any], None] = None) -> Any: 

1061 """ 

1062 Try to decode the stream object as an image 

1063 

1064 Args: 

1065 pillow_parameters: parameters provided to Pillow Image.save() method, 

1066 cf. <https://pillow.readthedocs.io/en/stable/reference/Image.html#PIL.Image.Image.save> 

1067 

1068 Returns: 

1069 a PIL image if proper decoding has been found 

1070 Raises: 

1071 Exception: Errors during decoding will be reported. 

1072 It is recommended to catch exceptions to prevent 

1073 stops in your program. 

1074 

1075 """ 

1076 from ._image_xobject import _xobj_to_image # noqa: PLC0415 

1077 

1078 if self.get("/Subtype", "") != "/Image": 

1079 try: 

1080 msg = f"{self.indirect_reference} does not seem to be an Image" # pragma: no cover 

1081 except AttributeError: 

1082 msg = f"{self.__repr__()} object does not seem to be an Image" # pragma: no cover 

1083 logger_warning(msg, __name__) 

1084 extension, _, img = _xobj_to_image(self, pillow_parameters) 

1085 if extension is None: 

1086 return None # pragma: no cover 

1087 return img 

1088 

1089 

1090class DecodedStreamObject(StreamObject): 

1091 pass 

1092 

1093 

1094class EncodedStreamObject(StreamObject): 

1095 def __init__(self) -> None: 

1096 self.decoded_self: Optional[DecodedStreamObject] = None 

1097 

1098 # This overrides the parent method 

1099 def get_data(self) -> bytes: 

1100 from ..filters import decode_stream_data # noqa: PLC0415 

1101 

1102 if self.decoded_self is not None: 

1103 # Cached version of decoded object 

1104 return self.decoded_self.get_data() 

1105 

1106 # Create decoded object 

1107 decoded = DecodedStreamObject() 

1108 decoded.set_data(decode_stream_data(self)) 

1109 for key, value in self.items(): 

1110 if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS): 

1111 decoded[key] = value 

1112 self.decoded_self = decoded 

1113 return decoded.get_data() 

1114 

1115 # This overrides the parent method: 

1116 def set_data(self, data: bytes) -> None: 

1117 from ..filters import FlateDecode # noqa: PLC0415 

1118 

1119 if self.get(SA.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]): 

1120 if not isinstance(data, bytes): 

1121 raise TypeError("Data must be bytes") 

1122 if self.decoded_self is None: 

1123 self.get_data() # to create self.decoded_self 

1124 assert self.decoded_self is not None, "mypy" 

1125 self.decoded_self.set_data(data) 

1126 super().set_data(FlateDecode.encode(data)) 

1127 else: 

1128 raise PdfReadError( 

1129 "Streams encoded with a filter different from FlateDecode are not supported" 

1130 ) 

1131 

1132 

1133class ContentStream(DecodedStreamObject): 

1134 """ 

1135 In order to be fast, this data structure can contain either: 

1136 

1137 * raw data in ._data 

1138 * parsed stream operations in ._operations. 

1139 

1140 At any time, ContentStream object can either have both of those fields defined, 

1141 or one field defined and the other set to None. 

1142 

1143 These fields are "rebuilt" lazily, when accessed: 

1144 

1145 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations. 

1146 * when .operations is called, if ._operations is None, it is rebuilt from ._data. 

1147 

1148 Conversely, these fields can be invalidated: 

1149 

1150 * when .set_data() is called, ._operations is set to None. 

1151 * when .operations is set, ._data is set to None. 

1152 """ 

1153 

1154 def __init__( 

1155 self, 

1156 stream: Any, 

1157 pdf: Any, 

1158 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

1159 ) -> None: 

1160 self.pdf = pdf 

1161 self._operations: list[tuple[Any, bytes]] = [] 

1162 

1163 # stream may be a StreamObject or an ArrayObject containing 

1164 # StreamObjects to be concatenated together. 

1165 if stream is None: 

1166 super().set_data(b"") 

1167 else: 

1168 stream = stream.get_object() 

1169 if isinstance(stream, ArrayObject): 

1170 data = b"" 

1171 for s in stream: 

1172 s_resolved = s.get_object() 

1173 if isinstance(s_resolved, NullObject): 

1174 continue 

1175 if not isinstance(s_resolved, StreamObject): 

1176 # No need to emit an exception here for now - the PDF structure 

1177 # seems to already be broken beforehand in these cases. 

1178 logger_warning( 

1179 f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.", 

1180 __name__ 

1181 ) 

1182 else: 

1183 data += s_resolved.get_data() 

1184 if len(data) == 0 or data[-1] != b"\n": 

1185 data += b"\n" 

1186 super().set_data(bytes(data)) 

1187 else: 

1188 stream_data = stream.get_data() 

1189 assert stream_data is not None 

1190 super().set_data(stream_data) 

1191 self.forced_encoding = forced_encoding 

1192 

1193 def replicate( 

1194 self, 

1195 pdf_dest: PdfWriterProtocol, 

1196 ) -> "ContentStream": 

1197 d__ = cast( 

1198 "ContentStream", 

1199 self._reference_clone(self.__class__(None, None), pdf_dest, False), 

1200 ) 

1201 d__._data = self._data 

1202 try: 

1203 decoded_self = self.decoded_self 

1204 if decoded_self is None: 

1205 self.decoded_self = None 

1206 else: 

1207 self.decoded_self = cast( 

1208 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

1209 ) 

1210 except Exception: 

1211 pass 

1212 for k, v in self.items(): 

1213 d__[k.replicate(pdf_dest)] = ( 

1214 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

1215 ) 

1216 return d__ 

1217 d__.set_data(self._data) 

1218 d__.pdf = pdf_dest 

1219 d__._operations = list(self._operations) 

1220 d__.forced_encoding = self.forced_encoding 

1221 return d__ 

1222 

1223 def clone( 

1224 self, 

1225 pdf_dest: Any, 

1226 force_duplicate: bool = False, 

1227 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

1228 ) -> "ContentStream": 

1229 """ 

1230 Clone object into pdf_dest. 

1231 

1232 Args: 

1233 pdf_dest: 

1234 force_duplicate: 

1235 ignore_fields: 

1236 

1237 Returns: 

1238 The cloned ContentStream 

1239 

1240 """ 

1241 try: 

1242 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

1243 return self 

1244 except Exception: 

1245 pass 

1246 

1247 visited: set[tuple[int, int]] = set() 

1248 d__ = cast( 

1249 "ContentStream", 

1250 self._reference_clone( 

1251 self.__class__(None, None), pdf_dest, force_duplicate 

1252 ), 

1253 ) 

1254 if ignore_fields is None: 

1255 ignore_fields = [] 

1256 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

1257 return d__ 

1258 

1259 def _clone( 

1260 self, 

1261 src: DictionaryObject, 

1262 pdf_dest: PdfWriterProtocol, 

1263 force_duplicate: bool, 

1264 ignore_fields: Optional[Sequence[Union[str, int]]], 

1265 visited: set[tuple[int, int]], 

1266 ) -> None: 

1267 """ 

1268 Update the object from src. 

1269 

1270 Args: 

1271 src: 

1272 pdf_dest: 

1273 force_duplicate: 

1274 ignore_fields: 

1275 

1276 """ 

1277 src_cs = cast("ContentStream", src) 

1278 super().set_data(src_cs._data) 

1279 self.pdf = pdf_dest 

1280 self._operations = list(src_cs._operations) 

1281 self.forced_encoding = src_cs.forced_encoding 

1282 # no need to call DictionaryObjection or anything 

1283 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

1284 

1285 def _parse_content_stream(self, stream: StreamType) -> None: 

1286 # 7.8.2 Content Streams 

1287 stream.seek(0, 0) 

1288 operands: list[Union[int, str, PdfObject]] = [] 

1289 while True: 

1290 peek = read_non_whitespace(stream) 

1291 if peek in (b"", 0): 

1292 break 

1293 stream.seek(-1, 1) 

1294 if peek.isalpha() or peek in (b"'", b'"'): 

1295 operator = read_until_regex(stream, NameObject.delimiter_pattern) 

1296 if operator == b"BI": 

1297 # begin inline image - a completely different parsing 

1298 # mechanism is required, of course... thanks buddy... 

1299 assert operands == [] 

1300 ii = self._read_inline_image(stream) 

1301 self._operations.append((ii, b"INLINE IMAGE")) 

1302 else: 

1303 self._operations.append((operands, operator)) 

1304 operands = [] 

1305 elif peek == b"%": 

1306 # If we encounter a comment in the content stream, we have to 

1307 # handle it here. Typically, read_object will handle 

1308 # encountering a comment -- but read_object assumes that 

1309 # following the comment must be the object we're trying to 

1310 # read. In this case, it could be an operator instead. 

1311 while peek not in (b"\r", b"\n", b""): 

1312 peek = stream.read(1) 

1313 else: 

1314 operands.append(read_object(stream, None, self.forced_encoding)) 

1315 

1316 def _read_inline_image(self, stream: StreamType) -> dict[str, Any]: 

1317 # begin reading just after the "BI" - begin image 

1318 # first read the dictionary of settings. 

1319 settings = DictionaryObject() 

1320 while True: 

1321 tok = read_non_whitespace(stream) 

1322 stream.seek(-1, 1) 

1323 if tok == b"I": 

1324 # "ID" - begin of image data 

1325 break 

1326 key = read_object(stream, self.pdf) 

1327 tok = read_non_whitespace(stream) 

1328 stream.seek(-1, 1) 

1329 value = read_object(stream, self.pdf) 

1330 settings[key] = value 

1331 # left at beginning of ID 

1332 tmp = stream.read(3) 

1333 assert tmp[:2] == b"ID" 

1334 filtr = settings.get("/F", settings.get("/Filter", "not set")) 

1335 savpos = stream.tell() 

1336 if isinstance(filtr, list): 

1337 filtr = filtr[0] # used forencoding 

1338 if "AHx" in filtr or "ASCIIHexDecode" in filtr: 

1339 data = extract_inline__ascii_hex_decode(stream) 

1340 elif "A85" in filtr or "ASCII85Decode" in filtr: 

1341 data = extract_inline__ascii85_decode(stream) 

1342 elif "RL" in filtr or "RunLengthDecode" in filtr: 

1343 data = extract_inline__run_length_decode(stream) 

1344 elif "DCT" in filtr or "DCTDecode" in filtr: 

1345 data = extract_inline__dct_decode(stream) 

1346 elif filtr == "not set": 

1347 cs = settings.get("/CS", "") 

1348 if isinstance(cs, list): 

1349 cs = cs[0] 

1350 if "RGB" in cs: 

1351 lcs = 3 

1352 elif "CMYK" in cs: 

1353 lcs = 4 

1354 else: 

1355 bits = settings.get( 

1356 "/BPC", 

1357 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1, 

1358 ) 

1359 if bits > 0: 

1360 lcs = bits / 8.0 

1361 else: 

1362 data = extract_inline_default(stream) 

1363 lcs = -1 

1364 if lcs > 0: 

1365 data = stream.read( 

1366 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"]) 

1367 ) 

1368 # Move to the `EI` if possible. 

1369 ei = read_non_whitespace(stream) 

1370 stream.seek(-1, 1) 

1371 else: 

1372 data = extract_inline_default(stream) 

1373 

1374 ei = stream.read(3) 

1375 stream.seek(-1, 1) 

1376 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: 

1377 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above. 

1378 stream.seek(savpos, 0) 

1379 data = extract_inline_default(stream) 

1380 ei = stream.read(3) 

1381 stream.seek(-1, 1) 

1382 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover 

1383 # Check the same condition again. This should never fail as 

1384 # edge cases are covered by `extract_inline_default` above, 

1385 # but check this ot make sure that we are behind the `EI` afterwards. 

1386 raise PdfStreamError( 

1387 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}" 

1388 ) 

1389 return {"settings": settings, "data": data} 

1390 

1391 # This overrides the parent method 

1392 def get_data(self) -> bytes: 

1393 if not self._data: 

1394 new_data = BytesIO() 

1395 for operands, operator in self._operations: 

1396 if operator == b"INLINE IMAGE": 

1397 new_data.write(b"BI") 

1398 dict_text = BytesIO() 

1399 operands["settings"].write_to_stream(dict_text) 

1400 new_data.write(dict_text.getvalue()[2:-2]) 

1401 new_data.write(b"ID ") 

1402 new_data.write(operands["data"]) 

1403 new_data.write(b"EI") 

1404 else: 

1405 for op in operands: 

1406 op.write_to_stream(new_data) 

1407 new_data.write(b" ") 

1408 new_data.write(operator) 

1409 new_data.write(b"\n") 

1410 self._data = new_data.getvalue() 

1411 return self._data 

1412 

1413 # This overrides the parent method 

1414 def set_data(self, data: bytes) -> None: 

1415 super().set_data(data) 

1416 self._operations = [] 

1417 

1418 @property 

1419 def operations(self) -> list[tuple[Any, bytes]]: 

1420 if not self._operations and self._data: 

1421 self._parse_content_stream(BytesIO(self._data)) 

1422 self._data = b"" 

1423 return self._operations 

1424 

1425 @operations.setter 

1426 def operations(self, operations: list[tuple[Any, bytes]]) -> None: 

1427 self._operations = operations 

1428 self._data = b"" 

1429 

1430 def isolate_graphics_state(self) -> None: 

1431 if self._operations: 

1432 self._operations.insert(0, ([], b"q")) 

1433 self._operations.append(([], b"Q")) 

1434 elif self._data: 

1435 self._data = b"q\n" + self._data + b"\nQ\n" 

1436 

1437 # This overrides the parent method 

1438 def write_to_stream( 

1439 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1440 ) -> None: 

1441 if not self._data and self._operations: 

1442 self.get_data() # this ensures ._data is rebuilt 

1443 super().write_to_stream(stream, encryption_key) 

1444 

1445 

1446def read_object( 

1447 stream: StreamType, 

1448 pdf: Optional[PdfReaderProtocol], 

1449 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

1450) -> Union[PdfObject, int, str, ContentStream]: 

1451 tok = stream.read(1) 

1452 stream.seek(-1, 1) # reset to start 

1453 if tok == b"/": 

1454 return NameObject.read_from_stream(stream, pdf) 

1455 if tok == b"<": 

1456 # hexadecimal string OR dictionary 

1457 peek = stream.read(2) 

1458 stream.seek(-2, 1) # reset to start 

1459 if peek == b"<<": 

1460 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding) 

1461 return read_hex_string_from_stream(stream, forced_encoding) 

1462 if tok == b"[": 

1463 return ArrayObject.read_from_stream(stream, pdf, forced_encoding) 

1464 if tok in (b"t", b"f"): 

1465 return BooleanObject.read_from_stream(stream) 

1466 if tok == b"(": 

1467 return read_string_from_stream(stream, forced_encoding) 

1468 if tok == b"e" and stream.read(6) == b"endobj": 

1469 return NullObject() 

1470 if tok == b"n": 

1471 return NullObject.read_from_stream(stream) 

1472 if tok == b"%": 

1473 # comment 

1474 skip_over_comment(stream) 

1475 tok = read_non_whitespace(stream) 

1476 stream.seek(-1, 1) 

1477 return read_object(stream, pdf, forced_encoding) 

1478 if tok in b"0123456789+-.": 

1479 # number object OR indirect reference 

1480 peek = stream.read(20) 

1481 stream.seek(-len(peek), 1) # reset to start 

1482 if IndirectPattern.match(peek) is not None: 

1483 assert pdf is not None, "mypy" 

1484 return IndirectObject.read_from_stream(stream, pdf) 

1485 return NumberObject.read_from_stream(stream) 

1486 pos = stream.tell() 

1487 stream.seek(-20, 1) 

1488 stream_extract = stream.read(80) 

1489 stream.seek(pos) 

1490 read_until_whitespace(stream) 

1491 raise PdfReadError( 

1492 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}" 

1493 ) 

1494 

1495 

1496class Field(TreeObject): 

1497 """ 

1498 A class representing a field dictionary. 

1499 

1500 This class is accessed through 

1501 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1502 """ 

1503 

1504 def __init__(self, data: DictionaryObject) -> None: 

1505 DictionaryObject.__init__(self) 

1506 field_attributes = ( 

1507 FieldDictionaryAttributes.attributes() 

1508 + CheckboxRadioButtonAttributes.attributes() 

1509 ) 

1510 self.indirect_reference = data.indirect_reference 

1511 for attr in field_attributes: 

1512 try: 

1513 self[NameObject(attr)] = data[attr] 

1514 except KeyError: 

1515 pass 

1516 if isinstance(self.get("/V"), EncodedStreamObject): 

1517 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data() 

1518 if isinstance(d, bytes): 

1519 d_str = d.decode() 

1520 elif d is None: 

1521 d_str = "" 

1522 else: 

1523 raise Exception("Should never happen") 

1524 self[NameObject("/V")] = TextStringObject(d_str) 

1525 

1526 # TABLE 8.69 Entries common to all field dictionaries 

1527 @property 

1528 def field_type(self) -> Optional[NameObject]: 

1529 """Read-only property accessing the type of this field.""" 

1530 return self.get(FieldDictionaryAttributes.FT) 

1531 

1532 @property 

1533 def parent(self) -> Optional[DictionaryObject]: 

1534 """Read-only property accessing the parent of this field.""" 

1535 return self.get(FieldDictionaryAttributes.Parent) 

1536 

1537 @property 

1538 def kids(self) -> Optional["ArrayObject"]: 

1539 """Read-only property accessing the kids of this field.""" 

1540 return self.get(FieldDictionaryAttributes.Kids) 

1541 

1542 @property 

1543 def name(self) -> Optional[str]: 

1544 """Read-only property accessing the name of this field.""" 

1545 return self.get(FieldDictionaryAttributes.T) 

1546 

1547 @property 

1548 def alternate_name(self) -> Optional[str]: 

1549 """Read-only property accessing the alternate name of this field.""" 

1550 return self.get(FieldDictionaryAttributes.TU) 

1551 

1552 @property 

1553 def mapping_name(self) -> Optional[str]: 

1554 """ 

1555 Read-only property accessing the mapping name of this field. 

1556 

1557 This name is used by pypdf as a key in the dictionary returned by 

1558 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1559 """ 

1560 return self.get(FieldDictionaryAttributes.TM) 

1561 

1562 @property 

1563 def flags(self) -> Optional[int]: 

1564 """ 

1565 Read-only property accessing the field flags, specifying various 

1566 characteristics of the field (see Table 8.70 of the PDF 1.7 reference). 

1567 """ 

1568 return self.get(FieldDictionaryAttributes.Ff) 

1569 

1570 @property 

1571 def value(self) -> Optional[Any]: 

1572 """ 

1573 Read-only property accessing the value of this field. 

1574 

1575 Format varies based on field type. 

1576 """ 

1577 return self.get(FieldDictionaryAttributes.V) 

1578 

1579 @property 

1580 def default_value(self) -> Optional[Any]: 

1581 """Read-only property accessing the default value of this field.""" 

1582 return self.get(FieldDictionaryAttributes.DV) 

1583 

1584 @property 

1585 def additional_actions(self) -> Optional[DictionaryObject]: 

1586 """ 

1587 Read-only property accessing the additional actions dictionary. 

1588 

1589 This dictionary defines the field's behavior in response to trigger 

1590 events. See Section 8.5.2 of the PDF 1.7 reference. 

1591 """ 

1592 return self.get(FieldDictionaryAttributes.AA) 

1593 

1594 

1595class Destination(TreeObject): 

1596 """ 

1597 A class representing a destination within a PDF file. 

1598 

1599 See section 12.3.2 of the PDF 2.0 reference. 

1600 

1601 Args: 

1602 title: Title of this destination. 

1603 page: Reference to the page of this destination. Should 

1604 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`. 

1605 fit: How the destination is displayed. 

1606 

1607 Raises: 

1608 PdfReadError: If destination type is invalid. 

1609 

1610 """ 

1611 

1612 node: Optional[ 

1613 DictionaryObject 

1614 ] = None # node provide access to the original Object 

1615 

1616 def __init__( 

1617 self, 

1618 title: Union[str, bytes], 

1619 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject], 

1620 fit: Fit, 

1621 ) -> None: 

1622 self._filtered_children: list[Any] = [] # used in PdfWriter 

1623 

1624 typ = fit.fit_type 

1625 args = fit.fit_args 

1626 

1627 DictionaryObject.__init__(self) 

1628 self[NameObject("/Title")] = TextStringObject(title) 

1629 self[NameObject("/Page")] = page 

1630 self[NameObject("/Type")] = typ 

1631 

1632 # from table 8.2 of the PDF 1.7 reference. 

1633 if typ == "/XYZ": 

1634 if len(args) < 1: # left is missing : should never occur 

1635 args.append(NumberObject(0.0)) 

1636 if len(args) < 2: # top is missing 

1637 args.append(NumberObject(0.0)) 

1638 if len(args) < 3: # zoom is missing 

1639 args.append(NumberObject(0.0)) 

1640 ( 

1641 self[NameObject(TA.LEFT)], 

1642 self[NameObject(TA.TOP)], 

1643 self[NameObject("/Zoom")], 

1644 ) = args 

1645 elif len(args) == 0: 

1646 pass 

1647 elif typ == TF.FIT_R: 

1648 ( 

1649 self[NameObject(TA.LEFT)], 

1650 self[NameObject(TA.BOTTOM)], 

1651 self[NameObject(TA.RIGHT)], 

1652 self[NameObject(TA.TOP)], 

1653 ) = args 

1654 elif typ in [TF.FIT_H, TF.FIT_BH]: 

1655 try: # Prefer to be more robust not only to null parameters 

1656 (self[NameObject(TA.TOP)],) = args 

1657 except Exception: 

1658 (self[NameObject(TA.TOP)],) = (NullObject(),) 

1659 elif typ in [TF.FIT_V, TF.FIT_BV]: 

1660 try: # Prefer to be more robust not only to null parameters 

1661 (self[NameObject(TA.LEFT)],) = args 

1662 except Exception: 

1663 (self[NameObject(TA.LEFT)],) = (NullObject(),) 

1664 elif typ in [TF.FIT, TF.FIT_B]: 

1665 pass 

1666 else: 

1667 raise PdfReadError(f"Unknown Destination Type: {typ!r}") 

1668 

1669 @property 

1670 def dest_array(self) -> "ArrayObject": 

1671 return ArrayObject( 

1672 [self.raw_get("/Page"), self["/Type"]] 

1673 + [ 

1674 self[x] 

1675 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"] 

1676 if x in self 

1677 ] 

1678 ) 

1679 

1680 def write_to_stream( 

1681 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1682 ) -> None: 

1683 if encryption_key is not None: # deprecated 

1684 deprecation_no_replacement( 

1685 "the encryption_key parameter of write_to_stream", "5.0.0" 

1686 ) 

1687 stream.write(b"<<\n") 

1688 key = NameObject("/D") 

1689 key.write_to_stream(stream) 

1690 stream.write(b" ") 

1691 value = self.dest_array 

1692 value.write_to_stream(stream) 

1693 

1694 key = NameObject("/S") 

1695 key.write_to_stream(stream) 

1696 stream.write(b" ") 

1697 value_s = NameObject("/GoTo") 

1698 value_s.write_to_stream(stream) 

1699 

1700 stream.write(b"\n") 

1701 stream.write(b">>") 

1702 

1703 @property 

1704 def title(self) -> Optional[str]: 

1705 """Read-only property accessing the destination title.""" 

1706 return self.get("/Title") 

1707 

1708 @property 

1709 def page(self) -> Optional[IndirectObject]: 

1710 """Read-only property accessing the IndirectObject of the destination page.""" 

1711 return self.get("/Page") 

1712 

1713 @property 

1714 def typ(self) -> Optional[str]: 

1715 """Read-only property accessing the destination type.""" 

1716 return self.get("/Type") 

1717 

1718 @property 

1719 def zoom(self) -> Optional[int]: 

1720 """Read-only property accessing the zoom factor.""" 

1721 return self.get("/Zoom", None) 

1722 

1723 @property 

1724 def left(self) -> Optional[FloatObject]: 

1725 """Read-only property accessing the left horizontal coordinate.""" 

1726 return self.get("/Left", None) 

1727 

1728 @property 

1729 def right(self) -> Optional[FloatObject]: 

1730 """Read-only property accessing the right horizontal coordinate.""" 

1731 return self.get("/Right", None) 

1732 

1733 @property 

1734 def top(self) -> Optional[FloatObject]: 

1735 """Read-only property accessing the top vertical coordinate.""" 

1736 return self.get("/Top", None) 

1737 

1738 @property 

1739 def bottom(self) -> Optional[FloatObject]: 

1740 """Read-only property accessing the bottom vertical coordinate.""" 

1741 return self.get("/Bottom", None) 

1742 

1743 @property 

1744 def color(self) -> Optional["ArrayObject"]: 

1745 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0.""" 

1746 return self.get( 

1747 "/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)]) 

1748 ) 

1749 

1750 @property 

1751 def font_format(self) -> Optional[OutlineFontFlag]: 

1752 """ 

1753 Read-only property accessing the font type. 

1754 

1755 1=italic, 2=bold, 3=both 

1756 """ 

1757 return self.get("/F", 0) 

1758 

1759 @property 

1760 def outline_count(self) -> Optional[int]: 

1761 """ 

1762 Read-only property accessing the outline count. 

1763 

1764 positive = expanded 

1765 negative = collapsed 

1766 absolute value = number of visible descendants at all levels 

1767 """ 

1768 return self.get("/Count", None)