Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/generic/_data_structures.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

955 statements  

1# Copyright (c) 2006, Mathieu Fenniak 

2# All rights reserved. 

3# 

4# Redistribution and use in source and binary forms, with or without 

5# modification, are permitted provided that the following conditions are 

6# met: 

7# 

8# * Redistributions of source code must retain the above copyright notice, 

9# this list of conditions and the following disclaimer. 

10# * Redistributions in binary form must reproduce the above copyright notice, 

11# this list of conditions and the following disclaimer in the documentation 

12# and/or other materials provided with the distribution. 

13# * The name of the author may not be used to endorse or promote products 

14# derived from this software without specific prior written permission. 

15# 

16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 

19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 

20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 

22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 

23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 

24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 

25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 

26# POSSIBILITY OF SUCH DAMAGE. 

27 

28 

29__author__ = "Mathieu Fenniak" 

30__author_email__ = "biziqe@mathieu.fenniak.net" 

31 

32import logging 

33import re 

34import sys 

35from collections.abc import Iterable, Sequence 

36from io import BytesIO 

37from math import ceil 

38from typing import ( 

39 Any, 

40 Callable, 

41 Optional, 

42 Union, 

43 cast, 

44) 

45 

46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol 

47from .._utils import ( 

48 WHITESPACES, 

49 StreamType, 

50 deprecation_no_replacement, 

51 logger_warning, 

52 read_non_whitespace, 

53 read_until_regex, 

54 read_until_whitespace, 

55 skip_over_comment, 

56) 

57from ..constants import ( 

58 CheckboxRadioButtonAttributes, 

59 FieldDictionaryAttributes, 

60 OutlineFontFlag, 

61) 

62from ..constants import FilterTypes as FT 

63from ..constants import StreamAttributes as SA 

64from ..constants import TypArguments as TA 

65from ..constants import TypFitArguments as TF 

66from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError 

67from ._base import ( 

68 BooleanObject, 

69 ByteStringObject, 

70 FloatObject, 

71 IndirectObject, 

72 NameObject, 

73 NullObject, 

74 NumberObject, 

75 PdfObject, 

76 TextStringObject, 

77 is_null_or_none, 

78) 

79from ._fit import Fit 

80from ._image_inline import ( 

81 extract_inline_A85, 

82 extract_inline_AHx, 

83 extract_inline_DCT, 

84 extract_inline_default, 

85 extract_inline_RL, 

86) 

87from ._utils import read_hex_string_from_stream, read_string_from_stream 

88 

89if sys.version_info >= (3, 11): 

90 from typing import Self 

91else: 

92 from typing_extensions import Self 

93 

94logger = logging.getLogger(__name__) 

95 

96IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]") 

97 

98 

99class ArrayObject(list[Any], PdfObject): 

100 def replicate( 

101 self, 

102 pdf_dest: PdfWriterProtocol, 

103 ) -> "ArrayObject": 

104 arr = cast( 

105 "ArrayObject", 

106 self._reference_clone(ArrayObject(), pdf_dest, False), 

107 ) 

108 for data in self: 

109 if hasattr(data, "replicate"): 

110 arr.append(data.replicate(pdf_dest)) 

111 else: 

112 arr.append(data) 

113 return arr 

114 

115 def clone( 

116 self, 

117 pdf_dest: PdfWriterProtocol, 

118 force_duplicate: bool = False, 

119 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

120 ) -> "ArrayObject": 

121 """Clone object into pdf_dest.""" 

122 try: 

123 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

124 return self 

125 except Exception: 

126 pass 

127 arr = cast( 

128 "ArrayObject", 

129 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate), 

130 ) 

131 for data in self: 

132 if isinstance(data, StreamObject): 

133 dup = data._reference_clone( 

134 data.clone(pdf_dest, force_duplicate, ignore_fields), 

135 pdf_dest, 

136 force_duplicate, 

137 ) 

138 arr.append(dup.indirect_reference) 

139 elif hasattr(data, "clone"): 

140 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields)) 

141 else: 

142 arr.append(data) 

143 return arr 

144 

145 def hash_bin(self) -> int: 

146 """ 

147 Used to detect modified object. 

148 

149 Returns: 

150 Hash considering type and value. 

151 

152 """ 

153 return hash((self.__class__, tuple(x.hash_bin() for x in self))) 

154 

155 def items(self) -> Iterable[Any]: 

156 """Emulate DictionaryObject.items for a list (index, object).""" 

157 return enumerate(self) 

158 

159 def _to_lst(self, lst: Any) -> list[Any]: 

160 # Convert to list, internal 

161 if isinstance(lst, (list, tuple, set)): 

162 pass 

163 elif isinstance(lst, PdfObject): 

164 lst = [lst] 

165 elif isinstance(lst, str): 

166 if lst[0] == "/": 

167 lst = [NameObject(lst)] 

168 else: 

169 lst = [TextStringObject(lst)] 

170 elif isinstance(lst, bytes): 

171 lst = [ByteStringObject(lst)] 

172 else: # for numbers,... 

173 lst = [lst] 

174 return lst 

175 

176 def __add__(self, lst: Any) -> "ArrayObject": 

177 """ 

178 Allow extension by adding list or add one element only 

179 

180 Args: 

181 lst: any list, tuples are extended the list. 

182 other types(numbers,...) will be appended. 

183 if str is passed it will be converted into TextStringObject 

184 or NameObject (if starting with "/") 

185 if bytes is passed it will be converted into ByteStringObject 

186 

187 Returns: 

188 ArrayObject with all elements 

189 

190 """ 

191 temp = ArrayObject(self) 

192 temp.extend(self._to_lst(lst)) 

193 return temp 

194 

195 def __iadd__(self, lst: Any) -> Self: 

196 """ 

197 Allow extension by adding list or add one element only 

198 

199 Args: 

200 lst: any list, tuples are extended the list. 

201 other types(numbers,...) will be appended. 

202 if str is passed it will be converted into TextStringObject 

203 or NameObject (if starting with "/") 

204 if bytes is passed it will be converted into ByteStringObject 

205 

206 """ 

207 self.extend(self._to_lst(lst)) 

208 return self 

209 

210 def __isub__(self, lst: Any) -> Self: 

211 """Allow to remove items""" 

212 for x in self._to_lst(lst): 

213 try: 

214 index = self.index(x) 

215 del self[index] 

216 except ValueError: 

217 pass 

218 return self 

219 

220 def write_to_stream( 

221 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

222 ) -> None: 

223 if encryption_key is not None: # deprecated 

224 deprecation_no_replacement( 

225 "the encryption_key parameter of write_to_stream", "5.0.0" 

226 ) 

227 stream.write(b"[") 

228 for data in self: 

229 stream.write(b" ") 

230 data.write_to_stream(stream) 

231 stream.write(b" ]") 

232 

233 @staticmethod 

234 def read_from_stream( 

235 stream: StreamType, 

236 pdf: Optional[PdfReaderProtocol], 

237 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

238 ) -> "ArrayObject": 

239 arr = ArrayObject() 

240 tmp = stream.read(1) 

241 if tmp != b"[": 

242 raise PdfReadError("Could not read array") 

243 while True: 

244 # skip leading whitespace 

245 tok = stream.read(1) 

246 while tok.isspace(): 

247 tok = stream.read(1) 

248 if tok == b"": 

249 break 

250 if tok == b"%": 

251 stream.seek(-1, 1) 

252 skip_over_comment(stream) 

253 continue 

254 stream.seek(-1, 1) 

255 # check for array ending 

256 peek_ahead = stream.read(1) 

257 if peek_ahead == b"]": 

258 break 

259 stream.seek(-1, 1) 

260 # read and append object 

