Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/generic/_data_structures.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

959 statements  

1# Copyright (c) 2006, Mathieu Fenniak 

2# All rights reserved. 

3# 

4# Redistribution and use in source and binary forms, with or without 

5# modification, are permitted provided that the following conditions are 

6# met: 

7# 

8# * Redistributions of source code must retain the above copyright notice, 

9# this list of conditions and the following disclaimer. 

10# * Redistributions in binary form must reproduce the above copyright notice, 

11# this list of conditions and the following disclaimer in the documentation 

12# and/or other materials provided with the distribution. 

13# * The name of the author may not be used to endorse or promote products 

14# derived from this software without specific prior written permission. 

15# 

16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 

19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 

20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 

22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 

23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 

24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 

25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 

26# POSSIBILITY OF SUCH DAMAGE. 

27 

28 

29__author__ = "Mathieu Fenniak" 

30__author_email__ = "biziqe@mathieu.fenniak.net" 

31 

32import logging 

33import re 

34import sys 

35from io import BytesIO 

36from math import ceil 

37from typing import ( 

38 Any, 

39 Callable, 

40 Dict, 

41 Iterable, 

42 List, 

43 Optional, 

44 Sequence, 

45 Set, 

46 Tuple, 

47 Union, 

48 cast, 

49) 

50 

51from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol 

52from .._utils import ( 

53 WHITESPACES, 

54 StreamType, 

55 deprecation_no_replacement, 

56 deprecation_with_replacement, 

57 logger_warning, 

58 read_non_whitespace, 

59 read_until_regex, 

60 read_until_whitespace, 

61 skip_over_comment, 

62) 

63from ..constants import ( 

64 CheckboxRadioButtonAttributes, 

65 FieldDictionaryAttributes, 

66 OutlineFontFlag, 

67) 

68from ..constants import FilterTypes as FT 

69from ..constants import StreamAttributes as SA 

70from ..constants import TypArguments as TA 

71from ..constants import TypFitArguments as TF 

72from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError 

73from ._base import ( 

74 BooleanObject, 

75 ByteStringObject, 

76 FloatObject, 

77 IndirectObject, 

78 NameObject, 

79 NullObject, 

80 NumberObject, 

81 PdfObject, 

82 TextStringObject, 

83 is_null_or_none, 

84) 

85from ._fit import Fit 

86from ._image_inline import ( 

87 extract_inline_A85, 

88 extract_inline_AHx, 

89 extract_inline_DCT, 

90 extract_inline_default, 

91 extract_inline_RL, 

92) 

93from ._utils import read_hex_string_from_stream, read_string_from_stream 

94 

95if sys.version_info >= (3, 11): 

96 from typing import Self 

97else: 

98 from typing_extensions import Self 

99 

100logger = logging.getLogger(__name__) 

101IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]") 

102 

103 

104class ArrayObject(List[Any], PdfObject): 

105 def replicate( 

106 self, 

107 pdf_dest: PdfWriterProtocol, 

108 ) -> "ArrayObject": 

109 arr = cast( 

110 "ArrayObject", 

111 self._reference_clone(ArrayObject(), pdf_dest, False), 

112 ) 

113 for data in self: 

114 if hasattr(data, "replicate"): 

115 arr.append(data.replicate(pdf_dest)) 

116 else: 

117 arr.append(data) 

118 return arr 

119 

120 def clone( 

121 self, 

122 pdf_dest: PdfWriterProtocol, 

123 force_duplicate: bool = False, 

124 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

125 ) -> "ArrayObject": 

126 """Clone object into pdf_dest.""" 

127 try: 

128 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

129 return self 

130 except Exception: 

131 pass 

132 arr = cast( 

133 "ArrayObject", 

134 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate), 

135 ) 

136 for data in self: 

137 if isinstance(data, StreamObject): 

138 dup = data._reference_clone( 

139 data.clone(pdf_dest, force_duplicate, ignore_fields), 

140 pdf_dest, 

141 force_duplicate, 

142 ) 

143 arr.append(dup.indirect_reference) 

144 elif hasattr(data, "clone"): 

145 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields)) 

146 else: 

147 arr.append(data) 

148 return arr 

149 

150 def hash_bin(self) -> int: 

151 """ 

152 Used to detect modified object. 

153 

154 Returns: 

155 Hash considering type and value. 

156 

157 """ 

158 return hash((self.__class__, tuple(x.hash_bin() for x in self))) 

159 

160 def items(self) -> Iterable[Any]: 

161 """Emulate DictionaryObject.items for a list (index, object).""" 

162 return enumerate(self) 

163 

164 def _to_lst(self, lst: Any) -> List[Any]: 

165 # Convert to list, internal 

166 if isinstance(lst, (list, tuple, set)): 

167 pass 

168 elif isinstance(lst, PdfObject): 

169 lst = [lst] 

170 elif isinstance(lst, str): 

171 if lst[0] == "/": 

172 lst = [NameObject(lst)] 

173 else: 

174 lst = [TextStringObject(lst)] 

175 elif isinstance(lst, bytes): 

176 lst = [ByteStringObject(lst)] 

177 else: # for numbers,... 

178 lst = [lst] 

179 return lst 

180 

181 def __add__(self, lst: Any) -> "ArrayObject": 

182 """ 

183 Allow extension by adding list or add one element only 

184 

185 Args: 

186 lst: any list, tuples are extended the list. 

187 other types(numbers,...) will be appended. 

188 if str is passed it will be converted into TextStringObject 

189 or NameObject (if starting with "/") 

190 if bytes is passed it will be converted into ByteStringObject 

191 

192 Returns: 

193 ArrayObject with all elements 

194 

195 """ 

196 temp = ArrayObject(self) 

197 temp.extend(self._to_lst(lst)) 

198 return temp 

199 

200 def __iadd__(self, lst: Any) -> Self: 

201 """ 

202 Allow extension by adding list or add one element only 

203 

204 Args: 

205 lst: any list, tuples are extended the list. 

206 other types(numbers,...) will be appended. 

207 if str is passed it will be converted into TextStringObject 

208 or NameObject (if starting with "/") 

209 if bytes is passed it will be converted into ByteStringObject 

210 

211 """ 

212 self.extend(self._to_lst(lst)) 

213 return self 

214 

215 def __isub__(self, lst: Any) -> Self: 

216 """Allow to remove items""" 

217 for x in self._to_lst(lst): 

218 try: 

219 x = self.index(x) 

220 del self[x] 

221 except ValueError: 

222 pass 

223 return self 

224 

225 def write_to_stream( 

226 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

227 ) -> None: 

228 if encryption_key is not None: # deprecated 

229 deprecation_no_replacement( 

230 "the encryption_key parameter of write_to_stream", "5.0.0" 

231 ) 

232 stream.write(b"[") 

233 for data in self: 

234 stream.write(b" ") 

235 data.write_to_stream(stream) 

236 stream.write(b" ]") 

237 

238 @staticmethod 

239 def read_from_stream( 

240 stream: StreamType, 

241 pdf: Optional[PdfReaderProtocol], 

242 forced_encoding: Union[None, str, List[str], Dict[int, str]] = None, 

243 ) -> "ArrayObject": 

244 arr = ArrayObject() 

245 tmp = stream.read(1) 

246 if tmp != b"[": 

247 raise PdfReadError("Could not read array") 

248 while True: 

249 # skip leading whitespace 

250 tok = stream.read(1) 

251 while tok.isspace(): 

252 tok = stream.read(1) 

253 if tok == b"": 

254 break 

255 if tok == b"%": 

256 stream.seek(-1, 1) 

257 skip_over_comment(stream) 

258 continue 

259 stream.seek(-1, 1) 

260 # check for array ending 

261 peek_ahead = stream.read(1) 

262 if peek_ahead == b"]": 

