Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/generic/_data_structures.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

954 statements  

1# Copyright (c) 2006, Mathieu Fenniak 

2# All rights reserved. 

3# 

4# Redistribution and use in source and binary forms, with or without 

5# modification, are permitted provided that the following conditions are 

6# met: 

7# 

8# * Redistributions of source code must retain the above copyright notice, 

9# this list of conditions and the following disclaimer. 

10# * Redistributions in binary form must reproduce the above copyright notice, 

11# this list of conditions and the following disclaimer in the documentation 

12# and/or other materials provided with the distribution. 

13# * The name of the author may not be used to endorse or promote products 

14# derived from this software without specific prior written permission. 

15# 

16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 

19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 

20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 

22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 

23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 

24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 

25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 

26# POSSIBILITY OF SUCH DAMAGE. 

27 

28 

29__author__ = "Mathieu Fenniak" 

30__author_email__ = "biziqe@mathieu.fenniak.net" 

31 

32import logging 

33import re 

34import sys 

35from collections.abc import Iterable, Sequence 

36from io import BytesIO 

37from math import ceil 

38from typing import ( 

39 Any, 

40 Callable, 

41 Optional, 

42 Union, 

43 cast, 

44) 

45 

46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol 

47from .._utils import ( 

48 WHITESPACES, 

49 StreamType, 

50 deprecation_no_replacement, 

51 logger_warning, 

52 read_non_whitespace, 

53 read_until_regex, 

54 read_until_whitespace, 

55 skip_over_comment, 

56) 

57from ..constants import ( 

58 CheckboxRadioButtonAttributes, 

59 FieldDictionaryAttributes, 

60 OutlineFontFlag, 

61) 

62from ..constants import FilterTypes as FT 

63from ..constants import StreamAttributes as SA 

64from ..constants import TypArguments as TA 

65from ..constants import TypFitArguments as TF 

66from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError 

67from ._base import ( 

68 BooleanObject, 

69 ByteStringObject, 

70 FloatObject, 

71 IndirectObject, 

72 NameObject, 

73 NullObject, 

74 NumberObject, 

75 PdfObject, 

76 TextStringObject, 

77 is_null_or_none, 

78) 

79from ._fit import Fit 

80from ._image_inline import ( 

81 extract_inline__ascii85_decode, 

82 extract_inline__ascii_hex_decode, 

83 extract_inline__dct_decode, 

84 extract_inline__run_length_decode, 

85 extract_inline_default, 

86) 

87from ._utils import read_hex_string_from_stream, read_string_from_stream 

88 

89if sys.version_info >= (3, 11): 

90 from typing import Self 

91else: 

92 from typing_extensions import Self 

93 

94logger = logging.getLogger(__name__) 

95 

96IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]") 

97 

98 

99class ArrayObject(list[Any], PdfObject): 

100 def replicate( 

101 self, 

102 pdf_dest: PdfWriterProtocol, 

103 ) -> "ArrayObject": 

104 arr = cast( 

105 "ArrayObject", 

106 self._reference_clone(ArrayObject(), pdf_dest, False), 

107 ) 

108 for data in self: 

109 if hasattr(data, "replicate"): 

110 arr.append(data.replicate(pdf_dest)) 

111 else: 

112 arr.append(data) 

113 return arr 

114 

115 def clone( 

116 self, 

117 pdf_dest: PdfWriterProtocol, 

118 force_duplicate: bool = False, 

119 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

120 ) -> "ArrayObject": 

121 """Clone object into pdf_dest.""" 

122 try: 

123 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

124 return self 

125 except Exception: 

126 pass 

127 arr = cast( 

128 "ArrayObject", 

129 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate), 

130 ) 

131 for data in self: 

132 if isinstance(data, StreamObject): 

133 dup = data._reference_clone( 

134 data.clone(pdf_dest, force_duplicate, ignore_fields), 

135 pdf_dest, 

136 force_duplicate, 

137 ) 

138 arr.append(dup.indirect_reference) 

139 elif hasattr(data, "clone"): 

140 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields)) 

141 else: 

142 arr.append(data) 

143 return arr 

144 

145 def hash_bin(self) -> int: 

146 """ 

147 Used to detect modified object. 

148 

149 Returns: 

150 Hash considering type and value. 

151 

152 """ 

153 return hash((self.__class__, tuple(x.hash_bin() for x in self))) 

154 

155 def items(self) -> Iterable[Any]: 

156 """Emulate DictionaryObject.items for a list (index, object).""" 

157 return enumerate(self) 

158 

159 def _to_lst(self, lst: Any) -> list[Any]: 

160 # Convert to list, internal 

161 if isinstance(lst, (list, tuple, set)): 

162 pass 

163 elif isinstance(lst, PdfObject): 

164 lst = [lst] 

165 elif isinstance(lst, str): 

166 if lst[0] == "/": 

167 lst = [NameObject(lst)] 

168 else: 

169 lst = [TextStringObject(lst)] 

170 elif isinstance(lst, bytes): 

171 lst = [ByteStringObject(lst)] 

172 else: # for numbers,... 

173 lst = [lst] 

174 return lst 

175 

176 def __add__(self, lst: Any) -> "ArrayObject": 

177 """ 

178 Allow extension by adding list or add one element only 

179 

180 Args: 

181 lst: any list, tuples are extended the list. 

182 other types(numbers,...) will be appended. 

183 if str is passed it will be converted into TextStringObject 

184 or NameObject (if starting with "/") 

185 if bytes is passed it will be converted into ByteStringObject 

186 

187 Returns: 

188 ArrayObject with all elements 

189 

190 """ 

191 temp = ArrayObject(self) 

192 temp.extend(self._to_lst(lst)) 

193 return temp 

194 

195 def __iadd__(self, lst: Any) -> Self: 

196 """ 

197 Allow extension by adding list or add one element only 

198 

199 Args: 

200 lst: any list, tuples are extended the list. 

201 other types(numbers,...) will be appended. 

202 if str is passed it will be converted into TextStringObject 

203 or NameObject (if starting with "/") 

204 if bytes is passed it will be converted into ByteStringObject 

205 

206 """ 

207 self.extend(self._to_lst(lst)) 

208 return self 

209 

210 def __isub__(self, lst: Any) -> Self: 

211 """Allow to remove items""" 

212 for x in self._to_lst(lst): 

213 try: 

214 index = self.index(x) 

215 del self[index] 

216 except ValueError: 

217 pass 

218 return self 

219 

220 def write_to_stream( 

221 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

222 ) -> None: 

223 if encryption_key is not None: # deprecated 

224 deprecation_no_replacement( 

225 "the encryption_key parameter of write_to_stream", "5.0.0" 

226 ) 

227 stream.write(b"[") 

228 for data in self: 

229 stream.write(b" ") 

230 data.write_to_stream(stream) 

231 stream.write(b" ]") 

232 

233 @staticmethod 

234 def read_from_stream( 

235 stream: StreamType, 

236 pdf: Optional[PdfReaderProtocol], 

237 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

238 ) -> "ArrayObject": 

239 arr = ArrayObject() 

240 tmp = stream.read(1) 

241 if tmp != b"[": 

242 raise PdfReadError("Could not read array") 

243 while True: 

244 # skip leading whitespace 

245 tok = stream.read(1) 

246 while tok.isspace(): 

247 tok = stream.read(1) 

248 if tok == b"": 

249 break 

250 if tok == b"%": 

251 stream.seek(-1, 1) 

252 skip_over_comment(stream) 

253 continue 

254 stream.seek(-1, 1) 

255 # check for array ending 

256 peek_ahead = stream.read(1) 

257 if peek_ahead == b"]": 

258 break 

259 stream.seek(-1, 1) 

260 # read and append object 

