Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/generic/_data_structures.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

983 statements  

1# Copyright (c) 2006, Mathieu Fenniak 

2# All rights reserved. 

3# 

4# Redistribution and use in source and binary forms, with or without 

5# modification, are permitted provided that the following conditions are 

6# met: 

7# 

8# * Redistributions of source code must retain the above copyright notice, 

9# this list of conditions and the following disclaimer. 

10# * Redistributions in binary form must reproduce the above copyright notice, 

11# this list of conditions and the following disclaimer in the documentation 

12# and/or other materials provided with the distribution. 

13# * The name of the author may not be used to endorse or promote products 

14# derived from this software without specific prior written permission. 

15# 

16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 

19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 

20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 

22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 

23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 

24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 

25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 

26# POSSIBILITY OF SUCH DAMAGE. 

27 

28 

29__author__ = "Mathieu Fenniak" 

30__author_email__ = "biziqe@mathieu.fenniak.net" 

31 

32import logging 

33import re 

34import sys 

35from collections.abc import Iterable, Sequence 

36from io import BytesIO 

37from math import ceil 

38from typing import ( 

39 Any, 

40 Callable, 

41 Optional, 

42 Union, 

43 cast, 

44) 

45 

46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol 

47from .._utils import ( 

48 WHITESPACES, 

49 BinaryStreamType, 

50 StreamType, 

51 deprecation_no_replacement, 

52 logger_warning, 

53 read_non_whitespace, 

54 read_until_regex, 

55 read_until_whitespace, 

56 skip_over_comment, 

57) 

58from ..constants import ( 

59 CheckboxRadioButtonAttributes, 

60 FieldDictionaryAttributes, 

61 OutlineFontFlag, 

62 StreamAttributes, 

63) 

64from ..constants import FilterTypes as FT 

65from ..constants import TypArguments as TA 

66from ..constants import TypFitArguments as TF 

67from ..errors import STREAM_TRUNCATED_PREMATURELY, LimitReachedError, PdfReadError, PdfStreamError 

68from ._base import ( 

69 BooleanObject, 

70 ByteStringObject, 

71 FloatObject, 

72 IndirectObject, 

73 NameObject, 

74 NullObject, 

75 NumberObject, 

76 PdfObject, 

77 TextStringObject, 

78 is_null_or_none, 

79) 

80from ._fit import Fit 

81from ._image_inline import ( 

82 extract_inline__ascii85_decode, 

83 extract_inline__ascii_hex_decode, 

84 extract_inline__dct_decode, 

85 extract_inline__run_length_decode, 

86 extract_inline_default, 

87) 

88from ._utils import read_hex_string_from_stream, read_string_from_stream 

89 

90if sys.version_info >= (3, 11): 

91 from typing import Self 

92else: 

93 from typing_extensions import Self 

94 

95logger = logging.getLogger(__name__) 

96 

97IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]") 

98 

99 

100class ArrayObject(list[Any], PdfObject): 

101 def replicate( 

102 self, 

103 pdf_dest: PdfWriterProtocol, 

104 ) -> "ArrayObject": 

105 arr = cast( 

106 "ArrayObject", 

107 self._reference_clone(ArrayObject(), pdf_dest, False), 

108 ) 

109 for data in self: 

110 if hasattr(data, "replicate"): 

111 arr.append(data.replicate(pdf_dest)) 

112 else: 

113 arr.append(data) 

114 return arr 

115 

116 def clone( 

117 self, 

118 pdf_dest: PdfWriterProtocol, 

119 force_duplicate: bool = False, 

120 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

121 ) -> "ArrayObject": 

122 """Clone object into pdf_dest.""" 

123 try: 

124 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore[union-attr] 

125 return self 

126 except Exception: 

127 pass 

128 arr = cast( 

129 "ArrayObject", 

130 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate=True), 

131 ) 

132 for data in self: 

133 if isinstance(data, StreamObject): 

134 dup = data._reference_clone( 

135 data.clone(pdf_dest, force_duplicate, ignore_fields), 

136 pdf_dest, 

137 force_duplicate, 

138 ) 

139 arr.append(dup.indirect_reference) 

140 elif isinstance(data, IndirectObject) and isinstance(resolved := data.get_object(), StreamObject): 

141 dup = data._reference_clone( 

142 resolved.clone(pdf_dest, force_duplicate=True, ignore_fields=ignore_fields), 

143 pdf_dest, 

144 force_duplicate, 

145 ) 

146 arr.append(dup.indirect_reference) 

147 elif hasattr(data, "clone"): 

148 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields)) 

149 else: 

150 arr.append(data) 

151 return arr 

152 

153 def hash_bin(self) -> int: 

154 """ 

155 Used to detect modified object. 

156 

157 Returns: 

158 Hash considering type and value. 

159 

160 """ 

161 return hash((self.__class__, tuple(x.hash_bin() for x in self))) 

162 

163 def items(self) -> Iterable[Any]: 

164 """Emulate DictionaryObject.items for a list (index, object).""" 

165 return enumerate(self) 

166 

167 def _to_lst(self, lst: Any) -> list[Any]: 

168 # Convert to list, internal 

169 result: list[Any] 

170 if isinstance(lst, (list, tuple, set)): 

171 result = list(lst) 

172 elif isinstance(lst, PdfObject): 

173 result = [lst] 

174 elif isinstance(lst, str): 

175 if lst[0] == "/": 

176 result = [NameObject(lst)] 

177 else: 

178 result = [TextStringObject(lst)] 

179 elif isinstance(lst, bytes): 

180 result = [ByteStringObject(lst)] 

181 else: # for numbers,... 

182 result = [lst] 

183 return result 

184 

185 def __add__(self, lst: Any) -> "ArrayObject": 

186 """ 

187 Allow extension by adding list or add one element only 

188 

189 Args: 

190 lst: any list, tuples are extended the list. 

191 other types(numbers,...) will be appended. 

192 if str is passed it will be converted into TextStringObject 

193 or NameObject (if starting with "/") 

194 if bytes is passed it will be converted into ByteStringObject 

195 

196 Returns: 

197 ArrayObject with all elements 

198 

199 """ 

200 temp = ArrayObject(self) 

201 temp.extend(self._to_lst(lst)) 

202 return temp 

203 

204 def __iadd__(self, lst: Any) -> Self: 

205 """ 

206 Allow extension by adding list or add one element only 

207 

208 Args: 

209 lst: any list, tuples are extended the list. 

210 other types(numbers,...) will be appended. 

211 if str is passed it will be converted into TextStringObject 

212 or NameObject (if starting with "/") 

213 if bytes is passed it will be converted into ByteStringObject 

214 

215 """ 

216 self.extend(self._to_lst(lst)) 

217 return self 

218 

219 def __isub__(self, lst: Any) -> Self: 

220 """Allow to remove items""" 

221 for x in self._to_lst(lst): 

222 try: 

223 index = self.index(x) 

224 del self[index] 

225 except ValueError: 

226 pass 

227 return self 

228 

229 def write_to_stream( 

230 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

231 ) -> None: 

232 if encryption_key is not None: # deprecated 

233 deprecation_no_replacement( 

234 "the encryption_key parameter of write_to_stream", "5.0.0" 

235 ) 

236 stream.write(b"[") 

237 for data in self: 

238 stream.write(b" ") 

239 data.write_to_stream(stream) 

240 stream.write(b" ]") 

241 

242 @staticmethod 

243 def read_from_stream( 

244 stream: StreamType, 

245 pdf: Optional[PdfReaderProtocol], 

246 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

247 ) -> "ArrayObject": 

248 arr = ArrayObject() 

249 tmp = stream.read(1) 

250 if tmp != b"[": 

251 raise PdfReadError("Could not read array") 

252 while True: 

253 # skip leading whitespace 

254 tok = stream.read(1) 

255 while tok.isspace(): 

256 tok = stream.read(1) 

257 if tok == b"": 

258 break 

259 if tok == b"%": 

260 stream.seek(-1, 1) 

261 skip_over_comment(stream) 

262 continue 

263 stream.seek(-1, 1) 

264 # check for array ending 

265 peek_ahead = stream.read(1) 

266 if peek_ahead == b"]": 

267 break 

268 stream.seek(-1, 1) 

269 # read and append object 

