Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/generic/_data_structures.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

955 statements  

1# Copyright (c) 2006, Mathieu Fenniak 

2# All rights reserved. 

3# 

4# Redistribution and use in source and binary forms, with or without 

5# modification, are permitted provided that the following conditions are 

6# met: 

7# 

8# * Redistributions of source code must retain the above copyright notice, 

9# this list of conditions and the following disclaimer. 

10# * Redistributions in binary form must reproduce the above copyright notice, 

11# this list of conditions and the following disclaimer in the documentation 

12# and/or other materials provided with the distribution. 

13# * The name of the author may not be used to endorse or promote products 

14# derived from this software without specific prior written permission. 

15# 

16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 

19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 

20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 

22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 

23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 

24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 

25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 

26# POSSIBILITY OF SUCH DAMAGE. 

27 

28 

29__author__ = "Mathieu Fenniak" 

30__author_email__ = "biziqe@mathieu.fenniak.net" 

31 

32import logging 

33import re 

34import sys 

35from collections.abc import Iterable, Sequence 

36from io import BytesIO 

37from math import ceil 

38from typing import ( 

39 Any, 

40 Callable, 

41 Optional, 

42 Union, 

43 cast, 

44) 

45 

46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol 

47from .._utils import ( 

48 WHITESPACES, 

49 StreamType, 

50 deprecation_no_replacement, 

51 logger_warning, 

52 read_non_whitespace, 

53 read_until_regex, 

54 read_until_whitespace, 

55 skip_over_comment, 

56) 

57from ..constants import ( 

58 CheckboxRadioButtonAttributes, 

59 FieldDictionaryAttributes, 

60 OutlineFontFlag, 

61) 

62from ..constants import FilterTypes as FT 

63from ..constants import StreamAttributes as SA 

64from ..constants import TypArguments as TA 

65from ..constants import TypFitArguments as TF 

66from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError 

67from ._base import ( 

68 BooleanObject, 

69 ByteStringObject, 

70 FloatObject, 

71 IndirectObject, 

72 NameObject, 

73 NullObject, 

74 NumberObject, 

75 PdfObject, 

76 TextStringObject, 

77 is_null_or_none, 

78) 

79from ._fit import Fit 

80from ._image_inline import ( 

81 extract_inline_A85, 

82 extract_inline_AHx, 

83 extract_inline_DCT, 

84 extract_inline_default, 

85 extract_inline_RL, 

86) 

87from ._utils import read_hex_string_from_stream, read_string_from_stream 

88 

89if sys.version_info >= (3, 11): 

90 from typing import Self 

91else: 

92 from typing_extensions import Self 

93 

94logger = logging.getLogger(__name__) 

95 

96IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]") 

97 

98 

99class ArrayObject(list[Any], PdfObject): 

100 def replicate( 

101 self, 

102 pdf_dest: PdfWriterProtocol, 

103 ) -> "ArrayObject": 

104 arr = cast( 

105 "ArrayObject", 

106 self._reference_clone(ArrayObject(), pdf_dest, False), 

107 ) 

108 for data in self: 

109 if hasattr(data, "replicate"): 

110 arr.append(data.replicate(pdf_dest)) 

111 else: 

112 arr.append(data) 

113 return arr 

114 

115 def clone( 

116 self, 

117 pdf_dest: PdfWriterProtocol, 

118 force_duplicate: bool = False, 

119 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

120 ) -> "ArrayObject": 

121 """Clone object into pdf_dest.""" 

122 try: 

123 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

124 return self 

125 except Exception: 

126 pass 

127 arr = cast( 

128 "ArrayObject", 

129 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate), 

130 ) 

131 for data in self: 

132 if isinstance(data, StreamObject): 

133 dup = data._reference_clone( 

134 data.clone(pdf_dest, force_duplicate, ignore_fields), 

135 pdf_dest, 

136 force_duplicate, 

137 ) 

138 arr.append(dup.indirect_reference) 

139 elif hasattr(data, "clone"): 

140 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields)) 

141 else: 

142 arr.append(data) 

143 return arr 

144 

145 def hash_bin(self) -> int: 

146 """ 

147 Used to detect modified object. 

148 

149 Returns: 

150 Hash considering type and value. 

151 

152 """ 

153 return hash((self.__class__, tuple(x.hash_bin() for x in self))) 

154 

155 def items(self) -> Iterable[Any]: 

156 """Emulate DictionaryObject.items for a list (index, object).""" 

157 return enumerate(self) 

158 

159 def _to_lst(self, lst: Any) -> list[Any]: 

160 # Convert to list, internal 

161 if isinstance(lst, (list, tuple, set)): 

162 pass 

163 elif isinstance(lst, PdfObject): 

164 lst = [lst] 

165 elif isinstance(lst, str): 

166 if lst[0] == "/": 

167 lst = [NameObject(lst)] 

168 else: 

169 lst = [TextStringObject(lst)] 

170 elif isinstance(lst, bytes): 

171 lst = [ByteStringObject(lst)] 

172 else: # for numbers,... 

173 lst = [lst] 

174 return lst 

175 

176 def __add__(self, lst: Any) -> "ArrayObject": 

177 """ 

178 Allow extension by adding list or add one element only 

179 

180 Args: 

181 lst: any list, tuples are extended the list. 

182 other types(numbers,...) will be appended. 

183 if str is passed it will be converted into TextStringObject 

184 or NameObject (if starting with "/") 

185 if bytes is passed it will be converted into ByteStringObject 

186 

187 Returns: 

188 ArrayObject with all elements 

189 

190 """ 

191 temp = ArrayObject(self) 

192 temp.extend(self._to_lst(lst)) 

193 return temp 

194 

195 def __iadd__(self, lst: Any) -> Self: 

196 """ 

197 Allow extension by adding list or add one element only 

198 

199 Args: 

200 lst: any list, tuples are extended the list. 

201 other types(numbers,...) will be appended. 

202 if str is passed it will be converted into TextStringObject 

203 or NameObject (if starting with "/") 

204 if bytes is passed it will be converted into ByteStringObject 

205 

206 """ 

207 self.extend(self._to_lst(lst)) 

208 return self 

209 

210 def __isub__(self, lst: Any) -> Self: 

211 """Allow to remove items""" 

212 for x in self._to_lst(lst): 

213 try: 

214 index = self.index(x) 

215 del self[index] 

216 except ValueError: 

217 pass 

218 return self 

219 

220 def write_to_stream( 

221 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

222 ) -> None: 

223 if encryption_key is not None: # deprecated 

224 deprecation_no_replacement( 

225 "the encryption_key parameter of write_to_stream", "5.0.0" 

226 ) 

227 stream.write(b"[") 

228 for data in self: 

229 stream.write(b" ") 

230 data.write_to_stream(stream) 

231 stream.write(b" ]") 

232 

233 @staticmethod 

234 def read_from_stream( 

235 stream: StreamType, 

236 pdf: Optional[PdfReaderProtocol], 

237 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

238 ) -> "ArrayObject": 

239 arr = ArrayObject() 

240 tmp = stream.read(1) 

241 if tmp != b"[": 

242 raise PdfReadError("Could not read array") 

243 while True: 

244 # skip leading whitespace 

245 tok = stream.read(1) 

246 while tok.isspace(): 

247 tok = stream.read(1) 

248 if tok == b"": 

249 break 

250 if tok == b"%": 

251 stream.seek(-1, 1) 

252 skip_over_comment(stream) 

253 continue 

254 stream.seek(-1, 1) 

255 # check for array ending 

256 peek_ahead = stream.read(1) 

257 if peek_ahead == b"]": 

258 break 

259 stream.seek(-1, 1) 

260 # read and append object 