261 arr.append(read_object(stream, pdf, forced_encoding)) 

262 return arr 

263 

264 

265class DictionaryObject(dict[Any, Any], PdfObject): 

266 def replicate( 

267 self, 

268 pdf_dest: PdfWriterProtocol, 

269 ) -> "DictionaryObject": 

270 d__ = cast( 

271 "DictionaryObject", 

272 self._reference_clone(self.__class__(), pdf_dest, False), 

273 ) 

274 for k, v in self.items(): 

275 d__[k.replicate(pdf_dest)] = ( 

276 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

277 ) 

278 return d__ 

279 

280 def clone( 

281 self, 

282 pdf_dest: PdfWriterProtocol, 

283 force_duplicate: bool = False, 

284 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

285 ) -> "DictionaryObject": 

286 """Clone object into pdf_dest.""" 

287 try: 

288 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

289 return self 

290 except Exception: 

291 pass 

292 

293 visited: set[tuple[int, int]] = set() # (idnum, generation) 

294 d__ = cast( 

295 "DictionaryObject", 

296 self._reference_clone(self.__class__(), pdf_dest, force_duplicate), 

297 ) 

298 if ignore_fields is None: 

299 ignore_fields = [] 

300 if len(d__.keys()) == 0: 

301 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

302 return d__ 

303 

304 def _clone( 

305 self, 

306 src: "DictionaryObject", 

307 pdf_dest: PdfWriterProtocol, 

308 force_duplicate: bool, 

309 ignore_fields: Optional[Sequence[Union[str, int]]], 

310 visited: set[tuple[int, int]], # (idnum, generation) 

311 ) -> None: 

312 """ 

313 Update the object from src. 

314 

315 Args: 

316 src: "DictionaryObject": 

317 pdf_dest: 

318 force_duplicate: 

319 ignore_fields: 

320 

321 """ 

322 # First we remove the ignore_fields 

323 # that are for a limited number of levels 

324 assert ignore_fields is not None 

325 ignore_fields = list(ignore_fields) 

326 x = 0 

327 while x < len(ignore_fields): 

328 if isinstance(ignore_fields[x], int): 

329 if cast(int, ignore_fields[x]) <= 0: 

330 del ignore_fields[x] 

331 del ignore_fields[x] 

332 continue 

333 ignore_fields[x] -= 1 # type:ignore 

334 x += 1 

335 # Check if this is a chain list, we need to loop to prevent recur 

336 if any( 

337 field not in ignore_fields 

338 and field in src 

339 and isinstance(src.raw_get(field), IndirectObject) 

340 and isinstance(src[field], DictionaryObject) 

341 and ( 

342 src.get("/Type", None) is None 

343 or cast(DictionaryObject, src[field]).get("/Type", None) is None 

344 or src.get("/Type", None) 

345 == cast(DictionaryObject, src[field]).get("/Type", None) 

346 ) 

347 for field in ["/Next", "/Prev", "/N", "/V"] 

348 ): 

349 ignore_fields = list(ignore_fields) 

350 for lst in (("/Next", "/Prev"), ("/N", "/V")): 

351 for k in lst: 

352 objs = [] 

353 if ( 

354 k in src 

355 and k not in self 

356 and isinstance(src.raw_get(k), IndirectObject) 

357 and isinstance(src[k], DictionaryObject) 

358 # If need to go further the idea is to check 

359 # that the types are the same 

360 and ( 

361 src.get("/Type", None) is None 

362 or cast(DictionaryObject, src[k]).get("/Type", None) is None 

363 or src.get("/Type", None) 

364 == cast(DictionaryObject, src[k]).get("/Type", None) 

365 ) 

366 ): 

367 cur_obj: Optional[DictionaryObject] = cast( 

368 "DictionaryObject", src[k] 

369 ) 

370 prev_obj: Optional[DictionaryObject] = self 

371 while cur_obj is not None: 

372 clon = cast( 

373 "DictionaryObject", 

374 cur_obj._reference_clone( 

375 cur_obj.__class__(), pdf_dest, force_duplicate 

376 ), 

377 ) 

378 # Check to see if we've previously processed our item 

379 if clon.indirect_reference is not None: 

380 idnum = clon.indirect_reference.idnum 

381 generation = clon.indirect_reference.generation 

382 if (idnum, generation) in visited: 

383 cur_obj = None 

384 break 

385 visited.add((idnum, generation)) 

386 objs.append((cur_obj, clon)) 

387 assert prev_obj is not None 

388 prev_obj[NameObject(k)] = clon.indirect_reference 

389 prev_obj = clon 

390 try: 

391 if cur_obj == src: 

392 cur_obj = None 

393 else: 

394 cur_obj = cast("DictionaryObject", cur_obj[k]) 

395 except Exception: 

396 cur_obj = None 

397 for s, c in objs: 

398 c._clone( 

399 s, pdf_dest, force_duplicate, ignore_fields, visited 

400 ) 

401 

402 for k, v in src.items(): 

403 if k not in ignore_fields: 

404 if isinstance(v, StreamObject): 

405 if not hasattr(v, "indirect_reference"): 

406 v.indirect_reference = None 

407 vv = v.clone(pdf_dest, force_duplicate, ignore_fields) 

408 assert vv.indirect_reference is not None 

409 self[k.clone(pdf_dest)] = vv.indirect_reference 

410 elif k not in self: 

411 self[NameObject(k)] = ( 

412 v.clone(pdf_dest, force_duplicate, ignore_fields) 

413 if hasattr(v, "clone") 

414 else v 

415 ) 

416 

417 def hash_bin(self) -> int: 

418 """ 

419 Used to detect modified object. 

420 

421 Returns: 

422 Hash considering type and value. 

423 

424 """ 

425 return hash( 

426 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items()))) 

427 ) 

428 

429 def raw_get(self, key: Any) -> Any: 

430 return dict.__getitem__(self, key) 

431 

432 def get_inherited(self, key: str, default: Any = None) -> Any: 

433 """ 

434 Returns the value of a key or from the parent if not found. 

435 If not found returns default. 

436 

437 Args: 

438 key: string identifying the field to return 

439 

440 default: default value to return 

441 

442 Returns: 

443 Current key or inherited one, otherwise default value. 

444 

445 """ 

446 if key in self: 

447 return self[key] 

448 try: 

449 if "/Parent" not in self: 

450 return default 

451 raise KeyError("Not present") 

452 except KeyError: 

453 return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited( 

454 key, default 

455 ) 

456 

457 def __setitem__(self, key: Any, value: Any) -> Any: 

458 if not isinstance(key, PdfObject): 

459 raise ValueError("Key must be a PdfObject") 

460 if not isinstance(value, PdfObject): 

461 raise ValueError("Value must be a PdfObject") 

462 return dict.__setitem__(self, key, value) 

463 

464 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any: 

465 if not isinstance(key, PdfObject): 

466 raise ValueError("Key must be a PdfObject") 

467 if not isinstance(value, PdfObject): 

468 raise ValueError("Value must be a PdfObject") 

469 return dict.setdefault(self, key, value) 

470 

471 def __getitem__(self, key: Any) -> PdfObject: 

472 return dict.__getitem__(self, key).get_object() 

473 

474 @property 

475 def xmp_metadata(self) -> Optional[XmpInformationProtocol]: 

476 """ 

477 Retrieve XMP (Extensible Metadata Platform) data relevant to this 

478 object, if available. 

479 

480 See Table 347 — Additional entries in a metadata stream dictionary. 

481 

482 Returns: 

483 Returns a :class:`~pypdf.xmp.XmpInformation` instance 

484 that can be used to access XMP metadata from the document. Can also 

485 return None if no metadata was found on the document root. 

486 

487 """ 

488 from ..xmp import XmpInformation # noqa: PLC0415 