261 arr.append(read_object(stream, pdf, forced_encoding)) 

262 return arr 

263 

264 

265class DictionaryObject(dict[Any, Any], PdfObject): 

266 def replicate( 

267 self, 

268 pdf_dest: PdfWriterProtocol, 

269 ) -> "DictionaryObject": 

270 d__ = cast( 

271 "DictionaryObject", 

272 self._reference_clone(self.__class__(), pdf_dest, False), 

273 ) 

274 for k, v in self.items(): 

275 d__[k.replicate(pdf_dest)] = ( 

276 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

277 ) 

278 return d__ 

279 

280 def clone( 

281 self, 

282 pdf_dest: PdfWriterProtocol, 

283 force_duplicate: bool = False, 

284 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

285 ) -> "DictionaryObject": 

286 """Clone object into pdf_dest.""" 

287 try: 

288 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

289 return self 

290 except Exception: 

291 pass 

292 

293 visited: set[tuple[int, int]] = set() # (idnum, generation) 

294 d__ = cast( 

295 "DictionaryObject", 

296 self._reference_clone(self.__class__(), pdf_dest, force_duplicate), 

297 ) 

298 if ignore_fields is None: 

299 ignore_fields = [] 

300 if len(d__.keys()) == 0: 

301 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

302 return d__ 

303 

304 def _clone( 

305 self, 

306 src: "DictionaryObject", 

307 pdf_dest: PdfWriterProtocol, 

308 force_duplicate: bool, 

309 ignore_fields: Optional[Sequence[Union[str, int]]], 

310 visited: set[tuple[int, int]], # (idnum, generation) 

311 ) -> None: 

312 """ 

313 Update the object from src. 

314 

315 Args: 

316 src: "DictionaryObject": 

317 pdf_dest: 

318 force_duplicate: 

319 ignore_fields: 

320 

321 """ 

322 # First we remove the ignore_fields 

323 # that are for a limited number of levels 

324 assert ignore_fields is not None 

325 ignore_fields = list(ignore_fields) 

326 x = 0 

327 while x < len(ignore_fields): 

328 if isinstance(ignore_fields[x], int): 

329 if cast(int, ignore_fields[x]) <= 0: 

330 del ignore_fields[x] 

331 del ignore_fields[x] 

332 continue 

333 ignore_fields[x] -= 1 # type:ignore 

334 x += 1 

335 # Check if this is a chain list, we need to loop to prevent recur 

336 if any( 

337 field not in ignore_fields 

338 and field in src 

339 and isinstance(src.raw_get(field), IndirectObject) 

340 and isinstance(src[field], DictionaryObject) 

341 and ( 

342 src.get("/Type", None) is None 

343 or cast(DictionaryObject, src[field]).get("/Type", None) is None 

344 or src.get("/Type", None) 

345 == cast(DictionaryObject, src[field]).get("/Type", None) 

346 ) 

347 for field in ["/Next", "/Prev", "/N", "/V"] 

348 ): 

349 ignore_fields = list(ignore_fields) 

350 for lst in (("/Next", "/Prev"), ("/N", "/V")): 

351 for k in lst: 

352 objs = [] 

353 if ( 

354 k in src 

355 and k not in self 

356 and isinstance(src.raw_get(k), IndirectObject) 

357 and isinstance(src[k], DictionaryObject) 

358 # If need to go further the idea is to check 

359 # that the types are the same 

360 and ( 

361 src.get("/Type", None) is None 

362 or cast(DictionaryObject, src[k]).get("/Type", None) is None 

363 or src.get("/Type", None) 

364 == cast(DictionaryObject, src[k]).get("/Type", None) 

365 ) 

366 ): 

367 cur_obj: Optional[DictionaryObject] = cast( 

368 "DictionaryObject", src[k] 

369 ) 

370 prev_obj: Optional[DictionaryObject] = self 

371 while cur_obj is not None: 

372 clon = cast( 

373 "DictionaryObject", 

374 cur_obj._reference_clone( 

375 cur_obj.__class__(), pdf_dest, force_duplicate 

376 ), 

377 ) 

378 # Check to see if we've previously processed our item 

379 if clon.indirect_reference is not None: 

380 idnum = clon.indirect_reference.idnum 

381 generation = clon.indirect_reference.generation 

382 if (idnum, generation) in visited: 

383 cur_obj = None 

384 break 

385 visited.add((idnum, generation)) 

386 objs.append((cur_obj, clon)) 

387 assert prev_obj is not None 

388 prev_obj[NameObject(k)] = clon.indirect_reference 

389 prev_obj = clon 

390 try: 

391 if cur_obj == src: 

392 cur_obj = None 

393 else: 

394 cur_obj = cast("DictionaryObject", cur_obj[k]) 

395 except Exception: 

396 cur_obj = None 

397 for s, c in objs: 

398 c._clone( 

399 s, pdf_dest, force_duplicate, ignore_fields, visited 

400 ) 

401 

402 for k, v in src.items(): 

403 if k not in ignore_fields: 

404 if isinstance(v, StreamObject): 

405 if not hasattr(v, "indirect_reference"): 

406 v.indirect_reference = None 

407 vv = v.clone(pdf_dest, force_duplicate, ignore_fields) 

408 assert vv.indirect_reference is not None 

409 self[k.clone(pdf_dest)] = vv.indirect_reference 

410 elif k not in self: 

411 self[NameObject(k)] = ( 

412 v.clone(pdf_dest, force_duplicate, ignore_fields) 

413 if hasattr(v, "clone") 

414 else v 

415 ) 

416 

417 def hash_bin(self) -> int: 

418 """ 

419 Used to detect modified object. 

420 

421 Returns: 

422 Hash considering type and value. 

423 

424 """ 

425 return hash( 

426 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items()))) 

427 ) 

428 

429 def raw_get(self, key: Any) -> Any: 

430 return dict.__getitem__(self, key) 

431 

432 def get_inherited(self, key: str, default: Any = None) -> Any: 

433 """ 

434 Returns the value of a key or from the parent if not found. 

435 If not found returns default. 

436 

437 Args: 

438 key: string identifying the field to return 

439 

440 default: default value to return 

441 

442 Returns: 

443 Current key or inherited one, otherwise default value. 

444 

445 """ 

446 if key in self: 

447 return self[key] 

448 try: 

449 if "/Parent" not in self: 

450 return default 

451 raise KeyError("Not present") 

452 except KeyError: 

453 return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited( 

454 key, default 

455 ) 

456 

457 def __setitem__(self, key: Any, value: Any) -> Any: 

458 if not isinstance(key, PdfObject): 

459 raise ValueError("Key must be a PdfObject") 

460 if not isinstance(value, PdfObject): 

461 raise ValueError("Value must be a PdfObject") 

462 return dict.__setitem__(self, key, value) 

463 

464 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any: 

465 if not isinstance(key, PdfObject): 

466 raise ValueError("Key must be a PdfObject") 

467 if not isinstance(value, PdfObject): 

468 raise ValueError("Value must be a PdfObject") 

469 return dict.setdefault(self, key, value) 

470 

471 def __getitem__(self, key: Any) -> PdfObject: 

472 return dict.__getitem__(self, key).get_object() 

473 

474 @property 

475 def xmp_metadata(self) -> Optional[XmpInformationProtocol]: 

476 """ 

477 Retrieve XMP (Extensible Metadata Platform) data relevant to this 

478 object, if available. 

479 

480 See Table 347 — Additional entries in a metadata stream dictionary. 

481 

482 Returns: 

483 Returns a :class:`~pypdf.xmp.XmpInformation` instance 

484 that can be used to access XMP metadata from the document. Can also 

485 return None if no metadata was found on the document root. 

486 

487 """ 