261 arr.append(read_object(stream, pdf, forced_encoding)) 

262 return arr 

263 

264 

265class DictionaryObject(dict[Any, Any], PdfObject): 

266 def replicate( 

267 self, 

268 pdf_dest: PdfWriterProtocol, 

269 ) -> "DictionaryObject": 

270 d__ = cast( 

271 "DictionaryObject", 

272 self._reference_clone(self.__class__(), pdf_dest, False), 

273 ) 

274 for k, v in self.items(): 

275 d__[k.replicate(pdf_dest)] = ( 

276 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

277 ) 

278 return d__ 

279 

280 def clone( 

281 self, 

282 pdf_dest: PdfWriterProtocol, 

283 force_duplicate: bool = False, 

284 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

285 ) -> "DictionaryObject": 

286 """Clone object into pdf_dest.""" 

287 try: 

288 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

289 return self 

290 except Exception: 

291 pass 

292 

293 visited: set[tuple[int, int]] = set() # (idnum, generation) 

294 d__ = cast( 

295 "DictionaryObject", 

296 self._reference_clone(self.__class__(), pdf_dest, force_duplicate), 

297 ) 

298 if ignore_fields is None: 

299 ignore_fields = [] 

300 if len(d__.keys()) == 0: 

301 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

302 return d__ 

303 

304 def _clone( 

305 self, 

306 src: "DictionaryObject", 

307 pdf_dest: PdfWriterProtocol, 

308 force_duplicate: bool, 

309 ignore_fields: Optional[Sequence[Union[str, int]]], 

310 visited: set[tuple[int, int]], # (idnum, generation) 

311 ) -> None: 

312 """ 

313 Update the object from src. 

314 

315 Args: 

316 src: "DictionaryObject": 

317 pdf_dest: 

318 force_duplicate: 

319 ignore_fields: 

320 

321 """ 

322 # First we remove the ignore_fields 

323 # that are for a limited number of levels 

324 assert ignore_fields is not None 

325 ignore_fields = list(ignore_fields) 

326 x = 0 

327 while x < len(ignore_fields): 

328 if isinstance(ignore_fields[x], int): 

329 if cast(int, ignore_fields[x]) <= 0: 

330 del ignore_fields[x] 

331 del ignore_fields[x] 

332 continue 

333 ignore_fields[x] -= 1 # type:ignore 

334 x += 1 

335 # Check if this is a chain list, we need to loop to prevent recur 

336 if any( 

337 field not in ignore_fields 

338 and field in src 

339 and isinstance(src.raw_get(field), IndirectObject) 

340 and isinstance(src[field], DictionaryObject) 

341 and ( 

342 src.get("/Type", None) is None 

343 or cast(DictionaryObject, src[field]).get("/Type", None) is None 

344 or src.get("/Type", None) 

345 == cast(DictionaryObject, src[field]).get("/Type", None) 

346 ) 

347 for field in ["/Next", "/Prev", "/N", "/V"] 

348 ): 

349 ignore_fields = list(ignore_fields) 

350 for lst in (("/Next", "/Prev"), ("/N", "/V")): 

351 for k in lst: 

352 objs = [] 

353 if ( 

354 k in src 

355 and k not in self 

356 and isinstance(src.raw_get(k), IndirectObject) 

357 and isinstance(src[k], DictionaryObject) 

358 # If need to go further the idea is to check 

359 # that the types are the same 

360 and ( 

361 src.get("/Type", None) is None 

362 or cast(DictionaryObject, src[k]).get("/Type", None) is None 

363 or src.get("/Type", None) 

364 == cast(DictionaryObject, src[k]).get("/Type", None) 

365 ) 

366 ): 

367 cur_obj: Optional[DictionaryObject] = cast( 

368 "DictionaryObject", src[k] 

369 ) 

370 prev_obj: Optional[DictionaryObject] = self 

371 while cur_obj is not None: 

372 clon = cast( 

373 "DictionaryObject", 

374 cur_obj._reference_clone( 

375 cur_obj.__class__(), pdf_dest, force_duplicate 

376 ), 

377 ) 

378 # Check to see if we've previously processed our item 

379 if clon.indirect_reference is not None: 

380 idnum = clon.indirect_reference.idnum 

381 generation = clon.indirect_reference.generation 

382 if (idnum, generation) in visited: 

383 cur_obj = None 

384 break 

385 visited.add((idnum, generation)) 

386 objs.append((cur_obj, clon)) 

387 assert prev_obj is not None 

388 prev_obj[NameObject(k)] = clon.indirect_reference 

389 prev_obj = clon 

390 try: 

391 if cur_obj == src: 

392 cur_obj = None 

393 else: 

394 cur_obj = cast("DictionaryObject", cur_obj[k]) 

395 except Exception: 

396 cur_obj = None 

397 for s, c in objs: 

398 c._clone( 

399 s, pdf_dest, force_duplicate, ignore_fields, visited 

400 ) 

401 

402 for k, v in src.items(): 

403 if k not in ignore_fields: 

404 if isinstance(v, StreamObject): 

405 if not hasattr(v, "indirect_reference"): 

406 v.indirect_reference = None 

407 vv = v.clone(pdf_dest, force_duplicate, ignore_fields) 

408 assert vv.indirect_reference is not None 

409 self[k.clone(pdf_dest)] = vv.indirect_reference 

410 elif k not in self: 

411 self[NameObject(k)] = ( 

412 v.clone(pdf_dest, force_duplicate, ignore_fields) 

413 if hasattr(v, "clone") 

414 else v 

415 ) 

416 

417 def hash_bin(self) -> int: 

418 """ 

419 Used to detect modified object. 

420 

421 Returns: 

422 Hash considering type and value. 

423 

424 """ 

425 return hash( 

426 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items()))) 

427 ) 

428 

429 def raw_get(self, key: Any) -> Any: 

430 return dict.__getitem__(self, key) 

431 

432 def get_inherited(self, key: str, default: Any = None) -> Any: 

433 """ 

434 Returns the value of a key or from the parent if not found. 

435 If not found returns default. 

436 

437 Args: 

438 key: string identifying the field to return 

439 

440 default: default value to return 

441 

442 Returns: 

443 Current key or inherited one, otherwise default value. 

444 

445 """ 

446 if key in self: 

447 return self[key] 

448 try: 

449 if "/Parent" not in self: 

450 return default 

451 raise KeyError("Not present") 

452 except KeyError: 

453 return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited( 

454 key, default 

455 ) 

456 

457 def __setitem__(self, key: Any, value: Any) -> Any: 

458 if not isinstance(key, PdfObject): 

459 raise ValueError("Key must be a PdfObject") 

460 if not isinstance(value, PdfObject): 

461 raise ValueError("Value must be a PdfObject") 

462 return dict.__setitem__(self, key, value) 

463 

464 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any: 

465 if not isinstance(key, PdfObject): 

466 raise ValueError("Key must be a PdfObject") 

467 if not isinstance(value, PdfObject): 

468 raise ValueError("Value must be a PdfObject") 

469 return dict.setdefault(self, key, value) 

470 

471 def __getitem__(self, key: Any) -> PdfObject: 

472 return dict.__getitem__(self, key).get_object() 

473 

474 @property 

475 def xmp_metadata(self) -> Optional[XmpInformationProtocol]: 

476 """ 

477 Retrieve XMP (Extensible Metadata Platform) data relevant to this 

478 object, if available. 

479 

480 See Table 347 — Additional entries in a metadata stream dictionary. 

481 

482 Returns: 

483 Returns a :class:`~pypdf.xmp.XmpInformation` instance 

484 that can be used to access XMP metadata from the document. Can also 

485 return None if no metadata was found on the document root. 

486 

487 """ 