489 

490 metadata = self.get("/Metadata", None) 

491 if is_null_or_none(metadata): 

492 return None 

493 assert metadata is not None, "mypy" 

494 metadata = metadata.get_object() 

495 return XmpInformation(metadata) 

496 

497 def write_to_stream( 

498 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

499 ) -> None: 

500 if encryption_key is not None: # deprecated 

501 deprecation_no_replacement( 

502 "the encryption_key parameter of write_to_stream", "5.0.0" 

503 ) 

504 stream.write(b"<<\n") 

505 for key, value in self.items(): 

506 if len(key) > 2 and key[1] == "%" and key[-1] == "%": 

507 continue 

508 key.write_to_stream(stream, encryption_key) 

509 stream.write(b" ") 

510 value.write_to_stream(stream) 

511 stream.write(b"\n") 

512 stream.write(b">>") 

513 

514 @classmethod 

515 def _get_next_object_position( 

516 cls, position_before: int, position_end: int, generations: list[int], pdf: PdfReaderProtocol 

517 ) -> int: 

518 out = position_end 

519 for generation in generations: 

520 location = pdf.xref[generation] 

521 values = [x for x in location.values() if position_before < x <= position_end] 

522 if values: 

523 out = min(out, *values) 

524 return out 

525 

526 @classmethod 

527 def _read_unsized_from_stream( 

528 cls, stream: StreamType, pdf: PdfReaderProtocol 

529 ) -> bytes: 

530 object_position = cls._get_next_object_position( 

531 position_before=stream.tell(), position_end=2 ** 32, generations=list(pdf.xref), pdf=pdf 

532 ) - 1 

533 current_position = stream.tell() 

534 # Read until the next object position. 

535 read_value = stream.read(object_position - stream.tell()) 

536 endstream_position = read_value.find(b"endstream") 

537 if endstream_position < 0: 

538 raise PdfReadError( 

539 f"Unable to find 'endstream' marker for obj starting at {current_position}." 

540 ) 

541 # 9 = len(b"endstream") 

542 stream.seek(current_position + endstream_position + 9) 

543 return read_value[: endstream_position - 1] 

544 

545 @staticmethod 

546 def read_from_stream( 

547 stream: StreamType, 

548 pdf: Optional[PdfReaderProtocol], 

549 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

550 ) -> "DictionaryObject": 

551 tmp = stream.read(2) 

552 if tmp != b"<<": 

553 raise PdfReadError( 

554 f"Dictionary read error at byte {hex(stream.tell())}: " 

555 "stream must begin with '<<'" 

556 ) 

557 data: dict[Any, Any] = {} 

558 while True: 

559 tok = read_non_whitespace(stream) 

560 if tok == b"\x00": 

561 continue 

562 if tok == b"%": 

563 stream.seek(-1, 1) 

564 skip_over_comment(stream) 

565 continue 

566 if not tok: 

567 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) 

568 

569 if tok == b">": 

570 stream.read(1) 

571 break 

572 stream.seek(-1, 1) 

573 try: 

574 try: 

575 key = read_object(stream, pdf) 

576 if isinstance(key, NullObject): 

577 break 

578 if not isinstance(key, NameObject): 

579 raise PdfReadError( 

580 f"Expecting a NameObject for key but found {key!r}" 

581 ) 

582 except PdfReadError as exc: 

583 if pdf is not None and pdf.strict: 

584 raise 

585 logger_warning(exc.__repr__(), __name__) 

586 continue 

587 tok = read_non_whitespace(stream) 

588 stream.seek(-1, 1) 

589 value = read_object(stream, pdf, forced_encoding) 

590 except Exception as exc: 

591 if pdf is not None and pdf.strict: 

592 raise PdfReadError(exc.__repr__()) 

593 logger_warning(exc.__repr__(), __name__) 

594 retval = DictionaryObject() 

595 retval.update(data) 

596 return retval # return partial data 

597 

598 if not data.get(key): 

599 data[key] = value 

600 else: 

601 # multiple definitions of key not permitted 

602 msg = ( 

603 f"Multiple definitions in dictionary at byte " 

604 f"{hex(stream.tell())} for key {key}" 

605 ) 

606 if pdf is not None and pdf.strict: 

607 raise PdfReadError(msg) 

608 logger_warning(msg, __name__) 

609 

610 pos = stream.tell() 

611 s = read_non_whitespace(stream) 

612 if s == b"s" and stream.read(5) == b"tream": 

613 eol = stream.read(1) 

614 # Occasional PDF file output has spaces after 'stream' keyword but before EOL. 

615 # patch provided by Danial Sandler 

616 while eol == b" ": 

617 eol = stream.read(1) 

618 if eol not in (b"\n", b"\r"): 

619 raise PdfStreamError("Stream data must be followed by a newline") 

620 if eol == b"\r" and stream.read(1) != b"\n": 

621 stream.seek(-1, 1) 

622 # this is a stream object, not a dictionary 

623 if SA.LENGTH not in data: 

624 if pdf is not None and pdf.strict: 

625 raise PdfStreamError("Stream length not defined") 

626 logger_warning( 

627 f"Stream length not defined @pos={stream.tell()}", __name__ 

628 ) 

629 data[NameObject(SA.LENGTH)] = NumberObject(-1) 

630 length = data[SA.LENGTH] 

631 if isinstance(length, IndirectObject): 

632 t = stream.tell() 

633 assert pdf is not None, "mypy" 

634 length = pdf.get_object(length) 

635 stream.seek(t, 0) 

636 if length is None: # if the PDF is damaged 

637 length = -1 

638 pstart = stream.tell() 

639 if length >= 0: 

640 data["__streamdata__"] = stream.read(length) 

641 else: 

642 data["__streamdata__"] = read_until_regex( 

643 stream, re.compile(b"endstream") 

644 ) 

645 e = read_non_whitespace(stream) 

646 ndstream = stream.read(8) 

647 if (e + ndstream) != b"endstream": 

648 # the odd PDF file has a length that is too long, so 

649 # we need to read backwards to find the "endstream" ending. 

650 # ReportLab (unknown version) generates files with this bug, 

651 # and Python users into PDF files tend to be our audience. 

652 # we need to do this to correct the streamdata and chop off 

653 # an extra character. 

654 pos = stream.tell() 

655 stream.seek(-10, 1) 

656 end = stream.read(9) 

657 if end == b"endstream": 

658 # we found it by looking back one character further. 

659 data["__streamdata__"] = data["__streamdata__"][:-1] 

660 elif pdf is not None and not pdf.strict: 

661 stream.seek(pstart, 0) 

662 data["__streamdata__"] = DictionaryObject._read_unsized_from_stream(stream, pdf) 

663 pos = stream.tell() 

664 else: 

665 stream.seek(pos, 0) 

666 raise PdfReadError( 

667 "Unable to find 'endstream' marker after stream at byte " 

668 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')." 

669 ) 

670 else: 

671 stream.seek(pos, 0) 

672 if "__streamdata__" in data: 

673 return StreamObject.initialize_from_dictionary(data) 

674 retval = DictionaryObject() 

675 retval.update(data) 

676 return retval 

677 

678 

679class TreeObject(DictionaryObject): 

680 def __init__(self, dct: Optional[DictionaryObject] = None) -> None: 

681 DictionaryObject.__init__(self) 

682 if dct: 

683 self.update(dct) 

684 

685 def has_children(self) -> bool: 

686 return "/First" in self 

687 

688 def __iter__(self) -> Any: 

689 return self.children() 

690 

691 def children(self) -> Iterable[Any]: 

692 if not self.has_children(): 

693 return 

694 

695 child_ref = self[NameObject("/First")] 

696 child = child_ref.get_object() 

697 while True: 

698 yield child 