263 break 

264 stream.seek(-1, 1) 

265 # read and append object 

266 arr.append(read_object(stream, pdf, forced_encoding)) 

267 return arr 

268 

269 

270class DictionaryObject(Dict[Any, Any], PdfObject): 

271 def replicate( 

272 self, 

273 pdf_dest: PdfWriterProtocol, 

274 ) -> "DictionaryObject": 

275 d__ = cast( 

276 "DictionaryObject", 

277 self._reference_clone(self.__class__(), pdf_dest, False), 

278 ) 

279 for k, v in self.items(): 

280 d__[k.replicate(pdf_dest)] = ( 

281 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

282 ) 

283 return d__ 

284 

285 def clone( 

286 self, 

287 pdf_dest: PdfWriterProtocol, 

288 force_duplicate: bool = False, 

289 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

290 ) -> "DictionaryObject": 

291 """Clone object into pdf_dest.""" 

292 try: 

293 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

294 return self 

295 except Exception: 

296 pass 

297 

298 visited: Set[Tuple[int, int]] = set() # (idnum, generation) 

299 d__ = cast( 

300 "DictionaryObject", 

301 self._reference_clone(self.__class__(), pdf_dest, force_duplicate), 

302 ) 

303 if ignore_fields is None: 

304 ignore_fields = [] 

305 if len(d__.keys()) == 0: 

306 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

307 return d__ 

308 

309 def _clone( 

310 self, 

311 src: "DictionaryObject", 

312 pdf_dest: PdfWriterProtocol, 

313 force_duplicate: bool, 

314 ignore_fields: Optional[Sequence[Union[str, int]]], 

315 visited: Set[Tuple[int, int]], # (idnum, generation) 

316 ) -> None: 

317 """ 

318 Update the object from src. 

319 

320 Args: 

321 src: "DictionaryObject": 

322 pdf_dest: 

323 force_duplicate: 

324 ignore_fields: 

325 

326 """ 

327 # first we remove for the ignore_fields 

328 # that are for a limited number of levels 

329 x = 0 

330 assert ignore_fields is not None 

331 ignore_fields = list(ignore_fields) 

332 while x < len(ignore_fields): 

333 if isinstance(ignore_fields[x], int): 

334 if cast(int, ignore_fields[x]) <= 0: 

335 del ignore_fields[x] 

336 del ignore_fields[x] 

337 continue 

338 ignore_fields[x] -= 1 # type:ignore 

339 x += 1 

340 # First check if this is a chain list, we need to loop to prevent recur 

341 if any( 

342 field not in ignore_fields 

343 and field in src 

344 and isinstance(src.raw_get(field), IndirectObject) 

345 and isinstance(src[field], DictionaryObject) 

346 and ( 

347 src.get("/Type", None) is None 

348 or cast(DictionaryObject, src[field]).get("/Type", None) is None 

349 or src.get("/Type", None) 

350 == cast(DictionaryObject, src[field]).get("/Type", None) 

351 ) 

352 for field in ["/Next", "/Prev", "/N", "/V"] 

353 ): 

354 ignore_fields = list(ignore_fields) 

355 for lst in (("/Next", "/Prev"), ("/N", "/V")): 

356 for k in lst: 

357 objs = [] 

358 if ( 

359 k in src 

360 and k not in self 

361 and isinstance(src.raw_get(k), IndirectObject) 

362 and isinstance(src[k], DictionaryObject) 

363 # IF need to go further the idea is to check 

364 # that the types are the same: 

365 and ( 

366 src.get("/Type", None) is None 

367 or cast(DictionaryObject, src[k]).get("/Type", None) is None 

368 or src.get("/Type", None) 

369 == cast(DictionaryObject, src[k]).get("/Type", None) 

370 ) 

371 ): 

372 cur_obj: Optional[DictionaryObject] = cast( 

373 "DictionaryObject", src[k] 

374 ) 

375 prev_obj: Optional[DictionaryObject] = self 

376 while cur_obj is not None: 

377 clon = cast( 

378 "DictionaryObject", 

379 cur_obj._reference_clone( 

380 cur_obj.__class__(), pdf_dest, force_duplicate 

381 ), 

382 ) 

383 # check to see if we've previously processed our item 

384 if clon.indirect_reference is not None: 

385 idnum = clon.indirect_reference.idnum 

386 generation = clon.indirect_reference.generation 

387 if (idnum, generation) in visited: 

388 cur_obj = None 

389 break 

390 visited.add((idnum, generation)) 

391 objs.append((cur_obj, clon)) 

392 assert prev_obj is not None 

393 prev_obj[NameObject(k)] = clon.indirect_reference 

394 prev_obj = clon 

395 try: 

396 if cur_obj == src: 

397 cur_obj = None 

398 else: 

399 cur_obj = cast("DictionaryObject", cur_obj[k]) 

400 except Exception: 

401 cur_obj = None 

402 for s, c in objs: 

403 c._clone( 

404 s, pdf_dest, force_duplicate, ignore_fields, visited 

405 ) 

406 

407 for k, v in src.items(): 

408 if k not in ignore_fields: 

409 if isinstance(v, StreamObject): 

410 if not hasattr(v, "indirect_reference"): 

411 v.indirect_reference = None 

412 vv = v.clone(pdf_dest, force_duplicate, ignore_fields) 

413 assert vv.indirect_reference is not None 

414 self[k.clone(pdf_dest)] = vv.indirect_reference 

415 elif k not in self: 

416 self[NameObject(k)] = ( 

417 v.clone(pdf_dest, force_duplicate, ignore_fields) 

418 if hasattr(v, "clone") 

419 else v 

420 ) 

421 

422 def hash_bin(self) -> int: 

423 """ 

424 Used to detect modified object. 

425 

426 Returns: 

427 Hash considering type and value. 

428 

429 """ 

430 return hash( 

431 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items()))) 

432 ) 

433 

434 def raw_get(self, key: Any) -> Any: 

435 return dict.__getitem__(self, key) 

436 

437 def get_inherited(self, key: str, default: Any = None) -> Any: 

438 """ 

439 Returns the value of a key or from the parent if not found. 

440 If not found returns default. 

441 

442 Args: 

443 key: string identifying the field to return 

444 

445 default: default value to return 

446 

447 Returns: 

448 Current key or inherited one, otherwise default value. 

449 

450 """ 

451 if key in self: 

452 return self[key] 

453 try: 

454 if "/Parent" not in self: 

455 return default 

456 raise KeyError("Not present") 

457 except KeyError: 

458 return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited( 

459 key, default 

460 ) 

461 

462 def __setitem__(self, key: Any, value: Any) -> Any: 

463 if not isinstance(key, PdfObject): 

464 raise ValueError("Key must be a PdfObject") 

465 if not isinstance(value, PdfObject): 

466 raise ValueError("Value must be a PdfObject") 

467 return dict.__setitem__(self, key, value) 

468 

469 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any: 

470 if not isinstance(key, PdfObject): 

471 raise ValueError("Key must be a PdfObject") 

472 if not isinstance(value, PdfObject): 

473 raise ValueError("Value must be a PdfObject") 

474 return dict.setdefault(self, key, value) # type: ignore 

475 

476 def __getitem__(self, key: Any) -> PdfObject: 

477 return dict.__getitem__(self, key).get_object() 

478 

479 @property 

480 def xmp_metadata(self) -> Optional[XmpInformationProtocol]: 

481 """ 

482 Retrieve XMP (Extensible Metadata Platform) data relevant to the this 

483 object, if available. 

484 

485 See Table 347 — Additional entries in a metadata stream dictionary. 

486 

487 Returns: 

488 Returns a :class:`~pypdf.xmp.XmpInformation` instance 

489 that can be used to access XMP metadata from the document. Can also 

490 return None if no metadata was found on the document root. 

491 

492 """ 