270 arr.append(read_object(stream, pdf, forced_encoding)) 

271 return arr 

272 

273 

274class DictionaryObject(dict[Any, Any], PdfObject): 

275 def replicate( 

276 self, 

277 pdf_dest: PdfWriterProtocol, 

278 ) -> "DictionaryObject": 

279 d__ = cast( 

280 "DictionaryObject", 

281 self._reference_clone(self.__class__(), pdf_dest, False), 

282 ) 

283 for k, v in self.items(): 

284 d__[k.replicate(pdf_dest)] = ( 

285 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

286 ) 

287 return d__ 

288 

289 def clone( 

290 self, 

291 pdf_dest: PdfWriterProtocol, 

292 force_duplicate: bool = False, 

293 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

294 ) -> "DictionaryObject": 

295 """Clone object into pdf_dest.""" 

296 try: 

297 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore[union-attr] 

298 return self 

299 except Exception: 

300 pass 

301 

302 visited: set[tuple[int, int]] = set() # (idnum, generation) 

303 d__ = cast( 

304 "DictionaryObject", 

305 self._reference_clone(self.__class__(), pdf_dest, force_duplicate), 

306 ) 

307 if ignore_fields is None: 

308 ignore_fields = [] 

309 if len(d__.keys()) == 0: 

310 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

311 return d__ 

312 

313 def _clone( 

314 self, 

315 src: "DictionaryObject", 

316 pdf_dest: PdfWriterProtocol, 

317 force_duplicate: bool, 

318 ignore_fields: Optional[Sequence[Union[str, int]]], 

319 visited: set[tuple[int, int]], # (idnum, generation) 

320 ) -> None: 

321 """ 

322 Update the object from src. 

323 

324 Args: 

325 src: "DictionaryObject": 

326 pdf_dest: 

327 force_duplicate: 

328 ignore_fields: 

329 

330 """ 

331 # First we remove the ignore_fields 

332 # that are for a limited number of levels 

333 assert ignore_fields is not None 

334 ignore_fields = list(ignore_fields) 

335 x = 0 

336 while x < len(ignore_fields): 

337 if isinstance(ignore_fields[x], int): 

338 if cast(int, ignore_fields[x]) <= 0: 

339 del ignore_fields[x] 

340 del ignore_fields[x] 

341 continue 

342 ignore_fields[x] -= 1 # type:ignore 

343 x += 1 

344 # Check if this is a chain list, we need to loop to prevent recur 

345 if any( 

346 field not in ignore_fields 

347 and field in src 

348 and isinstance(src.raw_get(field), IndirectObject) 

349 and isinstance(src[field], DictionaryObject) 

350 and ( 

351 src.get("/Type", None) is None 

352 or cast(DictionaryObject, src[field]).get("/Type", None) is None 

353 or src.get("/Type", None) 

354 == cast(DictionaryObject, src[field]).get("/Type", None) 

355 ) 

356 for field in ["/Next", "/Prev", "/N", "/V"] 

357 ): 

358 ignore_fields = list(ignore_fields) 

359 for lst in (("/Next", "/Prev"), ("/N", "/V")): 

360 for k in lst: 

361 objs = [] 

362 if ( 

363 k in src 

364 and k not in self 

365 and isinstance(src.raw_get(k), IndirectObject) 

366 and isinstance(src[k], DictionaryObject) 

367 # If need to go further the idea is to check 

368 # that the types are the same 

369 and ( 

370 src.get("/Type", None) is None 

371 or cast(DictionaryObject, src[k]).get("/Type", None) is None 

372 or src.get("/Type", None) 

373 == cast(DictionaryObject, src[k]).get("/Type", None) 

374 ) 

375 ): 

376 cur_obj: Optional[DictionaryObject] = cast( 

377 "DictionaryObject", src[k] 

378 ) 

379 prev_obj: Optional[DictionaryObject] = self 

380 while cur_obj is not None: 

381 clon = cast( 

382 "DictionaryObject", 

383 cur_obj._reference_clone( 

384 cur_obj.__class__(), pdf_dest, force_duplicate 

385 ), 

386 ) 

387 # Check to see if we've previously processed our item 

388 if clon.indirect_reference is not None: 

389 idnum = clon.indirect_reference.idnum 

390 generation = clon.indirect_reference.generation 

391 if (idnum, generation) in visited: 

392 cur_obj = None 

393 break 

394 visited.add((idnum, generation)) 

395 objs.append((cur_obj, clon)) 

396 assert prev_obj is not None 

397 prev_obj[NameObject(k)] = clon.indirect_reference 

398 prev_obj = clon 

399 try: 

400 if cur_obj == src: 

401 cur_obj = None 

402 else: 

403 cur_obj = cast("DictionaryObject", cur_obj[k]) 

404 except Exception: 

405 cur_obj = None 

406 for s, c in objs: 

407 c._clone( 

408 s, pdf_dest, force_duplicate, ignore_fields, visited 

409 ) 

410 

411 for k, v in src.items(): 

412 if k not in ignore_fields: 

413 if isinstance(v, StreamObject): 

414 if not hasattr(v, "indirect_reference"): 

415 v.indirect_reference = None 

416 vv = v.clone(pdf_dest, force_duplicate, ignore_fields) 

417 assert vv.indirect_reference is not None 

418 self[k.clone(pdf_dest)] = vv.indirect_reference 

419 elif k not in self: 

420 self[NameObject(k)] = ( 

421 v.clone(pdf_dest, force_duplicate, ignore_fields) 

422 if hasattr(v, "clone") 

423 else v 

424 ) 

425 

426 def hash_bin(self) -> int: 

427 """ 

428 Used to detect modified object. 

429 

430 Returns: 

431 Hash considering type and value. 

432 

433 """ 

434 return hash( 

435 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items()))) 

436 ) 

437 

438 def raw_get(self, key: Any) -> Any: 

439 return dict.__getitem__(self, key) 

440 

441 def get_inherited(self, key: str, default: Any = None) -> Any: 

442 """ 

443 Returns the value of a key or from the parent if not found. 

444 If not found returns default. 

445 

446 Args: 

447 key: string identifying the field to return 

448 

449 default: default value to return 

450 

451 Returns: 

452 Current key or inherited one, otherwise default value. 

453 

454 """ 

455 current = self 

456 visited: set[int] = set() 

457 

458 while True: 

459 # Detect cyclic parent references 

460 obj_id = id(current) 

461 if obj_id in visited: 

462 raise LimitReachedError(f"Detected cycle in /Parent hierarchy when retrieving value for key {key!r}.") 

463 visited.add(obj_id) 

464 

465 if key in current: 

466 return current[key] 

467 

468 if "/Parent" not in current: 

469 return default 

470 

471 # Walk upward 

472 current = cast( 

473 "DictionaryObject", 

474 current["/Parent"].get_object(), 

475 ) 

476 

477 def __setitem__(self, key: Any, value: Any) -> Any: 

478 if not isinstance(key, PdfObject): 

479 raise ValueError("Key must be a PdfObject") 

480 if not isinstance(value, PdfObject): 

481 raise ValueError("Value must be a PdfObject") 

482 return dict.__setitem__(self, key, value) 

483 

484 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any: 

485 if not isinstance(key, PdfObject): 

486 raise ValueError("Key must be a PdfObject") 

487 if not isinstance(value, PdfObject): 

488 raise ValueError("Value must be a PdfObject") 

489 return dict.setdefault(self, key, value) 

490 

491 def __getitem__(self, key: Any) -> PdfObject: 

492 return cast(PdfObject, dict.__getitem__(self, key).get_object()) 

493 

494 @property 

495 def xmp_metadata(self) -> Optional[XmpInformationProtocol]: 

496 """ 

497 Retrieve XMP (Extensible Metadata Platform) data relevant to this 

498 object, if available. 

499 

500 See Table 347 — Additional entries in a metadata stream dictionary. 

501 

502 Returns: 

503 Returns a :class:`~pypdf.xmp.XmpInformation` instance 

504 that can be used to access XMP metadata from the document. Can also 

505 return None if no metadata was found on the document root. 

506 

507 """ 

508 from ..xmp import XmpInformation # noqa: PLC0415 

509 

510 metadata = self.get("/Metadata", None) 