488 from ..xmp import XmpInformation # noqa: PLC0415 

489 

490 metadata = self.get("/Metadata", None) 

491 if is_null_or_none(metadata): 

492 return None 

493 assert metadata is not None, "mypy" 

494 metadata = metadata.get_object() 

495 return XmpInformation(metadata) 

496 

497 def write_to_stream( 

498 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

499 ) -> None: 

500 if encryption_key is not None: # deprecated 

501 deprecation_no_replacement( 

502 "the encryption_key parameter of write_to_stream", "5.0.0" 

503 ) 

504 stream.write(b"<<\n") 

505 for key, value in self.items(): 

506 if len(key) > 2 and key[1] == "%" and key[-1] == "%": 

507 continue 

508 key.write_to_stream(stream, encryption_key) 

509 stream.write(b" ") 

510 value.write_to_stream(stream) 

511 stream.write(b"\n") 

512 stream.write(b">>") 

513 

514 @staticmethod 

515 def read_from_stream( 

516 stream: StreamType, 

517 pdf: Optional[PdfReaderProtocol], 

518 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

519 ) -> "DictionaryObject": 

520 def get_next_obj_pos( 

521 p: int, p1: int, rem_gens: list[int], pdf: PdfReaderProtocol 

522 ) -> int: 

523 out = p1 

524 for gen in rem_gens: 

525 loc = pdf.xref[gen] 

526 try: 

527 values = [x for x in loc.values() if p < x <= p1] 

528 if values: 

529 out = min(out, *values) 

530 except ValueError: 

531 pass 

532 return out 

533 

534 def read_unsized_from_stream( 

535 stream: StreamType, pdf: PdfReaderProtocol 

536 ) -> bytes: 

537 # we are just pointing at beginning of the stream 

538 eon = get_next_obj_pos(stream.tell(), 2**32, list(pdf.xref), pdf) - 1 

539 curr = stream.tell() 

540 rw = stream.read(eon - stream.tell()) 

541 p = rw.find(b"endstream") 

542 if p < 0: 

543 raise PdfReadError( 

544 f"Unable to find 'endstream' marker for obj starting at {curr}." 

545 ) 

546 stream.seek(curr + p + 9) 

547 return rw[: p - 1] 

548 

549 tmp = stream.read(2) 

550 if tmp != b"<<": 

551 raise PdfReadError( 

552 f"Dictionary read error at byte {hex(stream.tell())}: " 

553 "stream must begin with '<<'" 

554 ) 

555 data: dict[Any, Any] = {} 

556 while True: 

557 tok = read_non_whitespace(stream) 

558 if tok == b"\x00": 

559 continue 

560 if tok == b"%": 

561 stream.seek(-1, 1) 

562 skip_over_comment(stream) 

563 continue 

564 if not tok: 

565 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) 

566 

567 if tok == b">": 

568 stream.read(1) 

569 break 

570 stream.seek(-1, 1) 

571 try: 

572 try: 

573 key = read_object(stream, pdf) 

574 if isinstance(key, NullObject): 

575 break 

576 if not isinstance(key, NameObject): 

577 raise PdfReadError( 

578 f"Expecting a NameObject for key but found {key!r}" 

579 ) 

580 except PdfReadError as exc: 

581 if pdf is not None and pdf.strict: 

582 raise 

583 logger_warning(exc.__repr__(), __name__) 

584 continue 

585 tok = read_non_whitespace(stream) 

586 stream.seek(-1, 1) 

587 value = read_object(stream, pdf, forced_encoding) 

588 except Exception as exc: 

589 if pdf is not None and pdf.strict: 

590 raise PdfReadError(exc.__repr__()) 

591 logger_warning(exc.__repr__(), __name__) 

592 retval = DictionaryObject() 

593 retval.update(data) 

594 return retval # return partial data 

595 

596 if not data.get(key): 

597 data[key] = value 

598 else: 

599 # multiple definitions of key not permitted 

600 msg = ( 

601 f"Multiple definitions in dictionary at byte " 

602 f"{hex(stream.tell())} for key {key}" 

603 ) 

604 if pdf is not None and pdf.strict: 

605 raise PdfReadError(msg) 

606 logger_warning(msg, __name__) 

607 

608 pos = stream.tell() 

609 s = read_non_whitespace(stream) 

610 if s == b"s" and stream.read(5) == b"tream": 

611 eol = stream.read(1) 

612 # Occasional PDF file output has spaces after 'stream' keyword but before EOL. 

613 # patch provided by Danial Sandler 

614 while eol == b" ": 

615 eol = stream.read(1) 

616 if eol not in (b"\n", b"\r"): 

617 raise PdfStreamError("Stream data must be followed by a newline") 

618 if eol == b"\r" and stream.read(1) != b"\n": 

619 stream.seek(-1, 1) 

620 # this is a stream object, not a dictionary 

621 if SA.LENGTH not in data: 

622 if pdf is not None and pdf.strict: 

623 raise PdfStreamError("Stream length not defined") 

624 logger_warning( 

625 f"Stream length not defined @pos={stream.tell()}", __name__ 

626 ) 

627 data[NameObject(SA.LENGTH)] = NumberObject(-1) 

628 length = data[SA.LENGTH] 

629 if isinstance(length, IndirectObject): 

630 t = stream.tell() 

631 assert pdf is not None, "mypy" 

632 length = pdf.get_object(length) 

633 stream.seek(t, 0) 

634 if length is None: # if the PDF is damaged 

635 length = -1 

636 pstart = stream.tell() 

637 if length > 0: 

638 data["__streamdata__"] = stream.read(length) 

639 else: 

640 data["__streamdata__"] = read_until_regex( 

641 stream, re.compile(b"endstream") 

642 ) 

643 e = read_non_whitespace(stream) 

644 ndstream = stream.read(8) 

645 if (e + ndstream) != b"endstream": 

646 # the odd PDF file has a length that is too long, so 

647 # we need to read backwards to find the "endstream" ending. 

648 # ReportLab (unknown version) generates files with this bug, 

649 # and Python users into PDF files tend to be our audience. 

650 # we need to do this to correct the streamdata and chop off 

651 # an extra character. 

652 pos = stream.tell() 

653 stream.seek(-10, 1) 

654 end = stream.read(9) 

655 if end == b"endstream": 

656 # we found it by looking back one character further. 

657 data["__streamdata__"] = data["__streamdata__"][:-1] 

658 elif pdf is not None and not pdf.strict: 

659 stream.seek(pstart, 0) 

660 data["__streamdata__"] = read_unsized_from_stream(stream, pdf) 

661 pos = stream.tell() 

662 else: 

663 stream.seek(pos, 0) 

664 raise PdfReadError( 

665 "Unable to find 'endstream' marker after stream at byte " 

666 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')." 

667 ) 

668 else: 

669 stream.seek(pos, 0) 

670 if "__streamdata__" in data: 

671 return StreamObject.initialize_from_dictionary(data) 

672 retval = DictionaryObject() 

673 retval.update(data) 

674 return retval 

675 

676 

677class TreeObject(DictionaryObject): 

678 def __init__(self, dct: Optional[DictionaryObject] = None) -> None: 

679 DictionaryObject.__init__(self) 

680 if dct: 

681 self.update(dct) 

682 

683 def has_children(self) -> bool: 

684 return "/First" in self 

685 

686 def __iter__(self) -> Any: 

687 return self.children() 

688 

689 def children(self) -> Iterable[Any]: 

690 if not self.has_children(): 

691 return 

692 

693 child_ref = self[NameObject("/First")] 

694 child = child_ref.get_object() 

695 while True: 

696 yield child 