493 from ..xmp import XmpInformation # noqa: PLC0415 

494 

495 metadata = self.get("/Metadata", None) 

496 if is_null_or_none(metadata): 

497 return None 

498 assert metadata is not None, "mypy" 

499 metadata = metadata.get_object() 

500 

501 if not isinstance(metadata, XmpInformation): 

502 metadata = XmpInformation(metadata) 

503 self[NameObject("/Metadata")] = metadata 

504 return metadata 

505 

506 def write_to_stream( 

507 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

508 ) -> None: 

509 if encryption_key is not None: # deprecated 

510 deprecation_no_replacement( 

511 "the encryption_key parameter of write_to_stream", "5.0.0" 

512 ) 

513 stream.write(b"<<\n") 

514 for key, value in self.items(): 

515 if len(key) > 2 and key[1] == "%" and key[-1] == "%": 

516 continue 

517 key.write_to_stream(stream, encryption_key) 

518 stream.write(b" ") 

519 value.write_to_stream(stream) 

520 stream.write(b"\n") 

521 stream.write(b">>") 

522 

523 @staticmethod 

524 def read_from_stream( 

525 stream: StreamType, 

526 pdf: Optional[PdfReaderProtocol], 

527 forced_encoding: Union[None, str, List[str], Dict[int, str]] = None, 

528 ) -> "DictionaryObject": 

529 def get_next_obj_pos( 

530 p: int, p1: int, rem_gens: List[int], pdf: PdfReaderProtocol 

531 ) -> int: 

532 out = p1 

533 for gen in rem_gens: 

534 loc = pdf.xref[gen] 

535 try: 

536 values = [x for x in loc.values() if p < x <= p1] 

537 if values: 

538 out = min(out, *values) 

539 except ValueError: 

540 pass 

541 return out 

542 

543 def read_unsized_from_stream( 

544 stream: StreamType, pdf: PdfReaderProtocol 

545 ) -> bytes: 

546 # we are just pointing at beginning of the stream 

547 eon = get_next_obj_pos(stream.tell(), 2**32, list(pdf.xref), pdf) - 1 

548 curr = stream.tell() 

549 rw = stream.read(eon - stream.tell()) 

550 p = rw.find(b"endstream") 

551 if p < 0: 

552 raise PdfReadError( 

553 f"Unable to find 'endstream' marker for obj starting at {curr}." 

554 ) 

555 stream.seek(curr + p + 9) 

556 return rw[: p - 1] 

557 

558 tmp = stream.read(2) 

559 if tmp != b"<<": 

560 raise PdfReadError( 

561 f"Dictionary read error at byte {hex(stream.tell())}: " 

562 "stream must begin with '<<'" 

563 ) 

564 data: Dict[Any, Any] = {} 

565 while True: 

566 tok = read_non_whitespace(stream) 

567 if tok == b"\x00": 

568 continue 

569 if tok == b"%": 

570 stream.seek(-1, 1) 

571 skip_over_comment(stream) 

572 continue 

573 if not tok: 

574 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) 

575 

576 if tok == b">": 

577 stream.read(1) 

578 break 

579 stream.seek(-1, 1) 

580 try: 

581 try: 

582 key = read_object(stream, pdf) 

583 if isinstance(key, NullObject): 

584 break 

585 if not isinstance(key, NameObject): 

586 raise PdfReadError( 

587 f"Expecting a NameObject for key but found {key!r}" 

588 ) 

589 except PdfReadError as exc: 

590 if pdf is not None and pdf.strict: 

591 raise 

592 logger_warning(exc.__repr__(), __name__) 

593 continue 

594 tok = read_non_whitespace(stream) 

595 stream.seek(-1, 1) 

596 value = read_object(stream, pdf, forced_encoding) 

597 except Exception as exc: 

598 if pdf is not None and pdf.strict: 

599 raise PdfReadError(exc.__repr__()) 

600 logger_warning(exc.__repr__(), __name__) 

601 retval = DictionaryObject() 

602 retval.update(data) 

603 return retval # return partial data 

604 

605 if not data.get(key): 

606 data[key] = value 

607 else: 

608 # multiple definitions of key not permitted 

609 msg = ( 

610 f"Multiple definitions in dictionary at byte " 

611 f"{hex(stream.tell())} for key {key}" 

612 ) 

613 if pdf is not None and pdf.strict: 

614 raise PdfReadError(msg) 

615 logger_warning(msg, __name__) 

616 

617 pos = stream.tell() 

618 s = read_non_whitespace(stream) 

619 if s == b"s" and stream.read(5) == b"tream": 

620 eol = stream.read(1) 

621 # Occasional PDF file output has spaces after 'stream' keyword but before EOL. 

622 # patch provided by Danial Sandler 

623 while eol == b" ": 

624 eol = stream.read(1) 

625 if eol not in (b"\n", b"\r"): 

626 raise PdfStreamError("Stream data must be followed by a newline") 

627 if eol == b"\r" and stream.read(1) != b"\n": 

628 stream.seek(-1, 1) 

629 # this is a stream object, not a dictionary 

630 if SA.LENGTH not in data: 

631 if pdf is not None and pdf.strict: 

632 raise PdfStreamError("Stream length not defined") 

633 logger_warning( 

634 f"Stream length not defined @pos={stream.tell()}", __name__ 

635 ) 

636 data[NameObject(SA.LENGTH)] = NumberObject(-1) 

637 length = data[SA.LENGTH] 

638 if isinstance(length, IndirectObject): 

639 t = stream.tell() 

640 assert pdf is not None, "mypy" 

641 length = pdf.get_object(length) 

642 stream.seek(t, 0) 

643 if length is None: # if the PDF is damaged 

644 length = -1 

645 pstart = stream.tell() 

646 if length > 0: 

647 data["__streamdata__"] = stream.read(length) 

648 else: 

649 data["__streamdata__"] = read_until_regex( 

650 stream, re.compile(b"endstream") 

651 ) 

652 e = read_non_whitespace(stream) 

653 ndstream = stream.read(8) 

654 if (e + ndstream) != b"endstream": 

655 # the odd PDF file has a length that is too long, so 

656 # we need to read backwards to find the "endstream" ending. 

657 # ReportLab (unknown version) generates files with this bug, 

658 # and Python users into PDF files tend to be our audience. 

659 # we need to do this to correct the streamdata and chop off 

660 # an extra character. 

661 pos = stream.tell() 

662 stream.seek(-10, 1) 

663 end = stream.read(9) 

664 if end == b"endstream": 

665 # we found it by looking back one character further. 

666 data["__streamdata__"] = data["__streamdata__"][:-1] 

667 elif pdf is not None and not pdf.strict: 

668 stream.seek(pstart, 0) 

669 data["__streamdata__"] = read_unsized_from_stream(stream, pdf) 

670 pos = stream.tell() 

671 else: 

672 stream.seek(pos, 0) 

673 raise PdfReadError( 

674 "Unable to find 'endstream' marker after stream at byte " 

675 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')." 

676 ) 

677 else: 

678 stream.seek(pos, 0) 

679 if "__streamdata__" in data: 

680 return StreamObject.initialize_from_dictionary(data) 

681 retval = DictionaryObject() 

682 retval.update(data) 

683 return retval 

684 

685 

686class TreeObject(DictionaryObject): 

687 def __init__(self, dct: Optional[DictionaryObject] = None) -> None: 

688 DictionaryObject.__init__(self) 

689 if dct: 

690 self.update(dct) 

691 

692 def has_children(self) -> bool: 

693 return "/First" in self 

694 

695 def __iter__(self) -> Any: 

696 return self.children() 

697 

698 def children(self) -> Iterable[Any]: 

699 if not self.has_children(): 

700 return 

701 

702 child_ref = self[NameObject("/First")] 

