Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/generic/_data_structures.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

955 statements  

1# Copyright (c) 2006, Mathieu Fenniak 

2# All rights reserved. 

3# 

4# Redistribution and use in source and binary forms, with or without 

5# modification, are permitted provided that the following conditions are 

6# met: 

7# 

8# * Redistributions of source code must retain the above copyright notice, 

9# this list of conditions and the following disclaimer. 

10# * Redistributions in binary form must reproduce the above copyright notice, 

11# this list of conditions and the following disclaimer in the documentation 

12# and/or other materials provided with the distribution. 

13# * The name of the author may not be used to endorse or promote products 

14# derived from this software without specific prior written permission. 

15# 

16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 

19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 

20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 

22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 

23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 

24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 

25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 

26# POSSIBILITY OF SUCH DAMAGE. 

27 

28 

29__author__ = "Mathieu Fenniak" 

30__author_email__ = "biziqe@mathieu.fenniak.net" 

31 

32import logging 

33import re 

34import sys 

35from collections.abc import Iterable, Sequence 

36from io import BytesIO 

37from math import ceil 

38from typing import ( 

39 Any, 

40 Callable, 

41 Optional, 

42 Union, 

43 cast, 

44) 

45 

46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol 

47from .._utils import ( 

48 WHITESPACES, 

49 StreamType, 

50 deprecation_no_replacement, 

51 logger_warning, 

52 read_non_whitespace, 

53 read_until_regex, 

54 read_until_whitespace, 

55 skip_over_comment, 

56) 

57from ..constants import ( 

58 CheckboxRadioButtonAttributes, 

59 FieldDictionaryAttributes, 

60 OutlineFontFlag, 

61) 

62from ..constants import FilterTypes as FT 

63from ..constants import StreamAttributes as SA 

64from ..constants import TypArguments as TA 

65from ..constants import TypFitArguments as TF 

66from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError 

67from ._base import ( 

68 BooleanObject, 

69 ByteStringObject, 

70 FloatObject, 

71 IndirectObject, 

72 NameObject, 

73 NullObject, 

74 NumberObject, 

75 PdfObject, 

76 TextStringObject, 

77 is_null_or_none, 

78) 

79from ._fit import Fit 

80from ._image_inline import ( 

81 extract_inline_A85, 

82 extract_inline_AHx, 

83 extract_inline_DCT, 

84 extract_inline_default, 

85 extract_inline_RL, 

86) 

87from ._utils import read_hex_string_from_stream, read_string_from_stream 

88 

89if sys.version_info >= (3, 11): 

90 from typing import Self 

91else: 

92 from typing_extensions import Self 

93 

94logger = logging.getLogger(__name__) 

95IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]") 

96 

97 

98class ArrayObject(list[Any], PdfObject): 

99 def replicate( 

100 self, 

101 pdf_dest: PdfWriterProtocol, 

102 ) -> "ArrayObject": 

103 arr = cast( 

104 "ArrayObject", 

105 self._reference_clone(ArrayObject(), pdf_dest, False), 

106 ) 

107 for data in self: 

108 if hasattr(data, "replicate"): 

109 arr.append(data.replicate(pdf_dest)) 

110 else: 

111 arr.append(data) 

112 return arr 

113 

114 def clone( 

115 self, 

116 pdf_dest: PdfWriterProtocol, 

117 force_duplicate: bool = False, 

118 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

119 ) -> "ArrayObject": 

120 """Clone object into pdf_dest.""" 

121 try: 

122 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

123 return self 

124 except Exception: 

125 pass 

126 arr = cast( 

127 "ArrayObject", 

128 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate), 

129 ) 

130 for data in self: 

131 if isinstance(data, StreamObject): 

132 dup = data._reference_clone( 

133 data.clone(pdf_dest, force_duplicate, ignore_fields), 

134 pdf_dest, 

135 force_duplicate, 

136 ) 

137 arr.append(dup.indirect_reference) 

138 elif hasattr(data, "clone"): 

139 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields)) 

140 else: 

141 arr.append(data) 

142 return arr 

143 

144 def hash_bin(self) -> int: 

145 """ 

146 Used to detect modified object. 

147 

148 Returns: 

149 Hash considering type and value. 

150 

151 """ 

152 return hash((self.__class__, tuple(x.hash_bin() for x in self))) 

153 

154 def items(self) -> Iterable[Any]: 

155 """Emulate DictionaryObject.items for a list (index, object).""" 

156 return enumerate(self) 

157 

158 def _to_lst(self, lst: Any) -> list[Any]: 

159 # Convert to list, internal 

160 if isinstance(lst, (list, tuple, set)): 

161 pass 

162 elif isinstance(lst, PdfObject): 

163 lst = [lst] 

164 elif isinstance(lst, str): 

165 if lst[0] == "/": 

166 lst = [NameObject(lst)] 

167 else: 

168 lst = [TextStringObject(lst)] 

169 elif isinstance(lst, bytes): 

170 lst = [ByteStringObject(lst)] 

171 else: # for numbers,... 

172 lst = [lst] 

173 return lst 

174 

175 def __add__(self, lst: Any) -> "ArrayObject": 

176 """ 

177 Allow extension by adding list or add one element only 

178 

179 Args: 

180 lst: any list, tuples are extended the list. 

181 other types(numbers,...) will be appended. 

182 if str is passed it will be converted into TextStringObject 

183 or NameObject (if starting with "/") 

184 if bytes is passed it will be converted into ByteStringObject 

185 

186 Returns: 

187 ArrayObject with all elements 

188 

189 """ 

190 temp = ArrayObject(self) 

191 temp.extend(self._to_lst(lst)) 

192 return temp 

193 

194 def __iadd__(self, lst: Any) -> Self: 

195 """ 

196 Allow extension by adding list or add one element only 

197 

198 Args: 

199 lst: any list, tuples are extended the list. 

200 other types(numbers,...) will be appended. 

201 if str is passed it will be converted into TextStringObject 

202 or NameObject (if starting with "/") 

203 if bytes is passed it will be converted into ByteStringObject 

204 

205 """ 

206 self.extend(self._to_lst(lst)) 

207 return self 

208 

209 def __isub__(self, lst: Any) -> Self: 

210 """Allow to remove items""" 

211 for x in self._to_lst(lst): 

212 try: 

213 x = self.index(x) 

214 del self[x] 

215 except ValueError: 

216 pass 

217 return self 

218 

219 def write_to_stream( 

220 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

221 ) -> None: 

222 if encryption_key is not None: # deprecated 

223 deprecation_no_replacement( 

224 "the encryption_key parameter of write_to_stream", "5.0.0" 

225 ) 

226 stream.write(b"[") 

227 for data in self: 

228 stream.write(b" ") 

229 data.write_to_stream(stream) 

230 stream.write(b" ]") 

231 

232 @staticmethod 

233 def read_from_stream( 

234 stream: StreamType, 

235 pdf: Optional[PdfReaderProtocol], 

236 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

237 ) -> "ArrayObject": 

238 arr = ArrayObject() 

239 tmp = stream.read(1) 

240 if tmp != b"[": 

241 raise PdfReadError("Could not read array") 

242 while True: 

243 # skip leading whitespace 

244 tok = stream.read(1) 

245 while tok.isspace(): 

246 tok = stream.read(1) 

247 if tok == b"": 

248 break 

249 if tok == b"%": 

250 stream.seek(-1, 1) 

251 skip_over_comment(stream) 

252 continue 

253 stream.seek(-1, 1) 

254 # check for array ending 

255 peek_ahead = stream.read(1) 

256 if peek_ahead == b"]": 

257 break 

258 stream.seek(-1, 1) 

259 # read and append object 