699 if child == self[NameObject("/Last")]: 

700 return 

701 child_ref = child.get(NameObject("/Next")) # type: ignore 

702 if is_null_or_none(child_ref): 

703 return 

704 child = child_ref.get_object() 

705 

706 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None: 

707 self.insert_child(child, None, pdf) 

708 

709 def inc_parent_counter_default( 

710 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

711 ) -> None: 

712 if is_null_or_none(parent): 

713 return 

714 assert parent is not None, "mypy" 

715 parent = cast("TreeObject", parent.get_object()) 

716 if "/Count" in parent: 

717 parent[NameObject("/Count")] = NumberObject( 

718 max(0, cast(int, parent[NameObject("/Count")]) + n) 

719 ) 

720 self.inc_parent_counter_default(parent.get("/Parent", None), n) 

721 

722 def inc_parent_counter_outline( 

723 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

724 ) -> None: 

725 if is_null_or_none(parent): 

726 return 

727 assert parent is not None, "mypy" 

728 parent = cast("TreeObject", parent.get_object()) 

729 # BooleanObject requires comparison with == not is 

730 opn = parent.get("/%is_open%", True) == True # noqa: E712 

731 c = cast(int, parent.get("/Count", 0)) 

732 if c < 0: 

733 c = abs(c) 

734 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1)) 

735 if not opn: 

736 return 

737 self.inc_parent_counter_outline(parent.get("/Parent", None), n) 

738 

739 def insert_child( 

740 self, 

741 child: Any, 

742 before: Any, 

743 pdf: PdfWriterProtocol, 

744 inc_parent_counter: Optional[Callable[..., Any]] = None, 

745 ) -> IndirectObject: 

746 if inc_parent_counter is None: 

747 inc_parent_counter = self.inc_parent_counter_default 

748 child_obj = child.get_object() 

749 child = child.indirect_reference # get_reference(child_obj) 

750 

751 prev: Optional[DictionaryObject] 

752 if "/First" not in self: # no child yet 

753 self[NameObject("/First")] = child 

754 self[NameObject("/Count")] = NumberObject(0) 

755 self[NameObject("/Last")] = child 

756 child_obj[NameObject("/Parent")] = self.indirect_reference 

757 inc_parent_counter(self, child_obj.get("/Count", 1)) 

758 if "/Next" in child_obj: 

759 del child_obj["/Next"] 

760 if "/Prev" in child_obj: 

761 del child_obj["/Prev"] 

762 return child 

763 prev = cast("DictionaryObject", self["/Last"]) 

764 

765 while prev.indirect_reference != before: 

766 if "/Next" in prev: 

767 prev = cast("TreeObject", prev["/Next"]) 

768 else: # append at the end 

769 prev[NameObject("/Next")] = cast("TreeObject", child) 

770 child_obj[NameObject("/Prev")] = prev.indirect_reference 

771 child_obj[NameObject("/Parent")] = self.indirect_reference 

772 if "/Next" in child_obj: 

773 del child_obj["/Next"] 

774 self[NameObject("/Last")] = child 

775 inc_parent_counter(self, child_obj.get("/Count", 1)) 

776 return child 

777 try: # insert as first or in the middle 

778 assert isinstance(prev["/Prev"], DictionaryObject) 

779 prev["/Prev"][NameObject("/Next")] = child 

780 child_obj[NameObject("/Prev")] = prev["/Prev"] 

781 except Exception: # it means we are inserting in first position 

782 del child_obj["/Next"] 

783 child_obj[NameObject("/Next")] = prev 

784 prev[NameObject("/Prev")] = child 

785 child_obj[NameObject("/Parent")] = self.indirect_reference 

786 inc_parent_counter(self, child_obj.get("/Count", 1)) 

787 return child 

788 

789 def _remove_node_from_tree( 

790 self, prev: Any, prev_ref: Any, cur: Any, last: Any 

791 ) -> None: 

792 """ 

793 Adjust the pointers of the linked list and tree node count. 

794 

795 Args: 

796 prev: 

797 prev_ref: 

798 cur: 

799 last: 

800 

801 """ 

802 next_ref = cur.get(NameObject("/Next"), None) 

803 if prev is None: 

804 if next_ref: 

805 # Removing first tree node 

806 next_obj = next_ref.get_object() 

807 del next_obj[NameObject("/Prev")] 

808 self[NameObject("/First")] = next_ref 

809 self[NameObject("/Count")] = NumberObject( 

810 self[NameObject("/Count")] - 1 # type: ignore 

811 ) 

812 

813 else: 

814 # Removing only tree node 

815 self[NameObject("/Count")] = NumberObject(0) 

816 del self[NameObject("/First")] 

817 if NameObject("/Last") in self: 

818 del self[NameObject("/Last")] 

819 else: 

820 if next_ref: 

821 # Removing middle tree node 

822 next_obj = next_ref.get_object() 

823 next_obj[NameObject("/Prev")] = prev_ref 

824 prev[NameObject("/Next")] = next_ref 

825 else: 

826 # Removing last tree node 

827 assert cur == last 

828 del prev[NameObject("/Next")] 

829 self[NameObject("/Last")] = prev_ref 

830 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore 

831 

832 def remove_child(self, child: Any) -> None: 

833 child_obj = child.get_object() 

834 child = child_obj.indirect_reference 

835 

836 if NameObject("/Parent") not in child_obj: 

837 raise ValueError("Removed child does not appear to be a tree item") 

838 if child_obj[NameObject("/Parent")] != self: 

839 raise ValueError("Removed child is not a member of this tree") 

840 

841 found = False 

842 prev_ref = None 

843 prev = None 

844 cur_ref: Optional[Any] = self[NameObject("/First")] 

845 cur: Optional[dict[str, Any]] = cur_ref.get_object() # type: ignore 

846 last_ref = self[NameObject("/Last")] 

847 last = last_ref.get_object() 

848 while cur is not None: 

849 if cur == child_obj: 

850 self._remove_node_from_tree(prev, prev_ref, cur, last) 

851 found = True 

852 break 

853 

854 # Go to the next node 

855 prev_ref = cur_ref 

856 prev = cur 

857 if NameObject("/Next") in cur: 

858 cur_ref = cur[NameObject("/Next")] 

859 cur = cur_ref.get_object() 

860 else: 

861 cur_ref = None 

862 cur = None 

863 

864 if not found: 

865 raise ValueError("Removal couldn't find item in tree") 

866 

867 _reset_node_tree_relationship(child_obj) 

868 

869 def remove_from_tree(self) -> None: 

870 """Remove the object from the tree it is in.""" 

871 if NameObject("/Parent") not in self: 

872 raise ValueError("Removed child does not appear to be a tree item") 

873 cast("TreeObject", self["/Parent"]).remove_child(self) 

874 

875 def empty_tree(self) -> None: 

876 for child in self: 

877 child_obj = child.get_object() 

878 _reset_node_tree_relationship(child_obj) 

879 

880 if NameObject("/Count") in self: 

881 del self[NameObject("/Count")] 

882 if NameObject("/First") in self: 

883 del self[NameObject("/First")] 

884 if NameObject("/Last") in self: 

885 del self[NameObject("/Last")] 

886 

887 

888def _reset_node_tree_relationship(child_obj: Any) -> None: 

889 """ 

890 Call this after a node has been removed from a tree. 

891 

892 This resets the nodes attributes in respect to that tree. 

893 

894 Args: 

895 child_obj: 

896 

897 """ 

898 del child_obj[NameObject("/Parent")] 

899 if NameObject("/Next") in child_obj: 

900 del child_obj[NameObject("/Next")] 

901 if NameObject("/Prev") in child_obj: 

902 del child_obj[NameObject("/Prev")] 

903 

904 

905class StreamObject(DictionaryObject): 