703 child = child_ref.get_object() 

704 while True: 

705 yield child 

706 if child == self[NameObject("/Last")]: 

707 return 

708 child_ref = child.get(NameObject("/Next")) # type: ignore 

709 if is_null_or_none(child_ref): 

710 return 

711 child = child_ref.get_object() 

712 

713 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None: 

714 self.insert_child(child, None, pdf) 

715 

716 def inc_parent_counter_default( 

717 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

718 ) -> None: 

719 if is_null_or_none(parent): 

720 return 

721 assert parent is not None, "mypy" 

722 parent = cast("TreeObject", parent.get_object()) 

723 if "/Count" in parent: 

724 parent[NameObject("/Count")] = NumberObject( 

725 max(0, cast(int, parent[NameObject("/Count")]) + n) 

726 ) 

727 self.inc_parent_counter_default(parent.get("/Parent", None), n) 

728 

729 def inc_parent_counter_outline( 

730 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

731 ) -> None: 

732 if is_null_or_none(parent): 

733 return 

734 assert parent is not None, "mypy" 

735 parent = cast("TreeObject", parent.get_object()) 

736 # BooleanObject requires comparison with == not is 

737 opn = parent.get("/%is_open%", True) == True # noqa: E712 

738 c = cast(int, parent.get("/Count", 0)) 

739 if c < 0: 

740 c = abs(c) 

741 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1)) 

742 if not opn: 

743 return 

744 self.inc_parent_counter_outline(parent.get("/Parent", None), n) 

745 

746 def insert_child( 

747 self, 

748 child: Any, 

749 before: Any, 

750 pdf: PdfWriterProtocol, 

751 inc_parent_counter: Optional[Callable[..., Any]] = None, 

752 ) -> IndirectObject: 

753 if inc_parent_counter is None: 

754 inc_parent_counter = self.inc_parent_counter_default 

755 child_obj = child.get_object() 

756 child = child.indirect_reference # get_reference(child_obj) 

757 

758 prev: Optional[DictionaryObject] 

759 if "/First" not in self: # no child yet 

760 self[NameObject("/First")] = child 

761 self[NameObject("/Count")] = NumberObject(0) 

762 self[NameObject("/Last")] = child 

763 child_obj[NameObject("/Parent")] = self.indirect_reference 

764 inc_parent_counter(self, child_obj.get("/Count", 1)) 

765 if "/Next" in child_obj: 

766 del child_obj["/Next"] 

767 if "/Prev" in child_obj: 

768 del child_obj["/Prev"] 

769 return child 

770 prev = cast("DictionaryObject", self["/Last"]) 

771 

772 while prev.indirect_reference != before: 

773 if "/Next" in prev: 

774 prev = cast("TreeObject", prev["/Next"]) 

775 else: # append at the end 

776 prev[NameObject("/Next")] = cast("TreeObject", child) 

777 child_obj[NameObject("/Prev")] = prev.indirect_reference 

778 child_obj[NameObject("/Parent")] = self.indirect_reference 

779 if "/Next" in child_obj: 

780 del child_obj["/Next"] 

781 self[NameObject("/Last")] = child 

782 inc_parent_counter(self, child_obj.get("/Count", 1)) 

783 return child 

784 try: # insert as first or in the middle 

785 assert isinstance(prev["/Prev"], DictionaryObject) 

786 prev["/Prev"][NameObject("/Next")] = child 

787 child_obj[NameObject("/Prev")] = prev["/Prev"] 

788 except Exception: # it means we are inserting in first position 

789 del child_obj["/Next"] 

790 child_obj[NameObject("/Next")] = prev 

791 prev[NameObject("/Prev")] = child 

792 child_obj[NameObject("/Parent")] = self.indirect_reference 

793 inc_parent_counter(self, child_obj.get("/Count", 1)) 

794 return child 

795 

796 def _remove_node_from_tree( 

797 self, prev: Any, prev_ref: Any, cur: Any, last: Any 

798 ) -> None: 

799 """ 

800 Adjust the pointers of the linked list and tree node count. 

801 

802 Args: 

803 prev: 

804 prev_ref: 

805 cur: 

806 last: 

807 

808 """ 

809 next_ref = cur.get(NameObject("/Next"), None) 

810 if prev is None: 

811 if next_ref: 

812 # Removing first tree node 

813 next_obj = next_ref.get_object() 

814 del next_obj[NameObject("/Prev")] 

815 self[NameObject("/First")] = next_ref 

816 self[NameObject("/Count")] = NumberObject( 

817 self[NameObject("/Count")] - 1 # type: ignore 

818 ) 

819 

820 else: 

821 # Removing only tree node 

822 self[NameObject("/Count")] = NumberObject(0) 

823 del self[NameObject("/First")] 

824 if NameObject("/Last") in self: 

825 del self[NameObject("/Last")] 

826 else: 

827 if next_ref: 

828 # Removing middle tree node 

829 next_obj = next_ref.get_object() 

830 next_obj[NameObject("/Prev")] = prev_ref 

831 prev[NameObject("/Next")] = next_ref 

832 else: 

833 # Removing last tree node 

834 assert cur == last 

835 del prev[NameObject("/Next")] 

836 self[NameObject("/Last")] = prev_ref 

837 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore 

838 

839 def remove_child(self, child: Any) -> None: 

840 child_obj = child.get_object() 

841 child = child_obj.indirect_reference 

842 

843 if NameObject("/Parent") not in child_obj: 

844 raise ValueError("Removed child does not appear to be a tree item") 

845 if child_obj[NameObject("/Parent")] != self: 

846 raise ValueError("Removed child is not a member of this tree") 

847 

848 found = False 

849 prev_ref = None 

850 prev = None 

851 cur_ref: Optional[Any] = self[NameObject("/First")] 

852 cur: Optional[Dict[str, Any]] = cur_ref.get_object() # type: ignore 

853 last_ref = self[NameObject("/Last")] 

854 last = last_ref.get_object() 

855 while cur is not None: 

856 if cur == child_obj: 

857 self._remove_node_from_tree(prev, prev_ref, cur, last) 

858 found = True 

859 break 

860 

861 # Go to the next node 

862 prev_ref = cur_ref 

863 prev = cur 

864 if NameObject("/Next") in cur: 

865 cur_ref = cur[NameObject("/Next")] 

866 cur = cur_ref.get_object() 

867 else: 

868 cur_ref = None 

869 cur = None 

870 

871 if not found: 

872 raise ValueError("Removal couldn't find item in tree") 

873 

874 _reset_node_tree_relationship(child_obj) 

875 

876 def remove_from_tree(self) -> None: 

877 """Remove the object from the tree it is in.""" 

878 if NameObject("/Parent") not in self: 

879 raise ValueError("Removed child does not appear to be a tree item") 

880 cast("TreeObject", self["/Parent"]).remove_child(self) 

881 

882 def empty_tree(self) -> None: 

883 for child in self: 

884 child_obj = child.get_object() 

885 _reset_node_tree_relationship(child_obj) 

886 

887 if NameObject("/Count") in self: 

888 del self[NameObject("/Count")] 

889 if NameObject("/First") in self: 

890 del self[NameObject("/First")] 

891 if NameObject("/Last") in self: 

892 del self[NameObject("/Last")] 

893 

894 

895def _reset_node_tree_relationship(child_obj: Any) -> None: 

896 """ 

897 Call this after a node has been removed from a tree. 

898 

899 This resets the nodes attributes in respect to that tree. 

900 

901 Args: 

902 child_obj: 

903 

904 """ 

905 del child_obj[NameObject("/Parent")] 

906 if NameObject("/Next") in child_obj: 

907 del child_obj[NameObject("/Next")] 

908 if NameObject("/Prev") in child_obj: 

909 del child_obj[NameObject("/Prev")] 

910 