488 from ..xmp import XmpInformation # noqa: PLC0415 

489 

490 metadata = self.get("/Metadata", None) 

491 if is_null_or_none(metadata): 

492 return None 

493 assert metadata is not None, "mypy" 

494 metadata = metadata.get_object() 

495 return XmpInformation(metadata) 

496 

497 def write_to_stream( 

498 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

499 ) -> None: 

500 if encryption_key is not None: # deprecated 

501 deprecation_no_replacement( 

502 "the encryption_key parameter of write_to_stream", "5.0.0" 

503 ) 

504 stream.write(b"<<\n") 

505 for key, value in self.items(): 

506 if len(key) > 2 and key[1] == "%" and key[-1] == "%": 

507 continue 

508 key.write_to_stream(stream, encryption_key) 

509 stream.write(b" ") 

510 value.write_to_stream(stream) 

511 stream.write(b"\n") 

512 stream.write(b">>") 

513 

514 @staticmethod 

515 def read_from_stream( 

516 stream: StreamType, 

517 pdf: Optional[PdfReaderProtocol], 

518 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

519 ) -> "DictionaryObject": 

520 def get_next_obj_pos( 

521 p: int, p1: int, rem_gens: list[int], pdf: PdfReaderProtocol 

522 ) -> int: 

523 out = p1 

524 for gen in rem_gens: 

525 loc = pdf.xref[gen] 

526 try: 

527 values = [x for x in loc.values() if p < x <= p1] 

528 if values: 

529 out = min(out, *values) 

530 except ValueError: 

531 pass 

532 return out 

533 

534 def read_unsized_from_stream( 

535 stream: StreamType, pdf: PdfReaderProtocol 

536 ) -> bytes: 

537 # we are just pointing at beginning of the stream 

538 eon = get_next_obj_pos(stream.tell(), 2**32, list(pdf.xref), pdf) - 1 

539 curr = stream.tell() 

540 rw = stream.read(eon - stream.tell()) 

541 p = rw.find(b"endstream") 

542 if p < 0: 

543 raise PdfReadError( 

544 f"Unable to find 'endstream' marker for obj starting at {curr}." 

545 ) 

546 stream.seek(curr + p + 9) 

547 return rw[: p - 1] 

548 

549 tmp = stream.read(2) 

550 if tmp != b"<<": 

551 raise PdfReadError( 

552 f"Dictionary read error at byte {hex(stream.tell())}: " 

553 "stream must begin with '<<'" 

554 ) 

555 data: dict[Any, Any] = {} 

556 while True: 

557 tok = read_non_whitespace(stream) 

558 if tok == b"\x00": 

559 continue 

560 if tok == b"%": 

561 stream.seek(-1, 1) 

562 skip_over_comment(stream) 

563 continue 

564 if not tok: 

565 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) 

566 

567 if tok == b">": 

568 stream.read(1) 

569 break 

570 stream.seek(-1, 1) 

571 try: 

572 try: 

573 key = read_object(stream, pdf) 

574 if isinstance(key, NullObject): 

575 break 

576 if not isinstance(key, NameObject): 

577 raise PdfReadError( 

578 f"Expecting a NameObject for key but found {key!r}" 

579 ) 

580 except PdfReadError as exc: 

581 if pdf is not None and pdf.strict: 

582 raise 

583 logger_warning(exc.__repr__(), __name__) 

584 continue 

585 tok = read_non_whitespace(stream) 

586 stream.seek(-1, 1) 

587 value = read_object(stream, pdf, forced_encoding) 

588 except Exception as exc: 

589 if pdf is not None and pdf.strict: 

590 raise PdfReadError(exc.__repr__()) 

591 logger_warning(exc.__repr__(), __name__) 

592 retval = DictionaryObject() 

593 retval.update(data) 

594 return retval # return partial data 

595 

596 if not data.get(key): 

597 data[key] = value 

598 else: 

599 # multiple definitions of key not permitted 

600 msg = ( 

601 f"Multiple definitions in dictionary at byte " 

602 f"{hex(stream.tell())} for key {key}" 

603 ) 

604 if pdf is not None and pdf.strict: 

605 raise PdfReadError(msg) 

606 logger_warning(msg, __name__) 

607 

608 pos = stream.tell() 

609 s = read_non_whitespace(stream) 

610 if s == b"s" and stream.read(5) == b"tream": 

611 eol = stream.read(1) 

612 # Occasional PDF file output has spaces after 'stream' keyword but before EOL. 

613 # patch provided by Danial Sandler 

614 while eol == b" ": 

615 eol = stream.read(1) 

616 if eol not in (b"\n", b"\r"): 

617 raise PdfStreamError("Stream data must be followed by a newline") 

618 if eol == b"\r" and stream.read(1) != b"\n": 

619 stream.seek(-1, 1) 

620 # this is a stream object, not a dictionary 

621 if SA.LENGTH not in data: 

622 if pdf is not None and pdf.strict: 

623 raise PdfStreamError("Stream length not defined") 

624 logger_warning( 

625 f"Stream length not defined @pos={stream.tell()}", __name__ 

626 ) 

627 data[NameObject(SA.LENGTH)] = NumberObject(-1) 

628 length = data[SA.LENGTH] 

629 if isinstance(length, IndirectObject): 

630 t = stream.tell() 

631 assert pdf is not None, "mypy" 

632 length = pdf.get_object(length) 

633 stream.seek(t, 0) 

634 if length is None: # if the PDF is damaged 

635 length = -1 

636 pstart = stream.tell() 

637 if length >= 0: 

638 data["__streamdata__"] = stream.read(length) 

639 else: 

640 data["__streamdata__"] = read_until_regex( 

641 stream, re.compile(b"endstream") 

642 ) 

643 e = read_non_whitespace(stream) 

644 ndstream = stream.read(8) 

645 if (e + ndstream) != b"endstream": 

646 # the odd PDF file has a length that is too long, so 

647 # we need to read backwards to find the "endstream" ending. 

648 # ReportLab (unknown version) generates files with this bug, 

649 # and Python users into PDF files tend to be our audience. 

650 # we need to do this to correct the streamdata and chop off 

651 # an extra character. 

652 pos = stream.tell() 

653 stream.seek(-10, 1) 

654 end = stream.read(9) 

655 if end == b"endstream": 

656 # we found it by looking back one character further. 

657 data["__streamdata__"] = data["__streamdata__"][:-1] 

658 elif pdf is not None and not pdf.strict: 

659 stream.seek(pstart, 0) 

660 data["__streamdata__"] = read_unsized_from_stream(stream, pdf) 

661 pos = stream.tell() 

662 else: 

663 stream.seek(pos, 0) 

664 raise PdfReadError( 

665 "Unable to find 'endstream' marker after stream at byte " 

666 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')." 

667 ) 

668 else: 

669 stream.seek(pos, 0) 

670 if "__streamdata__" in data: 

671 return StreamObject.initialize_from_dictionary(data) 

672 retval = DictionaryObject() 

673 retval.update(data) 

674 return retval 

675 

676 

677class TreeObject(DictionaryObject): 

678 def __init__(self, dct: Optional[DictionaryObject] = None) -> None: 

679 DictionaryObject.__init__(self) 

680 if dct: 

681 self.update(dct) 

682 

683 def has_children(self) -> bool: 

684 return "/First" in self 

685 

686 def __iter__(self) -> Any: 

687 return self.children() 

688 

689 def children(self) -> Iterable[Any]: 

690 if not self.has_children(): 

691 return 

692 

693 child_ref = self[NameObject("/First")] 

694 child = child_ref.get_object() 

695 while True: 

696 yield child 

697 if child == self[NameObject("/Last")]: 

698 return 

699 child_ref = child.get(NameObject("/Next")) # type: ignore 