260 arr.append(read_object(stream, pdf, forced_encoding)) 

261 return arr 

262 

263 

264class DictionaryObject(dict[Any, Any], PdfObject): 

265 def replicate( 

266 self, 

267 pdf_dest: PdfWriterProtocol, 

268 ) -> "DictionaryObject": 

269 d__ = cast( 

270 "DictionaryObject", 

271 self._reference_clone(self.__class__(), pdf_dest, False), 

272 ) 

273 for k, v in self.items(): 

274 d__[k.replicate(pdf_dest)] = ( 

275 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

276 ) 

277 return d__ 

278 

279 def clone( 

280 self, 

281 pdf_dest: PdfWriterProtocol, 

282 force_duplicate: bool = False, 

283 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

284 ) -> "DictionaryObject": 

285 """Clone object into pdf_dest.""" 

286 try: 

287 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

288 return self 

289 except Exception: 

290 pass 

291 

292 visited: set[tuple[int, int]] = set() # (idnum, generation) 

293 d__ = cast( 

294 "DictionaryObject", 

295 self._reference_clone(self.__class__(), pdf_dest, force_duplicate), 

296 ) 

297 if ignore_fields is None: 

298 ignore_fields = [] 

299 if len(d__.keys()) == 0: 

300 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

301 return d__ 

302 

303 def _clone( 

304 self, 

305 src: "DictionaryObject", 

306 pdf_dest: PdfWriterProtocol, 

307 force_duplicate: bool, 

308 ignore_fields: Optional[Sequence[Union[str, int]]], 

309 visited: set[tuple[int, int]], # (idnum, generation) 

310 ) -> None: 

311 """ 

312 Update the object from src. 

313 

314 Args: 

315 src: "DictionaryObject": 

316 pdf_dest: 

317 force_duplicate: 

318 ignore_fields: 

319 

320 """ 

321 # first we remove for the ignore_fields 

322 # that are for a limited number of levels 

323 x = 0 

324 assert ignore_fields is not None 

325 ignore_fields = list(ignore_fields) 

326 while x < len(ignore_fields): 

327 if isinstance(ignore_fields[x], int): 

328 if cast(int, ignore_fields[x]) <= 0: 

329 del ignore_fields[x] 

330 del ignore_fields[x] 

331 continue 

332 ignore_fields[x] -= 1 # type:ignore 

333 x += 1 

334 # First check if this is a chain list, we need to loop to prevent recur 

335 if any( 

336 field not in ignore_fields 

337 and field in src 

338 and isinstance(src.raw_get(field), IndirectObject) 

339 and isinstance(src[field], DictionaryObject) 

340 and ( 

341 src.get("/Type", None) is None 

342 or cast(DictionaryObject, src[field]).get("/Type", None) is None 

343 or src.get("/Type", None) 

344 == cast(DictionaryObject, src[field]).get("/Type", None) 

345 ) 

346 for field in ["/Next", "/Prev", "/N", "/V"] 

347 ): 

348 ignore_fields = list(ignore_fields) 

349 for lst in (("/Next", "/Prev"), ("/N", "/V")): 

350 for k in lst: 

351 objs = [] 

352 if ( 

353 k in src 

354 and k not in self 

355 and isinstance(src.raw_get(k), IndirectObject) 

356 and isinstance(src[k], DictionaryObject) 

357 # IF need to go further the idea is to check 

358 # that the types are the same: 

359 and ( 

360 src.get("/Type", None) is None 

361 or cast(DictionaryObject, src[k]).get("/Type", None) is None 

362 or src.get("/Type", None) 

363 == cast(DictionaryObject, src[k]).get("/Type", None) 

364 ) 

365 ): 

366 cur_obj: Optional[DictionaryObject] = cast( 

367 "DictionaryObject", src[k] 

368 ) 

369 prev_obj: Optional[DictionaryObject] = self 

370 while cur_obj is not None: 

371 clon = cast( 

372 "DictionaryObject", 

373 cur_obj._reference_clone( 

374 cur_obj.__class__(), pdf_dest, force_duplicate 

375 ), 

376 ) 

377 # check to see if we've previously processed our item 

378 if clon.indirect_reference is not None: 

379 idnum = clon.indirect_reference.idnum 

380 generation = clon.indirect_reference.generation 

381 if (idnum, generation) in visited: 

382 cur_obj = None 

383 break 

384 visited.add((idnum, generation)) 

385 objs.append((cur_obj, clon)) 

386 assert prev_obj is not None 

387 prev_obj[NameObject(k)] = clon.indirect_reference 

388 prev_obj = clon 

389 try: 

390 if cur_obj == src: 

391 cur_obj = None 

392 else: 

393 cur_obj = cast("DictionaryObject", cur_obj[k]) 

394 except Exception: 

395 cur_obj = None 

396 for s, c in objs: 

397 c._clone( 

398 s, pdf_dest, force_duplicate, ignore_fields, visited 

399 ) 

400 

401 for k, v in src.items(): 

402 if k not in ignore_fields: 

403 if isinstance(v, StreamObject): 

404 if not hasattr(v, "indirect_reference"): 

405 v.indirect_reference = None 

406 vv = v.clone(pdf_dest, force_duplicate, ignore_fields) 

407 assert vv.indirect_reference is not None 

408 self[k.clone(pdf_dest)] = vv.indirect_reference 

409 elif k not in self: 

410 self[NameObject(k)] = ( 

411 v.clone(pdf_dest, force_duplicate, ignore_fields) 

412 if hasattr(v, "clone") 

413 else v 

414 ) 

415 

416 def hash_bin(self) -> int: 

417 """ 

418 Used to detect modified object. 

419 

420 Returns: 

421 Hash considering type and value. 

422 

423 """ 

424 return hash( 

425 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items()))) 

426 ) 

427 

428 def raw_get(self, key: Any) -> Any: 

429 return dict.__getitem__(self, key) 

430 

431 def get_inherited(self, key: str, default: Any = None) -> Any: 

432 """ 

433 Returns the value of a key or from the parent if not found. 

434 If not found returns default. 

435 

436 Args: 

437 key: string identifying the field to return 

438 

439 default: default value to return 

440 

441 Returns: 

442 Current key or inherited one, otherwise default value. 

443 

444 """ 

445 if key in self: 

446 return self[key] 

447 try: 

448 if "/Parent" not in self: 

449 return default 

450 raise KeyError("Not present") 

451 except KeyError: 

452 return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited( 

453 key, default 

454 ) 

455 

456 def __setitem__(self, key: Any, value: Any) -> Any: 

457 if not isinstance(key, PdfObject): 

458 raise ValueError("Key must be a PdfObject") 

459 if not isinstance(value, PdfObject): 

460 raise ValueError("Value must be a PdfObject") 

461 return dict.__setitem__(self, key, value) 

462 

463 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any: 

464 if not isinstance(key, PdfObject): 

465 raise ValueError("Key must be a PdfObject") 

466 if not isinstance(value, PdfObject): 

467 raise ValueError("Value must be a PdfObject") 

468 return dict.setdefault(self, key, value) 

469 

470 def __getitem__(self, key: Any) -> PdfObject: 

471 return dict.__getitem__(self, key).get_object() 

472 

473 @property 

474 def xmp_metadata(self) -> Optional[XmpInformationProtocol]: 

475 """ 

476 Retrieve XMP (Extensible Metadata Platform) data relevant to this 

477 object, if available. 

478 

479 See Table 347 — Additional entries in a metadata stream dictionary. 

480 

481 Returns: 

482 Returns a :class:`~pypdf.xmp.XmpInformation` instance 

483 that can be used to access XMP metadata from the document. Can also 

484 return None if no metadata was found on the document root. 

485 

486 """ 