911 

912class StreamObject(DictionaryObject): 

913 def __init__(self) -> None: 

914 self._data: bytes = b"" 

915 self.decoded_self: Optional[DecodedStreamObject] = None 

916 

917 def replicate( 

918 self, 

919 pdf_dest: PdfWriterProtocol, 

920 ) -> "StreamObject": 

921 d__ = cast( 

922 "StreamObject", 

923 self._reference_clone(self.__class__(), pdf_dest, False), 

924 ) 

925 d__._data = self._data 

926 try: 

927 decoded_self = self.decoded_self 

928 if decoded_self is None: 

929 self.decoded_self = None 

930 else: 

931 self.decoded_self = cast( 

932 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

933 ) 

934 except Exception: 

935 pass 

936 for k, v in self.items(): 

937 d__[k.replicate(pdf_dest)] = ( 

938 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

939 ) 

940 return d__ 

941 

942 def _clone( 

943 self, 

944 src: DictionaryObject, 

945 pdf_dest: PdfWriterProtocol, 

946 force_duplicate: bool, 

947 ignore_fields: Optional[Sequence[Union[str, int]]], 

948 visited: Set[Tuple[int, int]], 

949 ) -> None: 

950 """ 

951 Update the object from src. 

952 

953 Args: 

954 src: 

955 pdf_dest: 

956 force_duplicate: 

957 ignore_fields: 

958 

959 """ 

960 self._data = cast("StreamObject", src)._data 

961 try: 

962 decoded_self = cast("StreamObject", src).decoded_self 

963 if decoded_self is None: 

964 self.decoded_self = None 

965 else: 

966 self.decoded_self = cast( 

967 "DecodedStreamObject", 

968 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields), 

969 ) 

970 except Exception: 

971 pass 

972 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

973 

974 def hash_bin(self) -> int: 

975 """ 

976 Used to detect modified object. 

977 

978 Returns: 

979 Hash considering type and value. 

980 

981 """ 

982 # Use _data to prevent errors on non-decoded streams. 

983 return hash((super().hash_bin(), self._data)) 

984 

985 def get_data(self) -> bytes: 

986 return self._data 

987 

988 def set_data(self, data: bytes) -> None: 

989 self._data = data 

990 

991 def hash_value_data(self) -> bytes: 

992 data = super().hash_value_data() 

993 data += self.get_data() 

994 return data 

995 

996 def write_to_stream( 

997 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

998 ) -> None: 

999 if encryption_key is not None: # deprecated 

1000 deprecation_no_replacement( 

1001 "the encryption_key parameter of write_to_stream", "5.0.0" 

1002 ) 

1003 self[NameObject(SA.LENGTH)] = NumberObject(len(self._data)) 

1004 DictionaryObject.write_to_stream(self, stream) 

1005 del self[SA.LENGTH] 

1006 stream.write(b"\nstream\n") 

1007 stream.write(self._data) 

1008 stream.write(b"\nendstream") 

1009 

1010 @staticmethod 

1011 def initializeFromDictionary(data: Dict[str, Any]) -> None: 

1012 deprecation_with_replacement( 

1013 "initializeFromDictionary", "initialize_from_dictionary", "5.0.0" 

1014 ) # pragma: no cover 

1015 

1016 @staticmethod 

1017 def initialize_from_dictionary( 

1018 data: Dict[str, Any] 

1019 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]: 

1020 retval: Union[EncodedStreamObject, DecodedStreamObject] 

1021 if SA.FILTER in data: 

1022 retval = EncodedStreamObject() 

1023 else: 

1024 retval = DecodedStreamObject() 

1025 retval._data = data["__streamdata__"] 

1026 del data["__streamdata__"] 

1027 if SA.LENGTH in data: 

1028 del data[SA.LENGTH] 

1029 retval.update(data) 

1030 return retval 

1031 

1032 def flate_encode(self, level: int = -1) -> "EncodedStreamObject": 

1033 from ..filters import FlateDecode # noqa: PLC0415 

1034 

1035 if SA.FILTER in self: 

1036 f = self[SA.FILTER] 

1037 if isinstance(f, ArrayObject): 

1038 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f]) 

1039 try: 

1040 params = ArrayObject( 

1041 [NullObject(), *self.get(SA.DECODE_PARMS, ArrayObject())] 

1042 ) 

1043 except TypeError: 

1044 # case of error where the * operator is not working (not an array 

1045 params = ArrayObject( 

1046 [NullObject(), self.get(SA.DECODE_PARMS, ArrayObject())] 

1047 ) 

1048 else: 

1049 f = ArrayObject([NameObject(FT.FLATE_DECODE), f]) 

1050 params = ArrayObject( 

1051 [NullObject(), self.get(SA.DECODE_PARMS, NullObject())] 

1052 ) 

1053 else: 

1054 f = NameObject(FT.FLATE_DECODE) 

1055 params = None 

1056 retval = EncodedStreamObject() 

1057 retval.update(self) 

1058 retval[NameObject(SA.FILTER)] = f 

1059 if params is not None: 

1060 retval[NameObject(SA.DECODE_PARMS)] = params 

1061 retval._data = FlateDecode.encode(self._data, level) 

1062 return retval 

1063 

1064 def decode_as_image(self) -> Any: 

1065 """ 

1066 Try to decode the stream object as an image 

1067 

1068 Returns: 

1069 a PIL image if proper decoding has been found 

1070 Raises: 

1071 Exception: (any)during decoding to to invalid object or 

1072 errors during decoding will be reported 

1073 It is recommended to catch exceptions to prevent 

1074 stops in your program. 

1075 

1076 """ 

1077 from ..filters import _xobj_to_image # noqa: PLC0415 

1078 

1079 if self.get("/Subtype", "") != "/Image": 

1080 try: 

1081 msg = f"{self.indirect_reference} does not seem to be an Image" # pragma: no cover 

1082 except AttributeError: 

1083 msg = f"{self.__repr__()} object does not seem to be an Image" # pragma: no cover 

1084 logger_warning(msg, __name__) 

1085 extension, byte_stream, img = _xobj_to_image(self) 

1086 if extension is None: 

1087 return None # pragma: no cover 

1088 return img 

1089 

1090 

1091class DecodedStreamObject(StreamObject): 

1092 pass 

1093 

1094 

1095class EncodedStreamObject(StreamObject): 

1096 def __init__(self) -> None: 

1097 self.decoded_self: Optional[DecodedStreamObject] = None 

1098 

1099 # This overrides the parent method 

1100 def get_data(self) -> bytes: 

1101 from ..filters import decode_stream_data # noqa: PLC0415 

1102 

1103 if self.decoded_self is not None: 

1104 # cached version of decoded object 

1105 return self.decoded_self.get_data() 

1106 # create decoded object 

1107 decoded = DecodedStreamObject() 

1108 

1109 decoded.set_data(decode_stream_data(self)) 

1110 for key, value in list(self.items()): 

1111 if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS): 

1112 decoded[key] = value 

1113 self.decoded_self = decoded 

1114 return decoded.get_data() 

1115 

1116 # This overrides the parent method: 

1117 def set_data(self, data: bytes) -> None: 

1118 from ..filters import FlateDecode # noqa: PLC0415 

1119 

1120 if self.get(SA.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]): 

1121 if not isinstance(data, bytes): 

1122 raise TypeError("Data must be bytes") 

1123 if self.decoded_self is None: 

1124 self.get_data() # to create self.decoded_self 

1125 assert self.decoded_self is not None, "mypy" 

1126 self.decoded_self.set_data(data) 

1127 super().set_data(FlateDecode.encode(data)) 

1128 else: 

1129 raise PdfReadError( 

1130 "Streams encoded with a filter different from FlateDecode are not supported" 

1131 ) 

1132 

1133 