511 if is_null_or_none(metadata): 

512 return None 

513 assert metadata is not None, "mypy" 

514 metadata = metadata.get_object() 

515 return XmpInformation(metadata) 

516 

517 def write_to_stream( 

518 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

519 ) -> None: 

520 if encryption_key is not None: # deprecated 

521 deprecation_no_replacement( 

522 "the encryption_key parameter of write_to_stream", "5.0.0" 

523 ) 

524 stream.write(b"<<\n") 

525 for key, value in self.items(): 

526 if len(key) > 2 and key[1] == "%" and key[-1] == "%": 

527 continue 

528 key.write_to_stream(stream, encryption_key) 

529 stream.write(b" ") 

530 value.write_to_stream(stream) 

531 stream.write(b"\n") 

532 stream.write(b">>") 

533 

534 @classmethod 

535 def _get_next_object_position( 

536 cls, position_before: int, position_end: int, generations: list[int], pdf: PdfReaderProtocol 

537 ) -> int: 

538 out = position_end 

539 for generation in generations: 

540 location = pdf.xref[generation] 

541 values = [x for x in location.values() if position_before < x <= position_end] 

542 if values: 

543 out = min(out, *values) 

544 return out 

545 

546 @classmethod 

547 def _read_unsized_from_stream( 

548 cls, stream: BinaryStreamType, pdf: PdfReaderProtocol 

549 ) -> bytes: 

550 object_position = cls._get_next_object_position( 

551 position_before=stream.tell(), position_end=2 ** 32, generations=list(pdf.xref), pdf=pdf 

552 ) - 1 

553 current_position = stream.tell() 

554 # Read until the next object position. 

555 read_value = stream.read(object_position - stream.tell()) 

556 endstream_position = read_value.find(b"endstream") 

557 if endstream_position < 0: 

558 raise PdfReadError( 

559 f"Unable to find 'endstream' marker for obj starting at {current_position}." 

560 ) 

561 # 9 = len(b"endstream") 

562 stream.seek(current_position + endstream_position + 9) 

563 return read_value[: endstream_position - 1] 

564 

565 @staticmethod 

566 def read_from_stream( 

567 stream: StreamType, 

568 pdf: Optional[PdfReaderProtocol], 

569 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

570 ) -> "DictionaryObject": 

571 tmp = stream.read(2) 

572 if tmp != b"<<": 

573 raise PdfReadError( 

574 f"Dictionary read error at byte {hex(stream.tell())}: " 

575 "stream must begin with '<<'" 

576 ) 

577 data: dict[Any, Any] = {} 

578 while True: 

579 tok = read_non_whitespace(stream) 

580 if tok == b"\x00": 

581 continue 

582 if tok == b"%": 

583 stream.seek(-1, 1) 

584 skip_over_comment(stream) 

585 continue 

586 if not tok: 

587 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) 

588 

589 if tok == b">": 

590 stream.read(1) 

591 break 

592 stream.seek(-1, 1) 

593 try: 

594 try: 

595 key = read_object(stream, pdf) 

596 if isinstance(key, NullObject): 

597 break 

598 if not isinstance(key, NameObject): 

599 raise PdfReadError( 

600 f"Expecting a NameObject for key but found {key!r}" 

601 ) 

602 except PdfReadError as exc: 

603 if pdf is not None and pdf.strict: 

604 raise 

605 logger_warning("%(exception)r", source=__name__, exception=exc) 

606 continue 

607 tok = read_non_whitespace(stream) 

608 stream.seek(-1, 1) 

609 value = read_object(stream, pdf, forced_encoding) 

610 except (RecursionError, LimitReachedError) as exc: 

611 raise PdfReadError(exc.__repr__()) 

612 except Exception as exc: 

613 if pdf is not None and pdf.strict: 

614 raise PdfReadError(exc.__repr__()) 

615 logger_warning("%(exception)r", source=__name__, exception=exc) 

616 retval = DictionaryObject() 

617 retval.update(data) 

618 return retval # return partial data 

619 

620 if not data.get(key): 

621 data[key] = value 

622 else: 

623 # multiple definitions of key not permitted 

624 msg = ( 

625 "Multiple definitions in dictionary at byte " 

626 "%(position)s for key %(key)s" 

627 ) 

628 values = {"position": hex(stream.tell()), "key": key} 

629 if pdf is not None and pdf.strict: 

630 raise PdfReadError(msg % values) 

631 logger_warning(msg, source=__name__, **values) 

632 

633 pos = stream.tell() 

634 s = read_non_whitespace(stream) 

635 if s == b"s" and stream.read(5) == b"tream": 

636 eol = stream.read(1) 

637 # Occasional PDF file output has spaces after 'stream' keyword but before EOL. 

638 # patch provided by Danial Sandler 

639 while eol == b" ": 

640 eol = stream.read(1) 

641 if eol not in (b"\n", b"\r"): 

642 raise PdfStreamError("Stream data must be followed by a newline") 

643 if eol == b"\r" and stream.read(1) != b"\n": 

644 stream.seek(-1, 1) 

645 # this is a stream object, not a dictionary 

646 if StreamAttributes.LENGTH not in data: 

647 if pdf is not None and pdf.strict: 

648 raise PdfStreamError("Stream length not defined") 

649 logger_warning( 

650 "Stream length not defined @pos=%(position)d", 

651 source=__name__, 

652 position=stream.tell(), 

653 ) 

654 data[NameObject(StreamAttributes.LENGTH)] = NumberObject(-1) 

655 length = data[StreamAttributes.LENGTH] 

656 if isinstance(length, IndirectObject): 

657 t = stream.tell() 

658 assert pdf is not None, "mypy" 

659 length = pdf.get_object(length) 

660 stream.seek(t, 0) 

661 if length is None: # if the PDF is damaged 

662 length = -1 

663 pstart = stream.tell() 

664 if length >= 0: 

665 from ..filters import MAX_DECLARED_STREAM_LENGTH # noqa: PLC0415 

666 if length > MAX_DECLARED_STREAM_LENGTH: 

667 raise LimitReachedError(f"Declared stream length of {length} exceeds maximum allowed length.") 

668 

669 data["__streamdata__"] = stream.read(length) 

670 else: 

671 data["__streamdata__"] = read_until_regex( 

672 stream, re.compile(b"endstream") 

673 ) 

674 e = read_non_whitespace(stream) 

675 ndstream = stream.read(8) 

676 if (e + ndstream) != b"endstream": 

677 # the odd PDF file has a length that is too long, so 

678 # we need to read backwards to find the "endstream" ending. 

679 # ReportLab (unknown version) generates files with this bug, 

680 # and Python users into PDF files tend to be our audience. 

681 # we need to do this to correct the streamdata and chop off 

682 # an extra character. 

683 pos = stream.tell() 

684 stream.seek(-10, 1) 

685 end = stream.read(9) 

686 if end == b"endstream": 

687 # we found it by looking back one character further. 

688 data["__streamdata__"] = data["__streamdata__"][:-1] 

689 elif pdf is not None and not pdf.strict: 

690 stream.seek(pstart, 0) 

691 data["__streamdata__"] = DictionaryObject._read_unsized_from_stream(stream, pdf) 

692 pos = stream.tell() 

693 else: 

694 stream.seek(pos, 0) 

695 raise PdfReadError( 

696 "Unable to find 'endstream' marker after stream at byte " 

697 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')." 

698 ) 

699 else: 

700 stream.seek(pos, 0) 

701 if "__streamdata__" in data: 

702 return StreamObject.initialize_from_dictionary(data) 

703 retval = DictionaryObject() 

704 retval.update(data) 

705 return retval 

706 

707 

708class TreeObject(DictionaryObject): 

709 def __init__(self, dct: Optional[DictionaryObject] = None) -> None: 

710 DictionaryObject.__init__(self) 

711 if dct: 

712 self.update(dct) 

713 

714 def has_children(self) -> bool: 

715 return "/First" in self 

716 

717 def __iter__(self) -> Any: 

718 return self.children() 

719 

720 def children(self) -> Iterable[Any]: 

721 if not self.has_children(): 

722 return 

723 

724 child_ref = self[NameObject("/First")] 

725 last = self[NameObject("/Last")] 

726 child = child_ref.get_object() 

727 visited: set[int] = set() 