700 if is_null_or_none(child_ref): 

701 return 

702 child = child_ref.get_object() 

703 

704 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None: 

705 self.insert_child(child, None, pdf) 

706 

707 def inc_parent_counter_default( 

708 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

709 ) -> None: 

710 if is_null_or_none(parent): 

711 return 

712 assert parent is not None, "mypy" 

713 parent = cast("TreeObject", parent.get_object()) 

714 if "/Count" in parent: 

715 parent[NameObject("/Count")] = NumberObject( 

716 max(0, cast(int, parent[NameObject("/Count")]) + n) 

717 ) 

718 self.inc_parent_counter_default(parent.get("/Parent", None), n) 

719 

720 def inc_parent_counter_outline( 

721 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

722 ) -> None: 

723 if is_null_or_none(parent): 

724 return 

725 assert parent is not None, "mypy" 

726 parent = cast("TreeObject", parent.get_object()) 

727 # BooleanObject requires comparison with == not is 

728 opn = parent.get("/%is_open%", True) == True # noqa: E712 

729 c = cast(int, parent.get("/Count", 0)) 

730 if c < 0: 

731 c = abs(c) 

732 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1)) 

733 if not opn: 

734 return 

735 self.inc_parent_counter_outline(parent.get("/Parent", None), n) 

736 

737 def insert_child( 

738 self, 

739 child: Any, 

740 before: Any, 

741 pdf: PdfWriterProtocol, 

742 inc_parent_counter: Optional[Callable[..., Any]] = None, 

743 ) -> IndirectObject: 

744 if inc_parent_counter is None: 

745 inc_parent_counter = self.inc_parent_counter_default 

746 child_obj = child.get_object() 

747 child = child.indirect_reference # get_reference(child_obj) 

748 

749 prev: Optional[DictionaryObject] 

750 if "/First" not in self: # no child yet 

751 self[NameObject("/First")] = child 

752 self[NameObject("/Count")] = NumberObject(0) 

753 self[NameObject("/Last")] = child 

754 child_obj[NameObject("/Parent")] = self.indirect_reference 

755 inc_parent_counter(self, child_obj.get("/Count", 1)) 

756 if "/Next" in child_obj: 

757 del child_obj["/Next"] 

758 if "/Prev" in child_obj: 

759 del child_obj["/Prev"] 

760 return child 

761 prev = cast("DictionaryObject", self["/Last"]) 

762 

763 while prev.indirect_reference != before: 

764 if "/Next" in prev: 

765 prev = cast("TreeObject", prev["/Next"]) 

766 else: # append at the end 

767 prev[NameObject("/Next")] = cast("TreeObject", child) 

768 child_obj[NameObject("/Prev")] = prev.indirect_reference 

769 child_obj[NameObject("/Parent")] = self.indirect_reference 

770 if "/Next" in child_obj: 

771 del child_obj["/Next"] 

772 self[NameObject("/Last")] = child 

773 inc_parent_counter(self, child_obj.get("/Count", 1)) 

774 return child 

775 try: # insert as first or in the middle 

776 assert isinstance(prev["/Prev"], DictionaryObject) 

777 prev["/Prev"][NameObject("/Next")] = child 

778 child_obj[NameObject("/Prev")] = prev["/Prev"] 

779 except Exception: # it means we are inserting in first position 

780 del child_obj["/Next"] 

781 child_obj[NameObject("/Next")] = prev 

782 prev[NameObject("/Prev")] = child 

783 child_obj[NameObject("/Parent")] = self.indirect_reference 

784 inc_parent_counter(self, child_obj.get("/Count", 1)) 

785 return child 

786 

787 def _remove_node_from_tree( 

788 self, prev: Any, prev_ref: Any, cur: Any, last: Any 

789 ) -> None: 

790 """ 

791 Adjust the pointers of the linked list and tree node count. 

792 

793 Args: 

794 prev: 

795 prev_ref: 

796 cur: 

797 last: 

798 

799 """ 

800 next_ref = cur.get(NameObject("/Next"), None) 

801 if prev is None: 

802 if next_ref: 

803 # Removing first tree node 

804 next_obj = next_ref.get_object() 

805 del next_obj[NameObject("/Prev")] 

806 self[NameObject("/First")] = next_ref 

807 self[NameObject("/Count")] = NumberObject( 

808 self[NameObject("/Count")] - 1 # type: ignore 

809 ) 

810 

811 else: 

812 # Removing only tree node 

813 self[NameObject("/Count")] = NumberObject(0) 

814 del self[NameObject("/First")] 

815 if NameObject("/Last") in self: 

816 del self[NameObject("/Last")] 

817 else: 

818 if next_ref: 

819 # Removing middle tree node 

820 next_obj = next_ref.get_object() 

821 next_obj[NameObject("/Prev")] = prev_ref 

822 prev[NameObject("/Next")] = next_ref 

823 else: 

824 # Removing last tree node 

825 assert cur == last 

826 del prev[NameObject("/Next")] 

827 self[NameObject("/Last")] = prev_ref 

828 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore 

829 

830 def remove_child(self, child: Any) -> None: 

831 child_obj = child.get_object() 

832 child = child_obj.indirect_reference 

833 

834 if NameObject("/Parent") not in child_obj: 

835 raise ValueError("Removed child does not appear to be a tree item") 

836 if child_obj[NameObject("/Parent")] != self: 

837 raise ValueError("Removed child is not a member of this tree") 

838 

839 found = False 

840 prev_ref = None 

841 prev = None 

842 cur_ref: Optional[Any] = self[NameObject("/First")] 

843 cur: Optional[dict[str, Any]] = cur_ref.get_object() # type: ignore 

844 last_ref = self[NameObject("/Last")] 

845 last = last_ref.get_object() 

846 while cur is not None: 

847 if cur == child_obj: 

848 self._remove_node_from_tree(prev, prev_ref, cur, last) 

849 found = True 

850 break 

851 

852 # Go to the next node 

853 prev_ref = cur_ref 

854 prev = cur 

855 if NameObject("/Next") in cur: 

856 cur_ref = cur[NameObject("/Next")] 

857 cur = cur_ref.get_object() 

858 else: 

859 cur_ref = None 

860 cur = None 

861 

862 if not found: 

863 raise ValueError("Removal couldn't find item in tree") 

864 

865 _reset_node_tree_relationship(child_obj) 

866 

867 def remove_from_tree(self) -> None: 

868 """Remove the object from the tree it is in.""" 

869 if NameObject("/Parent") not in self: 

870 raise ValueError("Removed child does not appear to be a tree item") 

871 cast("TreeObject", self["/Parent"]).remove_child(self) 

872 

873 def empty_tree(self) -> None: 

874 for child in self: 

875 child_obj = child.get_object() 

876 _reset_node_tree_relationship(child_obj) 

877 

878 if NameObject("/Count") in self: 

879 del self[NameObject("/Count")] 

880 if NameObject("/First") in self: 

881 del self[NameObject("/First")] 

882 if NameObject("/Last") in self: 

883 del self[NameObject("/Last")] 

884 

885 

886def _reset_node_tree_relationship(child_obj: Any) -> None: 

887 """ 

888 Call this after a node has been removed from a tree. 

889 

890 This resets the nodes attributes in respect to that tree. 

891 

892 Args: 

893 child_obj: 

894 

895 """ 

896 del child_obj[NameObject("/Parent")] 

897 if NameObject("/Next") in child_obj: 

898 del child_obj[NameObject("/Next")] 

899 if NameObject("/Prev") in child_obj: 

900 del child_obj[NameObject("/Prev")] 

901 

902 

903class StreamObject(DictionaryObject): 

904 def __init__(self) -> None: 

905 self._data: bytes = b"" 