1134class ContentStream(DecodedStreamObject): 

1135 """ 

1136 In order to be fast, this data structure can contain either: 

1137 

1138 * raw data in ._data 

1139 * parsed stream operations in ._operations. 

1140 

1141 At any time, ContentStream object can either have both of those fields defined, 

1142 or one field defined and the other set to None. 

1143 

1144 These fields are "rebuilt" lazily, when accessed: 

1145 

1146 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations. 

1147 * when .operations is called, if ._operations is None, it is rebuilt from ._data. 

1148 

1149 Conversely, these fields can be invalidated: 

1150 

1151 * when .set_data() is called, ._operations is set to None. 

1152 * when .operations is set, ._data is set to None. 

1153 """ 

1154 

1155 def __init__( 

1156 self, 

1157 stream: Any, 

1158 pdf: Any, 

1159 forced_encoding: Union[None, str, List[str], Dict[int, str]] = None, 

1160 ) -> None: 

1161 self.pdf = pdf 

1162 self._operations: List[Tuple[Any, bytes]] = [] 

1163 

1164 # stream may be a StreamObject or an ArrayObject containing 

1165 # StreamObjects to be concatenated together. 

1166 if stream is None: 

1167 super().set_data(b"") 

1168 else: 

1169 stream = stream.get_object() 

1170 if isinstance(stream, ArrayObject): 

1171 data = b"" 

1172 for s in stream: 

1173 s_resolved = s.get_object() 

1174 if isinstance(s_resolved, NullObject): 

1175 continue 

1176 if not isinstance(s_resolved, StreamObject): 

1177 # No need to emit an exception here for now - the PDF structure 

1178 # seems to already be broken beforehand in these cases. 

1179 logger_warning( 

1180 f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.", 

1181 __name__ 

1182 ) 

1183 else: 

1184 data += s_resolved.get_data() 

1185 if len(data) == 0 or data[-1] != b"\n": 

1186 data += b"\n" 

1187 super().set_data(bytes(data)) 

1188 else: 

1189 stream_data = stream.get_data() 

1190 assert stream_data is not None 

1191 super().set_data(stream_data) 

1192 self.forced_encoding = forced_encoding 

1193 

1194 def replicate( 

1195 self, 

1196 pdf_dest: PdfWriterProtocol, 

1197 ) -> "ContentStream": 

1198 d__ = cast( 

1199 "ContentStream", 

1200 self._reference_clone(self.__class__(None, None), pdf_dest, False), 

1201 ) 

1202 d__._data = self._data 

1203 try: 

1204 decoded_self = self.decoded_self 

1205 if decoded_self is None: 

1206 self.decoded_self = None 

1207 else: 

1208 self.decoded_self = cast( 

1209 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

1210 ) 

1211 except Exception: 

1212 pass 

1213 for k, v in self.items(): 

1214 d__[k.replicate(pdf_dest)] = ( 

1215 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

1216 ) 

1217 return d__ 

1218 d__.set_data(self._data) 

1219 d__.pdf = pdf_dest 

1220 d__._operations = list(self._operations) 

1221 d__.forced_encoding = self.forced_encoding 

1222 return d__ 

1223 

1224 def clone( 

1225 self, 

1226 pdf_dest: Any, 

1227 force_duplicate: bool = False, 

1228 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

1229 ) -> "ContentStream": 

1230 """ 

1231 Clone object into pdf_dest. 

1232 

1233 Args: 

1234 pdf_dest: 

1235 force_duplicate: 

1236 ignore_fields: 

1237 

1238 Returns: 

1239 The cloned ContentStream 

1240 

1241 """ 

1242 try: 

1243 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

1244 return self 

1245 except Exception: 

1246 pass 

1247 

1248 visited: Set[Tuple[int, int]] = set() 

1249 d__ = cast( 

1250 "ContentStream", 

1251 self._reference_clone( 

1252 self.__class__(None, None), pdf_dest, force_duplicate 

1253 ), 

1254 ) 

1255 if ignore_fields is None: 

1256 ignore_fields = [] 

1257 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

1258 return d__ 

1259 

1260 def _clone( 

1261 self, 

1262 src: DictionaryObject, 

1263 pdf_dest: PdfWriterProtocol, 

1264 force_duplicate: bool, 

1265 ignore_fields: Optional[Sequence[Union[str, int]]], 

1266 visited: Set[Tuple[int, int]], 

1267 ) -> None: 

1268 """ 

1269 Update the object from src. 

1270 

1271 Args: 

1272 src: 

1273 pdf_dest: 

1274 force_duplicate: 

1275 ignore_fields: 

1276 

1277 """ 

1278 src_cs = cast("ContentStream", src) 

1279 super().set_data(src_cs._data) 

1280 self.pdf = pdf_dest 

1281 self._operations = list(src_cs._operations) 

1282 self.forced_encoding = src_cs.forced_encoding 

1283 # no need to call DictionaryObjection or anything 

1284 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

1285 

1286 def _parse_content_stream(self, stream: StreamType) -> None: 

1287 # 7.8.2 Content Streams 

1288 stream.seek(0, 0) 

1289 operands: List[Union[int, str, PdfObject]] = [] 

1290 while True: 

1291 peek = read_non_whitespace(stream) 

1292 if peek in (b"", 0): 

1293 break 

1294 stream.seek(-1, 1) 

1295 if peek.isalpha() or peek in (b"'", b'"'): 

1296 operator = read_until_regex(stream, NameObject.delimiter_pattern) 

1297 if operator == b"BI": 

1298 # begin inline image - a completely different parsing 

1299 # mechanism is required, of course... thanks buddy... 

1300 assert operands == [] 

1301 ii = self._read_inline_image(stream) 

1302 self._operations.append((ii, b"INLINE IMAGE")) 

1303 else: 

1304 self._operations.append((operands, operator)) 

1305 operands = [] 

1306 elif peek == b"%": 

1307 # If we encounter a comment in the content stream, we have to 

1308 # handle it here. Typically, read_object will handle 

1309 # encountering a comment -- but read_object assumes that 

1310 # following the comment must be the object we're trying to 

1311 # read. In this case, it could be an operator instead. 

1312 while peek not in (b"\r", b"\n", b""): 

1313 peek = stream.read(1) 

1314 else: 

1315 operands.append(read_object(stream, None, self.forced_encoding)) 

1316 

1317 def _read_inline_image(self, stream: StreamType) -> Dict[str, Any]: 

1318 # begin reading just after the "BI" - begin image 

1319 # first read the dictionary of settings. 

1320 settings = DictionaryObject() 

1321 while True: 

1322 tok = read_non_whitespace(stream) 

1323 stream.seek(-1, 1) 

1324 if tok == b"I": 

1325 # "ID" - begin of image data 

1326 break 

1327 key = read_object(stream, self.pdf) 

1328 tok = read_non_whitespace(stream) 

1329 stream.seek(-1, 1) 

1330 value = read_object(stream, self.pdf) 

1331 settings[key] = value 

1332 # left at beginning of ID 

1333 tmp = stream.read(3) 

1334 assert tmp[:2] == b"ID" 

1335 filtr = settings.get("/F", settings.get("/Filter", "not set")) 

1336 savpos = stream.tell() 

1337 if isinstance(filtr, list): 

1338 filtr = filtr[0] # used forencoding 

1339 if "AHx" in filtr or "ASCIIHexDecode" in filtr: 

1340 data = extract_inline_AHx(stream) 

1341 elif "A85" in filtr or "ASCII85Decode" in filtr: 

1342 data = extract_inline_A85(stream) 

1343 elif "RL" in filtr or "RunLengthDecode" in filtr: 

1344 data = extract_inline_RL(stream) 

1345 elif "DCT" in filtr or "DCTDecode" in filtr: 

1346 data = extract_inline_DCT(stream) 