728 while True: 

729 child_id = id(child) 

730 if child_id in visited: 

731 logger_warning("Detected cycle in outline structure for %(child)s", source=__name__, child=child) 

732 return 

733 visited.add(child_id) 

734 

735 yield child 

736 

737 if child == last: 

738 return 

739 child_ref = child.get(NameObject("/Next")) # type: ignore[union-attr] 

740 if is_null_or_none(child_ref): 

741 return 

742 child = child_ref.get_object() 

743 

744 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None: 

745 self.insert_child(child, None, pdf) 

746 

747 def inc_parent_counter_default( 

748 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

749 ) -> None: 

750 if is_null_or_none(parent): 

751 return 

752 assert parent is not None, "mypy" 

753 parent = cast("TreeObject", parent.get_object()) 

754 if "/Count" in parent: 

755 parent[NameObject("/Count")] = NumberObject( 

756 max(0, cast(int, parent[NameObject("/Count")]) + n) 

757 ) 

758 self.inc_parent_counter_default(parent.get("/Parent", None), n) 

759 

760 def inc_parent_counter_outline( 

761 self, parent: Union[None, IndirectObject, "TreeObject"], n: int 

762 ) -> None: 

763 if is_null_or_none(parent): 

764 return 

765 assert parent is not None, "mypy" 

766 parent = cast("TreeObject", parent.get_object()) 

767 # BooleanObject requires comparison with == not is 

768 opn = parent.get("/%is_open%", True) == True # noqa: E712 

769 c = cast(int, parent.get("/Count", 0)) 

770 if c < 0: 

771 c = abs(c) 

772 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1)) 

773 if not opn: 

774 return 

775 self.inc_parent_counter_outline(parent.get("/Parent", None), n) 

776 

777 def insert_child( 

778 self, 

779 child: Any, 

780 before: Any, 

781 pdf: PdfWriterProtocol, 

782 inc_parent_counter: Optional[Callable[..., Any]] = None, 

783 ) -> IndirectObject: 

784 if inc_parent_counter is None: 

785 inc_parent_counter = self.inc_parent_counter_default 

786 child_obj = child.get_object() 

787 assert child.indirect_reference is not None, "mypy" 

788 child_reference: IndirectObject = child.indirect_reference 

789 

790 prev: Optional[DictionaryObject] 

791 if "/First" not in self: # no child yet 

792 self[NameObject("/First")] = child_reference 

793 self[NameObject("/Count")] = NumberObject(0) 

794 self[NameObject("/Last")] = child_reference 

795 child_obj[NameObject("/Parent")] = self.indirect_reference 

796 inc_parent_counter(self, child_obj.get("/Count", 1)) 

797 if "/Next" in child_obj: 

798 del child_obj["/Next"] 

799 if "/Prev" in child_obj: 

800 del child_obj["/Prev"] 

801 return child_reference 

802 prev = cast("DictionaryObject", self["/Last"]) 

803 

804 while prev.indirect_reference != before: 

805 if "/Next" in prev: 

806 prev = cast("TreeObject", prev["/Next"]) 

807 else: # append at the end 

808 prev[NameObject("/Next")] = cast("TreeObject", child_reference) 

809 child_obj[NameObject("/Prev")] = prev.indirect_reference 

810 child_obj[NameObject("/Parent")] = self.indirect_reference 

811 if "/Next" in child_obj: 

812 del child_obj["/Next"] 

813 self[NameObject("/Last")] = child_reference 

814 inc_parent_counter(self, child_obj.get("/Count", 1)) 

815 return child_reference 

816 try: # insert as first or in the middle 

817 assert isinstance(prev["/Prev"], DictionaryObject) 

818 prev["/Prev"][NameObject("/Next")] = child_reference 

819 child_obj[NameObject("/Prev")] = prev["/Prev"] 

820 except Exception: # it means we are inserting in first position 

821 child_obj.pop("/Next", None) 

822 child_obj[NameObject("/Next")] = prev 

823 prev[NameObject("/Prev")] = child_reference 

824 child_obj[NameObject("/Parent")] = self.indirect_reference 

825 inc_parent_counter(self, child_obj.get("/Count", 1)) 

826 return child_reference 

827 

828 def _remove_node_from_tree( 

829 self, prev: Any, prev_ref: Any, cur: Any, last: Any 

830 ) -> None: 

831 """ 

832 Adjust the pointers of the linked list and tree node count. 

833 

834 Args: 

835 prev: 

836 prev_ref: 

837 cur: 

838 last: 

839 

840 """ 

841 next_ref = cur.get(NameObject("/Next"), None) 

842 if prev is None: 

843 if next_ref: 

844 # Removing first tree node 

845 next_obj = next_ref.get_object() 

846 del next_obj[NameObject("/Prev")] 

847 self[NameObject("/First")] = next_ref 

848 self[NameObject("/Count")] = NumberObject( 

849 self[NameObject("/Count")] - 1 # type: ignore[operator] 

850 ) 

851 

852 else: 

853 # Removing only tree node 

854 self[NameObject("/Count")] = NumberObject(0) 

855 del self[NameObject("/First")] 

856 if NameObject("/Last") in self: 

857 del self[NameObject("/Last")] 

858 else: 

859 if next_ref: 

860 # Removing middle tree node 

861 next_obj = next_ref.get_object() 

862 next_obj[NameObject("/Prev")] = prev_ref 

863 prev[NameObject("/Next")] = next_ref 

864 else: 

865 # Removing last tree node 

866 assert cur == last 

867 del prev[NameObject("/Next")] 

868 self[NameObject("/Last")] = prev_ref 

869 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore[operator] 

870 

871 def remove_child(self, child: Any) -> None: 

872 child_obj = child.get_object() 

873 child = child_obj.indirect_reference 

874 

875 if NameObject("/Parent") not in child_obj: 

876 raise ValueError("Removed child does not appear to be a tree item") 

877 if child_obj[NameObject("/Parent")] != self: 

878 raise ValueError("Removed child is not a member of this tree") 

879 

880 found = False 

881 prev_ref = None 

882 prev = None 

883 cur_ref: Optional[Any] = self[NameObject("/First")] 

884 cur: Optional[dict[str, Any]] = cur_ref.get_object() # type: ignore[union-attr] 

885 last_ref = self[NameObject("/Last")] 

886 last = last_ref.get_object() 

887 while cur is not None: 

888 if cur == child_obj: 

889 self._remove_node_from_tree(prev, prev_ref, cur, last) 

890 found = True 

891 break 

892 

893 # Go to the next node 

894 prev_ref = cur_ref 

895 prev = cur 

896 if NameObject("/Next") in cur: 

897 cur_ref = cur[NameObject("/Next")] 

898 cur = cur_ref.get_object() 

899 else: 

900 cur_ref = None 

901 cur = None 

902 

903 if not found: 

904 raise ValueError("Removal couldn't find item in tree") 

905 

906 _reset_node_tree_relationship(child_obj) 

907 

908 def remove_from_tree(self) -> None: 

909 """Remove the object from the tree it is in.""" 

910 if NameObject("/Parent") not in self: 

911 raise ValueError("Removed child does not appear to be a tree item") 

912 cast("TreeObject", self["/Parent"]).remove_child(self) 

913 

914 def empty_tree(self) -> None: 

915 for child in self: 

916 child_obj = child.get_object() 

917 _reset_node_tree_relationship(child_obj) 

918 

919 if NameObject("/Count") in self: 

920 del self[NameObject("/Count")] 

921 if NameObject("/First") in self: 

922 del self[NameObject("/First")] 

923 if NameObject("/Last") in self: 

924 del self[NameObject("/Last")] 

925 

926 

927def _reset_node_tree_relationship(child_obj: Any) -> None: 

928 """ 

929 Call this after a node has been removed from a tree. 

930 

931 This resets the nodes attributes in respect to that tree. 

932 

933 Args: 

934 child_obj: 

935 

936 """ 

937 del child_obj[NameObject("/Parent")] 

938 if NameObject("/Next") in child_obj: 

939 del child_obj[NameObject("/Next")] 

940 if NameObject("/Prev") in child_obj: 

941 del child_obj[NameObject("/Prev")] 

942 

943 

944class StreamObject(DictionaryObject): 