487 from ..xmp import XmpInformation # noqa: PLC0415 

488 

489 metadata = self.get("/Metadata", None) 

490 if is_null_or_none(metadata): 

491 return None 

492 assert metadata is not None, "mypy" 

493 metadata = metadata.get_object() 

494 return XmpInformation(metadata) 

495 

496 def write_to_stream( 

497 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

498 ) -> None: 

499 if encryption_key is not None: # deprecated 

500 deprecation_no_replacement( 

501 "the encryption_key parameter of write_to_stream", "5.0.0" 

502 ) 

503 stream.write(b"<<\n") 

504 for key, value in self.items(): 

505 if len(key) > 2 and key[1] == "%" and key[-1] == "%": 

506 continue 

507 key.write_to_stream(stream, encryption_key) 

508 stream.write(b" ") 

509 value.write_to_stream(stream) 

510 stream.write(b"\n") 

511 stream.write(b">>") 

512 

513 @staticmethod 

514 def read_from_stream( 

515 stream: StreamType, 

516 pdf: Optional[PdfReaderProtocol], 

517 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

518 ) -> "DictionaryObject": 

519 def get_next_obj_pos( 

520 p: int, p1: int, rem_gens: list[int], pdf: PdfReaderProtocol 

521 ) -> int: 

522 out = p1 

523 for gen in rem_gens: 

524 loc = pdf.xref[gen] 

525 try: 

526 values = [x for x in loc.values() if p < x <= p1] 

527 if values: 

528 out = min(out, *values) 

529 except ValueError: 

530 pass 

531 return out 

532 

533 def read_unsized_from_stream( 

534 stream: StreamType, pdf: PdfReaderProtocol 

535 ) -> bytes: 

536 # we are just pointing at beginning of the stream 

537 eon = get_next_obj_pos(stream.tell(), 2**32, list(pdf.xref), pdf) - 1 

538 curr = stream.tell() 

539 rw = stream.read(eon - stream.tell()) 

540 p = rw.find(b"endstream") 

541 if p < 0: 

542 raise PdfReadError( 

543 f"Unable to find 'endstream' marker for obj starting at {curr}." 

544 ) 

545 stream.seek(curr + p + 9) 

546 return rw[: p - 1] 

547 

548 tmp = stream.read(2) 

549 if tmp != b"<<": 

550 raise PdfReadError( 

551 f"Dictionary read error at byte {hex(stream.tell())}: " 

552 "stream must begin with '<<'" 

553 ) 

554 data: dict[Any, Any] = {} 

555 while True: 

556 tok = read_non_whitespace(stream) 

557 if tok == b"\x00": 

558 continue 

559 if tok == b"%": 

560 stream.seek(-1, 1) 

561 skip_over_comment(stream) 

562 continue 

563 if not tok: 

564 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) 

565 

566 if tok == b">": 

567 stream.read(1) 

568 break 

569 stream.seek(-1, 1) 

570 try: 

571 try: 

572 key = read_object(stream, pdf) 

573 if isinstance(key, NullObject): 

574 break 

575 if not isinstance(key, NameObject): 

576 raise PdfReadError( 

577 f"Expecting a NameObject for key but found {key!r}" 

578 ) 

579 except PdfReadError as exc: 

580 if pdf is not None and pdf.strict: 

581 raise 

582 logger_warning(exc.__repr__(), __name__) 

583 continue 

584 tok = read_non_whitespace(stream) 

585 stream.seek(-1, 1) 

586 value = read_object(stream, pdf, forced_encoding) 

587 except Exception as exc: 

588 if pdf is not None and pdf.strict: 

589 raise PdfReadError(exc.__repr__()) 

590 logger_warning(exc.__repr__(), __name__) 

591 retval = DictionaryObject() 

592 retval.update(data) 

593 return retval # return partial data 

594 

595 if not data.get(key): 

596 data[key] = value 

597 else: 

598 # multiple definitions of key not permitted 

599 msg = ( 

600 f"Multiple definitions in dictionary at byte " 

601 f"{hex(stream.tell())} for key {key}" 

602 ) 

603 if pdf is not None and pdf.strict: 

604 raise PdfReadError(msg) 

605 logger_warning(msg, __name__) 

606 

607 pos = stream.tell() 

608 s = read_non_whitespace(stream) 

609 if s == b"s" and stream.read(5) == b"tream": 

610 eol = stream.read(1) 

611 # Occasional PDF file output has spaces after 'stream' keyword but before EOL. 

612 # patch provided by Danial Sandler 

613 while eol == b" ": 

614 eol = stream.read(1) 

615 if eol not in (b"\n", b"\r"): 

616 raise PdfStreamError("Stream data must be followed by a newline") 

617 if eol == b"\r" and stream.read(1) != b"\n": 

618 stream.seek(-1, 1) 

619 # this is a stream object, not a dictionary 

620 if SA.LENGTH not in data: 

621 if pdf is not None and pdf.strict: 

622 raise PdfStreamError("Stream length not defined") 

623 logger_warning( 

624 f"Stream length not defined @pos={stream.tell()}", __name__ 

625 ) 

626 data[NameObject(SA.LENGTH)] = NumberObject(-1) 

627 length = data[SA.LENGTH] 

628 if isinstance(length, IndirectObject): 

629 t = stream.tell() 

630 assert pdf is not None, "mypy" 

631 length = pdf.get_object(length) 

632 stream.seek(t, 0) 

633 if length is None: # if the PDF is damaged 

634 length = -1 

635 pstart = stream.tell() 

636 if length > 0: 

637 data["__streamdata__"] = stream.read(length) 

638 else: 

639 data["__streamdata__"] = read_until_regex( 

640 stream, re.compile(b"endstream") 

641 ) 

642 e = read_non_whitespace(stream) 

643 ndstream = stream.read(8) 

644 if (e + ndstream) != b"endstream": 

645 # the odd PDF file has a length that is too long, so 

646 # we need to read backwards to find the "endstream" ending. 

647 # ReportLab (unknown version) generates files with this bug, 

648 # and Python users into PDF files tend to be our audience. 

649 # we need to do this to correct the streamdata and chop off 

650 # an extra character. 

651 pos = stream.tell() 

652 stream.seek(-10, 1) 

653 end = stream.read(9) 

654 if end == b"endstream": 

655 # we found it by looking back one character further. 

656 data["__streamdata__"] = data["__streamdata__"][:-1] 

657 elif pdf is not None and not pdf.strict: 

658 stream.seek(pstart, 0) 

659 data["__streamdata__"] = read_unsized_from_stream(stream, pdf) 

660 pos = stream.tell() 

661 else: 

662 stream.seek(pos, 0) 

663 raise PdfReadError( 

664 "Unable to find 'endstream' marker after stream at byte " 

665 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')." 

666 ) 

667 else: 

668 stream.seek(pos, 0) 

669 if "__streamdata__" in data: 

670 return StreamObject.initialize_from_dictionary(data) 

671 retval = DictionaryObject() 

672 retval.update(data) 

673 return retval 

674 

675 

676class TreeObject(DictionaryObject): 

677 def __init__(self, dct: Optional[DictionaryObject] = None) -> None: 

678 DictionaryObject.__init__(self) 

679 if dct: 

680 self.update(dct) 

681 

682 def has_children(self) -> bool: 

683 return "/First" in self 

684 

685 def __iter__(self) -> Any: 

686 return self.children() 

687 

688 def children(self) -> Iterable[Any]: 

689 if not self.has_children(): 

690 return 

691 

692 child_ref = self[NameObject("/First")] 

693 child = child_ref.get_object() 

694 while True: 

695 yield child 

696 if child == self[NameObject("/Last")]: 