697 if child == self[NameObject("/Last")]: 

698 return 

699 child_ref = child.get(NameObject("/Next")) # type: ignore 

700 if is_null_or_none(child_ref): 

701 return 

702 child = child_ref.get_object() 

703 

704 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None: 

705 self.insert_child(child, None, pdf) 

706 

707 def inc_parent_counter_default( 

708 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

709 ) -> None: 

710 if is_null_or_none(parent): 

711 return 

712 assert parent is not None, "mypy" 

713 parent = cast("TreeObject", parent.get_object()) 

714 if "/Count" in parent: 

715 parent[NameObject("/Count")] = NumberObject( 

716 max(0, cast(int, parent[NameObject("/Count")]) + n) 

717 ) 

718 self.inc_parent_counter_default(parent.get("/Parent", None), n) 

719 

720 def inc_parent_counter_outline( 

721 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

722 ) -> None: 

723 if is_null_or_none(parent): 

724 return 

725 assert parent is not None, "mypy" 

726 parent = cast("TreeObject", parent.get_object()) 

727 # BooleanObject requires comparison with == not is 

728 opn = parent.get("/%is_open%", True) == True # noqa: E712 

729 c = cast(int, parent.get("/Count", 0)) 

730 if c < 0: 

731 c = abs(c) 

732 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1)) 

733 if not opn: 

734 return 

735 self.inc_parent_counter_outline(parent.get("/Parent", None), n) 

736 

737 def insert_child( 

738 self, 

739 child: Any, 

740 before: Any, 

741 pdf: PdfWriterProtocol, 

742 inc_parent_counter: Optional[Callable[..., Any]] = None, 

743 ) -> IndirectObject: 

744 if inc_parent_counter is None: 

745 inc_parent_counter = self.inc_parent_counter_default 

746 child_obj = child.get_object() 

747 child = child.indirect_reference # get_reference(child_obj) 

748 

749 prev: Optional[DictionaryObject] 

750 if "/First" not in self: # no child yet 

751 self[NameObject("/First")] = child 

752 self[NameObject("/Count")] = NumberObject(0) 

753 self[NameObject("/Last")] = child 

754 child_obj[NameObject("/Parent")] = self.indirect_reference 

755 inc_parent_counter(self, child_obj.get("/Count", 1)) 

756 if "/Next" in child_obj: 

757 del child_obj["/Next"] 

758 if "/Prev" in child_obj: 

759 del child_obj["/Prev"] 

760 return child 

761 prev = cast("DictionaryObject", self["/Last"]) 

762 

763 while prev.indirect_reference != before: 

764 if "/Next" in prev: 

765 prev = cast("TreeObject", prev["/Next"]) 

766 else: # append at the end 

767 prev[NameObject("/Next")] = cast("TreeObject", child) 

768 child_obj[NameObject("/Prev")] = prev.indirect_reference 

769 child_obj[NameObject("/Parent")] = self.indirect_reference 

770 if "/Next" in child_obj: 

771 del child_obj["/Next"] 

772 self[NameObject("/Last")] = child 

773 inc_parent_counter(self, child_obj.get("/Count", 1)) 

774 return child 

775 try: # insert as first or in the middle 

776 assert isinstance(prev["/Prev"], DictionaryObject) 

777 prev["/Prev"][NameObject("/Next")] = child 

778 child_obj[NameObject("/Prev")] = prev["/Prev"] 

779 except Exception: # it means we are inserting in first position 

780 del child_obj["/Next"] 

781 child_obj[NameObject("/Next")] = prev 

782 prev[NameObject("/Prev")] = child 

783 child_obj[NameObject("/Parent")] = self.indirect_reference 

784 inc_parent_counter(self, child_obj.get("/Count", 1)) 

785 return child 

786 

787 def _remove_node_from_tree( 

788 self, prev: Any, prev_ref: Any, cur: Any, last: Any 

789 ) -> None: 

790 """ 

791 Adjust the pointers of the linked list and tree node count. 

792 

793 Args: 

794 prev: 

795 prev_ref: 

796 cur: 

797 last: 

798 

799 """ 

800 next_ref = cur.get(NameObject("/Next"), None) 

801 if prev is None: 

802 if next_ref: 

803 # Removing first tree node 

804 next_obj = next_ref.get_object() 

805 del next_obj[NameObject("/Prev")] 

806 self[NameObject("/First")] = next_ref 

807 self[NameObject("/Count")] = NumberObject( 

808 self[NameObject("/Count")] - 1 # type: ignore 

809 ) 

810 

811 else: 

812 # Removing only tree node 

813 self[NameObject("/Count")] = NumberObject(0) 

814 del self[NameObject("/First")] 

815 if NameObject("/Last") in self: 

816 del self[NameObject("/Last")] 

817 else: 

818 if next_ref: 

819 # Removing middle tree node 

820 next_obj = next_ref.get_object() 

821 next_obj[NameObject("/Prev")] = prev_ref 

822 prev[NameObject("/Next")] = next_ref 

823 else: 

824 # Removing last tree node 

825 assert cur == last 

826 del prev[NameObject("/Next")] 

827 self[NameObject("/Last")] = prev_ref 

828 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore 

829 

830 def remove_child(self, child: Any) -> None: 

831 child_obj = child.get_object() 

832 child = child_obj.indirect_reference 

833 

834 if NameObject("/Parent") not in child_obj: 

835 raise ValueError("Removed child does not appear to be a tree item") 

836 if child_obj[NameObject("/Parent")] != self: 

837 raise ValueError("Removed child is not a member of this tree") 

838 

839 found = False 

840 prev_ref = None 

841 prev = None 

842 cur_ref: Optional[Any] = self[NameObject("/First")] 

843 cur: Optional[dict[str, Any]] = cur_ref.get_object() # type: ignore 

844 last_ref = self[NameObject("/Last")] 

845 last = last_ref.get_object() 

846 while cur is not None: 

847 if cur == child_obj: 

848 self._remove_node_from_tree(prev, prev_ref, cur, last) 

849 found = True 

850 break 

851 

852 # Go to the next node 

853 prev_ref = cur_ref 

854 prev = cur 

855 if NameObject("/Next") in cur: 

856 cur_ref = cur[NameObject("/Next")] 

857 cur = cur_ref.get_object() 

858 else: 

859 cur_ref = None 

860 cur = None 

861 

862 if not found: 

863 raise ValueError("Removal couldn't find item in tree") 

864 

865 _reset_node_tree_relationship(child_obj) 

866 

867 def remove_from_tree(self) -> None: 

868 """Remove the object from the tree it is in.""" 

869 if NameObject("/Parent") not in self: 

870 raise ValueError("Removed child does not appear to be a tree item") 

871 cast("TreeObject", self["/Parent"]).remove_child(self) 

872 

873 def empty_tree(self) -> None: 

874 for child in self: 

875 child_obj = child.get_object() 

876 _reset_node_tree_relationship(child_obj) 

877 

878 if NameObject("/Count") in self: 

879 del self[NameObject("/Count")] 

880 if NameObject("/First") in self: 

881 del self[NameObject("/First")] 

882 if NameObject("/Last") in self: 

883 del self[NameObject("/Last")] 

884 

885 

886def _reset_node_tree_relationship(child_obj: Any) -> None: 

887 """ 

888 Call this after a node has been removed from a tree. 

889 

890 This resets the nodes attributes in respect to that tree. 

891 

892 Args: 

893 child_obj: 

894 

895 """ 

896 del child_obj[NameObject("/Parent")] 

897 if NameObject("/Next") in child_obj: 

898 del child_obj[NameObject("/Next")] 

899 if NameObject("/Prev") in child_obj: 

900 del child_obj[NameObject("/Prev")] 