945 def __init__(self) -> None: 

946 self._data: bytes = b"" 

947 self.decoded_self: Optional[DecodedStreamObject] = None 

948 

949 def replicate( 

950 self, 

951 pdf_dest: PdfWriterProtocol, 

952 ) -> "StreamObject": 

953 d__ = cast( 

954 "StreamObject", 

955 self._reference_clone(self.__class__(), pdf_dest, False), 

956 ) 

957 d__._data = self._data 

958 try: 

959 decoded_self = self.decoded_self 

960 if decoded_self is None: 

961 self.decoded_self = None 

962 else: 

963 self.decoded_self = cast( 

964 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

965 ) 

966 except Exception: 

967 pass 

968 for k, v in self.items(): 

969 d__[k.replicate(pdf_dest)] = ( 

970 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

971 ) 

972 return d__ 

973 

974 def _clone( 

975 self, 

976 src: DictionaryObject, 

977 pdf_dest: PdfWriterProtocol, 

978 force_duplicate: bool, 

979 ignore_fields: Optional[Sequence[Union[str, int]]], 

980 visited: set[tuple[int, int]], 

981 ) -> None: 

982 """ 

983 Update the object from src. 

984 

985 Args: 

986 src: 

987 pdf_dest: 

988 force_duplicate: 

989 ignore_fields: 

990 

991 """ 

992 self._data = cast("StreamObject", src)._data 

993 try: 

994 decoded_self = cast("StreamObject", src).decoded_self 

995 if decoded_self is None: 

996 self.decoded_self = None 

997 else: 

998 self.decoded_self = cast( 

999 "DecodedStreamObject", 

1000 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields), 

1001 ) 

1002 except Exception: 

1003 pass 

1004 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

1005 

1006 def hash_bin(self) -> int: 

1007 """ 

1008 Used to detect modified object. 

1009 

1010 Returns: 

1011 Hash considering type and value. 

1012 

1013 """ 

1014 # Use _data to prevent errors on non-decoded streams. 

1015 return hash((super().hash_bin(), self._data)) 

1016 

1017 def get_data(self) -> bytes: 

1018 return self._data 

1019 

1020 def set_data(self, data: bytes) -> None: 

1021 self._data = data 

1022 

1023 def hash_value_data(self) -> bytes: 

1024 data = super().hash_value_data() 

1025 data += self.get_data() 

1026 return data 

1027 

1028 def write_to_stream( 

1029 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1030 ) -> None: 

1031 if encryption_key is not None: # deprecated 

1032 deprecation_no_replacement( 

1033 "the encryption_key parameter of write_to_stream", "5.0.0" 

1034 ) 

1035 self[NameObject(StreamAttributes.LENGTH)] = NumberObject(len(self._data)) 

1036 DictionaryObject.write_to_stream(self, stream) 

1037 del self[StreamAttributes.LENGTH] 

1038 stream.write(b"\nstream\n") 

1039 stream.write(self._data) 

1040 stream.write(b"\nendstream") 

1041 

1042 @staticmethod 

1043 def initialize_from_dictionary( 

1044 data: dict[str, Any] 

1045 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]: 

1046 retval: Union[EncodedStreamObject, DecodedStreamObject] 

1047 if StreamAttributes.FILTER in data: 

1048 retval = EncodedStreamObject() 

1049 else: 

1050 retval = DecodedStreamObject() 

1051 retval._data = data["__streamdata__"] 

1052 del data["__streamdata__"] 

1053 if StreamAttributes.LENGTH in data: 

1054 del data[StreamAttributes.LENGTH] 

1055 retval.update(data) 

1056 return retval 

1057 

1058 def flate_encode(self, level: int = -1) -> "EncodedStreamObject": 

1059 from ..filters import FlateDecode # noqa: PLC0415 

1060 

1061 if StreamAttributes.FILTER in self: 

1062 f = self[StreamAttributes.FILTER] 

1063 if isinstance(f, ArrayObject): 

1064 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f]) 

1065 try: 

1066 params = ArrayObject( 

1067 [NullObject(), *self.get(StreamAttributes.DECODE_PARMS, ArrayObject())] 

1068 ) 

1069 except TypeError: 

1070 # case of error where the * operator is not working (not an array 

1071 params = ArrayObject( 

1072 [NullObject(), self.get(StreamAttributes.DECODE_PARMS, ArrayObject())] 

1073 ) 

1074 else: 

1075 f = ArrayObject([NameObject(FT.FLATE_DECODE), f]) 

1076 params = ArrayObject( 

1077 [NullObject(), self.get(StreamAttributes.DECODE_PARMS, NullObject())] 

1078 ) 

1079 else: 

1080 f = NameObject(FT.FLATE_DECODE) 

1081 params = None 

1082 retval = EncodedStreamObject() 

1083 retval.update(self) 

1084 retval[NameObject(StreamAttributes.FILTER)] = f 

1085 if params is not None: 

1086 retval[NameObject(StreamAttributes.DECODE_PARMS)] = params 

1087 retval._data = FlateDecode.encode(self._data, level) 

1088 return retval 

1089 

1090 def decode_as_image(self, pillow_parameters: Union[dict[str, Any], None] = None) -> Any: 

1091 """ 

1092 Try to decode the stream object as an image 

1093 

1094 Args: 

1095 pillow_parameters: parameters provided to Pillow Image.save() method, 

1096 cf. <https://pillow.readthedocs.io/en/stable/reference/Image.html#PIL.Image.Image.save> 

1097 

1098 Returns: 

1099 a PIL image if proper decoding has been found 

1100 Raises: 

1101 Exception: Errors during decoding will be reported. 

1102 It is recommended to catch exceptions to prevent 

1103 stops in your program. 

1104 

1105 """ 

1106 from ._image_xobject import _xobj_to_image # noqa: PLC0415 

1107 

1108 if self.get("/Subtype", "") != "/Image": 

1109 try: 

1110 logger_warning( # pragma: no cover 

1111 "%(indirect_reference)s does not seem to be an Image", 

1112 source=__name__, 

1113 indirect_reference=self.indirect_reference, 

1114 ) 

1115 except AttributeError: 

1116 logger_warning( # pragma: no cover 

1117 "%(obj)r object does not seem to be an Image", 

1118 source=__name__, 

1119 obj=self, 

1120 ) 

1121 extension, _, img = _xobj_to_image(self, pillow_parameters) 

1122 if extension is None: 

1123 return None # pragma: no cover 

1124 return img 

1125 

1126 

1127class DecodedStreamObject(StreamObject): 

1128 pass 

1129 

1130 

1131class EncodedStreamObject(StreamObject): 

1132 def __init__(self) -> None: 

1133 self.decoded_self: Optional[DecodedStreamObject] = None 

1134 

1135 # This overrides the parent method 

1136 def get_data(self) -> bytes: 

1137 from ..filters import decode_stream_data # noqa: PLC0415 

1138 

1139 if self.decoded_self is not None: 

1140 # Cached version of decoded object 

1141 return self.decoded_self.get_data() 

1142 

1143 # Create decoded object 

1144 decoded = DecodedStreamObject() 

1145 decoded.set_data(decode_stream_data(self)) 

1146 for key, value in self.items(): 

1147 if key not in (StreamAttributes.LENGTH, StreamAttributes.FILTER, StreamAttributes.DECODE_PARMS): 

1148 decoded[key] = value 

1149 self.decoded_self = decoded 

1150 return decoded.get_data() 

1151 

1152 # This overrides the parent method: 

1153 def set_data(self, data: bytes) -> None: 

1154 from ..filters import FlateDecode # noqa: PLC0415 

1155 

1156 if self.get(StreamAttributes.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]): 

1157 if not isinstance(data, bytes): 

1158 raise TypeError("Data must be bytes") 

1159 if self.decoded_self is None: 

1160 self.get_data() # to create self.decoded_self 

1161 assert self.decoded_self is not None, "mypy" 

1162 self.decoded_self.set_data(data) 

1163 super().set_data(FlateDecode.encode(data)) 

1164 else: 

1165 raise PdfReadError( 

1166 "Streams encoded with a filter different from FlateDecode are not supported" 

1167 ) 

1168 

1169 

1170CONTENT_STREAM_ARRAY_MAX_LENGTH = 10_000 