697 return 

698 child_ref = child.get(NameObject("/Next")) # type: ignore 

699 if is_null_or_none(child_ref): 

700 return 

701 child = child_ref.get_object() 

702 

703 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None: 

704 self.insert_child(child, None, pdf) 

705 

706 def inc_parent_counter_default( 

707 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

708 ) -> None: 

709 if is_null_or_none(parent): 

710 return 

711 assert parent is not None, "mypy" 

712 parent = cast("TreeObject", parent.get_object()) 

713 if "/Count" in parent: 

714 parent[NameObject("/Count")] = NumberObject( 

715 max(0, cast(int, parent[NameObject("/Count")]) + n) 

716 ) 

717 self.inc_parent_counter_default(parent.get("/Parent", None), n) 

718 

719 def inc_parent_counter_outline( 

720 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

721 ) -> None: 

722 if is_null_or_none(parent): 

723 return 

724 assert parent is not None, "mypy" 

725 parent = cast("TreeObject", parent.get_object()) 

726 # BooleanObject requires comparison with == not is 

727 opn = parent.get("/%is_open%", True) == True # noqa: E712 

728 c = cast(int, parent.get("/Count", 0)) 

729 if c < 0: 

730 c = abs(c) 

731 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1)) 

732 if not opn: 

733 return 

734 self.inc_parent_counter_outline(parent.get("/Parent", None), n) 

735 

736 def insert_child( 

737 self, 

738 child: Any, 

739 before: Any, 

740 pdf: PdfWriterProtocol, 

741 inc_parent_counter: Optional[Callable[..., Any]] = None, 

742 ) -> IndirectObject: 

743 if inc_parent_counter is None: 

744 inc_parent_counter = self.inc_parent_counter_default 

745 child_obj = child.get_object() 

746 child = child.indirect_reference # get_reference(child_obj) 

747 

748 prev: Optional[DictionaryObject] 

749 if "/First" not in self: # no child yet 

750 self[NameObject("/First")] = child 

751 self[NameObject("/Count")] = NumberObject(0) 

752 self[NameObject("/Last")] = child 

753 child_obj[NameObject("/Parent")] = self.indirect_reference 

754 inc_parent_counter(self, child_obj.get("/Count", 1)) 

755 if "/Next" in child_obj: 

756 del child_obj["/Next"] 

757 if "/Prev" in child_obj: 

758 del child_obj["/Prev"] 

759 return child 

760 prev = cast("DictionaryObject", self["/Last"]) 

761 

762 while prev.indirect_reference != before: 

763 if "/Next" in prev: 

764 prev = cast("TreeObject", prev["/Next"]) 

765 else: # append at the end 

766 prev[NameObject("/Next")] = cast("TreeObject", child) 

767 child_obj[NameObject("/Prev")] = prev.indirect_reference 

768 child_obj[NameObject("/Parent")] = self.indirect_reference 

769 if "/Next" in child_obj: 

770 del child_obj["/Next"] 

771 self[NameObject("/Last")] = child 

772 inc_parent_counter(self, child_obj.get("/Count", 1)) 

773 return child 

774 try: # insert as first or in the middle 

775 assert isinstance(prev["/Prev"], DictionaryObject) 

776 prev["/Prev"][NameObject("/Next")] = child 

777 child_obj[NameObject("/Prev")] = prev["/Prev"] 

778 except Exception: # it means we are inserting in first position 

779 del child_obj["/Next"] 

780 child_obj[NameObject("/Next")] = prev 

781 prev[NameObject("/Prev")] = child 

782 child_obj[NameObject("/Parent")] = self.indirect_reference 

783 inc_parent_counter(self, child_obj.get("/Count", 1)) 

784 return child 

785 

786 def _remove_node_from_tree( 

787 self, prev: Any, prev_ref: Any, cur: Any, last: Any 

788 ) -> None: 

789 """ 

790 Adjust the pointers of the linked list and tree node count. 

791 

792 Args: 

793 prev: 

794 prev_ref: 

795 cur: 

796 last: 

797 

798 """ 

799 next_ref = cur.get(NameObject("/Next"), None) 

800 if prev is None: 

801 if next_ref: 

802 # Removing first tree node 

803 next_obj = next_ref.get_object() 

804 del next_obj[NameObject("/Prev")] 

805 self[NameObject("/First")] = next_ref 

806 self[NameObject("/Count")] = NumberObject( 

807 self[NameObject("/Count")] - 1 # type: ignore 

808 ) 

809 

810 else: 

811 # Removing only tree node 

812 self[NameObject("/Count")] = NumberObject(0) 

813 del self[NameObject("/First")] 

814 if NameObject("/Last") in self: 

815 del self[NameObject("/Last")] 

816 else: 

817 if next_ref: 

818 # Removing middle tree node 

819 next_obj = next_ref.get_object() 

820 next_obj[NameObject("/Prev")] = prev_ref 

821 prev[NameObject("/Next")] = next_ref 

822 else: 

823 # Removing last tree node 

824 assert cur == last 

825 del prev[NameObject("/Next")] 

826 self[NameObject("/Last")] = prev_ref 

827 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore 

828 

829 def remove_child(self, child: Any) -> None: 

830 child_obj = child.get_object() 

831 child = child_obj.indirect_reference 

832 

833 if NameObject("/Parent") not in child_obj: 

834 raise ValueError("Removed child does not appear to be a tree item") 

835 if child_obj[NameObject("/Parent")] != self: 

836 raise ValueError("Removed child is not a member of this tree") 

837 

838 found = False 

839 prev_ref = None 

840 prev = None 

841 cur_ref: Optional[Any] = self[NameObject("/First")] 

842 cur: Optional[dict[str, Any]] = cur_ref.get_object() # type: ignore 

843 last_ref = self[NameObject("/Last")] 

844 last = last_ref.get_object() 

845 while cur is not None: 

846 if cur == child_obj: 

847 self._remove_node_from_tree(prev, prev_ref, cur, last) 

848 found = True 

849 break 

850 

851 # Go to the next node 

852 prev_ref = cur_ref 

853 prev = cur 

854 if NameObject("/Next") in cur: 

855 cur_ref = cur[NameObject("/Next")] 

856 cur = cur_ref.get_object() 

857 else: 

858 cur_ref = None 

859 cur = None 

860 

861 if not found: 

862 raise ValueError("Removal couldn't find item in tree") 

863 

864 _reset_node_tree_relationship(child_obj) 

865 

866 def remove_from_tree(self) -> None: 

867 """Remove the object from the tree it is in.""" 

868 if NameObject("/Parent") not in self: 

869 raise ValueError("Removed child does not appear to be a tree item") 

870 cast("TreeObject", self["/Parent"]).remove_child(self) 

871 

872 def empty_tree(self) -> None: 

873 for child in self: 

874 child_obj = child.get_object() 

875 _reset_node_tree_relationship(child_obj) 

876 

877 if NameObject("/Count") in self: 

878 del self[NameObject("/Count")] 

879 if NameObject("/First") in self: 

880 del self[NameObject("/First")] 

881 if NameObject("/Last") in self: 

882 del self[NameObject("/Last")] 

883 

884 

885def _reset_node_tree_relationship(child_obj: Any) -> None: 

886 """ 

887 Call this after a node has been removed from a tree. 

888 

889 This resets the nodes attributes in respect to that tree. 

890 

891 Args: 

892 child_obj: 

893 

894 """ 

895 del child_obj[NameObject("/Parent")] 

896 if NameObject("/Next") in child_obj: 

897 del child_obj[NameObject("/Next")] 

898 if NameObject("/Prev") in child_obj: 

899 del child_obj[NameObject("/Prev")] 