901 

902 

903class StreamObject(DictionaryObject): 

904 def __init__(self) -> None: 

905 self._data: bytes = b"" 

906 self.decoded_self: Optional[DecodedStreamObject] = None 

907 

908 def replicate( 

909 self, 

910 pdf_dest: PdfWriterProtocol, 

911 ) -> "StreamObject": 

912 d__ = cast( 

913 "StreamObject", 

914 self._reference_clone(self.__class__(), pdf_dest, False), 

915 ) 

916 d__._data = self._data 

917 try: 

918 decoded_self = self.decoded_self 

919 if decoded_self is None: 

920 self.decoded_self = None 

921 else: 

922 self.decoded_self = cast( 

923 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

924 ) 

925 except Exception: 

926 pass 

927 for k, v in self.items(): 

928 d__[k.replicate(pdf_dest)] = ( 

929 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

930 ) 

931 return d__ 

932 

933 def _clone( 

934 self, 

935 src: DictionaryObject, 

936 pdf_dest: PdfWriterProtocol, 

937 force_duplicate: bool, 

938 ignore_fields: Optional[Sequence[Union[str, int]]], 

939 visited: set[tuple[int, int]], 

940 ) -> None: 

941 """ 

942 Update the object from src. 

943 

944 Args: 

945 src: 

946 pdf_dest: 

947 force_duplicate: 

948 ignore_fields: 

949 

950 """ 

951 self._data = cast("StreamObject", src)._data 

952 try: 

953 decoded_self = cast("StreamObject", src).decoded_self 

954 if decoded_self is None: 

955 self.decoded_self = None 

956 else: 

957 self.decoded_self = cast( 

958 "DecodedStreamObject", 

959 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields), 

960 ) 

961 except Exception: 

962 pass 

963 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

964 

965 def hash_bin(self) -> int: 

966 """ 

967 Used to detect modified object. 

968 

969 Returns: 

970 Hash considering type and value. 

971 

972 """ 

973 # Use _data to prevent errors on non-decoded streams. 

974 return hash((super().hash_bin(), self._data)) 

975 

976 def get_data(self) -> bytes: 

977 return self._data 

978 

979 def set_data(self, data: bytes) -> None: 

980 self._data = data 

981 

982 def hash_value_data(self) -> bytes: 

983 data = super().hash_value_data() 

984 data += self.get_data() 

985 return data 

986 

987 def write_to_stream( 

988 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

989 ) -> None: 

990 if encryption_key is not None: # deprecated 

991 deprecation_no_replacement( 

992 "the encryption_key parameter of write_to_stream", "5.0.0" 

993 ) 

994 self[NameObject(SA.LENGTH)] = NumberObject(len(self._data)) 

995 DictionaryObject.write_to_stream(self, stream) 

996 del self[SA.LENGTH] 

997 stream.write(b"\nstream\n") 

998 stream.write(self._data) 

999 stream.write(b"\nendstream") 

1000 

1001 @staticmethod 

1002 def initialize_from_dictionary( 

1003 data: dict[str, Any] 

1004 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]: 

1005 retval: Union[EncodedStreamObject, DecodedStreamObject] 

1006 if SA.FILTER in data: 

1007 retval = EncodedStreamObject() 

1008 else: 

1009 retval = DecodedStreamObject() 

1010 retval._data = data["__streamdata__"] 

1011 del data["__streamdata__"] 

1012 if SA.LENGTH in data: 

1013 del data[SA.LENGTH] 

1014 retval.update(data) 

1015 return retval 

1016 

1017 def flate_encode(self, level: int = -1) -> "EncodedStreamObject": 

1018 from ..filters import FlateDecode # noqa: PLC0415 

1019 

1020 if SA.FILTER in self: 

1021 f = self[SA.FILTER] 

1022 if isinstance(f, ArrayObject): 

1023 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f]) 

1024 try: 

1025 params = ArrayObject( 

1026 [NullObject(), *self.get(SA.DECODE_PARMS, ArrayObject())] 

1027 ) 

1028 except TypeError: 

1029 # case of error where the * operator is not working (not an array 

1030 params = ArrayObject( 

1031 [NullObject(), self.get(SA.DECODE_PARMS, ArrayObject())] 

1032 ) 

1033 else: 

1034 f = ArrayObject([NameObject(FT.FLATE_DECODE), f]) 

1035 params = ArrayObject( 

1036 [NullObject(), self.get(SA.DECODE_PARMS, NullObject())] 

1037 ) 

1038 else: 

1039 f = NameObject(FT.FLATE_DECODE) 

1040 params = None 

1041 retval = EncodedStreamObject() 

1042 retval.update(self) 

1043 retval[NameObject(SA.FILTER)] = f 

1044 if params is not None: 

1045 retval[NameObject(SA.DECODE_PARMS)] = params 

1046 retval._data = FlateDecode.encode(self._data, level) 

1047 return retval 

1048 

1049 def decode_as_image(self) -> Any: 

1050 """ 

1051 Try to decode the stream object as an image 

1052 

1053 Returns: 

1054 a PIL image if proper decoding has been found 

1055 Raises: 

1056 Exception: Errors during decoding will be reported. 

1057 It is recommended to catch exceptions to prevent 

1058 stops in your program. 

1059 

1060 """ 

1061 from ..filters import _xobj_to_image # noqa: PLC0415 

1062 

1063 if self.get("/Subtype", "") != "/Image": 

1064 try: 

1065 msg = f"{self.indirect_reference} does not seem to be an Image" # pragma: no cover 

1066 except AttributeError: 

1067 msg = f"{self.__repr__()} object does not seem to be an Image" # pragma: no cover 

1068 logger_warning(msg, __name__) 

1069 extension, _, img = _xobj_to_image(self) 

1070 if extension is None: 

1071 return None # pragma: no cover 

1072 return img 

1073 

1074 

1075class DecodedStreamObject(StreamObject): 

1076 pass 

1077 

1078 

1079class EncodedStreamObject(StreamObject): 

1080 def __init__(self) -> None: 

1081 self.decoded_self: Optional[DecodedStreamObject] = None 

1082 

1083 # This overrides the parent method 

1084 def get_data(self) -> bytes: 

1085 from ..filters import decode_stream_data # noqa: PLC0415 

1086 

1087 if self.decoded_self is not None: 

1088 # Cached version of decoded object 

1089 return self.decoded_self.get_data() 

1090 

1091 # Create decoded object 

1092 decoded = DecodedStreamObject() 

1093 decoded.set_data(decode_stream_data(self)) 

1094 for key, value in self.items(): 

1095 if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS): 

1096 decoded[key] = value 

1097 self.decoded_self = decoded 

1098 return decoded.get_data() 

1099 

1100 # This overrides the parent method: 

1101 def set_data(self, data: bytes) -> None: 

1102 from ..filters import FlateDecode # noqa: PLC0415 

1103 

1104 if self.get(SA.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]): 

1105 if not isinstance(data, bytes): 

1106 raise TypeError("Data must be bytes") 

1107 if self.decoded_self is None: 

1108 self.get_data() # to create self.decoded_self 

1109 assert self.decoded_self is not None, "mypy" 

1110 self.decoded_self.set_data(data) 

1111 super().set_data(FlateDecode.encode(data)) 

1112 else: 

1113 raise PdfReadError( 

1114 "Streams encoded with a filter different from FlateDecode are not supported" 

1115 ) 

1116 

1117 

1118class ContentStream(DecodedStreamObject): 