1171 

1172 

1173class ContentStream(DecodedStreamObject): 

1174 """ 

1175 In order to be fast, this data structure can contain either: 

1176 

1177 * raw data in ._data 

1178 * parsed stream operations in ._operations. 

1179 

1180 At any time, ContentStream object can either have both of those fields defined, 

1181 or one field defined and the other set to None. 

1182 

1183 These fields are "rebuilt" lazily, when accessed: 

1184 

1185 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations. 

1186 * when .operations is called, if ._operations is None, it is rebuilt from ._data. 

1187 

1188 Conversely, these fields can be invalidated: 

1189 

1190 * when .set_data() is called, ._operations is set to None. 

1191 * when .operations is set, ._data is set to None. 

1192 """ 

1193 

1194 def __init__( 

1195 self, 

1196 stream: Any, 

1197 pdf: Any, 

1198 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

1199 ) -> None: 

1200 self.pdf = pdf 

1201 self._operations: list[tuple[Any, bytes]] = [] 

1202 

1203 # stream may be a StreamObject or an ArrayObject containing 

1204 # StreamObjects to be concatenated together. 

1205 if stream is None: 

1206 super().set_data(b"") 

1207 else: 

1208 stream = stream.get_object() 

1209 if isinstance(stream, ArrayObject): 

1210 from pypdf.filters import MAX_ARRAY_BASED_STREAM_OUTPUT_LENGTH # noqa: PLC0415 

1211 

1212 if (stream_length := len(stream)) > CONTENT_STREAM_ARRAY_MAX_LENGTH: 

1213 raise LimitReachedError( 

1214 f"Array-based stream has {stream_length} > {CONTENT_STREAM_ARRAY_MAX_LENGTH} elements." 

1215 ) 

1216 data = bytearray() 

1217 length = 0 

1218 for s in stream: 

1219 s_resolved = s.get_object() 

1220 if isinstance(s_resolved, NullObject): 

1221 continue 

1222 if not isinstance(s_resolved, StreamObject): 

1223 # No need to emit an exception here for now - the PDF structure 

1224 # seems to already be broken beforehand in these cases. 

1225 logger_warning( 

1226 "Expected StreamObject, got %(type_name)s instead. Data might be wrong.", 

1227 source=__name__, 

1228 type_name=type(s_resolved).__name__, 

1229 ) 

1230 else: 

1231 new_data = s_resolved.get_data() 

1232 length += len(new_data) 

1233 if length > MAX_ARRAY_BASED_STREAM_OUTPUT_LENGTH: 

1234 raise LimitReachedError( 

1235 f"Array-based stream has at least {length} > " 

1236 f"{MAX_ARRAY_BASED_STREAM_OUTPUT_LENGTH} output bytes." 

1237 ) 

1238 data += new_data 

1239 if len(data) == 0 or data[-1:] != b"\n": 

1240 # There should be no direct need to check for a change of one byte. 

1241 length += 1 

1242 data += b"\n" 

1243 super().set_data(bytes(data)) 

1244 else: 

1245 stream_data = stream.get_data() 

1246 assert stream_data is not None 

1247 super().set_data(stream_data) 

1248 self.forced_encoding = forced_encoding 

1249 

1250 def replicate( 

1251 self, 

1252 pdf_dest: PdfWriterProtocol, 

1253 ) -> "ContentStream": 

1254 d__ = cast( 

1255 "ContentStream", 

1256 self._reference_clone(self.__class__(None, None), pdf_dest, False), 

1257 ) 

1258 d__._data = self._data 

1259 try: 

1260 decoded_self = self.decoded_self 

1261 if decoded_self is None: 

1262 self.decoded_self = None 

1263 else: 

1264 self.decoded_self = cast( 

1265 "DecodedStreamObject", decoded_self.replicate(pdf_dest) 

1266 ) 

1267 except Exception: 

1268 pass 

1269 for k, v in self.items(): 

1270 d__[k.replicate(pdf_dest)] = ( 

1271 v.replicate(pdf_dest) if hasattr(v, "replicate") else v 

1272 ) 

1273 return d__ 

1274 d__.set_data(self._data) 

1275 d__.pdf = pdf_dest 

1276 d__._operations = list(self._operations) 

1277 d__.forced_encoding = self.forced_encoding 

1278 return d__ 

1279 

1280 def clone( 

1281 self, 

1282 pdf_dest: Any, 

1283 force_duplicate: bool = False, 

1284 ignore_fields: Optional[Sequence[Union[str, int]]] = (), 

1285 ) -> "ContentStream": 

1286 """ 

1287 Clone object into pdf_dest. 

1288 

1289 Args: 

1290 pdf_dest: 

1291 force_duplicate: 

1292 ignore_fields: 

1293 

1294 Returns: 

1295 The cloned ContentStream 

1296 

1297 """ 

1298 try: 

1299 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore[union-attr] 

1300 return self 

1301 except Exception: 

1302 pass 

1303 

1304 visited: set[tuple[int, int]] = set() 

1305 d__ = cast( 

1306 "ContentStream", 

1307 self._reference_clone( 

1308 self.__class__(None, None), pdf_dest, force_duplicate 

1309 ), 

1310 ) 

1311 if ignore_fields is None: 

1312 ignore_fields = [] 

1313 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 

1314 return d__ 

1315 

1316 def _clone( 

1317 self, 

1318 src: DictionaryObject, 

1319 pdf_dest: PdfWriterProtocol, 

1320 force_duplicate: bool, 

1321 ignore_fields: Optional[Sequence[Union[str, int]]], 

1322 visited: set[tuple[int, int]], 

1323 ) -> None: 

1324 """ 

1325 Update the object from src. 

1326 

1327 Args: 

1328 src: 

1329 pdf_dest: 

1330 force_duplicate: 

1331 ignore_fields: 

1332 

1333 """ 

1334 src_cs = cast("ContentStream", src) 

1335 super().set_data(src_cs._data) 

1336 self.pdf = pdf_dest 

1337 self._operations = list(src_cs._operations) 

1338 self.forced_encoding = src_cs.forced_encoding 

1339 # no need to call DictionaryObjection or anything 

1340 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 

1341 

1342 def _parse_content_stream(self, stream: StreamType) -> None: 

1343 # 7.8.2 Content Streams 

1344 stream.seek(0, 0) 

1345 operands: list[Union[int, str, PdfObject]] = [] 

1346 while True: 

1347 peek = read_non_whitespace(stream) 

1348 if peek in (b"", 0): 

1349 break 

1350 stream.seek(-1, 1) 

1351 if peek.isalpha() or peek in (b"'", b'"'): 

1352 operator = read_until_regex(stream, NameObject.delimiter_pattern) 

1353 if operator == b"BI": 

1354 # begin inline image - a completely different parsing 

1355 # mechanism is required, of course... thanks buddy... 

1356 assert operands == [] 

1357 ii = self._read_inline_image(stream) 

1358 self._operations.append((ii, b"INLINE IMAGE")) 

1359 else: 

1360 self._operations.append((operands, operator)) 

1361 operands = [] 

1362 elif peek == b"%": 

1363 # If we encounter a comment in the content stream, we have to 

1364 # handle it here. Typically, read_object will handle 

1365 # encountering a comment -- but read_object assumes that 

1366 # following the comment must be the object we're trying to 

1367 # read. In this case, it could be an operator instead. 

1368 while peek not in (b"\r", b"\n", b""): 

1369 peek = stream.read(1) 

1370 else: 

1371 operands.append(read_object(stream, None, self.forced_encoding)) 

1372 

1373 def _read_inline_image(self, stream: StreamType) -> dict[str, Any]: 

1374 # begin reading just after the "BI" - begin image 

1375 # first read the dictionary of settings. 

1376 settings = DictionaryObject() 

1377 while True: 

1378 tok = read_non_whitespace(stream) 

1379 stream.seek(-1, 1) 

1380 if tok == b"I": 

1381 # "ID" - begin of image data 

1382 break 

1383 key = read_object(stream, self.pdf) 

1384 tok = read_non_whitespace(stream) 

1385 stream.seek(-1, 1) 

1386 value = read_object(stream, self.pdf) 

1387 settings[key] = value 

1388 # left at beginning of ID 

1389 tmp = stream.read(3) 