1347 elif filtr == "not set": 

1348 cs = settings.get("/CS", "") 

1349 if isinstance(cs, list): 

1350 cs = cs[0] 

1351 if "RGB" in cs: 

1352 lcs = 3 

1353 elif "CMYK" in cs: 

1354 lcs = 4 

1355 else: 

1356 bits = settings.get( 

1357 "/BPC", 

1358 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1, 

1359 ) 

1360 if bits > 0: 

1361 lcs = bits / 8.0 

1362 else: 

1363 data = extract_inline_default(stream) 

1364 lcs = -1 

1365 if lcs > 0: 

1366 data = stream.read( 

1367 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"]) 

1368 ) 

1369 # Move to the `EI` if possible. 

1370 ei = read_non_whitespace(stream) 

1371 stream.seek(-1, 1) 

1372 else: 

1373 data = extract_inline_default(stream) 

1374 

1375 ei = stream.read(3) 

1376 stream.seek(-1, 1) 

1377 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: 

1378 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above. 

1379 stream.seek(savpos, 0) 

1380 data = extract_inline_default(stream) 

1381 ei = stream.read(3) 

1382 stream.seek(-1, 1) 

1383 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover 

1384 # Check the same condition again. This should never fail as 

1385 # edge cases are covered by `extract_inline_default` above, 

1386 # but check this ot make sure that we are behind the `EI` afterwards. 

1387 raise PdfStreamError( 

1388 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}" 

1389 ) 

1390 return {"settings": settings, "data": data} 

1391 

1392 # This overrides the parent method 

1393 def get_data(self) -> bytes: 

1394 if not self._data: 

1395 new_data = BytesIO() 

1396 for operands, operator in self._operations: 

1397 if operator == b"INLINE IMAGE": 

1398 new_data.write(b"BI") 

1399 dict_text = BytesIO() 

1400 operands["settings"].write_to_stream(dict_text) 

1401 new_data.write(dict_text.getvalue()[2:-2]) 

1402 new_data.write(b"ID ") 

1403 new_data.write(operands["data"]) 

1404 new_data.write(b"EI") 

1405 else: 

1406 for op in operands: 

1407 op.write_to_stream(new_data) 

1408 new_data.write(b" ") 

1409 new_data.write(operator) 

1410 new_data.write(b"\n") 

1411 self._data = new_data.getvalue() 

1412 return self._data 

1413 

1414 # This overrides the parent method 

1415 def set_data(self, data: bytes) -> None: 

1416 super().set_data(data) 

1417 self._operations = [] 

1418 

1419 @property 

1420 def operations(self) -> List[Tuple[Any, bytes]]: 

1421 if not self._operations and self._data: 

1422 self._parse_content_stream(BytesIO(self._data)) 

1423 self._data = b"" 

1424 return self._operations 

1425 

1426 @operations.setter 

1427 def operations(self, operations: List[Tuple[Any, bytes]]) -> None: 

1428 self._operations = operations 

1429 self._data = b"" 

1430 

1431 def isolate_graphics_state(self) -> None: 

1432 if self._operations: 

1433 self._operations.insert(0, ([], b"q")) 

1434 self._operations.append(([], b"Q")) 

1435 elif self._data: 

1436 self._data = b"q\n" + self._data + b"\nQ\n" 

1437 

1438 # This overrides the parent method 

1439 def write_to_stream( 

1440 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1441 ) -> None: 

1442 if not self._data and self._operations: 

1443 self.get_data() # this ensures ._data is rebuilt 

1444 super().write_to_stream(stream, encryption_key) 

1445 

1446 

1447def read_object( 

1448 stream: StreamType, 

1449 pdf: Optional[PdfReaderProtocol], 

1450 forced_encoding: Union[None, str, List[str], Dict[int, str]] = None, 

1451) -> Union[PdfObject, int, str, ContentStream]: 

1452 tok = stream.read(1) 

1453 stream.seek(-1, 1) # reset to start 

1454 if tok == b"/": 

1455 return NameObject.read_from_stream(stream, pdf) 

1456 if tok == b"<": 

1457 # hexadecimal string OR dictionary 

1458 peek = stream.read(2) 

1459 stream.seek(-2, 1) # reset to start 

1460 if peek == b"<<": 

1461 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding) 

1462 return read_hex_string_from_stream(stream, forced_encoding) 

1463 if tok == b"[": 

1464 return ArrayObject.read_from_stream(stream, pdf, forced_encoding) 

1465 if tok in (b"t", b"f"): 

1466 return BooleanObject.read_from_stream(stream) 

1467 if tok == b"(": 

1468 return read_string_from_stream(stream, forced_encoding) 

1469 if tok == b"e" and stream.read(6) == b"endobj": 

1470 return NullObject() 

1471 if tok == b"n": 

1472 return NullObject.read_from_stream(stream) 

1473 if tok == b"%": 

1474 # comment 

1475 skip_over_comment(stream) 

1476 tok = read_non_whitespace(stream) 

1477 stream.seek(-1, 1) 

1478 return read_object(stream, pdf, forced_encoding) 

1479 if tok in b"0123456789+-.": 

1480 # number object OR indirect reference 

1481 peek = stream.read(20) 

1482 stream.seek(-len(peek), 1) # reset to start 

1483 if IndirectPattern.match(peek) is not None: 

1484 assert pdf is not None, "mypy" 

1485 return IndirectObject.read_from_stream(stream, pdf) 

1486 return NumberObject.read_from_stream(stream) 

1487 pos = stream.tell() 

1488 stream.seek(-20, 1) 

1489 stream_extract = stream.read(80) 

1490 stream.seek(pos) 

1491 read_until_whitespace(stream) 

1492 raise PdfReadError( 

1493 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}" 

1494 ) 

1495 

1496 

1497class Field(TreeObject): 

1498 """ 

1499 A class representing a field dictionary. 

1500 

1501 This class is accessed through 

1502 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1503 """ 

1504 

1505 def __init__(self, data: DictionaryObject) -> None: 

1506 DictionaryObject.__init__(self) 

1507 field_attributes = ( 

1508 FieldDictionaryAttributes.attributes() 

1509 + CheckboxRadioButtonAttributes.attributes() 

1510 ) 

1511 self.indirect_reference = data.indirect_reference 

1512 for attr in field_attributes: 

1513 try: 

1514 self[NameObject(attr)] = data[attr] 

1515 except KeyError: 

1516 pass 

1517 if isinstance(self.get("/V"), EncodedStreamObject): 

1518 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data() 

1519 if isinstance(d, bytes): 

1520 d_str = d.decode() 

1521 elif d is None: 

1522 d_str = "" 

1523 else: 

1524 raise Exception("Should never happen") 

1525 self[NameObject("/V")] = TextStringObject(d_str) 

1526 

1527 # TABLE 8.69 Entries common to all field dictionaries 

1528 @property 

1529 def field_type(self) -> Optional[NameObject]: 

1530 """Read-only property accessing the type of this field.""" 

1531 return self.get(FieldDictionaryAttributes.FT) 

1532 

1533 @property 

1534 def parent(self) -> Optional[DictionaryObject]: 

1535 """Read-only property accessing the parent of this field.""" 

1536 return self.get(FieldDictionaryAttributes.Parent) 

1537 

1538 @property 

1539 def kids(self) -> Optional["ArrayObject"]: 

1540 """Read-only property accessing the kids of this field.""" 

1541 return self.get(FieldDictionaryAttributes.Kids) 

1542 

1543 @property 

1544 def name(self) -> Optional[str]: 

1545 """Read-only property accessing the name of this field.""" 

1546 return self.get(FieldDictionaryAttributes.T) 

1547 

1548 @property 

1549 def alternate_name(self) -> Optional[str]: 

1550 """Read-only property accessing the alternate name of this field.""" 