906 self.decoded_self: Optional[DecodedStreamObject] = None 

907 

908 def replicate( 

909 self, 

910 pdf_dest: PdfWriterProtocol, 

911 ) -> "StreamObject": 

912 d__ = cast( 

913 "StreamObject", 

914 self._reference_clone(self.__class__(), pdf_dest, False), 

915 ) 

916 d__._data = self._data 

917 try: 

918 decoded_self = self.decoded_self 

919 if decoded_self is None: 

920 self.decoded_self = None 

921 else: 

922 self.decoded_self = cast( 

923 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

924 ) 

925 except Exception: 

926 pass 

927 for k, v in self.items(): 

928 d__[k.replicate(pdf_dest)] = ( 

929 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

930 ) 

931 return d__ 

932 

933 def _clone( 

934 self, 

935 src: DictionaryObject, 

936 pdf_dest: PdfWriterProtocol, 

937 force_duplicate: bool, 

938 ignore_fields: Optional[Sequence[Union[str, int]]], 

939 visited: set[tuple[int, int]], 

940 ) -> None: 

941 """ 

942 Update the object from src. 

943 

944 Args: 

945 src: 

946 pdf_dest: 

947 force_duplicate: 

948 ignore_fields: 

949 

950 """ 

951 self._data = cast("StreamObject", src)._data 

952 try: 

953 decoded_self = cast("StreamObject", src).decoded_self 

954 if decoded_self is None: 

955 self.decoded_self = None 

956 else: 

957 self.decoded_self = cast( 

958 "DecodedStreamObject", 

959 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields), 

960 ) 

961 except Exception: 

962 pass 

963 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

964 

965 def hash_bin(self) -> int: 

966 """ 

967 Used to detect modified object. 

968 

969 Returns: 

970 Hash considering type and value. 

971 

972 """ 

973 # Use _data to prevent errors on non-decoded streams. 

974 return hash((super().hash_bin(), self._data)) 

975 

976 def get_data(self) -> bytes: 

977 return self._data 

978 

979 def set_data(self, data: bytes) -> None: 

980 self._data = data 

981 

982 def hash_value_data(self) -> bytes: 

983 data = super().hash_value_data() 

984 data += self.get_data() 

985 return data 

986 

987 def write_to_stream( 

988 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

989 ) -> None: 

990 if encryption_key is not None: # deprecated 

991 deprecation_no_replacement( 

992 "the encryption_key parameter of write_to_stream", "5.0.0" 

993 ) 

994 self[NameObject(SA.LENGTH)] = NumberObject(len(self._data)) 

995 DictionaryObject.write_to_stream(self, stream) 

996 del self[SA.LENGTH] 

997 stream.write(b"\nstream\n") 

998 stream.write(self._data) 

999 stream.write(b"\nendstream") 

1000 

1001 @staticmethod 

1002 def initialize_from_dictionary( 

1003 data: dict[str, Any] 

1004 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]: 

1005 retval: Union[EncodedStreamObject, DecodedStreamObject] 

1006 if SA.FILTER in data: 

1007 retval = EncodedStreamObject() 

1008 else: 

1009 retval = DecodedStreamObject() 

1010 retval._data = data["__streamdata__"] 

1011 del data["__streamdata__"] 

1012 if SA.LENGTH in data: 

1013 del data[SA.LENGTH] 

1014 retval.update(data) 

1015 return retval 

1016 

1017 def flate_encode(self, level: int = -1) -> "EncodedStreamObject": 

1018 from ..filters import FlateDecode # noqa: PLC0415 

1019 

1020 if SA.FILTER in self: 

1021 f = self[SA.FILTER] 

1022 if isinstance(f, ArrayObject): 

1023 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f]) 

1024 try: 

1025 params = ArrayObject( 

1026 [NullObject(), *self.get(SA.DECODE_PARMS, ArrayObject())] 

1027 ) 

1028 except TypeError: 

1029 # case of error where the * operator is not working (not an array 

1030 params = ArrayObject( 

1031 [NullObject(), self.get(SA.DECODE_PARMS, ArrayObject())] 

1032 ) 

1033 else: 

1034 f = ArrayObject([NameObject(FT.FLATE_DECODE), f]) 

1035 params = ArrayObject( 

1036 [NullObject(), self.get(SA.DECODE_PARMS, NullObject())] 

1037 ) 

1038 else: 

1039 f = NameObject(FT.FLATE_DECODE) 

1040 params = None 

1041 retval = EncodedStreamObject() 

1042 retval.update(self) 

1043 retval[NameObject(SA.FILTER)] = f 

1044 if params is not None: 

1045 retval[NameObject(SA.DECODE_PARMS)] = params 

1046 retval._data = FlateDecode.encode(self._data, level) 

1047 return retval 

1048 

1049 def decode_as_image(self, pillow_parameters: Union[dict[str, Any], None] = None) -> Any: 

1050 """ 

1051 Try to decode the stream object as an image 

1052 

1053 Args: 

1054 pillow_parameters: parameters provided to Pillow Image.save() method, 

1055 cf. <https://pillow.readthedocs.io/en/stable/reference/Image.html#PIL.Image.Image.save> 

1056 

1057 Returns: 

1058 a PIL image if proper decoding has been found 

1059 Raises: 

1060 Exception: Errors during decoding will be reported. 

1061 It is recommended to catch exceptions to prevent 

1062 stops in your program. 

1063 

1064 """ 

1065 from ..filters import _xobj_to_image # noqa: PLC0415 

1066 

1067 if self.get("/Subtype", "") != "/Image": 

1068 try: 

1069 msg = f"{self.indirect_reference} does not seem to be an Image" # pragma: no cover 

1070 except AttributeError: 

1071 msg = f"{self.__repr__()} object does not seem to be an Image" # pragma: no cover 

1072 logger_warning(msg, __name__) 

1073 extension, _, img = _xobj_to_image(self, pillow_parameters) 

1074 if extension is None: 

1075 return None # pragma: no cover 

1076 return img 

1077 

1078 

1079class DecodedStreamObject(StreamObject): 

1080 pass 

1081 

1082 

1083class EncodedStreamObject(StreamObject): 

1084 def __init__(self) -> None: 

1085 self.decoded_self: Optional[DecodedStreamObject] = None 

1086 

1087 # This overrides the parent method 

1088 def get_data(self) -> bytes: 

1089 from ..filters import decode_stream_data # noqa: PLC0415 

1090 

1091 if self.decoded_self is not None: 

1092 # Cached version of decoded object 

1093 return self.decoded_self.get_data() 

1094 

1095 # Create decoded object 

1096 decoded = DecodedStreamObject() 

1097 decoded.set_data(decode_stream_data(self)) 

1098 for key, value in self.items(): 

1099 if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS): 

1100 decoded[key] = value 

1101 self.decoded_self = decoded 

1102 return decoded.get_data() 

1103 

1104 # This overrides the parent method: 

1105 def set_data(self, data: bytes) -> None: 

1106 from ..filters import FlateDecode # noqa: PLC0415 

1107 

1108 if self.get(SA.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]): 

1109 if not isinstance(data, bytes): 

1110 raise TypeError("Data must be bytes") 

1111 if self.decoded_self is None: 

1112 self.get_data() # to create self.decoded_self 

1113 assert self.decoded_self is not None, "mypy" 

1114 self.decoded_self.set_data(data) 

1115 super().set_data(FlateDecode.encode(data)) 

1116 else: 

1117 raise PdfReadError( 

1118 "Streams encoded with a filter different from FlateDecode are not supported" 

1119 ) 

1120 

1121 

1122class ContentStream(DecodedStreamObject): 