900 

901 

902class StreamObject(DictionaryObject): 

903 def __init__(self) -> None: 

904 self._data: bytes = b"" 

905 self.decoded_self: Optional[DecodedStreamObject] = None 

906 

907 def replicate( 

908 self, 

909 pdf_dest: PdfWriterProtocol, 

910 ) -> "StreamObject": 

911 d__ = cast( 

912 "StreamObject", 

913 self._reference_clone(self.__class__(), pdf_dest, False), 

914 ) 

915 d__._data = self._data 

916 try: 

917 decoded_self = self.decoded_self 

918 if decoded_self is None: 

919 self.decoded_self = None 

920 else: 

921 self.decoded_self = cast( 

922 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

923 ) 

924 except Exception: 

925 pass 

926 for k, v in self.items(): 

927 d__[k.replicate(pdf_dest)] = ( 

928 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

929 ) 

930 return d__ 

931 

932 def _clone( 

933 self, 

934 src: DictionaryObject, 

935 pdf_dest: PdfWriterProtocol, 

936 force_duplicate: bool, 

937 ignore_fields: Optional[Sequence[Union[str, int]]], 

938 visited: set[tuple[int, int]], 

939 ) -> None: 

940 """ 

941 Update the object from src. 

942 

943 Args: 

944 src: 

945 pdf_dest: 

946 force_duplicate: 

947 ignore_fields: 

948 

949 """ 

950 self._data = cast("StreamObject", src)._data 

951 try: 

952 decoded_self = cast("StreamObject", src).decoded_self 

953 if decoded_self is None: 

954 self.decoded_self = None 

955 else: 

956 self.decoded_self = cast( 

957 "DecodedStreamObject", 

958 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields), 

959 ) 

960 except Exception: 

961 pass 

962 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

963 

964 def hash_bin(self) -> int: 

965 """ 

966 Used to detect modified object. 

967 

968 Returns: 

969 Hash considering type and value. 

970 

971 """ 

972 # Use _data to prevent errors on non-decoded streams. 

973 return hash((super().hash_bin(), self._data)) 

974 

975 def get_data(self) -> bytes: 

976 return self._data 

977 

978 def set_data(self, data: bytes) -> None: 

979 self._data = data 

980 

981 def hash_value_data(self) -> bytes: 

982 data = super().hash_value_data() 

983 data += self.get_data() 

984 return data 

985 

986 def write_to_stream( 

987 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

988 ) -> None: 

989 if encryption_key is not None: # deprecated 

990 deprecation_no_replacement( 

991 "the encryption_key parameter of write_to_stream", "5.0.0" 

992 ) 

993 self[NameObject(SA.LENGTH)] = NumberObject(len(self._data)) 

994 DictionaryObject.write_to_stream(self, stream) 

995 del self[SA.LENGTH] 

996 stream.write(b"\nstream\n") 

997 stream.write(self._data) 

998 stream.write(b"\nendstream") 

999 

1000 @staticmethod 

1001 def initialize_from_dictionary( 

1002 data: dict[str, Any] 

1003 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]: 

1004 retval: Union[EncodedStreamObject, DecodedStreamObject] 

1005 if SA.FILTER in data: 

1006 retval = EncodedStreamObject() 

1007 else: 

1008 retval = DecodedStreamObject() 

1009 retval._data = data["__streamdata__"] 

1010 del data["__streamdata__"] 

1011 if SA.LENGTH in data: 

1012 del data[SA.LENGTH] 

1013 retval.update(data) 

1014 return retval 

1015 

1016 def flate_encode(self, level: int = -1) -> "EncodedStreamObject": 

1017 from ..filters import FlateDecode # noqa: PLC0415 

1018 

1019 if SA.FILTER in self: 

1020 f = self[SA.FILTER] 

1021 if isinstance(f, ArrayObject): 

1022 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f]) 

1023 try: 

1024 params = ArrayObject( 

1025 [NullObject(), *self.get(SA.DECODE_PARMS, ArrayObject())] 

1026 ) 

1027 except TypeError: 

1028 # case of error where the * operator is not working (not an array 

1029 params = ArrayObject( 

1030 [NullObject(), self.get(SA.DECODE_PARMS, ArrayObject())] 

1031 ) 

1032 else: 

1033 f = ArrayObject([NameObject(FT.FLATE_DECODE), f]) 

1034 params = ArrayObject( 

1035 [NullObject(), self.get(SA.DECODE_PARMS, NullObject())] 

1036 ) 

1037 else: 

1038 f = NameObject(FT.FLATE_DECODE) 

1039 params = None 

1040 retval = EncodedStreamObject() 

1041 retval.update(self) 

1042 retval[NameObject(SA.FILTER)] = f 

1043 if params is not None: 

1044 retval[NameObject(SA.DECODE_PARMS)] = params 

1045 retval._data = FlateDecode.encode(self._data, level) 

1046 return retval 

1047 

1048 def decode_as_image(self) -> Any: 

1049 """ 

1050 Try to decode the stream object as an image 

1051 

1052 Returns: 

1053 a PIL image if proper decoding has been found 

1054 Raises: 

1055 Exception: (any)during decoding to to invalid object or 

1056 errors during decoding will be reported 

1057 It is recommended to catch exceptions to prevent 

1058 stops in your program. 

1059 

1060 """ 

1061 from ..filters import _xobj_to_image # noqa: PLC0415 

1062 

1063 if self.get("/Subtype", "") != "/Image": 

1064 try: 

1065 msg = f"{self.indirect_reference} does not seem to be an Image" # pragma: no cover 

1066 except AttributeError: 

1067 msg = f"{self.__repr__()} object does not seem to be an Image" # pragma: no cover 

1068 logger_warning(msg, __name__) 

1069 extension, byte_stream, img = _xobj_to_image(self) 

1070 if extension is None: 

1071 return None # pragma: no cover 

1072 return img 

1073 

1074 

1075class DecodedStreamObject(StreamObject): 

1076 pass 

1077 

1078 

1079class EncodedStreamObject(StreamObject): 

1080 def __init__(self) -> None: 

1081 self.decoded_self: Optional[DecodedStreamObject] = None 

1082 

1083 # This overrides the parent method 

1084 def get_data(self) -> bytes: 

1085 from ..filters import decode_stream_data # noqa: PLC0415 

1086 

1087 if self.decoded_self is not None: 

1088 # Cached version of decoded object 

1089 return self.decoded_self.get_data() 

1090 

1091 # Create decoded object 

1092 decoded = DecodedStreamObject() 

1093 decoded.set_data(decode_stream_data(self)) 

1094 for key, value in self.items(): 

1095 if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS): 

1096 decoded[key] = value 

1097 self.decoded_self = decoded 

1098 return decoded.get_data() 

1099 

1100 # This overrides the parent method: 

1101 def set_data(self, data: bytes) -> None: 

1102 from ..filters import FlateDecode # noqa: PLC0415 

1103 

1104 if self.get(SA.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]): 

1105 if not isinstance(data, bytes): 

1106 raise TypeError("Data must be bytes") 

1107 if self.decoded_self is None: 

1108 self.get_data() # to create self.decoded_self 

1109 assert self.decoded_self is not None, "mypy" 

1110 self.decoded_self.set_data(data) 

1111 super().set_data(FlateDecode.encode(data)) 

1112 else: 

1113 raise PdfReadError( 

1114 "Streams encoded with a filter different from FlateDecode are not supported" 

1115 ) 

1116 

1117 

1118class ContentStream(DecodedStreamObject): 