1119 """ 

1120 In order to be fast, this data structure can contain either: 

1121 

1122 * raw data in ._data 

1123 * parsed stream operations in ._operations. 

1124 

1125 At any time, ContentStream object can either have both of those fields defined, 

1126 or one field defined and the other set to None. 

1127 

1128 These fields are "rebuilt" lazily, when accessed: 

1129 

1130 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations. 

1131 * when .operations is called, if ._operations is None, it is rebuilt from ._data. 

1132 

1133 Conversely, these fields can be invalidated: 

1134 

1135 * when .set_data() is called, ._operations is set to None. 

1136 * when .operations is set, ._data is set to None. 

1137 """ 

1138 

1139 def __init__( 

1140 self, 

1141 stream: Any, 

1142 pdf: Any, 

1143 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

1144 ) -> None: 

1145 self.pdf = pdf 

1146 self._operations: list[tuple[Any, bytes]] = [] 

1147 

1148 # stream may be a StreamObject or an ArrayObject containing 

1149 # StreamObjects to be concatenated together. 

1150 if stream is None: 

1151 super().set_data(b"") 

1152 else: 

1153 stream = stream.get_object() 

1154 if isinstance(stream, ArrayObject): 

1155 data = b"" 

1156 for s in stream: 

1157 s_resolved = s.get_object() 

1158 if isinstance(s_resolved, NullObject): 

1159 continue 

1160 if not isinstance(s_resolved, StreamObject): 

1161 # No need to emit an exception here for now - the PDF structure 

1162 # seems to already be broken beforehand in these cases. 

1163 logger_warning( 

1164 f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.", 

1165 __name__ 

1166 ) 

1167 else: 

1168 data += s_resolved.get_data() 

1169 if len(data) == 0 or data[-1] != b"\n": 

1170 data += b"\n" 

1171 super().set_data(bytes(data)) 

1172 else: 

1173 stream_data = stream.get_data() 

1174 assert stream_data is not None 

1175 super().set_data(stream_data) 

1176 self.forced_encoding = forced_encoding 

1177 

1178 def replicate( 

1179 self, 

1180 pdf_dest: PdfWriterProtocol, 

1181 ) -> "ContentStream": 

1182 d__ = cast( 

1183 "ContentStream", 

1184 self._reference_clone(self.__class__(None, None), pdf_dest, False), 

1185 ) 

1186 d__._data = self._data 

1187 try: 

1188 decoded_self = self.decoded_self 

1189 if decoded_self is None: 

1190 self.decoded_self = None 

1191 else: 

1192 self.decoded_self = cast( 

1193 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

1194 ) 

1195 except Exception: 

1196 pass 

1197 for k, v in self.items(): 

1198 d__[k.replicate(pdf_dest)] = ( 

1199 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

1200 ) 

1201 return d__ 

1202 d__.set_data(self._data) 

1203 d__.pdf = pdf_dest 

1204 d__._operations = list(self._operations) 

1205 d__.forced_encoding = self.forced_encoding 

1206 return d__ 

1207 

1208 def clone( 

1209 self, 

1210 pdf_dest: Any, 

1211 force_duplicate: bool = False, 

1212 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

1213 ) -> "ContentStream": 

1214 """ 

1215 Clone object into pdf_dest. 

1216 

1217 Args: 

1218 pdf_dest: 

1219 force_duplicate: 

1220 ignore_fields: 

1221 

1222 Returns: 

1223 The cloned ContentStream 

1224 

1225 """ 

1226 try: 

1227 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

1228 return self 

1229 except Exception: 

1230 pass 

1231 

1232 visited: set[tuple[int, int]] = set() 

1233 d__ = cast( 

1234 "ContentStream", 

1235 self._reference_clone( 

1236 self.__class__(None, None), pdf_dest, force_duplicate 

1237 ), 

1238 ) 

1239 if ignore_fields is None: 

1240 ignore_fields = [] 

1241 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

1242 return d__ 

1243 

1244 def _clone( 

1245 self, 

1246 src: DictionaryObject, 

1247 pdf_dest: PdfWriterProtocol, 

1248 force_duplicate: bool, 

1249 ignore_fields: Optional[Sequence[Union[str, int]]], 

1250 visited: set[tuple[int, int]], 

1251 ) -> None: 

1252 """ 

1253 Update the object from src. 

1254 

1255 Args: 

1256 src: 

1257 pdf_dest: 

1258 force_duplicate: 

1259 ignore_fields: 

1260 

1261 """ 

1262 src_cs = cast("ContentStream", src) 

1263 super().set_data(src_cs._data) 

1264 self.pdf = pdf_dest 

1265 self._operations = list(src_cs._operations) 

1266 self.forced_encoding = src_cs.forced_encoding 

1267 # no need to call DictionaryObjection or anything 

1268 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

1269 

1270 def _parse_content_stream(self, stream: StreamType) -> None: 

1271 # 7.8.2 Content Streams 

1272 stream.seek(0, 0) 

1273 operands: list[Union[int, str, PdfObject]] = [] 

1274 while True: 

1275 peek = read_non_whitespace(stream) 

1276 if peek in (b"", 0): 

1277 break 

1278 stream.seek(-1, 1) 

1279 if peek.isalpha() or peek in (b"'", b'"'): 

1280 operator = read_until_regex(stream, NameObject.delimiter_pattern) 

1281 if operator == b"BI": 

1282 # begin inline image - a completely different parsing 

1283 # mechanism is required, of course... thanks buddy... 

1284 assert operands == [] 

1285 ii = self._read_inline_image(stream) 

1286 self._operations.append((ii, b"INLINE IMAGE")) 

1287 else: 

1288 self._operations.append((operands, operator)) 

1289 operands = [] 

1290 elif peek == b"%": 

1291 # If we encounter a comment in the content stream, we have to 

1292 # handle it here. Typically, read_object will handle 

1293 # encountering a comment -- but read_object assumes that 

1294 # following the comment must be the object we're trying to 

1295 # read. In this case, it could be an operator instead. 

1296 while peek not in (b"\r", b"\n", b""): 

1297 peek = stream.read(1) 

1298 else: 

1299 operands.append(read_object(stream, None, self.forced_encoding)) 

1300 

1301 def _read_inline_image(self, stream: StreamType) -> dict[str, Any]: 

1302 # begin reading just after the "BI" - begin image 

1303 # first read the dictionary of settings. 

1304 settings = DictionaryObject() 

1305 while True: 

1306 tok = read_non_whitespace(stream) 

1307 stream.seek(-1, 1) 

1308 if tok == b"I": 

1309 # "ID" - begin of image data 

1310 break 

1311 key = read_object(stream, self.pdf) 

1312 tok = read_non_whitespace(stream) 

1313 stream.seek(-1, 1) 

1314 value = read_object(stream, self.pdf) 

1315 settings[key] = value 

1316 # left at beginning of ID 

1317 tmp = stream.read(3) 

1318 assert tmp[:2] == b"ID" 

1319 filtr = settings.get("/F", settings.get("/Filter", "not set")) 

1320 savpos = stream.tell() 

1321 if isinstance(filtr, list): 

1322 filtr = filtr[0] # used forencoding 

1323 if "AHx" in filtr or "ASCIIHexDecode" in filtr: 

1324 data = extract_inline_AHx(stream) 

1325 elif "A85" in filtr or "ASCII85Decode" in filtr: 

1326 data = extract_inline_A85(stream) 

1327 elif "RL" in filtr or "RunLengthDecode" in filtr: 

1328 data = extract_inline_RL(stream) 

1329 elif "DCT" in filtr or "DCTDecode" in filtr: 

1330 data = extract_inline_DCT(stream) 

1331 elif filtr == "not set": 

1332 cs = settings.get("/CS", "") 

1333 if isinstance(cs, list): 

1334 cs = cs[0] 

1335 if "RGB" in cs: 