1390 assert tmp[:2] == b"ID" 

1391 filtr = settings.get("/F", settings.get("/Filter", "not set")) 

1392 savpos = stream.tell() 

1393 if isinstance(filtr, list): 

1394 filtr = filtr[0] # used forencoding 

1395 if "AHx" in filtr or "ASCIIHexDecode" in filtr: 

1396 data = extract_inline__ascii_hex_decode(stream) 

1397 elif "A85" in filtr or "ASCII85Decode" in filtr: 

1398 data = extract_inline__ascii85_decode(stream) 

1399 elif "RL" in filtr or "RunLengthDecode" in filtr: 

1400 data = extract_inline__run_length_decode(stream) 

1401 elif "DCT" in filtr or "DCTDecode" in filtr: 

1402 data = extract_inline__dct_decode(stream) 

1403 elif filtr == "not set": 

1404 cs = settings.get("/CS", "") 

1405 if isinstance(cs, list): 

1406 cs = cs[0] 

1407 if "RGB" in cs: 

1408 lcs = 3 

1409 elif "CMYK" in cs: 

1410 lcs = 4 

1411 else: 

1412 bits = settings.get( 

1413 "/BPC", 

1414 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1, 

1415 ) 

1416 if bits > 0: 

1417 lcs = bits / 8.0 

1418 else: 

1419 data = extract_inline_default(stream) 

1420 lcs = -1 

1421 if lcs > 0: 

1422 data = stream.read( 

1423 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"]) 

1424 ) 

1425 # Move to the `EI` if possible. 

1426 ei = read_non_whitespace(stream) 

1427 stream.seek(-1, 1) 

1428 else: 

1429 data = extract_inline_default(stream) 

1430 

1431 ei = stream.read(3) 

1432 stream.seek(-1, 1) 

1433 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: 

1434 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above. 

1435 stream.seek(savpos, 0) 

1436 data = extract_inline_default(stream) 

1437 ei = stream.read(3) 

1438 stream.seek(-1, 1) 

1439 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover 

1440 # Check the same condition again. This should never fail as 

1441 # edge cases are covered by `extract_inline_default` above, 

1442 # but check this ot make sure that we are behind the `EI` afterwards. 

1443 raise PdfStreamError( 

1444 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}" 

1445 ) 

1446 return {"settings": settings, "data": data} 

1447 

1448 # This overrides the parent method 

1449 def get_data(self) -> bytes: 

1450 if not self._data: 

1451 new_data = BytesIO() 

1452 for operands, operator in self._operations: 

1453 if operator == b"INLINE IMAGE": 

1454 new_data.write(b"BI") 

1455 dict_text = BytesIO() 

1456 operands["settings"].write_to_stream(dict_text) 

1457 new_data.write(dict_text.getvalue()[2:-2]) 

1458 new_data.write(b"ID ") 

1459 new_data.write(operands["data"]) 

1460 new_data.write(b"EI") 

1461 else: 

1462 for op in operands: 

1463 op.write_to_stream(new_data) 

1464 new_data.write(b" ") 

1465 new_data.write(operator) 

1466 new_data.write(b"\n") 

1467 self._data = new_data.getvalue() 

1468 return self._data 

1469 

1470 # This overrides the parent method 

1471 def set_data(self, data: bytes) -> None: 

1472 super().set_data(data) 

1473 self._operations = [] 

1474 

1475 @property 

1476 def operations(self) -> list[tuple[Any, bytes]]: 

1477 if not self._operations and self._data: 

1478 self._parse_content_stream(BytesIO(self._data)) 

1479 self._data = b"" 

1480 return self._operations 

1481 

1482 @operations.setter 

1483 def operations(self, operations: list[tuple[Any, bytes]]) -> None: 

1484 self._operations = operations 

1485 self._data = b"" 

1486 

1487 def isolate_graphics_state(self) -> None: 

1488 if self._operations: 

1489 self._operations.insert(0, ([], b"q")) 

1490 self._operations.append(([], b"Q")) 

1491 elif self._data: 

1492 self._data = b"q\n" + self._data + b"\nQ\n" 

1493 

1494 # This overrides the parent method 

1495 def write_to_stream( 

1496 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1497 ) -> None: 

1498 if not self._data and self._operations: 

1499 self.get_data() # this ensures ._data is rebuilt 

1500 super().write_to_stream(stream, encryption_key) 

1501 

1502 

1503def read_object( 

1504 stream: StreamType, 

1505 pdf: Optional[PdfReaderProtocol], 

1506 forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 

1507) -> Union[PdfObject, int, str, ContentStream]: 

1508 tok = stream.read(1) 

1509 stream.seek(-1, 1) # reset to start 

1510 if tok == b"/": 

1511 return NameObject.read_from_stream(stream, pdf) 

1512 if tok == b"<": 

1513 # hexadecimal string OR dictionary 

1514 peek = stream.read(2) 

1515 stream.seek(-2, 1) # reset to start 

1516 if peek == b"<<": 

1517 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding) 

1518 return read_hex_string_from_stream(stream, forced_encoding) 

1519 if tok == b"[": 

1520 return ArrayObject.read_from_stream(stream, pdf, forced_encoding) 

1521 if tok in (b"t", b"f"): 

1522 return BooleanObject.read_from_stream(stream) 

1523 if tok == b"(": 

1524 return read_string_from_stream(stream, forced_encoding) 

1525 if tok == b"e" and stream.read(6) == b"endobj": 

1526 return NullObject() 

1527 if tok == b"n": 

1528 return NullObject.read_from_stream(stream) 

1529 if tok == b"%": 

1530 # comment 

1531 skip_over_comment(stream) 

1532 tok = read_non_whitespace(stream) 

1533 stream.seek(-1, 1) 

1534 return read_object(stream, pdf, forced_encoding) 

1535 if tok in b"0123456789+-.": 

1536 # number object OR indirect reference 

1537 peek = stream.read(20) 

1538 stream.seek(-len(peek), 1) # reset to start 

1539 if IndirectPattern.match(peek) is not None: 

1540 assert pdf is not None, "mypy" 

1541 return IndirectObject.read_from_stream(stream, pdf) 

1542 return NumberObject.read_from_stream(stream) 

1543 pos = stream.tell() 

1544 stream.seek(-20, 1) 

1545 stream_extract = stream.read(80) 

1546 stream.seek(pos) 

1547 read_until_whitespace(stream) 

1548 raise PdfReadError( 

1549 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}" 

1550 ) 

1551 

1552 

1553class Field(TreeObject): 

1554 """ 

1555 A class representing a field dictionary. 

1556 

1557 This class is accessed through 

1558 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1559 """ 

1560 

1561 def __init__(self, data: DictionaryObject) -> None: 

1562 DictionaryObject.__init__(self) 

1563 field_attributes = ( 

1564 FieldDictionaryAttributes.attributes() 

1565 + CheckboxRadioButtonAttributes.attributes() 

1566 ) 

1567 self.indirect_reference = data.indirect_reference 

1568 for attr in field_attributes: 

1569 try: 

1570 self[NameObject(attr)] = data[attr] 

1571 except KeyError: 

1572 pass 

1573 if isinstance(self.get("/V"), EncodedStreamObject): 

1574 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data() 

1575 if isinstance(d, bytes): 

1576 d_str = d.decode() 

1577 elif d is None: 

1578 d_str = "" 

1579 else: 

1580 raise Exception("Should never happen") 

1581 self[NameObject("/V")] = TextStringObject(d_str) 

1582 

1583 # TABLE 8.69 Entries common to all field dictionaries 

1584 @property 

1585 def field_type(self) -> Optional[NameObject]: 

1586 """Read-only property accessing the type of this field.""" 

1587 return self.get(FieldDictionaryAttributes.FT) 

1588 

1589 @property 

1590 def parent(self) -> Optional[DictionaryObject]: 

1591 """Read-only property accessing the parent of this field.""" 

1592 return self.get(FieldDictionaryAttributes.Parent) 

1593 

1594 @property 

1595 def kids(self) -> Optional["ArrayObject"]: 

1596 """Read-only property accessing the kids of this field.""" 

1597 return self.get(FieldDictionaryAttributes.Kids) 

1598 

1599 @property 

1600 def name(self) -> Optional[str]: 