1119 """ 

1120 In order to be fast, this data structure can contain either: 

1121 

1122 * raw data in ._data 

1123 * parsed stream operations in ._operations. 

1124 

1125 At any time, ContentStream object can either have both of those fields defined, 

1126 or one field defined and the other set to None. 

1127 

1128 These fields are "rebuilt" lazily, when accessed: 

1129 

1130 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations. 

1131 * when .operations is called, if ._operations is None, it is rebuilt from ._data. 

1132 

1133 Conversely, these fields can be invalidated: 

1134 

1135 * when .set_data() is called, ._operations is set to None. 

1136 * when .operations is set, ._data is set to None. 

1137 """ 

1138 

1139 def __init__( 

1140 self, 

1141 stream: Any, 

1142 pdf: Any, 

1143 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

1144 ) -> None: 

1145 self.pdf = pdf 

1146 self._operations: list[tuple[Any, bytes]] = [] 

1147 

1148 # stream may be a StreamObject or an ArrayObject containing 

1149 # StreamObjects to be concatenated together. 

1150 if stream is None: 

1151 super().set_data(b"") 

1152 else: 

1153 stream = stream.get_object() 

1154 if isinstance(stream, ArrayObject): 

1155 data = b"" 

1156 for s in stream: 

1157 s_resolved = s.get_object() 

1158 if isinstance(s_resolved, NullObject): 

1159 continue 

1160 if not isinstance(s_resolved, StreamObject): 

1161 # No need to emit an exception here for now - the PDF structure 

1162 # seems to already be broken beforehand in these cases. 

1163 logger_warning( 

1164 f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.", 

1165 __name__ 

1166 ) 

1167 else: 

1168 data += s_resolved.get_data() 

1169 if len(data) == 0 or data[-1] != b"\n": 

1170 data += b"\n" 

1171 super().set_data(bytes(data)) 

1172 else: 

1173 stream_data = stream.get_data() 

1174 assert stream_data is not None 

1175 super().set_data(stream_data) 

1176 self.forced_encoding = forced_encoding 

1177 

1178 def replicate( 

1179 self, 

1180 pdf_dest: PdfWriterProtocol, 

1181 ) -> "ContentStream": 

1182 d__ = cast( 

1183 "ContentStream", 

1184 self._reference_clone(self.__class__(None, None), pdf_dest, False), 

1185 ) 

1186 d__._data = self._data 

1187 try: 

1188 decoded_self = self.decoded_self 

1189 if decoded_self is None: 

1190 self.decoded_self = None 

1191 else: 

1192 self.decoded_self = cast( 

1193 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

1194 ) 

1195 except Exception: 

1196 pass 

1197 for k, v in self.items(): 

1198 d__[k.replicate(pdf_dest)] = ( 

1199 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

1200 ) 

1201 return d__ 

1202 d__.set_data(self._data) 

1203 d__.pdf = pdf_dest 

1204 d__._operations = list(self._operations) 

1205 d__.forced_encoding = self.forced_encoding 

1206 return d__ 

1207 

1208 def clone( 

1209 self, 

1210 pdf_dest: Any, 

1211 force_duplicate: bool = False, 

1212 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

1213 ) -> "ContentStream": 

1214 """ 

1215 Clone object into pdf_dest. 

1216 

1217 Args: 

1218 pdf_dest: 

1219 force_duplicate: 

1220 ignore_fields: 

1221 

1222 Returns: 

1223 The cloned ContentStream 

1224 

1225 """ 

1226 try: 

1227 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

1228 return self 

1229 except Exception: 

1230 pass 

1231 

1232 visited: set[tuple[int, int]] = set() 

1233 d__ = cast( 

1234 "ContentStream", 

1235 self._reference_clone( 

1236 self.__class__(None, None), pdf_dest, force_duplicate 

1237 ), 

1238 ) 

1239 if ignore_fields is None: 

1240 ignore_fields = [] 

1241 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

1242 return d__ 

1243 

1244 def _clone( 

1245 self, 

1246 src: DictionaryObject, 

1247 pdf_dest: PdfWriterProtocol, 

1248 force_duplicate: bool, 

1249 ignore_fields: Optional[Sequence[Union[str, int]]], 

1250 visited: set[tuple[int, int]], 

1251 ) -> None: 

1252 """ 

1253 Update the object from src. 

1254 

1255 Args: 

1256 src: 

1257 pdf_dest: 

1258 force_duplicate: 

1259 ignore_fields: 

1260 

1261 """ 

1262 src_cs = cast("ContentStream", src) 

1263 super().set_data(src_cs._data) 

1264 self.pdf = pdf_dest 

1265 self._operations = list(src_cs._operations) 

1266 self.forced_encoding = src_cs.forced_encoding 

1267 # no need to call DictionaryObjection or anything 

1268 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

1269 

1270 def _parse_content_stream(self, stream: StreamType) -> None: 

1271 # 7.8.2 Content Streams 

1272 stream.seek(0, 0) 

1273 operands: list[Union[int, str, PdfObject]] = [] 

1274 while True: 

1275 peek = read_non_whitespace(stream) 

1276 if peek in (b"", 0): 

1277 break 

1278 stream.seek(-1, 1) 

1279 if peek.isalpha() or peek in (b"'", b'"'): 

1280 operator = read_until_regex(stream, NameObject.delimiter_pattern) 

1281 if operator == b"BI": 

1282 # begin inline image - a completely different parsing 

1283 # mechanism is required, of course... thanks buddy... 

1284 assert operands == [] 

1285 ii = self._read_inline_image(stream) 

1286 self._operations.append((ii, b"INLINE IMAGE")) 

1287 else: 

1288 self._operations.append((operands, operator)) 

1289 operands = [] 

1290 elif peek == b"%": 

1291 # If we encounter a comment in the content stream, we have to 

1292 # handle it here. Typically, read_object will handle 

1293 # encountering a comment -- but read_object assumes that 

1294 # following the comment must be the object we're trying to 

1295 # read. In this case, it could be an operator instead. 

1296 while peek not in (b"\r", b"\n", b""): 

1297 peek = stream.read(1) 

1298 else: 

1299 operands.append(read_object(stream, None, self.forced_encoding)) 

1300 

1301 def _read_inline_image(self, stream: StreamType) -> dict[str, Any]: 

1302 # begin reading just after the "BI" - begin image 

1303 # first read the dictionary of settings. 

1304 settings = DictionaryObject() 

1305 while True: 

1306 tok = read_non_whitespace(stream) 

1307 stream.seek(-1, 1) 

1308 if tok == b"I": 

1309 # "ID" - begin of image data 

1310 break 

1311 key = read_object(stream, self.pdf) 

1312 tok = read_non_whitespace(stream) 

1313 stream.seek(-1, 1) 

1314 value = read_object(stream, self.pdf) 

1315 settings[key] = value 

1316 # left at beginning of ID 

1317 tmp = stream.read(3) 

1318 assert tmp[:2] == b"ID" 

1319 filtr = settings.get("/F", settings.get("/Filter", "not set")) 

1320 savpos = stream.tell() 

1321 if isinstance(filtr, list): 

1322 filtr = filtr[0] # used forencoding 

1323 if "AHx" in filtr or "ASCIIHexDecode" in filtr: 

1324 data = extract_inline_AHx(stream) 

1325 elif "A85" in filtr or "ASCII85Decode" in filtr: 

1326 data = extract_inline_A85(stream) 

1327 elif "RL" in filtr or "RunLengthDecode" in filtr: 

1328 data = extract_inline_RL(stream) 

1329 elif "DCT" in filtr or "DCTDecode" in filtr: 

1330 data = extract_inline_DCT(stream) 

1331 elif filtr == "not set": 

1332 cs = settings.get("/CS", "") 

1333 if isinstance(cs, list): 

1334 cs = cs[0] 

1335 if "RGB" in cs: 