1336 lcs = 3 

1337 elif "CMYK" in cs: 

1338 lcs = 4 

1339 else: 

1340 bits = settings.get( 

1341 "/BPC", 

1342 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1, 

1343 ) 

1344 if bits > 0: 

1345 lcs = bits / 8.0 

1346 else: 

1347 data = extract_inline_default(stream) 

1348 lcs = -1 

1349 if lcs > 0: 

1350 data = stream.read( 

1351 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"]) 

1352 ) 

1353 # Move to the `EI` if possible. 

1354 ei = read_non_whitespace(stream) 

1355 stream.seek(-1, 1) 

1356 else: 

1357 data = extract_inline_default(stream) 

1358 

1359 ei = stream.read(3) 

1360 stream.seek(-1, 1) 

1361 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: 

1362 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above. 

1363 stream.seek(savpos, 0) 

1364 data = extract_inline_default(stream) 

1365 ei = stream.read(3) 

1366 stream.seek(-1, 1) 

1367 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover 

1368 # Check the same condition again. This should never fail as 

1369 # edge cases are covered by `extract_inline_default` above, 

1370 # but check this ot make sure that we are behind the `EI` afterwards. 

1371 raise PdfStreamError( 

1372 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}" 

1373 ) 

1374 return {"settings": settings, "data": data} 

1375 

1376 # This overrides the parent method 

1377 def get_data(self) -> bytes: 

1378 if not self._data: 

1379 new_data = BytesIO() 

1380 for operands, operator in self._operations: 

1381 if operator == b"INLINE IMAGE": 

1382 new_data.write(b"BI") 

1383 dict_text = BytesIO() 

1384 operands["settings"].write_to_stream(dict_text) 

1385 new_data.write(dict_text.getvalue()[2:-2]) 

1386 new_data.write(b"ID ") 

1387 new_data.write(operands["data"]) 

1388 new_data.write(b"EI") 

1389 else: 

1390 for op in operands: 

1391 op.write_to_stream(new_data) 

1392 new_data.write(b" ") 

1393 new_data.write(operator) 

1394 new_data.write(b"\n") 

1395 self._data = new_data.getvalue() 

1396 return self._data 

1397 

1398 # This overrides the parent method 

1399 def set_data(self, data: bytes) -> None: 

1400 super().set_data(data) 

1401 self._operations = [] 

1402 

1403 @property 

1404 def operations(self) -> list[tuple[Any, bytes]]: 

1405 if not self._operations and self._data: 

1406 self._parse_content_stream(BytesIO(self._data)) 

1407 self._data = b"" 

1408 return self._operations 

1409 

1410 @operations.setter 

1411 def operations(self, operations: list[tuple[Any, bytes]]) -> None: 

1412 self._operations = operations 

1413 self._data = b"" 

1414 

1415 def isolate_graphics_state(self) -> None: 

1416 if self._operations: 

1417 self._operations.insert(0, ([], b"q")) 

1418 self._operations.append(([], b"Q")) 

1419 elif self._data: 

1420 self._data = b"q\n" + self._data + b"\nQ\n" 

1421 

1422 # This overrides the parent method 

1423 def write_to_stream( 

1424 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1425 ) -> None: 

1426 if not self._data and self._operations: 

1427 self.get_data() # this ensures ._data is rebuilt 

1428 super().write_to_stream(stream, encryption_key) 

1429 

1430 

1431def read_object( 

1432 stream: StreamType, 

1433 pdf: Optional[PdfReaderProtocol], 

1434 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

1435) -> Union[PdfObject, int, str, ContentStream]: 

1436 tok = stream.read(1) 

1437 stream.seek(-1, 1) # reset to start 

1438 if tok == b"/": 

1439 return NameObject.read_from_stream(stream, pdf) 

1440 if tok == b"<": 

1441 # hexadecimal string OR dictionary 

1442 peek = stream.read(2) 

1443 stream.seek(-2, 1) # reset to start 

1444 if peek == b"<<": 

1445 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding) 

1446 return read_hex_string_from_stream(stream, forced_encoding) 

1447 if tok == b"[": 

1448 return ArrayObject.read_from_stream(stream, pdf, forced_encoding) 

1449 if tok in (b"t", b"f"): 

1450 return BooleanObject.read_from_stream(stream) 

1451 if tok == b"(": 

1452 return read_string_from_stream(stream, forced_encoding) 

1453 if tok == b"e" and stream.read(6) == b"endobj": 

1454 return NullObject() 

1455 if tok == b"n": 

1456 return NullObject.read_from_stream(stream) 

1457 if tok == b"%": 

1458 # comment 

1459 skip_over_comment(stream) 

1460 tok = read_non_whitespace(stream) 

1461 stream.seek(-1, 1) 

1462 return read_object(stream, pdf, forced_encoding) 

1463 if tok in b"0123456789+-.": 

1464 # number object OR indirect reference 

1465 peek = stream.read(20) 

1466 stream.seek(-len(peek), 1) # reset to start 

1467 if IndirectPattern.match(peek) is not None: 

1468 assert pdf is not None, "mypy" 

1469 return IndirectObject.read_from_stream(stream, pdf) 

1470 return NumberObject.read_from_stream(stream) 

1471 pos = stream.tell() 

1472 stream.seek(-20, 1) 

1473 stream_extract = stream.read(80) 

1474 stream.seek(pos) 

1475 read_until_whitespace(stream) 

1476 raise PdfReadError( 

1477 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}" 

1478 ) 

1479 

1480 

1481class Field(TreeObject): 

1482 """ 

1483 A class representing a field dictionary. 

1484 

1485 This class is accessed through 

1486 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1487 """ 

1488 

1489 def __init__(self, data: DictionaryObject) -> None: 

1490 DictionaryObject.__init__(self) 

1491 field_attributes = ( 

1492 FieldDictionaryAttributes.attributes() 

1493 + CheckboxRadioButtonAttributes.attributes() 

1494 ) 

1495 self.indirect_reference = data.indirect_reference 

1496 for attr in field_attributes: 

1497 try: 

1498 self[NameObject(attr)] = data[attr] 

1499 except KeyError: 

1500 pass 

1501 if isinstance(self.get("/V"), EncodedStreamObject): 

1502 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data() 

1503 if isinstance(d, bytes): 

1504 d_str = d.decode() 

1505 elif d is None: 

1506 d_str = "" 

1507 else: 

1508 raise Exception("Should never happen") 

1509 self[NameObject("/V")] = TextStringObject(d_str) 

1510 

1511 # TABLE 8.69 Entries common to all field dictionaries 

1512 @property 

1513 def field_type(self) -> Optional[NameObject]: 

1514 """Read-only property accessing the type of this field.""" 

1515 return self.get(FieldDictionaryAttributes.FT) 

1516 

1517 @property 

1518 def parent(self) -> Optional[DictionaryObject]: 

1519 """Read-only property accessing the parent of this field.""" 

1520 return self.get(FieldDictionaryAttributes.Parent) 

1521 

1522 @property 

1523 def kids(self) -> Optional["ArrayObject"]: 

1524 """Read-only property accessing the kids of this field.""" 

1525 return self.get(FieldDictionaryAttributes.Kids) 

1526 

1527 @property 

1528 def name(self) -> Optional[str]: 

1529 """Read-only property accessing the name of this field.""" 

1530 return self.get(FieldDictionaryAttributes.T) 

1531 

1532 @property 

1533 def alternate_name(self) -> Optional[str]: 

1534 """Read-only property accessing the alternate name of this field.""" 

1535 return self.get(FieldDictionaryAttributes.TU) 

1536 

1537 @property 

1538 def mapping_name(self) -> Optional[str]: 