1123 """ 

1124 In order to be fast, this data structure can contain either: 

1125 

1126 * raw data in ._data 

1127 * parsed stream operations in ._operations. 

1128 

1129 At any time, ContentStream object can either have both of those fields defined, 

1130 or one field defined and the other set to None. 

1131 

1132 These fields are "rebuilt" lazily, when accessed: 

1133 

1134 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations. 

1135 * when .operations is called, if ._operations is None, it is rebuilt from ._data. 

1136 

1137 Conversely, these fields can be invalidated: 

1138 

1139 * when .set_data() is called, ._operations is set to None. 

1140 * when .operations is set, ._data is set to None. 

1141 """ 

1142 

1143 def __init__( 

1144 self, 

1145 stream: Any, 

1146 pdf: Any, 

1147 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

1148 ) -> None: 

1149 self.pdf = pdf 

1150 self._operations: list[tuple[Any, bytes]] = [] 

1151 

1152 # stream may be a StreamObject or an ArrayObject containing 

1153 # StreamObjects to be concatenated together. 

1154 if stream is None: 

1155 super().set_data(b"") 

1156 else: 

1157 stream = stream.get_object() 

1158 if isinstance(stream, ArrayObject): 

1159 data = b"" 

1160 for s in stream: 

1161 s_resolved = s.get_object() 

1162 if isinstance(s_resolved, NullObject): 

1163 continue 

1164 if not isinstance(s_resolved, StreamObject): 

1165 # No need to emit an exception here for now - the PDF structure 

1166 # seems to already be broken beforehand in these cases. 

1167 logger_warning( 

1168 f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.", 

1169 __name__ 

1170 ) 

1171 else: 

1172 data += s_resolved.get_data() 

1173 if len(data) == 0 or data[-1] != b"\n": 

1174 data += b"\n" 

1175 super().set_data(bytes(data)) 

1176 else: 

1177 stream_data = stream.get_data() 

1178 assert stream_data is not None 

1179 super().set_data(stream_data) 

1180 self.forced_encoding = forced_encoding 

1181 

1182 def replicate( 

1183 self, 

1184 pdf_dest: PdfWriterProtocol, 

1185 ) -> "ContentStream": 

1186 d__ = cast( 

1187 "ContentStream", 

1188 self._reference_clone(self.__class__(None, None), pdf_dest, False), 

1189 ) 

1190 d__._data = self._data 

1191 try: 

1192 decoded_self = self.decoded_self 

1193 if decoded_self is None: 

1194 self.decoded_self = None 

1195 else: 

1196 self.decoded_self = cast( 

1197 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

1198 ) 

1199 except Exception: 

1200 pass 

1201 for k, v in self.items(): 

1202 d__[k.replicate(pdf_dest)] = ( 

1203 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

1204 ) 

1205 return d__ 

1206 d__.set_data(self._data) 

1207 d__.pdf = pdf_dest 

1208 d__._operations = list(self._operations) 

1209 d__.forced_encoding = self.forced_encoding 

1210 return d__ 

1211 

1212 def clone( 

1213 self, 

1214 pdf_dest: Any, 

1215 force_duplicate: bool = False, 

1216 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

1217 ) -> "ContentStream": 

1218 """ 

1219 Clone object into pdf_dest. 

1220 

1221 Args: 

1222 pdf_dest: 

1223 force_duplicate: 

1224 ignore_fields: 

1225 

1226 Returns: 

1227 The cloned ContentStream 

1228 

1229 """ 

1230 try: 

1231 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

1232 return self 

1233 except Exception: 

1234 pass 

1235 

1236 visited: set[tuple[int, int]] = set() 

1237 d__ = cast( 

1238 "ContentStream", 

1239 self._reference_clone( 

1240 self.__class__(None, None), pdf_dest, force_duplicate 

1241 ), 

1242 ) 

1243 if ignore_fields is None: 

1244 ignore_fields = [] 

1245 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

1246 return d__ 

1247 

1248 def _clone( 

1249 self, 

1250 src: DictionaryObject, 

1251 pdf_dest: PdfWriterProtocol, 

1252 force_duplicate: bool, 

1253 ignore_fields: Optional[Sequence[Union[str, int]]], 

1254 visited: set[tuple[int, int]], 

1255 ) -> None: 

1256 """ 

1257 Update the object from src. 

1258 

1259 Args: 

1260 src: 

1261 pdf_dest: 

1262 force_duplicate: 

1263 ignore_fields: 

1264 

1265 """ 

1266 src_cs = cast("ContentStream", src) 

1267 super().set_data(src_cs._data) 

1268 self.pdf = pdf_dest 

1269 self._operations = list(src_cs._operations) 

1270 self.forced_encoding = src_cs.forced_encoding 

1271 # no need to call DictionaryObjection or anything 

1272 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

1273 

1274 def _parse_content_stream(self, stream: StreamType) -> None: 

1275 # 7.8.2 Content Streams 

1276 stream.seek(0, 0) 

1277 operands: list[Union[int, str, PdfObject]] = [] 

1278 while True: 

1279 peek = read_non_whitespace(stream) 

1280 if peek in (b"", 0): 

1281 break 

1282 stream.seek(-1, 1) 

1283 if peek.isalpha() or peek in (b"'", b'"'): 

1284 operator = read_until_regex(stream, NameObject.delimiter_pattern) 

1285 if operator == b"BI": 

1286 # begin inline image - a completely different parsing 

1287 # mechanism is required, of course... thanks buddy... 

1288 assert operands == [] 

1289 ii = self._read_inline_image(stream) 

1290 self._operations.append((ii, b"INLINE IMAGE")) 

1291 else: 

1292 self._operations.append((operands, operator)) 

1293 operands = [] 

1294 elif peek == b"%": 

1295 # If we encounter a comment in the content stream, we have to 

1296 # handle it here. Typically, read_object will handle 

1297 # encountering a comment -- but read_object assumes that 

1298 # following the comment must be the object we're trying to 

1299 # read. In this case, it could be an operator instead. 

1300 while peek not in (b"\r", b"\n", b""): 

1301 peek = stream.read(1) 

1302 else: 

1303 operands.append(read_object(stream, None, self.forced_encoding)) 

1304 

1305 def _read_inline_image(self, stream: StreamType) -> dict[str, Any]: 

1306 # begin reading just after the "BI" - begin image 

1307 # first read the dictionary of settings. 

1308 settings = DictionaryObject() 

1309 while True: 

1310 tok = read_non_whitespace(stream) 

1311 stream.seek(-1, 1) 

1312 if tok == b"I": 

1313 # "ID" - begin of image data 

1314 break 

1315 key = read_object(stream, self.pdf) 

1316 tok = read_non_whitespace(stream) 

1317 stream.seek(-1, 1) 

1318 value = read_object(stream, self.pdf) 

1319 settings[key] = value 

1320 # left at beginning of ID 

1321 tmp = stream.read(3) 

1322 assert tmp[:2] == b"ID" 

1323 filtr = settings.get("/F", settings.get("/Filter", "not set")) 

1324 savpos = stream.tell() 

1325 if isinstance(filtr, list): 

1326 filtr = filtr[0] # used forencoding 

1327 if "AHx" in filtr or "ASCIIHexDecode" in filtr: 

1328 data = extract_inline_AHx(stream) 

1329 elif "A85" in filtr or "ASCII85Decode" in filtr: 

1330 data = extract_inline_A85(stream) 

1331 elif "RL" in filtr or "RunLengthDecode" in filtr: 

1332 data = extract_inline_RL(stream) 

1333 elif "DCT" in filtr or "DCTDecode" in filtr: 

1334 data = extract_inline_DCT(stream) 

1335 elif filtr == "not set": 

1336 cs = settings.get("/CS", "") 

1337 if isinstance(cs, list): 