1336 lcs = 3 

1337 elif "CMYK" in cs: 

1338 lcs = 4 

1339 else: 

1340 bits = settings.get( 

1341 "/BPC", 

1342 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1, 

1343 ) 

1344 if bits > 0: 

1345 lcs = bits / 8.0 

1346 else: 

1347 data = extract_inline_default(stream) 

1348 lcs = -1 

1349 if lcs > 0: 

1350 data = stream.read( 

1351 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"]) 

1352 ) 

1353 # Move to the `EI` if possible. 

1354 ei = read_non_whitespace(stream) 

1355 stream.seek(-1, 1) 

1356 else: 

1357 data = extract_inline_default(stream) 

1358 

1359 ei = stream.read(3) 

1360 stream.seek(-1, 1) 

1361 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: 

1362 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above. 

1363 stream.seek(savpos, 0) 

1364 data = extract_inline_default(stream) 

1365 ei = stream.read(3) 

1366 stream.seek(-1, 1) 

1367 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover 

1368 # Check the same condition again. This should never fail as 

1369 # edge cases are covered by `extract_inline_default` above, 

1370 # but check this ot make sure that we are behind the `EI` afterwards. 

1371 raise PdfStreamError( 

1372 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}" 

1373 ) 

1374 return {"settings": settings, "data": data} 

1375 

1376 # This overrides the parent method 

1377 def get_data(self) -> bytes: 

1378 if not self._data: 

1379 new_data = BytesIO() 

1380 for operands, operator in self._operations: 

1381 if operator == b"INLINE IMAGE": 

1382 new_data.write(b"BI") 

1383 dict_text = BytesIO() 

1384 operands["settings"].write_to_stream(dict_text) 

1385 new_data.write(dict_text.getvalue()[2:-2]) 

1386 new_data.write(b"ID ") 

1387 new_data.write(operands["data"]) 

1388 new_data.write(b"EI") 

1389 else: 

1390 for op in operands: 

1391 op.write_to_stream(new_data) 

1392 new_data.write(b" ") 

1393 new_data.write(operator) 

1394 new_data.write(b"\n") 

1395 self._data = new_data.getvalue() 

1396 return self._data 

1397 

1398 # This overrides the parent method 

1399 def set_data(self, data: bytes) -> None: 

1400 super().set_data(data) 

1401 self._operations = [] 

1402 

1403 @property 

1404 def operations(self) -> list[tuple[Any, bytes]]: 

1405 if not self._operations and self._data: 

1406 self._parse_content_stream(BytesIO(self._data)) 

1407 self._data = b"" 

1408 return self._operations 

1409 

1410 @operations.setter 

1411 def operations(self, operations: list[tuple[Any, bytes]]) -> None: 

1412 self._operations = operations 

1413 self._data = b"" 

1414 

1415 def isolate_graphics_state(self) -> None: 

1416 if self._operations: 

1417 self._operations.insert(0, ([], b"q")) 

1418 self._operations.append(([], b"Q")) 

1419 elif self._data: 

1420 self._data = b"q\n" + self._data + b"\nQ\n" 

1421 

1422 # This overrides the parent method 

1423 def write_to_stream( 

1424 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1425 ) -> None: 

1426 if not self._data and self._operations: 

1427 self.get_data() # this ensures ._data is rebuilt 

1428 super().write_to_stream(stream, encryption_key) 

1429 

1430 

1431def read_object( 

1432 stream: StreamType, 

1433 pdf: Optional[PdfReaderProtocol], 

1434 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

1435) -> Union[PdfObject, int, str, ContentStream]: 

1436 tok = stream.read(1) 

1437 stream.seek(-1, 1) # reset to start 

1438 if tok == b"/": 

1439 return NameObject.read_from_stream(stream, pdf) 

1440 if tok == b"<": 

1441 # hexadecimal string OR dictionary 

1442 peek = stream.read(2) 

1443 stream.seek(-2, 1) # reset to start 

1444 if peek == b"<<": 

1445 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding) 

1446 return read_hex_string_from_stream(stream, forced_encoding) 

1447 if tok == b"[": 

1448 return ArrayObject.read_from_stream(stream, pdf, forced_encoding) 

1449 if tok in (b"t", b"f"): 

1450 return BooleanObject.read_from_stream(stream) 

1451 if tok == b"(": 

1452 return read_string_from_stream(stream, forced_encoding) 

1453 if tok == b"e" and stream.read(6) == b"endobj": 

1454 return NullObject() 

1455 if tok == b"n": 

1456 return NullObject.read_from_stream(stream) 

1457 if tok == b"%": 

1458 # comment 

1459 skip_over_comment(stream) 

1460 tok = read_non_whitespace(stream) 

1461 stream.seek(-1, 1) 

1462 return read_object(stream, pdf, forced_encoding) 

1463 if tok in b"0123456789+-.": 

1464 # number object OR indirect reference 

1465 peek = stream.read(20) 

1466 stream.seek(-len(peek), 1) # reset to start 

1467 if IndirectPattern.match(peek) is not None: 

1468 assert pdf is not None, "mypy" 

1469 return IndirectObject.read_from_stream(stream, pdf) 

1470 return NumberObject.read_from_stream(stream) 

1471 pos = stream.tell() 

1472 stream.seek(-20, 1) 

1473 stream_extract = stream.read(80) 

1474 stream.seek(pos) 

1475 read_until_whitespace(stream) 

1476 raise PdfReadError( 

1477 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}" 

1478 ) 

1479 

1480 

1481class Field(TreeObject): 

1482 """ 

1483 A class representing a field dictionary. 

1484 

1485 This class is accessed through 

1486 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1487 """ 

1488 

1489 def __init__(self, data: DictionaryObject) -> None: 

1490 DictionaryObject.__init__(self) 

1491 field_attributes = ( 

1492 FieldDictionaryAttributes.attributes() 

1493 + CheckboxRadioButtonAttributes.attributes() 

1494 ) 

1495 self.indirect_reference = data.indirect_reference 

1496 for attr in field_attributes: 

1497 try: 

1498 self[NameObject(attr)] = data[attr] 

1499 except KeyError: 

1500 pass 

1501 if isinstance(self.get("/V"), EncodedStreamObject): 

1502 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data() 

1503 if isinstance(d, bytes): 

1504 d_str = d.decode() 

1505 elif d is None: 

1506 d_str = "" 

1507 else: 

1508 raise Exception("Should never happen") 

1509 self[NameObject("/V")] = TextStringObject(d_str) 

1510 

1511 # TABLE 8.69 Entries common to all field dictionaries 

1512 @property 

1513 def field_type(self) -> Optional[NameObject]: 

1514 """Read-only property accessing the type of this field.""" 

1515 return self.get(FieldDictionaryAttributes.FT) 

1516 

1517 @property 

1518 def parent(self) -> Optional[DictionaryObject]: 

1519 """Read-only property accessing the parent of this field.""" 

1520 return self.get(FieldDictionaryAttributes.Parent) 

1521 

1522 @property 

1523 def kids(self) -> Optional["ArrayObject"]: 

1524 """Read-only property accessing the kids of this field.""" 

1525 return self.get(FieldDictionaryAttributes.Kids) 

1526 

1527 @property 

1528 def name(self) -> Optional[str]: 

1529 """Read-only property accessing the name of this field.""" 

1530 return self.get(FieldDictionaryAttributes.T) 

1531 

1532 @property 

1533 def alternate_name(self) -> Optional[str]: 

1534 """Read-only property accessing the alternate name of this field.""" 

1535 return self.get(FieldDictionaryAttributes.TU) 

1536 

1537 @property 

1538 def mapping_name(self) -> Optional[str]: 