1551 return self.get(FieldDictionaryAttributes.TU) 

1552 

1553 @property 

1554 def mapping_name(self) -> Optional[str]: 

1555 """ 

1556 Read-only property accessing the mapping name of this field. 

1557 

1558 This name is used by pypdf as a key in the dictionary returned by 

1559 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1560 """ 

1561 return self.get(FieldDictionaryAttributes.TM) 

1562 

1563 @property 

1564 def flags(self) -> Optional[int]: 

1565 """ 

1566 Read-only property accessing the field flags, specifying various 

1567 characteristics of the field (see Table 8.70 of the PDF 1.7 reference). 

1568 """ 

1569 return self.get(FieldDictionaryAttributes.Ff) 

1570 

1571 @property 

1572 def value(self) -> Optional[Any]: 

1573 """ 

1574 Read-only property accessing the value of this field. 

1575 

1576 Format varies based on field type. 

1577 """ 

1578 return self.get(FieldDictionaryAttributes.V) 

1579 

1580 @property 

1581 def default_value(self) -> Optional[Any]: 

1582 """Read-only property accessing the default value of this field.""" 

1583 return self.get(FieldDictionaryAttributes.DV) 

1584 

1585 @property 

1586 def additional_actions(self) -> Optional[DictionaryObject]: 

1587 """ 

1588 Read-only property accessing the additional actions dictionary. 

1589 

1590 This dictionary defines the field's behavior in response to trigger 

1591 events. See Section 8.5.2 of the PDF 1.7 reference. 

1592 """ 

1593 return self.get(FieldDictionaryAttributes.AA) 

1594 

1595 

1596class Destination(TreeObject): 

1597 """ 

1598 A class representing a destination within a PDF file. 

1599 

1600 See section 12.3.2 of the PDF 2.0 reference. 

1601 

1602 Args: 

1603 title: Title of this destination. 

1604 page: Reference to the page of this destination. Should 

1605 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`. 

1606 fit: How the destination is displayed. 

1607 

1608 Raises: 

1609 PdfReadError: If destination type is invalid. 

1610 

1611 """ 

1612 

1613 node: Optional[ 

1614 DictionaryObject 

1615 ] = None # node provide access to the original Object 

1616 

1617 def __init__( 

1618 self, 

1619 title: str, 

1620 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject], 

1621 fit: Fit, 

1622 ) -> None: 

1623 self._filtered_children: List[Any] = [] # used in PdfWriter 

1624 

1625 typ = fit.fit_type 

1626 args = fit.fit_args 

1627 

1628 DictionaryObject.__init__(self) 

1629 self[NameObject("/Title")] = TextStringObject(title) 

1630 self[NameObject("/Page")] = page 

1631 self[NameObject("/Type")] = typ 

1632 

1633 # from table 8.2 of the PDF 1.7 reference. 

1634 if typ == "/XYZ": 

1635 if len(args) < 1: # left is missing : should never occur 

1636 args.append(NumberObject(0.0)) 

1637 if len(args) < 2: # top is missing 

1638 args.append(NumberObject(0.0)) 

1639 if len(args) < 3: # zoom is missing 

1640 args.append(NumberObject(0.0)) 

1641 ( 

1642 self[NameObject(TA.LEFT)], 

1643 self[NameObject(TA.TOP)], 

1644 self[NameObject("/Zoom")], 

1645 ) = args 

1646 elif len(args) == 0: 

1647 pass 

1648 elif typ == TF.FIT_R: 

1649 ( 

1650 self[NameObject(TA.LEFT)], 

1651 self[NameObject(TA.BOTTOM)], 

1652 self[NameObject(TA.RIGHT)], 

1653 self[NameObject(TA.TOP)], 

1654 ) = args 

1655 elif typ in [TF.FIT_H, TF.FIT_BH]: 

1656 try: # Prefer to be more robust not only to null parameters 

1657 (self[NameObject(TA.TOP)],) = args 

1658 except Exception: 

1659 (self[NameObject(TA.TOP)],) = (NullObject(),) 

1660 elif typ in [TF.FIT_V, TF.FIT_BV]: 

1661 try: # Prefer to be more robust not only to null parameters 

1662 (self[NameObject(TA.LEFT)],) = args 

1663 except Exception: 

1664 (self[NameObject(TA.LEFT)],) = (NullObject(),) 

1665 elif typ in [TF.FIT, TF.FIT_B]: 

1666 pass 

1667 else: 

1668 raise PdfReadError(f"Unknown Destination Type: {typ!r}") 

1669 

1670 @property 

1671 def dest_array(self) -> "ArrayObject": 

1672 return ArrayObject( 

1673 [self.raw_get("/Page"), self["/Type"]] 

1674 + [ 

1675 self[x] 

1676 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"] 

1677 if x in self 

1678 ] 

1679 ) 

1680 

1681 def write_to_stream( 

1682 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1683 ) -> None: 

1684 if encryption_key is not None: # deprecated 

1685 deprecation_no_replacement( 

1686 "the encryption_key parameter of write_to_stream", "5.0.0" 

1687 ) 

1688 stream.write(b"<<\n") 

1689 key = NameObject("/D") 

1690 key.write_to_stream(stream) 

1691 stream.write(b" ") 

1692 value = self.dest_array 

1693 value.write_to_stream(stream) 

1694 

1695 key = NameObject("/S") 

1696 key.write_to_stream(stream) 

1697 stream.write(b" ") 

1698 value_s = NameObject("/GoTo") 

1699 value_s.write_to_stream(stream) 

1700 

1701 stream.write(b"\n") 

1702 stream.write(b">>") 

1703 

1704 @property 

1705 def title(self) -> Optional[str]: 

1706 """Read-only property accessing the destination title.""" 

1707 return self.get("/Title") 

1708 

1709 @property 

1710 def page(self) -> Optional[int]: 

1711 """Read-only property accessing the destination page number.""" 

1712 return self.get("/Page") 

1713 

1714 @property 

1715 def typ(self) -> Optional[str]: 

1716 """Read-only property accessing the destination type.""" 

1717 return self.get("/Type") 

1718 

1719 @property 

1720 def zoom(self) -> Optional[int]: 

1721 """Read-only property accessing the zoom factor.""" 

1722 return self.get("/Zoom", None) 

1723 

1724 @property 

1725 def left(self) -> Optional[FloatObject]: 

1726 """Read-only property accessing the left horizontal coordinate.""" 

1727 return self.get("/Left", None) 

1728 

1729 @property 

1730 def right(self) -> Optional[FloatObject]: 

1731 """Read-only property accessing the right horizontal coordinate.""" 

1732 return self.get("/Right", None) 

1733 

1734 @property 

1735 def top(self) -> Optional[FloatObject]: 

1736 """Read-only property accessing the top vertical coordinate.""" 

1737 return self.get("/Top", None) 

1738 

1739 @property 

1740 def bottom(self) -> Optional[FloatObject]: 

1741 """Read-only property accessing the bottom vertical coordinate.""" 

1742 return self.get("/Bottom", None) 

1743 

1744 @property 

1745 def color(self) -> Optional["ArrayObject"]: 

1746 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0.""" 

1747 return self.get( 

1748 "/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)]) 

1749 ) 

1750 

1751 @property 

1752 def font_format(self) -> Optional[OutlineFontFlag]: 

1753 """ 

1754 Read-only property accessing the font type. 

1755 

1756 1=italic, 2=bold, 3=both 

1757 """ 

1758 return self.get("/F", 0) 

1759 

1760 @property 

1761 def outline_count(self) -> Optional[int]: 

1762 """ 

1763 Read-only property accessing the outline count. 

1764 

1765 positive = expanded 

1766 negative = collapsed 

1767 absolute value = number of visible descendants at all levels 

1768 """ 

1769 return self.get("/Count", None)