1338 cs = cs[0] 

1339 if "RGB" in cs: 

1340 lcs = 3 

1341 elif "CMYK" in cs: 

1342 lcs = 4 

1343 else: 

1344 bits = settings.get( 

1345 "/BPC", 

1346 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1, 

1347 ) 

1348 if bits > 0: 

1349 lcs = bits / 8.0 

1350 else: 

1351 data = extract_inline_default(stream) 

1352 lcs = -1 

1353 if lcs > 0: 

1354 data = stream.read( 

1355 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"]) 

1356 ) 

1357 # Move to the `EI` if possible. 

1358 ei = read_non_whitespace(stream) 

1359 stream.seek(-1, 1) 

1360 else: 

1361 data = extract_inline_default(stream) 

1362 

1363 ei = stream.read(3) 

1364 stream.seek(-1, 1) 

1365 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: 

1366 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above. 

1367 stream.seek(savpos, 0) 

1368 data = extract_inline_default(stream) 

1369 ei = stream.read(3) 

1370 stream.seek(-1, 1) 

1371 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover 

1372 # Check the same condition again. This should never fail as 

1373 # edge cases are covered by `extract_inline_default` above, 

1374 # but check this ot make sure that we are behind the `EI` afterwards. 

1375 raise PdfStreamError( 

1376 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}" 

1377 ) 

1378 return {"settings": settings, "data": data} 

1379 

1380 # This overrides the parent method 

1381 def get_data(self) -> bytes: 

1382 if not self._data: 

1383 new_data = BytesIO() 

1384 for operands, operator in self._operations: 

1385 if operator == b"INLINE IMAGE": 

1386 new_data.write(b"BI") 

1387 dict_text = BytesIO() 

1388 operands["settings"].write_to_stream(dict_text) 

1389 new_data.write(dict_text.getvalue()[2:-2]) 

1390 new_data.write(b"ID ") 

1391 new_data.write(operands["data"]) 

1392 new_data.write(b"EI") 

1393 else: 

1394 for op in operands: 

1395 op.write_to_stream(new_data) 

1396 new_data.write(b" ") 

1397 new_data.write(operator) 

1398 new_data.write(b"\n") 

1399 self._data = new_data.getvalue() 

1400 return self._data 

1401 

1402 # This overrides the parent method 

1403 def set_data(self, data: bytes) -> None: 

1404 super().set_data(data) 

1405 self._operations = [] 

1406 

1407 @property 

1408 def operations(self) -> list[tuple[Any, bytes]]: 

1409 if not self._operations and self._data: 

1410 self._parse_content_stream(BytesIO(self._data)) 

1411 self._data = b"" 

1412 return self._operations 

1413 

1414 @operations.setter 

1415 def operations(self, operations: list[tuple[Any, bytes]]) -> None: 

1416 self._operations = operations 

1417 self._data = b"" 

1418 

1419 def isolate_graphics_state(self) -> None: 

1420 if self._operations: 

1421 self._operations.insert(0, ([], b"q")) 

1422 self._operations.append(([], b"Q")) 

1423 elif self._data: 

1424 self._data = b"q\n" + self._data + b"\nQ\n" 

1425 

1426 # This overrides the parent method 

1427 def write_to_stream( 

1428 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1429 ) -> None: 

1430 if not self._data and self._operations: 

1431 self.get_data() # this ensures ._data is rebuilt 

1432 super().write_to_stream(stream, encryption_key) 

1433 

1434 

1435def read_object( 

1436 stream: StreamType, 

1437 pdf: Optional[PdfReaderProtocol], 

1438 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

1439) -> Union[PdfObject, int, str, ContentStream]: 

1440 tok = stream.read(1) 

1441 stream.seek(-1, 1) # reset to start 

1442 if tok == b"/": 

1443 return NameObject.read_from_stream(stream, pdf) 

1444 if tok == b"<": 

1445 # hexadecimal string OR dictionary 

1446 peek = stream.read(2) 

1447 stream.seek(-2, 1) # reset to start 

1448 if peek == b"<<": 

1449 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding) 

1450 return read_hex_string_from_stream(stream, forced_encoding) 

1451 if tok == b"[": 

1452 return ArrayObject.read_from_stream(stream, pdf, forced_encoding) 

1453 if tok in (b"t", b"f"): 

1454 return BooleanObject.read_from_stream(stream) 

1455 if tok == b"(": 

1456 return read_string_from_stream(stream, forced_encoding) 

1457 if tok == b"e" and stream.read(6) == b"endobj": 

1458 return NullObject() 

1459 if tok == b"n": 

1460 return NullObject.read_from_stream(stream) 

1461 if tok == b"%": 

1462 # comment 

1463 skip_over_comment(stream) 

1464 tok = read_non_whitespace(stream) 

1465 stream.seek(-1, 1) 

1466 return read_object(stream, pdf, forced_encoding) 

1467 if tok in b"0123456789+-.": 

1468 # number object OR indirect reference 

1469 peek = stream.read(20) 

1470 stream.seek(-len(peek), 1) # reset to start 

1471 if IndirectPattern.match(peek) is not None: 

1472 assert pdf is not None, "mypy" 

1473 return IndirectObject.read_from_stream(stream, pdf) 

1474 return NumberObject.read_from_stream(stream) 

1475 pos = stream.tell() 

1476 stream.seek(-20, 1) 

1477 stream_extract = stream.read(80) 

1478 stream.seek(pos) 

1479 read_until_whitespace(stream) 

1480 raise PdfReadError( 

1481 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}" 

1482 ) 

1483 

1484 

1485class Field(TreeObject): 

1486 """ 

1487 A class representing a field dictionary. 

1488 

1489 This class is accessed through 

1490 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1491 """ 

1492 

1493 def __init__(self, data: DictionaryObject) -> None: 

1494 DictionaryObject.__init__(self) 

1495 field_attributes = ( 

1496 FieldDictionaryAttributes.attributes() 

1497 + CheckboxRadioButtonAttributes.attributes() 

1498 ) 

1499 self.indirect_reference = data.indirect_reference 

1500 for attr in field_attributes: 

1501 try: 

1502 self[NameObject(attr)] = data[attr] 

1503 except KeyError: 

1504 pass 

1505 if isinstance(self.get("/V"), EncodedStreamObject): 

1506 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data() 

1507 if isinstance(d, bytes): 

1508 d_str = d.decode() 

1509 elif d is None: 

1510 d_str = "" 

1511 else: 

1512 raise Exception("Should never happen") 

1513 self[NameObject("/V")] = TextStringObject(d_str) 

1514 

1515 # TABLE 8.69 Entries common to all field dictionaries 

1516 @property 

1517 def field_type(self) -> Optional[NameObject]: 

1518 """Read-only property accessing the type of this field.""" 

1519 return self.get(FieldDictionaryAttributes.FT) 

1520 

1521 @property 

1522 def parent(self) -> Optional[DictionaryObject]: 

1523 """Read-only property accessing the parent of this field.""" 

1524 return self.get(FieldDictionaryAttributes.Parent) 

1525 

1526 @property 

1527 def kids(self) -> Optional["ArrayObject"]: 

1528 """Read-only property accessing the kids of this field.""" 

1529 return self.get(FieldDictionaryAttributes.Kids) 

1530 

1531 @property 

1532 def name(self) -> Optional[str]: 

1533 """Read-only property accessing the name of this field.""" 

1534 return self.get(FieldDictionaryAttributes.T) 

1535 

1536 @property 

1537 def alternate_name(self) -> Optional[str]: 

1538 """Read-only property accessing the alternate name of this field.""" 

1539 return self.get(FieldDictionaryAttributes.TU) 

1540 

1541 @property 

1542 def mapping_name(self) -> Optional[str]: 