1601 """Read-only property accessing the name of this field.""" 

1602 return self.get(FieldDictionaryAttributes.T) 

1603 

1604 @property 

1605 def alternate_name(self) -> Optional[str]: 

1606 """Read-only property accessing the alternate name of this field.""" 

1607 return self.get(FieldDictionaryAttributes.TU) 

1608 

1609 @property 

1610 def mapping_name(self) -> Optional[str]: 

1611 """ 

1612 Read-only property accessing the mapping name of this field. 

1613 

1614 This name is used by pypdf as a key in the dictionary returned by 

1615 :meth:`get_fields()<pypdf.PdfReader.get_fields>` 

1616 """ 

1617 return self.get(FieldDictionaryAttributes.TM) 

1618 

1619 @property 

1620 def flags(self) -> Optional[int]: 

1621 """ 

1622 Read-only property accessing the field flags, specifying various 

1623 characteristics of the field (see Table 8.70 of the PDF 1.7 reference). 

1624 """ 

1625 return self.get(FieldDictionaryAttributes.Ff) 

1626 

1627 @property 

1628 def value(self) -> Optional[Any]: 

1629 """ 

1630 Read-only property accessing the value of this field. 

1631 

1632 Format varies based on field type. 

1633 """ 

1634 return self.get(FieldDictionaryAttributes.V) 

1635 

1636 @property 

1637 def default_value(self) -> Optional[Any]: 

1638 """Read-only property accessing the default value of this field.""" 

1639 return self.get(FieldDictionaryAttributes.DV) 

1640 

1641 @property 

1642 def additional_actions(self) -> Optional[DictionaryObject]: 

1643 """ 

1644 Read-only property accessing the additional actions dictionary. 

1645 

1646 This dictionary defines the field's behavior in response to trigger 

1647 events. See Section 8.5.2 of the PDF 1.7 reference. 

1648 """ 

1649 return self.get(FieldDictionaryAttributes.AA) 

1650 

1651 

1652class Destination(TreeObject): 

1653 """ 

1654 A class representing a destination within a PDF file. 

1655 

1656 See section 12.3.2 of the PDF 2.0 reference. 

1657 

1658 Args: 

1659 title: Title of this destination. 

1660 page: Reference to the page of this destination. Should 

1661 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`. 

1662 fit: How the destination is displayed. 

1663 

1664 Raises: 

1665 PdfReadError: If destination type is invalid. 

1666 

1667 """ 

1668 

1669 node: Optional[ 

1670 DictionaryObject 

1671 ] = None # node provide access to the original Object 

1672 

1673 def __init__( 

1674 self, 

1675 title: Union[str, bytes], 

1676 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject], 

1677 fit: Fit, 

1678 ) -> None: 

1679 self._filtered_children: list[Any] = [] # used in PdfWriter 

1680 

1681 typ = fit.fit_type 

1682 args = fit.fit_args 

1683 

1684 DictionaryObject.__init__(self) 

1685 self[NameObject("/Title")] = TextStringObject(title) 

1686 self[NameObject("/Page")] = page 

1687 self[NameObject("/Type")] = typ 

1688 

1689 # from table 8.2 of the PDF 1.7 reference. 

1690 if typ == "/XYZ": 

1691 if len(args) < 1: # left is missing : should never occur 

1692 args.append(NumberObject(0.0)) 

1693 if len(args) < 2: # top is missing 

1694 args.append(NumberObject(0.0)) 

1695 if len(args) < 3: # zoom is missing 

1696 args.append(NumberObject(0.0)) 

1697 ( 

1698 self[NameObject(TA.LEFT)], 

1699 self[NameObject(TA.TOP)], 

1700 self[NameObject("/Zoom")], 

1701 ) = args 

1702 elif len(args) == 0: 

1703 pass 

1704 elif typ == TF.FIT_R: 

1705 ( 

1706 self[NameObject(TA.LEFT)], 

1707 self[NameObject(TA.BOTTOM)], 

1708 self[NameObject(TA.RIGHT)], 

1709 self[NameObject(TA.TOP)], 

1710 ) = args 

1711 elif typ in [TF.FIT_H, TF.FIT_BH]: 

1712 try: # Prefer to be more robust not only to null parameters 

1713 (self[NameObject(TA.TOP)],) = args 

1714 except Exception: 

1715 (self[NameObject(TA.TOP)],) = (NullObject(),) 

1716 elif typ in [TF.FIT_V, TF.FIT_BV]: 

1717 try: # Prefer to be more robust not only to null parameters 

1718 (self[NameObject(TA.LEFT)],) = args 

1719 except Exception: 

1720 (self[NameObject(TA.LEFT)],) = (NullObject(),) 

1721 elif typ in [TF.FIT, TF.FIT_B]: 

1722 pass 

1723 else: 

1724 raise PdfReadError(f"Unknown Destination Type: {typ!r}") 

1725 

1726 @property 

1727 def dest_array(self) -> "ArrayObject": 

1728 return ArrayObject( 

1729 [self.raw_get("/Page"), self["/Type"]] 

1730 + [ 

1731 self[x] 

1732 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"] 

1733 if x in self 

1734 ] 

1735 ) 

1736 

1737 def write_to_stream( 

1738 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 

1739 ) -> None: 

1740 if encryption_key is not None: # deprecated 

1741 deprecation_no_replacement( 

1742 "the encryption_key parameter of write_to_stream", "5.0.0" 

1743 ) 

1744 stream.write(b"<<\n") 

1745 key = NameObject("/D") 

1746 key.write_to_stream(stream) 

1747 stream.write(b" ") 

1748 value = self.dest_array 

1749 value.write_to_stream(stream) 

1750 

1751 key = NameObject("/S") 

1752 key.write_to_stream(stream) 

1753 stream.write(b" ") 

1754 value_s = NameObject("/GoTo") 

1755 value_s.write_to_stream(stream) 

1756 

1757 stream.write(b"\n") 

1758 stream.write(b">>") 

1759 

1760 @property 

1761 def title(self) -> Optional[str]: 

1762 """Read-only property accessing the destination title.""" 

1763 return self.get("/Title") 

1764 

1765 @property 

1766 def page(self) -> Optional[IndirectObject]: 

1767 """Read-only property accessing the IndirectObject of the destination page.""" 

1768 return self.get("/Page") 

1769 

1770 @property 

1771 def typ(self) -> Optional[str]: 

1772 """Read-only property accessing the destination type.""" 

1773 return self.get("/Type") 

1774 

1775 @property 

1776 def zoom(self) -> Optional[int]: 

1777 """Read-only property accessing the zoom factor.""" 

1778 return self.get("/Zoom", None) 

1779 

1780 @property 

1781 def left(self) -> Optional[FloatObject]: 

1782 """Read-only property accessing the left horizontal coordinate.""" 

1783 return self.get("/Left", None) 

1784 

1785 @property 

1786 def right(self) -> Optional[FloatObject]: 

1787 """Read-only property accessing the right horizontal coordinate.""" 

1788 return self.get("/Right", None) 

1789 

1790 @property 

1791 def top(self) -> Optional[FloatObject]: 

1792 """Read-only property accessing the top vertical coordinate.""" 

1793 return self.get("/Top", None) 

1794 

1795 @property 

1796 def bottom(self) -> Optional[FloatObject]: 

1797 """Read-only property accessing the bottom vertical coordinate.""" 

1798 return self.get("/Bottom", None) 

1799 

1800 @property 

1801 def color(self) -> Optional["ArrayObject"]: 

1802 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0.""" 

1803 return cast( 

1804 "ArrayObject", 

1805 self.get("/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)])), 

1806 ) 

1807 

1808 @property 

1809 def font_format(self) -> Optional[OutlineFontFlag]: 

1810 """ 

1811 Read-only property accessing the font type. 

1812 

1813 1=italic, 2=bold, 3=both 

1814 """ 

1815 return OutlineFontFlag(self.get("/F", 0)) 

1816 

1817 @property 

1818 def outline_count(self) -> Optional[int]: 

1819 """ 

1820 Read-only property accessing the outline count. 

1821 

1822 positive = expanded 

1823 negative = collapsed 

1824 absolute value = number of visible descendants at all levels 

1825 """ 

1826 return self.get("/Count", None)