906 def __init__(self) -> None: 

907 self._data: bytes = b"" 

908 self.decoded_self: Optional[DecodedStreamObject] = None 

909 

910 def replicate( 

911 self, 

912 pdf_dest: PdfWriterProtocol, 

913 ) -> "StreamObject": 

914 d__ = cast( 

915 "StreamObject", 

916 self._reference_clone(self.__class__(), pdf_dest, False), 

917 ) 

918 d__._data = self._data 

919 try: 

920 decoded_self = self.decoded_self 

921 if decoded_self is None: 

922 self.decoded_self = None 

923 else: 

924 self.decoded_self = cast( 

925 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

926 ) 

927 except Exception: 

928 pass 

929 for k, v in self.items(): 

930 d__[k.replicate(pdf_dest)] = ( 

931 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

932 ) 

933 return d__ 

934 

935 def _clone( 

936 self, 

937 src: DictionaryObject, 

938 pdf_dest: PdfWriterProtocol, 

939 force_duplicate: bool, 

940 ignore_fields: Optional[Sequence[Union[str, int]]], 

941 visited: set[tuple[int, int]], 

942 ) -> None: 

943 """ 

944 Update the object from src. 

945 

946 Args: 

947 src: 

948 pdf_dest: 

949 force_duplicate: 

950 ignore_fields: 

951 

952 """ 

953 self._data = cast("StreamObject", src)._data 

954 try: 

955 decoded_self = cast("StreamObject", src).decoded_self 

956 if decoded_self is None: 

957 self.decoded_self = None 

958 else: 

959 self.decoded_self = cast( 

960 "DecodedStreamObject", 

961 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields), 

962 ) 

963 except Exception: 

964 pass 

965 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

966 

967 def hash_bin(self) -> int: 

968 """ 

969 Used to detect modified object. 

970 

971 Returns: 

972 Hash considering type and value. 

973 

974 """ 

975 # Use _data to prevent errors on non-decoded streams. 

976 return hash((super().hash_bin(), self._data)) 

977 

978 def get_data(self) -> bytes: 

979 return self._data 

980 

981 def set_data(self, data: bytes) -> None: 

982 self._data = data 

983 

984 def hash_value_data(self) -> bytes: 

985 data = super().hash_value_data() 

986 data += self.get_data() 

987 return data 

988 

989 def write_to_stream( 

990 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

991 ) -> None: 

992 if encryption_key is not None: # deprecated 

993 deprecation_no_replacement( 

994 "the encryption_key parameter of write_to_stream", "5.0.0" 

995 ) 

996 self[NameObject(SA.LENGTH)] = NumberObject(len(self._data)) 

997 DictionaryObject.write_to_stream(self, stream) 

998 del self[SA.LENGTH] 

999 stream.write(b"\nstream\n") 

1000 stream.write(self._data) 

1001 stream.write(b"\nendstream") 

1002 

1003 @staticmethod 

1004 def initialize_from_dictionary( 

1005 data: dict[str, Any] 

1006 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]: 

1007 retval: Union[EncodedStreamObject, DecodedStreamObject] 

1008 if SA.FILTER in data: 

1009 retval = EncodedStreamObject() 

1010 else: 

1011 retval = DecodedStreamObject() 

1012 retval._data = data["__streamdata__"] 

1013 del data["__streamdata__"] 

1014 if SA.LENGTH in data: 

1015 del data[SA.LENGTH] 

1016 retval.update(data) 

1017 return retval 

1018 

1019 def flate_encode(self, level: int = -1) -> "EncodedStreamObject": 

1020 from ..filters import FlateDecode # noqa: PLC0415 

1021 

1022 if SA.FILTER in self: 

1023 f = self[SA.FILTER] 

1024 if isinstance(f, ArrayObject): 

1025 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f]) 

1026 try: 

1027 params = ArrayObject( 

1028 [NullObject(), *self.get(SA.DECODE_PARMS, ArrayObject())] 

1029 ) 

1030 except TypeError: 

1031 # case of error where the * operator is not working (not an array 

1032 params = ArrayObject( 

1033 [NullObject(), self.get(SA.DECODE_PARMS, ArrayObject())] 

1034 ) 

1035 else: 

1036 f = ArrayObject([NameObject(FT.FLATE_DECODE), f]) 

1037 params = ArrayObject( 

1038 [NullObject(), self.get(SA.DECODE_PARMS, NullObject())] 

1039 ) 

1040 else: 

1041 f = NameObject(FT.FLATE_DECODE) 

1042 params = None 

1043 retval = EncodedStreamObject() 

1044 retval.update(self) 

1045 retval[NameObject(SA.FILTER)] = f 

1046 if params is not None: 

1047 retval[NameObject(SA.DECODE_PARMS)] = params 

1048 retval._data = FlateDecode.encode(self._data, level) 

1049 return retval 

1050 

1051 def decode_as_image(self, pillow_parameters: Union[dict[str, Any], None] = None) -> Any: 

1052 """ 

1053 Try to decode the stream object as an image 

1054 

1055 Args: 

1056 pillow_parameters: parameters provided to Pillow Image.save() method, 

1057 cf. <https://pillow.readthedocs.io/en/stable/reference/Image.html#PIL.Image.Image.save> 

1058 

1059 Returns: 

1060 a PIL image if proper decoding has been found 

1061 Raises: 

1062 Exception: Errors during decoding will be reported. 

1063 It is recommended to catch exceptions to prevent 

1064 stops in your program. 

1065 

1066 """ 

1067 from .._xobj_image_helpers import _xobj_to_image # noqa: PLC0415 

1068 

1069 if self.get("/Subtype", "") != "/Image": 

1070 try: 

1071 msg = f"{self.indirect_reference} does not seem to be an Image" # pragma: no cover 

1072 except AttributeError: 

1073 msg = f"{self.__repr__()} object does not seem to be an Image" # pragma: no cover 

1074 logger_warning(msg, __name__) 

1075 extension, _, img = _xobj_to_image(self, pillow_parameters) 

1076 if extension is None: 

1077 return None # pragma: no cover 

1078 return img 

1079 

1080 

1081class DecodedStreamObject(StreamObject): 

1082 pass 

1083 

1084 

1085class EncodedStreamObject(StreamObject): 

1086 def __init__(self) -> None: 

1087 self.decoded_self: Optional[DecodedStreamObject] = None 

1088 

1089 # This overrides the parent method 

1090 def get_data(self) -> bytes: 

1091 from ..filters import decode_stream_data # noqa: PLC0415 

1092 

1093 if self.decoded_self is not None: 

1094 # Cached version of decoded object 

1095 return self.decoded_self.get_data() 

1096 

1097 # Create decoded object 

1098 decoded = DecodedStreamObject() 

1099 decoded.set_data(decode_stream_data(self)) 

1100 for key, value in self.items(): 

1101 if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS): 

1102 decoded[key] = value 

1103 self.decoded_self = decoded 

1104 return decoded.get_data() 

1105 

1106 # This overrides the parent method: 

1107 def set_data(self, data: bytes) -> None: 

1108 from ..filters import FlateDecode # noqa: PLC0415 

1109 

1110 if self.get(SA.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]): 

1111 if not isinstance(data, bytes): 

1112 raise TypeError("Data must be bytes") 

1113 if self.decoded_self is None: 

1114 self.get_data() # to create self.decoded_self 

1115 assert self.decoded_self is not None, "mypy" 

1116 self.decoded_self.set_data(data) 

1117 super().set_data(FlateDecode.encode(data)) 

1118 else: 

1119 raise PdfReadError( 

1120 "Streams encoded with a filter different from FlateDecode are not supported" 

1121 ) 

1122 

1123 

1124class ContentStream(DecodedStreamObject): 