1539 """ 

1540 Read-only property accessing the mapping name of this field. 

1541 

1542 This name is used by pypdf as a key in the dictionary returned by 

1543 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1544 """ 

1545 return self.get(FieldDictionaryAttributes.TM) 

1546 

1547 @property 

1548 def flags(self) -> Optional[int]: 

1549 """ 

1550 Read-only property accessing the field flags, specifying various 

1551 characteristics of the field (see Table 8.70 of the PDF 1.7 reference). 

1552 """ 

1553 return self.get(FieldDictionaryAttributes.Ff) 

1554 

1555 @property 

1556 def value(self) -> Optional[Any]: 

1557 """ 

1558 Read-only property accessing the value of this field. 

1559 

1560 Format varies based on field type. 

1561 """ 

1562 return self.get(FieldDictionaryAttributes.V) 

1563 

1564 @property 

1565 def default_value(self) -> Optional[Any]: 

1566 """Read-only property accessing the default value of this field.""" 

1567 return self.get(FieldDictionaryAttributes.DV) 

1568 

1569 @property 

1570 def additional_actions(self) -> Optional[DictionaryObject]: 

1571 """ 

1572 Read-only property accessing the additional actions dictionary. 

1573 

1574 This dictionary defines the field's behavior in response to trigger 

1575 events. See Section 8.5.2 of the PDF 1.7 reference. 

1576 """ 

1577 return self.get(FieldDictionaryAttributes.AA) 

1578 

1579 

1580class Destination(TreeObject): 

1581 """ 

1582 A class representing a destination within a PDF file. 

1583 

1584 See section 12.3.2 of the PDF 2.0 reference. 

1585 

1586 Args: 

1587 title: Title of this destination. 

1588 page: Reference to the page of this destination. Should 

1589 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`. 

1590 fit: How the destination is displayed. 

1591 

1592 Raises: 

1593 PdfReadError: If destination type is invalid. 

1594 

1595 """ 

1596 

1597 node: Optional[ 

1598 DictionaryObject 

1599 ] = None # node provide access to the original Object 

1600 

1601 def __init__( 

1602 self, 

1603 title: str, 

1604 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject], 

1605 fit: Fit, 

1606 ) -> None: 

1607 self._filtered_children: list[Any] = [] # used in PdfWriter 

1608 

1609 typ = fit.fit_type 

1610 args = fit.fit_args 

1611 

1612 DictionaryObject.__init__(self) 

1613 self[NameObject("/Title")] = TextStringObject(title) 

1614 self[NameObject("/Page")] = page 

1615 self[NameObject("/Type")] = typ 

1616 

1617 # from table 8.2 of the PDF 1.7 reference. 

1618 if typ == "/XYZ": 

1619 if len(args) < 1: # left is missing : should never occur 

1620 args.append(NumberObject(0.0)) 

1621 if len(args) < 2: # top is missing 

1622 args.append(NumberObject(0.0)) 

1623 if len(args) < 3: # zoom is missing 

1624 args.append(NumberObject(0.0)) 

1625 ( 

1626 self[NameObject(TA.LEFT)], 

1627 self[NameObject(TA.TOP)], 

1628 self[NameObject("/Zoom")], 

1629 ) = args 

1630 elif len(args) == 0: 

1631 pass 

1632 elif typ == TF.FIT_R: 

1633 ( 

1634 self[NameObject(TA.LEFT)], 

1635 self[NameObject(TA.BOTTOM)], 

1636 self[NameObject(TA.RIGHT)], 

1637 self[NameObject(TA.TOP)], 

1638 ) = args 

1639 elif typ in [TF.FIT_H, TF.FIT_BH]: 

1640 try: # Prefer to be more robust not only to null parameters 

1641 (self[NameObject(TA.TOP)],) = args 

1642 except Exception: 

1643 (self[NameObject(TA.TOP)],) = (NullObject(),) 

1644 elif typ in [TF.FIT_V, TF.FIT_BV]: 

1645 try: # Prefer to be more robust not only to null parameters 

1646 (self[NameObject(TA.LEFT)],) = args 

1647 except Exception: 

1648 (self[NameObject(TA.LEFT)],) = (NullObject(),) 

1649 elif typ in [TF.FIT, TF.FIT_B]: 

1650 pass 

1651 else: 

1652 raise PdfReadError(f"Unknown Destination Type: {typ!r}") 

1653 

1654 @property 

1655 def dest_array(self) -> "ArrayObject": 

1656 return ArrayObject( 

1657 [self.raw_get("/Page"), self["/Type"]] 

1658 + [ 

1659 self[x] 

1660 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"] 

1661 if x in self 

1662 ] 

1663 ) 

1664 

1665 def write_to_stream( 

1666 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1667 ) -> None: 

1668 if encryption_key is not None: # deprecated 

1669 deprecation_no_replacement( 

1670 "the encryption_key parameter of write_to_stream", "5.0.0" 

1671 ) 

1672 stream.write(b"<<\n") 

1673 key = NameObject("/D") 

1674 key.write_to_stream(stream) 

1675 stream.write(b" ") 

1676 value = self.dest_array 

1677 value.write_to_stream(stream) 

1678 

1679 key = NameObject("/S") 

1680 key.write_to_stream(stream) 

1681 stream.write(b" ") 

1682 value_s = NameObject("/GoTo") 

1683 value_s.write_to_stream(stream) 

1684 

1685 stream.write(b"\n") 

1686 stream.write(b">>") 

1687 

1688 @property 

1689 def title(self) -> Optional[str]: 

1690 """Read-only property accessing the destination title.""" 

1691 return self.get("/Title") 

1692 

1693 @property 

1694 def page(self) -> Optional[IndirectObject]: 

1695 """Read-only property accessing the IndirectObject of the destination page.""" 

1696 return self.get("/Page") 

1697 

1698 @property 

1699 def typ(self) -> Optional[str]: 

1700 """Read-only property accessing the destination type.""" 

1701 return self.get("/Type") 

1702 

1703 @property 

1704 def zoom(self) -> Optional[int]: 

1705 """Read-only property accessing the zoom factor.""" 

1706 return self.get("/Zoom", None) 

1707 

1708 @property 

1709 def left(self) -> Optional[FloatObject]: 

1710 """Read-only property accessing the left horizontal coordinate.""" 

1711 return self.get("/Left", None) 

1712 

1713 @property 

1714 def right(self) -> Optional[FloatObject]: 

1715 """Read-only property accessing the right horizontal coordinate.""" 

1716 return self.get("/Right", None) 

1717 

1718 @property 

1719 def top(self) -> Optional[FloatObject]: 

1720 """Read-only property accessing the top vertical coordinate.""" 

1721 return self.get("/Top", None) 

1722 

1723 @property 

1724 def bottom(self) -> Optional[FloatObject]: 

1725 """Read-only property accessing the bottom vertical coordinate.""" 

1726 return self.get("/Bottom", None) 

1727 

1728 @property 

1729 def color(self) -> Optional["ArrayObject"]: 

1730 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0.""" 

1731 return self.get( 

1732 "/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)]) 

1733 ) 

1734 

1735 @property 

1736 def font_format(self) -> Optional[OutlineFontFlag]: 

1737 """ 

1738 Read-only property accessing the font type. 

1739 

1740 1=italic, 2=bold, 3=both 

1741 """ 

1742 return self.get("/F", 0) 

1743 

1744 @property 

1745 def outline_count(self) -> Optional[int]: 

1746 """ 

1747 Read-only property accessing the outline count. 

1748 

1749 positive = expanded 

1750 negative = collapsed 

1751 absolute value = number of visible descendants at all levels 

1752 """ 

1753 return self.get("/Count", None)