1539 """ 

1540 Read-only property accessing the mapping name of this field. 

1541 

1542 This name is used by pypdf as a key in the dictionary returned by 

1543 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1544 """ 

1545 return self.get(FieldDictionaryAttributes.TM) 

1546 

1547 @property 

1548 def flags(self) -> Optional[int]: 

1549 """ 

1550 Read-only property accessing the field flags, specifying various 

1551 characteristics of the field (see Table 8.70 of the PDF 1.7 reference). 

1552 """ 

1553 return self.get(FieldDictionaryAttributes.Ff) 

1554 

1555 @property 

1556 def value(self) -> Optional[Any]: 

1557 """ 

1558 Read-only property accessing the value of this field. 

1559 

1560 Format varies based on field type. 

1561 """ 

1562 return self.get(FieldDictionaryAttributes.V) 

1563 

1564 @property 

1565 def default_value(self) -> Optional[Any]: 

1566 """Read-only property accessing the default value of this field.""" 

1567 return self.get(FieldDictionaryAttributes.DV) 

1568 

1569 @property 

1570 def additional_actions(self) -> Optional[DictionaryObject]: 

1571 """ 

1572 Read-only property accessing the additional actions dictionary. 

1573 

1574 This dictionary defines the field's behavior in response to trigger 

1575 events. See Section 8.5.2 of the PDF 1.7 reference. 

1576 """ 

1577 return self.get(FieldDictionaryAttributes.AA) 

1578 

1579 

1580class Destination(TreeObject): 

1581 """ 

1582 A class representing a destination within a PDF file. 

1583 

1584 See section 12.3.2 of the PDF 2.0 reference. 

1585 

1586 Args: 

1587 title: Title of this destination. 

1588 page: Reference to the page of this destination. Should 

1589 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`. 

1590 fit: How the destination is displayed. 

1591 

1592 Raises: 

1593 PdfReadError: If destination type is invalid. 

1594 

1595 """ 

1596 

1597 node: Optional[ 

1598 DictionaryObject 

1599 ] = None # node provide access to the original Object 

1600 

1601 def __init__( 

1602 self, 

1603 title: Union[str, bytes], 

1604 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject], 

1605 fit: Fit, 

1606 ) -> None: 

1607 self._filtered_children: list[Any] = [] # used in PdfWriter 

1608 

1609 typ = fit.fit_type 

1610 args = fit.fit_args 

1611 

1612 DictionaryObject.__init__(self) 

1613 self[NameObject("/Title")] = TextStringObject(title) 

1614 self[NameObject("/Page")] = page 

1615 self[NameObject("/Type")] = typ 

1616 

1617 # from table 8.2 of the PDF 1.7 reference. 

1618 if typ == "/XYZ": 

1619 if len(args) < 1: # left is missing : should never occur 

1620 args.append(NumberObject(0.0)) 

1621 if len(args) < 2: # top is missing 

1622 args.append(NumberObject(0.0)) 

1623 if len(args) < 3: # zoom is missing 

1624 args.append(NumberObject(0.0)) 

1625 ( 

1626 self[NameObject(TA.LEFT)], 

1627 self[NameObject(TA.TOP)], 

1628 self[NameObject("/Zoom")], 

1629 ) = args 

1630 elif len(args) == 0: 

1631 pass 

1632 elif typ == TF.FIT_R: 

1633 ( 

1634 self[NameObject(TA.LEFT)], 

1635 self[NameObject(TA.BOTTOM)], 

1636 self[NameObject(TA.RIGHT)], 

1637 self[NameObject(TA.TOP)], 

1638 ) = args 

1639 elif typ in [TF.FIT_H, TF.FIT_BH]: 

1640 try: # Prefer to be more robust not only to null parameters 

1641 (self[NameObject(TA.TOP)],) = args 

1642 except Exception: 

1643 (self[NameObject(TA.TOP)],) = (NullObject(),) 

1644 elif typ in [TF.FIT_V, TF.FIT_BV]: 

1645 try: # Prefer to be more robust not only to null parameters 

1646 (self[NameObject(TA.LEFT)],) = args 

1647 except Exception: 

1648 (self[NameObject(TA.LEFT)],) = (NullObject(),) 

1649 elif typ in [TF.FIT, TF.FIT_B]: 

1650 pass 

1651 else: 

1652 raise PdfReadError(f"Unknown Destination Type: {typ!r}") 

1653 

1654 @property 

1655 def dest_array(self) -> "ArrayObject": 

1656 return ArrayObject( 

1657 [self.raw_get("/Page"), self["/Type"]] 

1658 + [ 

1659 self[x] 

1660 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"] 

1661 if x in self 

1662 ] 

1663 ) 

1664 

1665 def write_to_stream( 

1666 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1667 ) -> None: 

1668 if encryption_key is not None: # deprecated 

1669 deprecation_no_replacement( 

1670 "the encryption_key parameter of write_to_stream", "5.0.0" 

1671 ) 

1672 stream.write(b"<<\n") 

1673 key = NameObject("/D") 

1674 key.write_to_stream(stream) 

1675 stream.write(b" ") 

1676 value = self.dest_array 

1677 value.write_to_stream(stream) 

1678 

1679 key = NameObject("/S") 

1680 key.write_to_stream(stream) 

1681 stream.write(b" ") 

1682 value_s = NameObject("/GoTo") 

1683 value_s.write_to_stream(stream) 

1684 

1685 stream.write(b"\n") 

1686 stream.write(b">>") 

1687 

1688 @property 

1689 def title(self) -> Optional[str]: 

1690 """Read-only property accessing the destination title.""" 

1691 return self.get("/Title") 

1692 

1693 @property 

1694 def page(self) -> Optional[IndirectObject]: 

1695 """Read-only property accessing the IndirectObject of the destination page.""" 

1696 return self.get("/Page") 

1697 

1698 @property 

1699 def typ(self) -> Optional[str]: 

1700 """Read-only property accessing the destination type.""" 

1701 return self.get("/Type") 

1702 

1703 @property 

1704 def zoom(self) -> Optional[int]: 

1705 """Read-only property accessing the zoom factor.""" 

1706 return self.get("/Zoom", None) 

1707 

1708 @property 

1709 def left(self) -> Optional[FloatObject]: 

1710 """Read-only property accessing the left horizontal coordinate.""" 

1711 return self.get("/Left", None) 

1712 

1713 @property 

1714 def right(self) -> Optional[FloatObject]: 

1715 """Read-only property accessing the right horizontal coordinate.""" 

1716 return self.get("/Right", None) 

1717 

1718 @property 

1719 def top(self) -> Optional[FloatObject]: 

1720 """Read-only property accessing the top vertical coordinate.""" 

1721 return self.get("/Top", None) 

1722 

1723 @property 

1724 def bottom(self) -> Optional[FloatObject]: 

1725 """Read-only property accessing the bottom vertical coordinate.""" 

1726 return self.get("/Bottom", None) 

1727 

1728 @property 

1729 def color(self) -> Optional["ArrayObject"]: 

1730 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0.""" 

1731 return self.get( 

1732 "/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)]) 

1733 ) 

1734 

1735 @property 

1736 def font_format(self) -> Optional[OutlineFontFlag]: 

1737 """ 

1738 Read-only property accessing the font type. 

1739 

1740 1=italic, 2=bold, 3=both 

1741 """ 

1742 return self.get("/F", 0) 

1743 

1744 @property 

1745 def outline_count(self) -> Optional[int]: 

1746 """ 

1747 Read-only property accessing the outline count. 

1748 

1749 positive = expanded 

1750 negative = collapsed 

1751 absolute value = number of visible descendants at all levels 

1752 """ 

1753 return self.get("/Count", None)