1125 """ 

1126 In order to be fast, this data structure can contain either: 

1127 

1128 * raw data in ._data 

1129 * parsed stream operations in ._operations. 

1130 

1131 At any time, ContentStream object can either have both of those fields defined, 

1132 or one field defined and the other set to None. 

1133 

1134 These fields are "rebuilt" lazily, when accessed: 

1135 

1136 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations. 

1137 * when .operations is called, if ._operations is None, it is rebuilt from ._data. 

1138 

1139 Conversely, these fields can be invalidated: 

1140 

1141 * when .set_data() is called, ._operations is set to None. 

1142 * when .operations is set, ._data is set to None. 

1143 """ 

1144 

1145 def __init__( 

1146 self, 

1147 stream: Any, 

1148 pdf: Any, 

1149 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

1150 ) -> None: 

1151 self.pdf = pdf 

1152 self._operations: list[tuple[Any, bytes]] = [] 

1153 

1154 # stream may be a StreamObject or an ArrayObject containing 

1155 # StreamObjects to be concatenated together. 

1156 if stream is None: 

1157 super().set_data(b"") 

1158 else: 

1159 stream = stream.get_object() 

1160 if isinstance(stream, ArrayObject): 

1161 data = b"" 

1162 for s in stream: 

1163 s_resolved = s.get_object() 

1164 if isinstance(s_resolved, NullObject): 

1165 continue 

1166 if not isinstance(s_resolved, StreamObject): 

1167 # No need to emit an exception here for now - the PDF structure 

1168 # seems to already be broken beforehand in these cases. 

1169 logger_warning( 

1170 f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.", 

1171 __name__ 

1172 ) 

1173 else: 

1174 data += s_resolved.get_data() 

1175 if len(data) == 0 or data[-1] != b"\n": 

1176 data += b"\n" 

1177 super().set_data(bytes(data)) 

1178 else: 

1179 stream_data = stream.get_data() 

1180 assert stream_data is not None 

1181 super().set_data(stream_data) 

1182 self.forced_encoding = forced_encoding 

1183 

1184 def replicate( 

1185 self, 

1186 pdf_dest: PdfWriterProtocol, 

1187 ) -> "ContentStream": 

1188 d__ = cast( 

1189 "ContentStream", 

1190 self._reference_clone(self.__class__(None, None), pdf_dest, False), 

1191 ) 

1192 d__._data = self._data 

1193 try: 

1194 decoded_self = self.decoded_self 

1195 if decoded_self is None: 

1196 self.decoded_self = None 

1197 else: 

1198 self.decoded_self = cast( 

1199 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

1200 ) 

1201 except Exception: 

1202 pass 

1203 for k, v in self.items(): 

1204 d__[k.replicate(pdf_dest)] = ( 

1205 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

1206 ) 

1207 return d__ 

1208 d__.set_data(self._data) 

1209 d__.pdf = pdf_dest 

1210 d__._operations = list(self._operations) 

1211 d__.forced_encoding = self.forced_encoding 

1212 return d__ 

1213 

1214 def clone( 

1215 self, 

1216 pdf_dest: Any, 

1217 force_duplicate: bool = False, 

1218 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

1219 ) -> "ContentStream": 

1220 """ 

1221 Clone object into pdf_dest. 

1222 

1223 Args: 

1224 pdf_dest: 

1225 force_duplicate: 

1226 ignore_fields: 

1227 

1228 Returns: 

1229 The cloned ContentStream 

1230 

1231 """ 

1232 try: 

1233 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore 

1234 return self 

1235 except Exception: 

1236 pass 

1237 

1238 visited: set[tuple[int, int]] = set() 

1239 d__ = cast( 

1240 "ContentStream", 

1241 self._reference_clone( 

1242 self.__class__(None, None), pdf_dest, force_duplicate 

1243 ), 

1244 ) 

1245 if ignore_fields is None: 

1246 ignore_fields = [] 

1247 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

1248 return d__ 

1249 

1250 def _clone( 

1251 self, 

1252 src: DictionaryObject, 

1253 pdf_dest: PdfWriterProtocol, 

1254 force_duplicate: bool, 

1255 ignore_fields: Optional[Sequence[Union[str, int]]], 

1256 visited: set[tuple[int, int]], 

1257 ) -> None: 

1258 """ 

1259 Update the object from src. 

1260 

1261 Args: 

1262 src: 

1263 pdf_dest: 

1264 force_duplicate: 

1265 ignore_fields: 

1266 

1267 """ 

1268 src_cs = cast("ContentStream", src) 

1269 super().set_data(src_cs._data) 

1270 self.pdf = pdf_dest 

1271 self._operations = list(src_cs._operations) 

1272 self.forced_encoding = src_cs.forced_encoding 

1273 # no need to call DictionaryObjection or anything 

1274 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

1275 

1276 def _parse_content_stream(self, stream: StreamType) -> None: 

1277 # 7.8.2 Content Streams 

1278 stream.seek(0, 0) 

1279 operands: list[Union[int, str, PdfObject]] = [] 

1280 while True: 

1281 peek = read_non_whitespace(stream) 

1282 if peek in (b"", 0): 

1283 break 

1284 stream.seek(-1, 1) 

1285 if peek.isalpha() or peek in (b"'", b'"'): 

1286 operator = read_until_regex(stream, NameObject.delimiter_pattern) 

1287 if operator == b"BI": 

1288 # begin inline image - a completely different parsing 

1289 # mechanism is required, of course... thanks buddy... 

1290 assert operands == [] 

1291 ii = self._read_inline_image(stream) 

1292 self._operations.append((ii, b"INLINE IMAGE")) 

1293 else: 

1294 self._operations.append((operands, operator)) 

1295 operands = [] 

1296 elif peek == b"%": 

1297 # If we encounter a comment in the content stream, we have to 

1298 # handle it here. Typically, read_object will handle 

1299 # encountering a comment -- but read_object assumes that 

1300 # following the comment must be the object we're trying to 

1301 # read. In this case, it could be an operator instead. 

1302 while peek not in (b"\r", b"\n", b""): 

1303 peek = stream.read(1) 

1304 else: 

1305 operands.append(read_object(stream, None, self.forced_encoding)) 

1306 

1307 def _read_inline_image(self, stream: StreamType) -> dict[str, Any]: 

1308 # begin reading just after the "BI" - begin image 

1309 # first read the dictionary of settings. 

1310 settings = DictionaryObject() 

1311 while True: 

1312 tok = read_non_whitespace(stream) 

1313 stream.seek(-1, 1) 

1314 if tok == b"I": 

1315 # "ID" - begin of image data 

1316 break 

1317 key = read_object(stream, self.pdf) 

1318 tok = read_non_whitespace(stream) 

1319 stream.seek(-1, 1) 

1320 value = read_object(stream, self.pdf) 

1321 settings[key] = value 

1322 # left at beginning of ID 

1323 tmp = stream.read(3) 

1324 assert tmp[:2] == b"ID" 

1325 filtr = settings.get("/F", settings.get("/Filter", "not set")) 

1326 savpos = stream.tell() 

1327 if isinstance(filtr, list): 

1328 filtr = filtr[0] # used forencoding 

1329 if "AHx" in filtr or "ASCIIHexDecode" in filtr: 

1330 data = extract_inline__ascii_hex_decode(stream) 

1331 elif "A85" in filtr or "ASCII85Decode" in filtr: 

1332 data = extract_inline__ascii85_decode(stream) 

1333 elif "RL" in filtr or "RunLengthDecode" in filtr: 

1334 data = extract_inline__run_length_decode(stream) 

1335 elif "DCT" in filtr or "DCTDecode" in filtr: 

1336 data = extract_inline__dct_decode(stream) 

1337 elif filtr == "not set": 