1543 """ 

1544 Read-only property accessing the mapping name of this field. 

1545 

1546 This name is used by pypdf as a key in the dictionary returned by 

1547 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1548 """ 

1549 return self.get(FieldDictionaryAttributes.TM) 

1550 

1551 @property 

1552 def flags(self) -> Optional[int]: 

1553 """ 

1554 Read-only property accessing the field flags, specifying various 

1555 characteristics of the field (see Table 8.70 of the PDF 1.7 reference). 

1556 """ 

1557 return self.get(FieldDictionaryAttributes.Ff) 

1558 

1559 @property 

1560 def value(self) -> Optional[Any]: 

1561 """ 

1562 Read-only property accessing the value of this field. 

1563 

1564 Format varies based on field type. 

1565 """ 

1566 return self.get(FieldDictionaryAttributes.V) 

1567 

1568 @property 

1569 def default_value(self) -> Optional[Any]: 

1570 """Read-only property accessing the default value of this field.""" 

1571 return self.get(FieldDictionaryAttributes.DV) 

1572 

1573 @property 

1574 def additional_actions(self) -> Optional[DictionaryObject]: 

1575 """ 

1576 Read-only property accessing the additional actions dictionary. 

1577 

1578 This dictionary defines the field's behavior in response to trigger 

1579 events. See Section 8.5.2 of the PDF 1.7 reference. 

1580 """ 

1581 return self.get(FieldDictionaryAttributes.AA) 

1582 

1583 

1584class Destination(TreeObject): 

1585 """ 

1586 A class representing a destination within a PDF file. 

1587 

1588 See section 12.3.2 of the PDF 2.0 reference. 

1589 

1590 Args: 

1591 title: Title of this destination. 

1592 page: Reference to the page of this destination. Should 

1593 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`. 

1594 fit: How the destination is displayed. 

1595 

1596 Raises: 

1597 PdfReadError: If destination type is invalid. 

1598 

1599 """ 

1600 

1601 node: Optional[ 

1602 DictionaryObject 

1603 ] = None # node provide access to the original Object 

1604 

1605 def __init__( 

1606 self, 

1607 title: Union[str, bytes], 

1608 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject], 

1609 fit: Fit, 

1610 ) -> None: 

1611 self._filtered_children: list[Any] = [] # used in PdfWriter 

1612 

1613 typ = fit.fit_type 

1614 args = fit.fit_args 

1615 

1616 DictionaryObject.__init__(self) 

1617 self[NameObject("/Title")] = TextStringObject(title) 

1618 self[NameObject("/Page")] = page 

1619 self[NameObject("/Type")] = typ 

1620 

1621 # from table 8.2 of the PDF 1.7 reference. 

1622 if typ == "/XYZ": 

1623 if len(args) < 1: # left is missing : should never occur 

1624 args.append(NumberObject(0.0)) 

1625 if len(args) < 2: # top is missing 

1626 args.append(NumberObject(0.0)) 

1627 if len(args) < 3: # zoom is missing 

1628 args.append(NumberObject(0.0)) 

1629 ( 

1630 self[NameObject(TA.LEFT)], 

1631 self[NameObject(TA.TOP)], 

1632 self[NameObject("/Zoom")], 

1633 ) = args 

1634 elif len(args) == 0: 

1635 pass 

1636 elif typ == TF.FIT_R: 

1637 ( 

1638 self[NameObject(TA.LEFT)], 

1639 self[NameObject(TA.BOTTOM)], 

1640 self[NameObject(TA.RIGHT)], 

1641 self[NameObject(TA.TOP)], 

1642 ) = args 

1643 elif typ in [TF.FIT_H, TF.FIT_BH]: 

1644 try: # Prefer to be more robust not only to null parameters 

1645 (self[NameObject(TA.TOP)],) = args 

1646 except Exception: 

1647 (self[NameObject(TA.TOP)],) = (NullObject(),) 

1648 elif typ in [TF.FIT_V, TF.FIT_BV]: 

1649 try: # Prefer to be more robust not only to null parameters 

1650 (self[NameObject(TA.LEFT)],) = args 

1651 except Exception: 

1652 (self[NameObject(TA.LEFT)],) = (NullObject(),) 

1653 elif typ in [TF.FIT, TF.FIT_B]: 

1654 pass 

1655 else: 

1656 raise PdfReadError(f"Unknown Destination Type: {typ!r}") 

1657 

1658 @property 

1659 def dest_array(self) -> "ArrayObject": 

1660 return ArrayObject( 

1661 [self.raw_get("/Page"), self["/Type"]] 

1662 + [ 

1663 self[x] 

1664 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"] 

1665 if x in self 

1666 ] 

1667 ) 

1668 

1669 def write_to_stream( 

1670 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1671 ) -> None: 

1672 if encryption_key is not None: # deprecated 

1673 deprecation_no_replacement( 

1674 "the encryption_key parameter of write_to_stream", "5.0.0" 

1675 ) 

1676 stream.write(b"<<\n") 

1677 key = NameObject("/D") 

1678 key.write_to_stream(stream) 

1679 stream.write(b" ") 

1680 value = self.dest_array 

1681 value.write_to_stream(stream) 

1682 

1683 key = NameObject("/S") 

1684 key.write_to_stream(stream) 

1685 stream.write(b" ") 

1686 value_s = NameObject("/GoTo") 

1687 value_s.write_to_stream(stream) 

1688 

1689 stream.write(b"\n") 

1690 stream.write(b">>") 

1691 

1692 @property 

1693 def title(self) -> Optional[str]: 

1694 """Read-only property accessing the destination title.""" 

1695 return self.get("/Title") 

1696 

1697 @property 

1698 def page(self) -> Optional[IndirectObject]: 

1699 """Read-only property accessing the IndirectObject of the destination page.""" 

1700 return self.get("/Page") 

1701 

1702 @property 

1703 def typ(self) -> Optional[str]: 

1704 """Read-only property accessing the destination type.""" 

1705 return self.get("/Type") 

1706 

1707 @property 

1708 def zoom(self) -> Optional[int]: 

1709 """Read-only property accessing the zoom factor.""" 

1710 return self.get("/Zoom", None) 

1711 

1712 @property 

1713 def left(self) -> Optional[FloatObject]: 

1714 """Read-only property accessing the left horizontal coordinate.""" 

1715 return self.get("/Left", None) 

1716 

1717 @property 

1718 def right(self) -> Optional[FloatObject]: 

1719 """Read-only property accessing the right horizontal coordinate.""" 

1720 return self.get("/Right", None) 

1721 

1722 @property 

1723 def top(self) -> Optional[FloatObject]: 

1724 """Read-only property accessing the top vertical coordinate.""" 

1725 return self.get("/Top", None) 

1726 

1727 @property 

1728 def bottom(self) -> Optional[FloatObject]: 

1729 """Read-only property accessing the bottom vertical coordinate.""" 

1730 return self.get("/Bottom", None) 

1731 

1732 @property 

1733 def color(self) -> Optional["ArrayObject"]: 

1734 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0.""" 

1735 return self.get( 

1736 "/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)]) 

1737 ) 

1738 

1739 @property 

1740 def font_format(self) -> Optional[OutlineFontFlag]: 

1741 """ 

1742 Read-only property accessing the font type. 

1743 

1744 1=italic, 2=bold, 3=both 

1745 """ 

1746 return self.get("/F", 0) 

1747 

1748 @property 

1749 def outline_count(self) -> Optional[int]: 

1750 """ 

1751 Read-only property accessing the outline count. 

1752 

1753 positive = expanded 

1754 negative = collapsed 

1755 absolute value = number of visible descendants at all levels 

1756 """ 

1757 return self.get("/Count", None)