1338 cs = settings.get("/CS", "") 

1339 if isinstance(cs, list): 

1340 cs = cs[0] 

1341 if "RGB" in cs: 

1342 lcs = 3 

1343 elif "CMYK" in cs: 

1344 lcs = 4 

1345 else: 

1346 bits = settings.get( 

1347 "/BPC", 

1348 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1, 

1349 ) 

1350 if bits > 0: 

1351 lcs = bits / 8.0 

1352 else: 

1353 data = extract_inline_default(stream) 

1354 lcs = -1 

1355 if lcs > 0: 

1356 data = stream.read( 

1357 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"]) 

1358 ) 

1359 # Move to the `EI` if possible. 

1360 ei = read_non_whitespace(stream) 

1361 stream.seek(-1, 1) 

1362 else: 

1363 data = extract_inline_default(stream) 

1364 

1365 ei = stream.read(3) 

1366 stream.seek(-1, 1) 

1367 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: 

1368 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above. 

1369 stream.seek(savpos, 0) 

1370 data = extract_inline_default(stream) 

1371 ei = stream.read(3) 

1372 stream.seek(-1, 1) 

1373 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover 

1374 # Check the same condition again. This should never fail as 

1375 # edge cases are covered by `extract_inline_default` above, 

1376 # but check this ot make sure that we are behind the `EI` afterwards. 

1377 raise PdfStreamError( 

1378 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}" 

1379 ) 

1380 return {"settings": settings, "data": data} 

1381 

1382 # This overrides the parent method 

1383 def get_data(self) -> bytes: 

1384 if not self._data: 

1385 new_data = BytesIO() 

1386 for operands, operator in self._operations: 

1387 if operator == b"INLINE IMAGE": 

1388 new_data.write(b"BI") 

1389 dict_text = BytesIO() 

1390 operands["settings"].write_to_stream(dict_text) 

1391 new_data.write(dict_text.getvalue()[2:-2]) 

1392 new_data.write(b"ID ") 

1393 new_data.write(operands["data"]) 

1394 new_data.write(b"EI") 

1395 else: 

1396 for op in operands: 

1397 op.write_to_stream(new_data) 

1398 new_data.write(b" ") 

1399 new_data.write(operator) 

1400 new_data.write(b"\n") 

1401 self._data = new_data.getvalue() 

1402 return self._data 

1403 

1404 # This overrides the parent method 

1405 def set_data(self, data: bytes) -> None: 

1406 super().set_data(data) 

1407 self._operations = [] 

1408 

1409 @property 

1410 def operations(self) -> list[tuple[Any, bytes]]: 

1411 if not self._operations and self._data: 

1412 self._parse_content_stream(BytesIO(self._data)) 

1413 self._data = b"" 

1414 return self._operations 

1415 

1416 @operations.setter 

1417 def operations(self, operations: list[tuple[Any, bytes]]) -> None: 

1418 self._operations = operations 

1419 self._data = b"" 

1420 

1421 def isolate_graphics_state(self) -> None: 

1422 if self._operations: 

1423 self._operations.insert(0, ([], b"q")) 

1424 self._operations.append(([], b"Q")) 

1425 elif self._data: 

1426 self._data = b"q\n" + self._data + b"\nQ\n" 

1427 

1428 # This overrides the parent method 

1429 def write_to_stream( 

1430 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1431 ) -> None: 

1432 if not self._data and self._operations: 

1433 self.get_data() # this ensures ._data is rebuilt 

1434 super().write_to_stream(stream, encryption_key) 

1435 

1436 

1437def read_object( 

1438 stream: StreamType, 

1439 pdf: Optional[PdfReaderProtocol], 

1440 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

1441) -> Union[PdfObject, int, str, ContentStream]: 

1442 tok = stream.read(1) 

1443 stream.seek(-1, 1) # reset to start 

1444 if tok == b"/": 

1445 return NameObject.read_from_stream(stream, pdf) 

1446 if tok == b"<": 

1447 # hexadecimal string OR dictionary 

1448 peek = stream.read(2) 

1449 stream.seek(-2, 1) # reset to start 

1450 if peek == b"<<": 

1451 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding) 

1452 return read_hex_string_from_stream(stream, forced_encoding) 

1453 if tok == b"[": 

1454 return ArrayObject.read_from_stream(stream, pdf, forced_encoding) 

1455 if tok in (b"t", b"f"): 

1456 return BooleanObject.read_from_stream(stream) 

1457 if tok == b"(": 

1458 return read_string_from_stream(stream, forced_encoding) 

1459 if tok == b"e" and stream.read(6) == b"endobj": 

1460 return NullObject() 

1461 if tok == b"n": 

1462 return NullObject.read_from_stream(stream) 

1463 if tok == b"%": 

1464 # comment 

1465 skip_over_comment(stream) 

1466 tok = read_non_whitespace(stream) 

1467 stream.seek(-1, 1) 

1468 return read_object(stream, pdf, forced_encoding) 

1469 if tok in b"0123456789+-.": 

1470 # number object OR indirect reference 

1471 peek = stream.read(20) 

1472 stream.seek(-len(peek), 1) # reset to start 

1473 if IndirectPattern.match(peek) is not None: 

1474 assert pdf is not None, "mypy" 

1475 return IndirectObject.read_from_stream(stream, pdf) 

1476 return NumberObject.read_from_stream(stream) 

1477 pos = stream.tell() 

1478 stream.seek(-20, 1) 

1479 stream_extract = stream.read(80) 

1480 stream.seek(pos) 

1481 read_until_whitespace(stream) 

1482 raise PdfReadError( 

1483 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}" 

1484 ) 

1485 

1486 

1487class Field(TreeObject): 

1488 """ 

1489 A class representing a field dictionary. 

1490 

1491 This class is accessed through 

1492 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1493 """ 

1494 

1495 def __init__(self, data: DictionaryObject) -> None: 

1496 DictionaryObject.__init__(self) 

1497 field_attributes = ( 

1498 FieldDictionaryAttributes.attributes() 

1499 + CheckboxRadioButtonAttributes.attributes() 

1500 ) 

1501 self.indirect_reference = data.indirect_reference 

1502 for attr in field_attributes: 

1503 try: 

1504 self[NameObject(attr)] = data[attr] 

1505 except KeyError: 

1506 pass 

1507 if isinstance(self.get("/V"), EncodedStreamObject): 

1508 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data() 

1509 if isinstance(d, bytes): 

1510 d_str = d.decode() 

1511 elif d is None: 

1512 d_str = "" 

1513 else: 

1514 raise Exception("Should never happen") 

1515 self[NameObject("/V")] = TextStringObject(d_str) 

1516 

1517 # TABLE 8.69 Entries common to all field dictionaries 

1518 @property 

1519 def field_type(self) -> Optional[NameObject]: 

1520 """Read-only property accessing the type of this field.""" 

1521 return self.get(FieldDictionaryAttributes.FT) 

1522 

1523 @property 

1524 def parent(self) -> Optional[DictionaryObject]: 

1525 """Read-only property accessing the parent of this field.""" 

1526 return self.get(FieldDictionaryAttributes.Parent) 

1527 

1528 @property 

1529 def kids(self) -> Optional["ArrayObject"]: 

1530 """Read-only property accessing the kids of this field.""" 

1531 return self.get(FieldDictionaryAttributes.Kids) 

1532 

1533 @property 

1534 def name(self) -> Optional[str]: 

1535 """Read-only property accessing the name of this field.""" 

1536 return self.get(FieldDictionaryAttributes.T) 

1537 

1538 @property 

1539 def alternate_name(self) -> Optional[str]: 

1540 """Read-only property accessing the alternate name of this field.""" 

1541 return self.get(FieldDictionaryAttributes.TU) 

1542 

1543 @property 

1544 def mapping_name(self) -> Optional[str]: 

1545 """ 

1546 Read-only property accessing the mapping name of this field. 

1547 

1548 This name is used by pypdf as a key in the dictionary returned by 

1549 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1550 """ 

1551 return self.get(FieldDictionaryAttributes.TM) 

1552 

1553 @property 

1554 def flags(self) -> Optional[int]: 

1555 """ 

1556 Read-only property accessing the field flags, specifying various 

1557 characteristics of the field (see Table 8.70 of the PDF 1.7 reference). 

1558 """ 

1559 return self.get(FieldDictionaryAttributes.Ff) 

1560 

1561 @property 

1562 def value(self) -> Optional[Any]: 

1563 """ 

1564 Read-only property accessing the value of this field. 

1565 

1566 Format varies based on field type. 

1567 """ 

1568 return self.get(FieldDictionaryAttributes.V) 

1569 

1570 @property 

1571 def default_value(self) -> Optional[Any]: 

1572 """Read-only property accessing the default value of this field.""" 

1573 return self.get(FieldDictionaryAttributes.DV) 

1574 

1575 @property 

1576 def additional_actions(self) -> Optional[DictionaryObject]: 

1577 """ 

1578 Read-only property accessing the additional actions dictionary. 

1579 

1580 This dictionary defines the field's behavior in response to trigger 

1581 events. See Section 8.5.2 of the PDF 1.7 reference. 

1582 """ 

1583 return self.get(FieldDictionaryAttributes.AA) 

1584 

1585 

1586class Destination(TreeObject): 

1587 """ 

1588 A class representing a destination within a PDF file. 

1589 

1590 See section 12.3.2 of the PDF 2.0 reference. 

1591 

1592 Args: 

1593 title: Title of this destination. 

1594 page: Reference to the page of this destination. Should 

1595 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`. 

1596 fit: How the destination is displayed. 

1597 

1598 Raises: 

1599 PdfReadError: If destination type is invalid. 

1600 

1601 """ 

1602 

1603 node: Optional[ 

1604 DictionaryObject 

1605 ] = None # node provide access to the original Object 

1606 

1607 def __init__( 

1608 self, 

1609 title: Union[str, bytes], 

1610 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject], 

1611 fit: Fit, 

1612 ) -> None: 

1613 self._filtered_children: list[Any] = [] # used in PdfWriter 

1614 

1615 typ = fit.fit_type 

1616 args = fit.fit_args 

1617 

1618 DictionaryObject.__init__(self) 

1619 self[NameObject("/Title")] = TextStringObject(title) 

1620 self[NameObject("/Page")] = page 

1621 self[NameObject("/Type")] = typ 

1622 

1623 # from table 8.2 of the PDF 1.7 reference. 

1624 if typ == "/XYZ": 

1625 if len(args) < 1: # left is missing : should never occur 

1626 args.append(NumberObject(0.0)) 

1627 if len(args) < 2: # top is missing 

1628 args.append(NumberObject(0.0)) 

1629 if len(args) < 3: # zoom is missing 

1630 args.append(NumberObject(0.0)) 

1631 ( 

1632 self[NameObject(TA.LEFT)], 

1633 self[NameObject(TA.TOP)], 

1634 self[NameObject("/Zoom")], 

1635 ) = args 

1636 elif len(args) == 0: 

1637 pass 

1638 elif typ == TF.FIT_R: 

1639 ( 

1640 self[NameObject(TA.LEFT)], 

1641 self[NameObject(TA.BOTTOM)], 

1642 self[NameObject(TA.RIGHT)], 

1643 self[NameObject(TA.TOP)], 

1644 ) = args 

1645 elif typ in [TF.FIT_H, TF.FIT_BH]: 

1646 try: # Prefer to be more robust not only to null parameters 

1647 (self[NameObject(TA.TOP)],) = args 

1648 except Exception: 

1649 (self[NameObject(TA.TOP)],) = (NullObject(),) 

1650 elif typ in [TF.FIT_V, TF.FIT_BV]: 

1651 try: # Prefer to be more robust not only to null parameters 

1652 (self[NameObject(TA.LEFT)],) = args 

1653 except Exception: 

1654 (self[NameObject(TA.LEFT)],) = (NullObject(),) 

1655 elif typ in [TF.FIT, TF.FIT_B]: 

1656 pass 

1657 else: 

1658 raise PdfReadError(f"Unknown Destination Type: {typ!r}") 

1659 

1660 @property 

1661 def dest_array(self) -> "ArrayObject": 

1662 return ArrayObject( 

1663 [self.raw_get("/Page"), self["/Type"]] 

1664 + [ 

1665 self[x] 

1666 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"] 

1667 if x in self 

1668 ] 

1669 ) 

1670 

1671 def write_to_stream( 

1672 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1673 ) -> None: 

1674 if encryption_key is not None: # deprecated 

1675 deprecation_no_replacement( 

1676 "the encryption_key parameter of write_to_stream", "5.0.0" 

1677 ) 

1678 stream.write(b"<<\n") 

1679 key = NameObject("/D") 

1680 key.write_to_stream(stream) 

1681 stream.write(b" ") 

1682 value = self.dest_array 

1683 value.write_to_stream(stream) 

1684 

1685 key = NameObject("/S") 

1686 key.write_to_stream(stream) 

1687 stream.write(b" ") 

1688 value_s = NameObject("/GoTo") 

1689 value_s.write_to_stream(stream) 

1690 

1691 stream.write(b"\n") 

1692 stream.write(b">>") 

1693 

1694 @property 

1695 def title(self) -> Optional[str]: 

1696 """Read-only property accessing the destination title.""" 

1697 return self.get("/Title") 

1698 

1699 @property 

1700 def page(self) -> Optional[IndirectObject]: 

1701 """Read-only property accessing the IndirectObject of the destination page.""" 

1702 return self.get("/Page") 

1703 

1704 @property 

1705 def typ(self) -> Optional[str]: 

1706 """Read-only property accessing the destination type.""" 

1707 return self.get("/Type") 

1708 

1709 @property 

1710 def zoom(self) -> Optional[int]: 

1711 """Read-only property accessing the zoom factor.""" 

1712 return self.get("/Zoom", None) 

1713 

1714 @property 

1715 def left(self) -> Optional[FloatObject]: 

1716 """Read-only property accessing the left horizontal coordinate.""" 

1717 return self.get("/Left", None) 

1718 

1719 @property 

1720 def right(self) -> Optional[FloatObject]: 

1721 """Read-only property accessing the right horizontal coordinate.""" 

1722 return self.get("/Right", None) 

1723 

1724 @property 

1725 def top(self) -> Optional[FloatObject]: 

1726 """Read-only property accessing the top vertical coordinate.""" 

1727 return self.get("/Top", None) 

1728 

1729 @property 

1730 def bottom(self) -> Optional[FloatObject]: 

1731 """Read-only property accessing the bottom vertical coordinate.""" 

1732 return self.get("/Bottom", None) 

1733 

1734 @property 

1735 def color(self) -> Optional["ArrayObject"]: 

1736 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0.""" 

1737 return self.get( 

1738 "/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)]) 

1739 ) 

1740 

1741 @property 

1742 def font_format(self) -> Optional[OutlineFontFlag]: 

1743 """ 

1744 Read-only property accessing the font type. 

1745 

1746 1=italic, 2=bold, 3=both 

1747 """ 

1748 return self.get("/F", 0) 

1749 

1750 @property 

1751 def outline_count(self) -> Optional[int]: 

1752 """ 

1753 Read-only property accessing the outline count. 

1754 

1755 positive = expanded 

1756 negative = collapsed 

1757 absolute value = number of visible descendants at all levels 

1758 """ 

1759 return self.get("/Count", None)