1# Copyright (c) 2006, Mathieu Fenniak 
    2# All rights reserved. 
    3# 
    4# Redistribution and use in source and binary forms, with or without 
    5# modification, are permitted provided that the following conditions are 
    6# met: 
    7# 
    8# * Redistributions of source code must retain the above copyright notice, 
    9# this list of conditions and the following disclaimer. 
    10# * Redistributions in binary form must reproduce the above copyright notice, 
    11# this list of conditions and the following disclaimer in the documentation 
    12# and/or other materials provided with the distribution. 
    13# * The name of the author may not be used to endorse or promote products 
    14# derived from this software without specific prior written permission. 
    15# 
    16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
    17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
    18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
    19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
    20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
    21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
    22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
    23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
    24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
    25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
    26# POSSIBILITY OF SUCH DAMAGE. 
    27 
    28 
    29__author__ = "Mathieu Fenniak" 
    30__author_email__ = "biziqe@mathieu.fenniak.net" 
    31 
    32import logging 
    33import re 
    34import sys 
    35from collections.abc import Iterable, Sequence 
    36from io import BytesIO 
    37from math import ceil 
    38from typing import ( 
    39    Any, 
    40    Callable, 
    41    Optional, 
    42    Union, 
    43    cast, 
    44) 
    45 
    46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol 
    47from .._utils import ( 
    48    WHITESPACES, 
    49    StreamType, 
    50    deprecation_no_replacement, 
    51    logger_warning, 
    52    read_non_whitespace, 
    53    read_until_regex, 
    54    read_until_whitespace, 
    55    skip_over_comment, 
    56) 
    57from ..constants import ( 
    58    CheckboxRadioButtonAttributes, 
    59    FieldDictionaryAttributes, 
    60    OutlineFontFlag, 
    61) 
    62from ..constants import FilterTypes as FT 
    63from ..constants import StreamAttributes as SA 
    64from ..constants import TypArguments as TA 
    65from ..constants import TypFitArguments as TF 
    66from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError 
    67from ._base import ( 
    68    BooleanObject, 
    69    ByteStringObject, 
    70    FloatObject, 
    71    IndirectObject, 
    72    NameObject, 
    73    NullObject, 
    74    NumberObject, 
    75    PdfObject, 
    76    TextStringObject, 
    77    is_null_or_none, 
    78) 
    79from ._fit import Fit 
    80from ._image_inline import ( 
    81    extract_inline_A85, 
    82    extract_inline_AHx, 
    83    extract_inline_DCT, 
    84    extract_inline_default, 
    85    extract_inline_RL, 
    86) 
    87from ._utils import read_hex_string_from_stream, read_string_from_stream 
    88 
    89if sys.version_info >= (3, 11): 
    90    from typing import Self 
    91else: 
    92    from typing_extensions import Self 
    93 
    94logger = logging.getLogger(__name__) 
    95 
    96IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]") 
    97 
    98 
    99class ArrayObject(list[Any], PdfObject): 
    100    def replicate( 
    101        self, 
    102        pdf_dest: PdfWriterProtocol, 
    103    ) -> "ArrayObject": 
    104        arr = cast( 
    105            "ArrayObject", 
    106            self._reference_clone(ArrayObject(), pdf_dest, False), 
    107        ) 
    108        for data in self: 
    109            if hasattr(data, "replicate"): 
    110                arr.append(data.replicate(pdf_dest)) 
    111            else: 
    112                arr.append(data) 
    113        return arr 
    114 
    115    def clone( 
    116        self, 
    117        pdf_dest: PdfWriterProtocol, 
    118        force_duplicate: bool = False, 
    119        ignore_fields: Optional[Sequence[Union[str, int]]] = (), 
    120    ) -> "ArrayObject": 
    121        """Clone object into pdf_dest.""" 
    122        try: 
    123            if self.indirect_reference.pdf == pdf_dest and not force_duplicate:  # type: ignore 
    124                return self 
    125        except Exception: 
    126            pass 
    127        arr = cast( 
    128            "ArrayObject", 
    129            self._reference_clone(ArrayObject(), pdf_dest, force_duplicate), 
    130        ) 
    131        for data in self: 
    132            if isinstance(data, StreamObject): 
    133                dup = data._reference_clone( 
    134                    data.clone(pdf_dest, force_duplicate, ignore_fields), 
    135                    pdf_dest, 
    136                    force_duplicate, 
    137                ) 
    138                arr.append(dup.indirect_reference) 
    139            elif hasattr(data, "clone"): 
    140                arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields)) 
    141            else: 
    142                arr.append(data) 
    143        return arr 
    144 
    145    def hash_bin(self) -> int: 
    146        """ 
    147        Used to detect modified object. 
    148 
    149        Returns: 
    150            Hash considering type and value. 
    151 
    152        """ 
    153        return hash((self.__class__, tuple(x.hash_bin() for x in self))) 
    154 
    155    def items(self) -> Iterable[Any]: 
    156        """Emulate DictionaryObject.items for a list (index, object).""" 
    157        return enumerate(self) 
    158 
    159    def _to_lst(self, lst: Any) -> list[Any]: 
    160        # Convert to list, internal 
    161        if isinstance(lst, (list, tuple, set)): 
    162            pass 
    163        elif isinstance(lst, PdfObject): 
    164            lst = [lst] 
    165        elif isinstance(lst, str): 
    166            if lst[0] == "/": 
    167                lst = [NameObject(lst)] 
    168            else: 
    169                lst = [TextStringObject(lst)] 
    170        elif isinstance(lst, bytes): 
    171            lst = [ByteStringObject(lst)] 
    172        else:  # for numbers,... 
    173            lst = [lst] 
    174        return lst 
    175 
    176    def __add__(self, lst: Any) -> "ArrayObject": 
    177        """ 
    178        Allow extension by adding list or add one element only 
    179 
    180        Args: 
    181            lst: any list, tuples are extended the list. 
    182            other types(numbers,...) will be appended. 
    183            if str is passed it will be converted into TextStringObject 
    184            or NameObject (if starting with "/") 
    185            if bytes is passed it will be converted into ByteStringObject 
    186 
    187        Returns: 
    188            ArrayObject with all elements 
    189 
    190        """ 
    191        temp = ArrayObject(self) 
    192        temp.extend(self._to_lst(lst)) 
    193        return temp 
    194 
    195    def __iadd__(self, lst: Any) -> Self: 
    196        """ 
    197         Allow extension by adding list or add one element only 
    198 
    199        Args: 
    200            lst: any list, tuples are extended the list. 
    201            other types(numbers,...) will be appended. 
    202            if str is passed it will be converted into TextStringObject 
    203            or NameObject (if starting with "/") 
    204            if bytes is passed it will be converted into ByteStringObject 
    205 
    206        """ 
    207        self.extend(self._to_lst(lst)) 
    208        return self 
    209 
    210    def __isub__(self, lst: Any) -> Self: 
    211        """Allow to remove items""" 
    212        for x in self._to_lst(lst): 
    213            try: 
    214                index = self.index(x) 
    215                del self[index] 
    216            except ValueError: 
    217                pass 
    218        return self 
    219 
    220    def write_to_stream( 
    221        self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 
    222    ) -> None: 
    223        if encryption_key is not None:  # deprecated 
    224            deprecation_no_replacement( 
    225                "the encryption_key parameter of write_to_stream", "5.0.0" 
    226            ) 
    227        stream.write(b"[") 
    228        for data in self: 
    229            stream.write(b" ") 
    230            data.write_to_stream(stream) 
    231        stream.write(b" ]") 
    232 
    233    @staticmethod 
    234    def read_from_stream( 
    235        stream: StreamType, 
    236        pdf: Optional[PdfReaderProtocol], 
    237        forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 
    238    ) -> "ArrayObject": 
    239        arr = ArrayObject() 
    240        tmp = stream.read(1) 
    241        if tmp != b"[": 
    242            raise PdfReadError("Could not read array") 
    243        while True: 
    244            # skip leading whitespace 
    245            tok = stream.read(1) 
    246            while tok.isspace(): 
    247                tok = stream.read(1) 
    248            if tok == b"": 
    249                break 
    250            if tok == b"%": 
    251                stream.seek(-1, 1) 
    252                skip_over_comment(stream) 
    253                continue 
    254            stream.seek(-1, 1) 
    255            # check for array ending 
    256            peek_ahead = stream.read(1) 
    257            if peek_ahead == b"]": 
    258                break 
    259            stream.seek(-1, 1) 
    260            # read and append object 
    261            arr.append(read_object(stream, pdf, forced_encoding)) 
    262        return arr 
    263 
    264 
    265class DictionaryObject(dict[Any, Any], PdfObject): 
    266    def replicate( 
    267        self, 
    268        pdf_dest: PdfWriterProtocol, 
    269    ) -> "DictionaryObject": 
    270        d__ = cast( 
    271            "DictionaryObject", 
    272            self._reference_clone(self.__class__(), pdf_dest, False), 
    273        ) 
    274        for k, v in self.items(): 
    275            d__[k.replicate(pdf_dest)] = ( 
    276                v.replicate(pdf_dest) if hasattr(v, "replicate") else v 
    277            ) 
    278        return d__ 
    279 
    280    def clone( 
    281        self, 
    282        pdf_dest: PdfWriterProtocol, 
    283        force_duplicate: bool = False, 
    284        ignore_fields: Optional[Sequence[Union[str, int]]] = (), 
    285    ) -> "DictionaryObject": 
    286        """Clone object into pdf_dest.""" 
    287        try: 
    288            if self.indirect_reference.pdf == pdf_dest and not force_duplicate:  # type: ignore 
    289                return self 
    290        except Exception: 
    291            pass 
    292 
    293        visited: set[tuple[int, int]] = set()  # (idnum, generation) 
    294        d__ = cast( 
    295            "DictionaryObject", 
    296            self._reference_clone(self.__class__(), pdf_dest, force_duplicate), 
    297        ) 
    298        if ignore_fields is None: 
    299            ignore_fields = [] 
    300        if len(d__.keys()) == 0: 
    301            d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 
    302        return d__ 
    303 
    304    def _clone( 
    305        self, 
    306        src: "DictionaryObject", 
    307        pdf_dest: PdfWriterProtocol, 
    308        force_duplicate: bool, 
    309        ignore_fields: Optional[Sequence[Union[str, int]]], 
    310        visited: set[tuple[int, int]],  # (idnum, generation) 
    311    ) -> None: 
    312        """ 
    313        Update the object from src. 
    314 
    315        Args: 
    316            src: "DictionaryObject": 
    317            pdf_dest: 
    318            force_duplicate: 
    319            ignore_fields: 
    320 
    321        """ 
    322        # First we remove the ignore_fields 
    323        # that are for a limited number of levels 
    324        assert ignore_fields is not None 
    325        ignore_fields = list(ignore_fields) 
    326        x = 0 
    327        while x < len(ignore_fields): 
    328            if isinstance(ignore_fields[x], int): 
    329                if cast(int, ignore_fields[x]) <= 0: 
    330                    del ignore_fields[x] 
    331                    del ignore_fields[x] 
    332                    continue 
    333                ignore_fields[x] -= 1  # type:ignore 
    334            x += 1 
    335        #  Check if this is a chain list, we need to loop to prevent recur 
    336        if any( 
    337            field not in ignore_fields 
    338            and field in src 
    339            and isinstance(src.raw_get(field), IndirectObject) 
    340            and isinstance(src[field], DictionaryObject) 
    341            and ( 
    342                src.get("/Type", None) is None 
    343                or cast(DictionaryObject, src[field]).get("/Type", None) is None 
    344                or src.get("/Type", None) 
    345                == cast(DictionaryObject, src[field]).get("/Type", None) 
    346            ) 
    347            for field in ["/Next", "/Prev", "/N", "/V"] 
    348        ): 
    349            ignore_fields = list(ignore_fields) 
    350            for lst in (("/Next", "/Prev"), ("/N", "/V")): 
    351                for k in lst: 
    352                    objs = [] 
    353                    if ( 
    354                        k in src 
    355                        and k not in self 
    356                        and isinstance(src.raw_get(k), IndirectObject) 
    357                        and isinstance(src[k], DictionaryObject) 
    358                        # If need to go further the idea is to check 
    359                        # that the types are the same 
    360                        and ( 
    361                            src.get("/Type", None) is None 
    362                            or cast(DictionaryObject, src[k]).get("/Type", None) is None 
    363                            or src.get("/Type", None) 
    364                            == cast(DictionaryObject, src[k]).get("/Type", None) 
    365                        ) 
    366                    ): 
    367                        cur_obj: Optional[DictionaryObject] = cast( 
    368                            "DictionaryObject", src[k] 
    369                        ) 
    370                        prev_obj: Optional[DictionaryObject] = self 
    371                        while cur_obj is not None: 
    372                            clon = cast( 
    373                                "DictionaryObject", 
    374                                cur_obj._reference_clone( 
    375                                    cur_obj.__class__(), pdf_dest, force_duplicate 
    376                                ), 
    377                            ) 
    378                            # Check to see if we've previously processed our item 
    379                            if clon.indirect_reference is not None: 
    380                                idnum = clon.indirect_reference.idnum 
    381                                generation = clon.indirect_reference.generation 
    382                                if (idnum, generation) in visited: 
    383                                    cur_obj = None 
    384                                    break 
    385                                visited.add((idnum, generation)) 
    386                            objs.append((cur_obj, clon)) 
    387                            assert prev_obj is not None 
    388                            prev_obj[NameObject(k)] = clon.indirect_reference 
    389                            prev_obj = clon 
    390                            try: 
    391                                if cur_obj == src: 
    392                                    cur_obj = None 
    393                                else: 
    394                                    cur_obj = cast("DictionaryObject", cur_obj[k]) 
    395                            except Exception: 
    396                                cur_obj = None 
    397                        for s, c in objs: 
    398                            c._clone( 
    399                                s, pdf_dest, force_duplicate, ignore_fields, visited 
    400                            ) 
    401 
    402        for k, v in src.items(): 
    403            if k not in ignore_fields: 
    404                if isinstance(v, StreamObject): 
    405                    if not hasattr(v, "indirect_reference"): 
    406                        v.indirect_reference = None 
    407                    vv = v.clone(pdf_dest, force_duplicate, ignore_fields) 
    408                    assert vv.indirect_reference is not None 
    409                    self[k.clone(pdf_dest)] = vv.indirect_reference 
    410                elif k not in self: 
    411                    self[NameObject(k)] = ( 
    412                        v.clone(pdf_dest, force_duplicate, ignore_fields) 
    413                        if hasattr(v, "clone") 
    414                        else v 
    415                    ) 
    416 
    417    def hash_bin(self) -> int: 
    418        """ 
    419        Used to detect modified object. 
    420 
    421        Returns: 
    422            Hash considering type and value. 
    423 
    424        """ 
    425        return hash( 
    426            (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items()))) 
    427        ) 
    428 
    429    def raw_get(self, key: Any) -> Any: 
    430        return dict.__getitem__(self, key) 
    431 
    432    def get_inherited(self, key: str, default: Any = None) -> Any: 
    433        """ 
    434        Returns the value of a key or from the parent if not found. 
    435        If not found returns default. 
    436 
    437        Args: 
    438            key: string identifying the field to return 
    439 
    440            default: default value to return 
    441 
    442        Returns: 
    443            Current key or inherited one, otherwise default value. 
    444 
    445        """ 
    446        if key in self: 
    447            return self[key] 
    448        try: 
    449            if "/Parent" not in self: 
    450                return default 
    451            raise KeyError("Not present") 
    452        except KeyError: 
    453            return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited( 
    454                key, default 
    455            ) 
    456 
    457    def __setitem__(self, key: Any, value: Any) -> Any: 
    458        if not isinstance(key, PdfObject): 
    459            raise ValueError("Key must be a PdfObject") 
    460        if not isinstance(value, PdfObject): 
    461            raise ValueError("Value must be a PdfObject") 
    462        return dict.__setitem__(self, key, value) 
    463 
    464    def setdefault(self, key: Any, value: Optional[Any] = None) -> Any: 
    465        if not isinstance(key, PdfObject): 
    466            raise ValueError("Key must be a PdfObject") 
    467        if not isinstance(value, PdfObject): 
    468            raise ValueError("Value must be a PdfObject") 
    469        return dict.setdefault(self, key, value) 
    470 
    471    def __getitem__(self, key: Any) -> PdfObject: 
    472        return dict.__getitem__(self, key).get_object() 
    473 
    474    @property 
    475    def xmp_metadata(self) -> Optional[XmpInformationProtocol]: 
    476        """ 
    477        Retrieve XMP (Extensible Metadata Platform) data relevant to this 
    478        object, if available. 
    479 
    480        See Table 347 — Additional entries in a metadata stream dictionary. 
    481 
    482        Returns: 
    483          Returns a :class:`~pypdf.xmp.XmpInformation` instance 
    484          that can be used to access XMP metadata from the document. Can also 
    485          return None if no metadata was found on the document root. 
    486 
    487        """ 
    488        from ..xmp import XmpInformation  # noqa: PLC0415 
    489 
    490        metadata = self.get("/Metadata", None) 
    491        if is_null_or_none(metadata): 
    492            return None 
    493        assert metadata is not None, "mypy" 
    494        metadata = metadata.get_object() 
    495        return XmpInformation(metadata) 
    496 
    497    def write_to_stream( 
    498        self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 
    499    ) -> None: 
    500        if encryption_key is not None:  # deprecated 
    501            deprecation_no_replacement( 
    502                "the encryption_key parameter of write_to_stream", "5.0.0" 
    503            ) 
    504        stream.write(b"<<\n") 
    505        for key, value in self.items(): 
    506            if len(key) > 2 and key[1] == "%" and key[-1] == "%": 
    507                continue 
    508            key.write_to_stream(stream, encryption_key) 
    509            stream.write(b" ") 
    510            value.write_to_stream(stream) 
    511            stream.write(b"\n") 
    512        stream.write(b">>") 
    513 
    514    @staticmethod 
    515    def read_from_stream( 
    516        stream: StreamType, 
    517        pdf: Optional[PdfReaderProtocol], 
    518        forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 
    519    ) -> "DictionaryObject": 
    520        def get_next_obj_pos( 
    521            p: int, p1: int, rem_gens: list[int], pdf: PdfReaderProtocol 
    522        ) -> int: 
    523            out = p1 
    524            for gen in rem_gens: 
    525                loc = pdf.xref[gen] 
    526                try: 
    527                    values = [x for x in loc.values() if p < x <= p1] 
    528                    if values: 
    529                        out = min(out, *values) 
    530                except ValueError: 
    531                    pass 
    532            return out 
    533 
    534        def read_unsized_from_stream( 
    535            stream: StreamType, pdf: PdfReaderProtocol 
    536        ) -> bytes: 
    537            # we are just pointing at beginning of the stream 
    538            eon = get_next_obj_pos(stream.tell(), 2**32, list(pdf.xref), pdf) - 1 
    539            curr = stream.tell() 
    540            rw = stream.read(eon - stream.tell()) 
    541            p = rw.find(b"endstream") 
    542            if p < 0: 
    543                raise PdfReadError( 
    544                    f"Unable to find 'endstream' marker for obj starting at {curr}." 
    545                ) 
    546            stream.seek(curr + p + 9) 
    547            return rw[: p - 1] 
    548 
    549        tmp = stream.read(2) 
    550        if tmp != b"<<": 
    551            raise PdfReadError( 
    552                f"Dictionary read error at byte {hex(stream.tell())}: " 
    553                "stream must begin with '<<'" 
    554            ) 
    555        data: dict[Any, Any] = {} 
    556        while True: 
    557            tok = read_non_whitespace(stream) 
    558            if tok == b"\x00": 
    559                continue 
    560            if tok == b"%": 
    561                stream.seek(-1, 1) 
    562                skip_over_comment(stream) 
    563                continue 
    564            if not tok: 
    565                raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) 
    566 
    567            if tok == b">": 
    568                stream.read(1) 
    569                break 
    570            stream.seek(-1, 1) 
    571            try: 
    572                try: 
    573                    key = read_object(stream, pdf) 
    574                    if isinstance(key, NullObject): 
    575                        break 
    576                    if not isinstance(key, NameObject): 
    577                        raise PdfReadError( 
    578                            f"Expecting a NameObject for key but found {key!r}" 
    579                        ) 
    580                except PdfReadError as exc: 
    581                    if pdf is not None and pdf.strict: 
    582                        raise 
    583                    logger_warning(exc.__repr__(), __name__) 
    584                    continue 
    585                tok = read_non_whitespace(stream) 
    586                stream.seek(-1, 1) 
    587                value = read_object(stream, pdf, forced_encoding) 
    588            except Exception as exc: 
    589                if pdf is not None and pdf.strict: 
    590                    raise PdfReadError(exc.__repr__()) 
    591                logger_warning(exc.__repr__(), __name__) 
    592                retval = DictionaryObject() 
    593                retval.update(data) 
    594                return retval  # return partial data 
    595 
    596            if not data.get(key): 
    597                data[key] = value 
    598            else: 
    599                # multiple definitions of key not permitted 
    600                msg = ( 
    601                    f"Multiple definitions in dictionary at byte " 
    602                    f"{hex(stream.tell())} for key {key}" 
    603                ) 
    604                if pdf is not None and pdf.strict: 
    605                    raise PdfReadError(msg) 
    606                logger_warning(msg, __name__) 
    607 
    608        pos = stream.tell() 
    609        s = read_non_whitespace(stream) 
    610        if s == b"s" and stream.read(5) == b"tream": 
    611            eol = stream.read(1) 
    612            # Occasional PDF file output has spaces after 'stream' keyword but before EOL. 
    613            # patch provided by Danial Sandler 
    614            while eol == b" ": 
    615                eol = stream.read(1) 
    616            if eol not in (b"\n", b"\r"): 
    617                raise PdfStreamError("Stream data must be followed by a newline") 
    618            if eol == b"\r" and stream.read(1) != b"\n": 
    619                stream.seek(-1, 1) 
    620            # this is a stream object, not a dictionary 
    621            if SA.LENGTH not in data: 
    622                if pdf is not None and pdf.strict: 
    623                    raise PdfStreamError("Stream length not defined") 
    624                logger_warning( 
    625                    f"Stream length not defined @pos={stream.tell()}", __name__ 
    626                ) 
    627                data[NameObject(SA.LENGTH)] = NumberObject(-1) 
    628            length = data[SA.LENGTH] 
    629            if isinstance(length, IndirectObject): 
    630                t = stream.tell() 
    631                assert pdf is not None, "mypy" 
    632                length = pdf.get_object(length) 
    633                stream.seek(t, 0) 
    634            if length is None:  # if the PDF is damaged 
    635                length = -1 
    636            pstart = stream.tell() 
    637            if length >= 0: 
    638                data["__streamdata__"] = stream.read(length) 
    639            else: 
    640                data["__streamdata__"] = read_until_regex( 
    641                    stream, re.compile(b"endstream") 
    642                ) 
    643            e = read_non_whitespace(stream) 
    644            ndstream = stream.read(8) 
    645            if (e + ndstream) != b"endstream": 
    646                # the odd PDF file has a length that is too long, so 
    647                # we need to read backwards to find the "endstream" ending. 
    648                # ReportLab (unknown version) generates files with this bug, 
    649                # and Python users into PDF files tend to be our audience. 
    650                # we need to do this to correct the streamdata and chop off 
    651                # an extra character. 
    652                pos = stream.tell() 
    653                stream.seek(-10, 1) 
    654                end = stream.read(9) 
    655                if end == b"endstream": 
    656                    # we found it by looking back one character further. 
    657                    data["__streamdata__"] = data["__streamdata__"][:-1] 
    658                elif pdf is not None and not pdf.strict: 
    659                    stream.seek(pstart, 0) 
    660                    data["__streamdata__"] = read_unsized_from_stream(stream, pdf) 
    661                    pos = stream.tell() 
    662                else: 
    663                    stream.seek(pos, 0) 
    664                    raise PdfReadError( 
    665                        "Unable to find 'endstream' marker after stream at byte " 
    666                        f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')." 
    667                    ) 
    668        else: 
    669            stream.seek(pos, 0) 
    670        if "__streamdata__" in data: 
    671            return StreamObject.initialize_from_dictionary(data) 
    672        retval = DictionaryObject() 
    673        retval.update(data) 
    674        return retval 
    675 
    676 
    677class TreeObject(DictionaryObject): 
    678    def __init__(self, dct: Optional[DictionaryObject] = None) -> None: 
    679        DictionaryObject.__init__(self) 
    680        if dct: 
    681            self.update(dct) 
    682 
    683    def has_children(self) -> bool: 
    684        return "/First" in self 
    685 
    686    def __iter__(self) -> Any: 
    687        return self.children() 
    688 
    689    def children(self) -> Iterable[Any]: 
    690        if not self.has_children(): 
    691            return 
    692 
    693        child_ref = self[NameObject("/First")] 
    694        child = child_ref.get_object() 
    695        while True: 
    696            yield child 
    697            if child == self[NameObject("/Last")]: 
    698                return 
    699            child_ref = child.get(NameObject("/Next"))  # type: ignore 
    700            if is_null_or_none(child_ref): 
    701                return 
    702            child = child_ref.get_object() 
    703 
    704    def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None: 
    705        self.insert_child(child, None, pdf) 
    706 
    707    def inc_parent_counter_default( 
    708        self, parent: Union[None, IndirectObject, "TreeObject"], n: int 
    709    ) -> None: 
    710        if is_null_or_none(parent): 
    711            return 
    712        assert parent is not None, "mypy" 
    713        parent = cast("TreeObject", parent.get_object()) 
    714        if "/Count" in parent: 
    715            parent[NameObject("/Count")] = NumberObject( 
    716                max(0, cast(int, parent[NameObject("/Count")]) + n) 
    717            ) 
    718            self.inc_parent_counter_default(parent.get("/Parent", None), n) 
    719 
    720    def inc_parent_counter_outline( 
    721        self, parent: Union[None, IndirectObject, "TreeObject"], n: int 
    722    ) -> None: 
    723        if is_null_or_none(parent): 
    724            return 
    725        assert parent is not None, "mypy" 
    726        parent = cast("TreeObject", parent.get_object()) 
    727        #  BooleanObject requires comparison with == not is 
    728        opn = parent.get("/%is_open%", True) == True  # noqa: E712 
    729        c = cast(int, parent.get("/Count", 0)) 
    730        if c < 0: 
    731            c = abs(c) 
    732        parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1)) 
    733        if not opn: 
    734            return 
    735        self.inc_parent_counter_outline(parent.get("/Parent", None), n) 
    736 
    737    def insert_child( 
    738        self, 
    739        child: Any, 
    740        before: Any, 
    741        pdf: PdfWriterProtocol, 
    742        inc_parent_counter: Optional[Callable[..., Any]] = None, 
    743    ) -> IndirectObject: 
    744        if inc_parent_counter is None: 
    745            inc_parent_counter = self.inc_parent_counter_default 
    746        child_obj = child.get_object() 
    747        child = child.indirect_reference  # get_reference(child_obj) 
    748 
    749        prev: Optional[DictionaryObject] 
    750        if "/First" not in self:  # no child yet 
    751            self[NameObject("/First")] = child 
    752            self[NameObject("/Count")] = NumberObject(0) 
    753            self[NameObject("/Last")] = child 
    754            child_obj[NameObject("/Parent")] = self.indirect_reference 
    755            inc_parent_counter(self, child_obj.get("/Count", 1)) 
    756            if "/Next" in child_obj: 
    757                del child_obj["/Next"] 
    758            if "/Prev" in child_obj: 
    759                del child_obj["/Prev"] 
    760            return child 
    761        prev = cast("DictionaryObject", self["/Last"]) 
    762 
    763        while prev.indirect_reference != before: 
    764            if "/Next" in prev: 
    765                prev = cast("TreeObject", prev["/Next"]) 
    766            else:  # append at the end 
    767                prev[NameObject("/Next")] = cast("TreeObject", child) 
    768                child_obj[NameObject("/Prev")] = prev.indirect_reference 
    769                child_obj[NameObject("/Parent")] = self.indirect_reference 
    770                if "/Next" in child_obj: 
    771                    del child_obj["/Next"] 
    772                self[NameObject("/Last")] = child 
    773                inc_parent_counter(self, child_obj.get("/Count", 1)) 
    774                return child 
    775        try:  # insert as first or in the middle 
    776            assert isinstance(prev["/Prev"], DictionaryObject) 
    777            prev["/Prev"][NameObject("/Next")] = child 
    778            child_obj[NameObject("/Prev")] = prev["/Prev"] 
    779        except Exception:  # it means we are inserting in first position 
    780            del child_obj["/Next"] 
    781        child_obj[NameObject("/Next")] = prev 
    782        prev[NameObject("/Prev")] = child 
    783        child_obj[NameObject("/Parent")] = self.indirect_reference 
    784        inc_parent_counter(self, child_obj.get("/Count", 1)) 
    785        return child 
    786 
    787    def _remove_node_from_tree( 
    788        self, prev: Any, prev_ref: Any, cur: Any, last: Any 
    789    ) -> None: 
    790        """ 
    791        Adjust the pointers of the linked list and tree node count. 
    792 
    793        Args: 
    794            prev: 
    795            prev_ref: 
    796            cur: 
    797            last: 
    798 
    799        """ 
    800        next_ref = cur.get(NameObject("/Next"), None) 
    801        if prev is None: 
    802            if next_ref: 
    803                # Removing first tree node 
    804                next_obj = next_ref.get_object() 
    805                del next_obj[NameObject("/Prev")] 
    806                self[NameObject("/First")] = next_ref 
    807                self[NameObject("/Count")] = NumberObject( 
    808                    self[NameObject("/Count")] - 1  # type: ignore 
    809                ) 
    810 
    811            else: 
    812                # Removing only tree node 
    813                self[NameObject("/Count")] = NumberObject(0) 
    814                del self[NameObject("/First")] 
    815                if NameObject("/Last") in self: 
    816                    del self[NameObject("/Last")] 
    817        else: 
    818            if next_ref: 
    819                # Removing middle tree node 
    820                next_obj = next_ref.get_object() 
    821                next_obj[NameObject("/Prev")] = prev_ref 
    822                prev[NameObject("/Next")] = next_ref 
    823            else: 
    824                # Removing last tree node 
    825                assert cur == last 
    826                del prev[NameObject("/Next")] 
    827                self[NameObject("/Last")] = prev_ref 
    828            self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1)  # type: ignore 
    829 
    830    def remove_child(self, child: Any) -> None: 
    831        child_obj = child.get_object() 
    832        child = child_obj.indirect_reference 
    833 
    834        if NameObject("/Parent") not in child_obj: 
    835            raise ValueError("Removed child does not appear to be a tree item") 
    836        if child_obj[NameObject("/Parent")] != self: 
    837            raise ValueError("Removed child is not a member of this tree") 
    838 
    839        found = False 
    840        prev_ref = None 
    841        prev = None 
    842        cur_ref: Optional[Any] = self[NameObject("/First")] 
    843        cur: Optional[dict[str, Any]] = cur_ref.get_object()  # type: ignore 
    844        last_ref = self[NameObject("/Last")] 
    845        last = last_ref.get_object() 
    846        while cur is not None: 
    847            if cur == child_obj: 
    848                self._remove_node_from_tree(prev, prev_ref, cur, last) 
    849                found = True 
    850                break 
    851 
    852            # Go to the next node 
    853            prev_ref = cur_ref 
    854            prev = cur 
    855            if NameObject("/Next") in cur: 
    856                cur_ref = cur[NameObject("/Next")] 
    857                cur = cur_ref.get_object() 
    858            else: 
    859                cur_ref = None 
    860                cur = None 
    861 
    862        if not found: 
    863            raise ValueError("Removal couldn't find item in tree") 
    864 
    865        _reset_node_tree_relationship(child_obj) 
    866 
    867    def remove_from_tree(self) -> None: 
    868        """Remove the object from the tree it is in.""" 
    869        if NameObject("/Parent") not in self: 
    870            raise ValueError("Removed child does not appear to be a tree item") 
    871        cast("TreeObject", self["/Parent"]).remove_child(self) 
    872 
    873    def empty_tree(self) -> None: 
    874        for child in self: 
    875            child_obj = child.get_object() 
    876            _reset_node_tree_relationship(child_obj) 
    877 
    878        if NameObject("/Count") in self: 
    879            del self[NameObject("/Count")] 
    880        if NameObject("/First") in self: 
    881            del self[NameObject("/First")] 
    882        if NameObject("/Last") in self: 
    883            del self[NameObject("/Last")] 
    884 
    885 
    886def _reset_node_tree_relationship(child_obj: Any) -> None: 
    887    """ 
    888    Call this after a node has been removed from a tree. 
    889 
    890    This resets the nodes attributes in respect to that tree. 
    891 
    892    Args: 
    893        child_obj: 
    894 
    895    """ 
    896    del child_obj[NameObject("/Parent")] 
    897    if NameObject("/Next") in child_obj: 
    898        del child_obj[NameObject("/Next")] 
    899    if NameObject("/Prev") in child_obj: 
    900        del child_obj[NameObject("/Prev")] 
    901 
    902 
    903class StreamObject(DictionaryObject): 
    904    def __init__(self) -> None: 
    905        self._data: bytes = b"" 
    906        self.decoded_self: Optional[DecodedStreamObject] = None 
    907 
    908    def replicate( 
    909        self, 
    910        pdf_dest: PdfWriterProtocol, 
    911    ) -> "StreamObject": 
    912        d__ = cast( 
    913            "StreamObject", 
    914            self._reference_clone(self.__class__(), pdf_dest, False), 
    915        ) 
    916        d__._data = self._data 
    917        try: 
    918            decoded_self = self.decoded_self 
    919            if decoded_self is None: 
    920                self.decoded_self = None 
    921            else: 
    922                self.decoded_self = cast( 
    923                    "DecodedStreamObject", decoded_self.replicate(pdf_dest) 
    924                ) 
    925        except Exception: 
    926            pass 
    927        for k, v in self.items(): 
    928            d__[k.replicate(pdf_dest)] = ( 
    929                v.replicate(pdf_dest) if hasattr(v, "replicate") else v 
    930            ) 
    931        return d__ 
    932 
    933    def _clone( 
    934        self, 
    935        src: DictionaryObject, 
    936        pdf_dest: PdfWriterProtocol, 
    937        force_duplicate: bool, 
    938        ignore_fields: Optional[Sequence[Union[str, int]]], 
    939        visited: set[tuple[int, int]], 
    940    ) -> None: 
    941        """ 
    942        Update the object from src. 
    943 
    944        Args: 
    945            src: 
    946            pdf_dest: 
    947            force_duplicate: 
    948            ignore_fields: 
    949 
    950        """ 
    951        self._data = cast("StreamObject", src)._data 
    952        try: 
    953            decoded_self = cast("StreamObject", src).decoded_self 
    954            if decoded_self is None: 
    955                self.decoded_self = None 
    956            else: 
    957                self.decoded_self = cast( 
    958                    "DecodedStreamObject", 
    959                    decoded_self.clone(pdf_dest, force_duplicate, ignore_fields), 
    960                ) 
    961        except Exception: 
    962            pass 
    963        super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 
    964 
    965    def hash_bin(self) -> int: 
    966        """ 
    967        Used to detect modified object. 
    968 
    969        Returns: 
    970            Hash considering type and value. 
    971 
    972        """ 
    973        # Use _data to prevent errors on non-decoded streams. 
    974        return hash((super().hash_bin(), self._data)) 
    975 
    976    def get_data(self) -> bytes: 
    977        return self._data 
    978 
    979    def set_data(self, data: bytes) -> None: 
    980        self._data = data 
    981 
    982    def hash_value_data(self) -> bytes: 
    983        data = super().hash_value_data() 
    984        data += self.get_data() 
    985        return data 
    986 
    987    def write_to_stream( 
    988        self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 
    989    ) -> None: 
    990        if encryption_key is not None:  # deprecated 
    991            deprecation_no_replacement( 
    992                "the encryption_key parameter of write_to_stream", "5.0.0" 
    993            ) 
    994        self[NameObject(SA.LENGTH)] = NumberObject(len(self._data)) 
    995        DictionaryObject.write_to_stream(self, stream) 
    996        del self[SA.LENGTH] 
    997        stream.write(b"\nstream\n") 
    998        stream.write(self._data) 
    999        stream.write(b"\nendstream") 
    1000 
    1001    @staticmethod 
    1002    def initialize_from_dictionary( 
    1003        data: dict[str, Any] 
    1004    ) -> Union["EncodedStreamObject", "DecodedStreamObject"]: 
    1005        retval: Union[EncodedStreamObject, DecodedStreamObject] 
    1006        if SA.FILTER in data: 
    1007            retval = EncodedStreamObject() 
    1008        else: 
    1009            retval = DecodedStreamObject() 
    1010        retval._data = data["__streamdata__"] 
    1011        del data["__streamdata__"] 
    1012        if SA.LENGTH in data: 
    1013            del data[SA.LENGTH] 
    1014        retval.update(data) 
    1015        return retval 
    1016 
    1017    def flate_encode(self, level: int = -1) -> "EncodedStreamObject": 
    1018        from ..filters import FlateDecode  # noqa: PLC0415 
    1019 
    1020        if SA.FILTER in self: 
    1021            f = self[SA.FILTER] 
    1022            if isinstance(f, ArrayObject): 
    1023                f = ArrayObject([NameObject(FT.FLATE_DECODE), *f]) 
    1024                try: 
    1025                    params = ArrayObject( 
    1026                        [NullObject(), *self.get(SA.DECODE_PARMS, ArrayObject())] 
    1027                    ) 
    1028                except TypeError: 
    1029                    # case of error where the * operator is not working (not an array 
    1030                    params = ArrayObject( 
    1031                        [NullObject(), self.get(SA.DECODE_PARMS, ArrayObject())] 
    1032                    ) 
    1033            else: 
    1034                f = ArrayObject([NameObject(FT.FLATE_DECODE), f]) 
    1035                params = ArrayObject( 
    1036                    [NullObject(), self.get(SA.DECODE_PARMS, NullObject())] 
    1037                ) 
    1038        else: 
    1039            f = NameObject(FT.FLATE_DECODE) 
    1040            params = None 
    1041        retval = EncodedStreamObject() 
    1042        retval.update(self) 
    1043        retval[NameObject(SA.FILTER)] = f 
    1044        if params is not None: 
    1045            retval[NameObject(SA.DECODE_PARMS)] = params 
    1046        retval._data = FlateDecode.encode(self._data, level) 
    1047        return retval 
    1048 
    1049    def decode_as_image(self) -> Any: 
    1050        """ 
    1051        Try to decode the stream object as an image 
    1052 
    1053        Returns: 
    1054            a PIL image if proper decoding has been found 
    1055        Raises: 
    1056            Exception: Errors during decoding will be reported. 
    1057                It is recommended to catch exceptions to prevent 
    1058                stops in your program. 
    1059 
    1060        """ 
    1061        from ..filters import _xobj_to_image  # noqa: PLC0415 
    1062 
    1063        if self.get("/Subtype", "") != "/Image": 
    1064            try: 
    1065                msg = f"{self.indirect_reference} does not seem to be an Image"  # pragma: no cover 
    1066            except AttributeError: 
    1067                msg = f"{self.__repr__()} object does not seem to be an Image"  # pragma: no cover 
    1068            logger_warning(msg, __name__) 
    1069        extension, _, img = _xobj_to_image(self) 
    1070        if extension is None: 
    1071            return None  # pragma: no cover 
    1072        return img 
    1073 
    1074 
    1075class DecodedStreamObject(StreamObject): 
    1076    pass 
    1077 
    1078 
    1079class EncodedStreamObject(StreamObject): 
    1080    def __init__(self) -> None: 
    1081        self.decoded_self: Optional[DecodedStreamObject] = None 
    1082 
    1083    # This overrides the parent method 
    1084    def get_data(self) -> bytes: 
    1085        from ..filters import decode_stream_data  # noqa: PLC0415 
    1086 
    1087        if self.decoded_self is not None: 
    1088            # Cached version of decoded object 
    1089            return self.decoded_self.get_data() 
    1090 
    1091        # Create decoded object 
    1092        decoded = DecodedStreamObject() 
    1093        decoded.set_data(decode_stream_data(self)) 
    1094        for key, value in self.items(): 
    1095            if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS): 
    1096                decoded[key] = value 
    1097        self.decoded_self = decoded 
    1098        return decoded.get_data() 
    1099 
    1100    # This overrides the parent method: 
    1101    def set_data(self, data: bytes) -> None: 
    1102        from ..filters import FlateDecode  # noqa: PLC0415 
    1103 
    1104        if self.get(SA.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]): 
    1105            if not isinstance(data, bytes): 
    1106                raise TypeError("Data must be bytes") 
    1107            if self.decoded_self is None: 
    1108                self.get_data()  # to create self.decoded_self 
    1109            assert self.decoded_self is not None, "mypy" 
    1110            self.decoded_self.set_data(data) 
    1111            super().set_data(FlateDecode.encode(data)) 
    1112        else: 
    1113            raise PdfReadError( 
    1114                "Streams encoded with a filter different from FlateDecode are not supported" 
    1115            ) 
    1116 
    1117 
    1118class ContentStream(DecodedStreamObject): 
    1119    """ 
    1120    In order to be fast, this data structure can contain either: 
    1121 
    1122    * raw data in ._data 
    1123    * parsed stream operations in ._operations. 
    1124 
    1125    At any time, ContentStream object can either have both of those fields defined, 
    1126    or one field defined and the other set to None. 
    1127 
    1128    These fields are "rebuilt" lazily, when accessed: 
    1129 
    1130    * when .get_data() is called, if ._data is None, it is rebuilt from ._operations. 
    1131    * when .operations is called, if ._operations is None, it is rebuilt from ._data. 
    1132 
    1133    Conversely, these fields can be invalidated: 
    1134 
    1135    * when .set_data() is called, ._operations is set to None. 
    1136    * when .operations is set, ._data is set to None. 
    1137    """ 
    1138 
    1139    def __init__( 
    1140        self, 
    1141        stream: Any, 
    1142        pdf: Any, 
    1143        forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 
    1144    ) -> None: 
    1145        self.pdf = pdf 
    1146        self._operations: list[tuple[Any, bytes]] = [] 
    1147 
    1148        # stream may be a StreamObject or an ArrayObject containing 
    1149        # StreamObjects to be concatenated together. 
    1150        if stream is None: 
    1151            super().set_data(b"") 
    1152        else: 
    1153            stream = stream.get_object() 
    1154            if isinstance(stream, ArrayObject): 
    1155                data = b"" 
    1156                for s in stream: 
    1157                    s_resolved = s.get_object() 
    1158                    if isinstance(s_resolved, NullObject): 
    1159                        continue 
    1160                    if not isinstance(s_resolved, StreamObject): 
    1161                        # No need to emit an exception here for now - the PDF structure 
    1162                        # seems to already be broken beforehand in these cases. 
    1163                        logger_warning( 
    1164                            f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.", 
    1165                            __name__ 
    1166                        ) 
    1167                    else: 
    1168                        data += s_resolved.get_data() 
    1169                    if len(data) == 0 or data[-1] != b"\n": 
    1170                        data += b"\n" 
    1171                super().set_data(bytes(data)) 
    1172            else: 
    1173                stream_data = stream.get_data() 
    1174                assert stream_data is not None 
    1175                super().set_data(stream_data) 
    1176        self.forced_encoding = forced_encoding 
    1177 
    1178    def replicate( 
    1179        self, 
    1180        pdf_dest: PdfWriterProtocol, 
    1181    ) -> "ContentStream": 
    1182        d__ = cast( 
    1183            "ContentStream", 
    1184            self._reference_clone(self.__class__(None, None), pdf_dest, False), 
    1185        ) 
    1186        d__._data = self._data 
    1187        try: 
    1188            decoded_self = self.decoded_self 
    1189            if decoded_self is None: 
    1190                self.decoded_self = None 
    1191            else: 
    1192                self.decoded_self = cast( 
    1193                    "DecodedStreamObject", decoded_self.replicate(pdf_dest) 
    1194                ) 
    1195        except Exception: 
    1196            pass 
    1197        for k, v in self.items(): 
    1198            d__[k.replicate(pdf_dest)] = ( 
    1199                v.replicate(pdf_dest) if hasattr(v, "replicate") else v 
    1200            ) 
    1201        return d__ 
    1202        d__.set_data(self._data) 
    1203        d__.pdf = pdf_dest 
    1204        d__._operations = list(self._operations) 
    1205        d__.forced_encoding = self.forced_encoding 
    1206        return d__ 
    1207 
    1208    def clone( 
    1209        self, 
    1210        pdf_dest: Any, 
    1211        force_duplicate: bool = False, 
    1212        ignore_fields: Optional[Sequence[Union[str, int]]] = (), 
    1213    ) -> "ContentStream": 
    1214        """ 
    1215        Clone object into pdf_dest. 
    1216 
    1217        Args: 
    1218            pdf_dest: 
    1219            force_duplicate: 
    1220            ignore_fields: 
    1221 
    1222        Returns: 
    1223            The cloned ContentStream 
    1224 
    1225        """ 
    1226        try: 
    1227            if self.indirect_reference.pdf == pdf_dest and not force_duplicate:  # type: ignore 
    1228                return self 
    1229        except Exception: 
    1230            pass 
    1231 
    1232        visited: set[tuple[int, int]] = set() 
    1233        d__ = cast( 
    1234            "ContentStream", 
    1235            self._reference_clone( 
    1236                self.__class__(None, None), pdf_dest, force_duplicate 
    1237            ), 
    1238        ) 
    1239        if ignore_fields is None: 
    1240            ignore_fields = [] 
    1241        d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) 
    1242        return d__ 
    1243 
    1244    def _clone( 
    1245        self, 
    1246        src: DictionaryObject, 
    1247        pdf_dest: PdfWriterProtocol, 
    1248        force_duplicate: bool, 
    1249        ignore_fields: Optional[Sequence[Union[str, int]]], 
    1250        visited: set[tuple[int, int]], 
    1251    ) -> None: 
    1252        """ 
    1253        Update the object from src. 
    1254 
    1255        Args: 
    1256            src: 
    1257            pdf_dest: 
    1258            force_duplicate: 
    1259            ignore_fields: 
    1260 
    1261        """ 
    1262        src_cs = cast("ContentStream", src) 
    1263        super().set_data(src_cs._data) 
    1264        self.pdf = pdf_dest 
    1265        self._operations = list(src_cs._operations) 
    1266        self.forced_encoding = src_cs.forced_encoding 
    1267        # no need to call DictionaryObjection or anything 
    1268        # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) 
    1269 
    1270    def _parse_content_stream(self, stream: StreamType) -> None: 
    1271        # 7.8.2 Content Streams 
    1272        stream.seek(0, 0) 
    1273        operands: list[Union[int, str, PdfObject]] = [] 
    1274        while True: 
    1275            peek = read_non_whitespace(stream) 
    1276            if peek in (b"", 0): 
    1277                break 
    1278            stream.seek(-1, 1) 
    1279            if peek.isalpha() or peek in (b"'", b'"'): 
    1280                operator = read_until_regex(stream, NameObject.delimiter_pattern) 
    1281                if operator == b"BI": 
    1282                    # begin inline image - a completely different parsing 
    1283                    # mechanism is required, of course... thanks buddy... 
    1284                    assert operands == [] 
    1285                    ii = self._read_inline_image(stream) 
    1286                    self._operations.append((ii, b"INLINE IMAGE")) 
    1287                else: 
    1288                    self._operations.append((operands, operator)) 
    1289                    operands = [] 
    1290            elif peek == b"%": 
    1291                # If we encounter a comment in the content stream, we have to 
    1292                # handle it here. Typically, read_object will handle 
    1293                # encountering a comment -- but read_object assumes that 
    1294                # following the comment must be the object we're trying to 
    1295                # read. In this case, it could be an operator instead. 
    1296                while peek not in (b"\r", b"\n", b""): 
    1297                    peek = stream.read(1) 
    1298            else: 
    1299                operands.append(read_object(stream, None, self.forced_encoding)) 
    1300 
    1301    def _read_inline_image(self, stream: StreamType) -> dict[str, Any]: 
    1302        # begin reading just after the "BI" - begin image 
    1303        # first read the dictionary of settings. 
    1304        settings = DictionaryObject() 
    1305        while True: 
    1306            tok = read_non_whitespace(stream) 
    1307            stream.seek(-1, 1) 
    1308            if tok == b"I": 
    1309                # "ID" - begin of image data 
    1310                break 
    1311            key = read_object(stream, self.pdf) 
    1312            tok = read_non_whitespace(stream) 
    1313            stream.seek(-1, 1) 
    1314            value = read_object(stream, self.pdf) 
    1315            settings[key] = value 
    1316        # left at beginning of ID 
    1317        tmp = stream.read(3) 
    1318        assert tmp[:2] == b"ID" 
    1319        filtr = settings.get("/F", settings.get("/Filter", "not set")) 
    1320        savpos = stream.tell() 
    1321        if isinstance(filtr, list): 
    1322            filtr = filtr[0]  # used forencoding 
    1323        if "AHx" in filtr or "ASCIIHexDecode" in filtr: 
    1324            data = extract_inline_AHx(stream) 
    1325        elif "A85" in filtr or "ASCII85Decode" in filtr: 
    1326            data = extract_inline_A85(stream) 
    1327        elif "RL" in filtr or "RunLengthDecode" in filtr: 
    1328            data = extract_inline_RL(stream) 
    1329        elif "DCT" in filtr or "DCTDecode" in filtr: 
    1330            data = extract_inline_DCT(stream) 
    1331        elif filtr == "not set": 
    1332            cs = settings.get("/CS", "") 
    1333            if isinstance(cs, list): 
    1334                cs = cs[0] 
    1335            if "RGB" in cs: 
    1336                lcs = 3 
    1337            elif "CMYK" in cs: 
    1338                lcs = 4 
    1339            else: 
    1340                bits = settings.get( 
    1341                    "/BPC", 
    1342                    8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1, 
    1343                ) 
    1344                if bits > 0: 
    1345                    lcs = bits / 8.0 
    1346                else: 
    1347                    data = extract_inline_default(stream) 
    1348                    lcs = -1 
    1349            if lcs > 0: 
    1350                data = stream.read( 
    1351                    ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"]) 
    1352                ) 
    1353            # Move to the `EI` if possible. 
    1354            ei = read_non_whitespace(stream) 
    1355            stream.seek(-1, 1) 
    1356        else: 
    1357            data = extract_inline_default(stream) 
    1358 
    1359        ei = stream.read(3) 
    1360        stream.seek(-1, 1) 
    1361        if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: 
    1362            # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above. 
    1363            stream.seek(savpos, 0) 
    1364            data = extract_inline_default(stream) 
    1365            ei = stream.read(3) 
    1366            stream.seek(-1, 1) 
    1367            if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES:  # pragma: no cover 
    1368                # Check the same condition again. This should never fail as 
    1369                # edge cases are covered by `extract_inline_default` above, 
    1370                # but check this ot make sure that we are behind the `EI` afterwards. 
    1371                raise PdfStreamError( 
    1372                    f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}" 
    1373                ) 
    1374        return {"settings": settings, "data": data} 
    1375 
    1376    # This overrides the parent method 
    1377    def get_data(self) -> bytes: 
    1378        if not self._data: 
    1379            new_data = BytesIO() 
    1380            for operands, operator in self._operations: 
    1381                if operator == b"INLINE IMAGE": 
    1382                    new_data.write(b"BI") 
    1383                    dict_text = BytesIO() 
    1384                    operands["settings"].write_to_stream(dict_text) 
    1385                    new_data.write(dict_text.getvalue()[2:-2]) 
    1386                    new_data.write(b"ID ") 
    1387                    new_data.write(operands["data"]) 
    1388                    new_data.write(b"EI") 
    1389                else: 
    1390                    for op in operands: 
    1391                        op.write_to_stream(new_data) 
    1392                        new_data.write(b" ") 
    1393                    new_data.write(operator) 
    1394                new_data.write(b"\n") 
    1395            self._data = new_data.getvalue() 
    1396        return self._data 
    1397 
    1398    # This overrides the parent method 
    1399    def set_data(self, data: bytes) -> None: 
    1400        super().set_data(data) 
    1401        self._operations = [] 
    1402 
    1403    @property 
    1404    def operations(self) -> list[tuple[Any, bytes]]: 
    1405        if not self._operations and self._data: 
    1406            self._parse_content_stream(BytesIO(self._data)) 
    1407            self._data = b"" 
    1408        return self._operations 
    1409 
    1410    @operations.setter 
    1411    def operations(self, operations: list[tuple[Any, bytes]]) -> None: 
    1412        self._operations = operations 
    1413        self._data = b"" 
    1414 
    1415    def isolate_graphics_state(self) -> None: 
    1416        if self._operations: 
    1417            self._operations.insert(0, ([], b"q")) 
    1418            self._operations.append(([], b"Q")) 
    1419        elif self._data: 
    1420            self._data = b"q\n" + self._data + b"\nQ\n" 
    1421 
    1422    # This overrides the parent method 
    1423    def write_to_stream( 
    1424        self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 
    1425    ) -> None: 
    1426        if not self._data and self._operations: 
    1427            self.get_data()  # this ensures ._data is rebuilt 
    1428        super().write_to_stream(stream, encryption_key) 
    1429 
    1430 
    1431def read_object( 
    1432    stream: StreamType, 
    1433    pdf: Optional[PdfReaderProtocol], 
    1434    forced_encoding: Union[None, str, list[str], dict[int, str]] = None, 
    1435) -> Union[PdfObject, int, str, ContentStream]: 
    1436    tok = stream.read(1) 
    1437    stream.seek(-1, 1)  # reset to start 
    1438    if tok == b"/": 
    1439        return NameObject.read_from_stream(stream, pdf) 
    1440    if tok == b"<": 
    1441        # hexadecimal string OR dictionary 
    1442        peek = stream.read(2) 
    1443        stream.seek(-2, 1)  # reset to start 
    1444        if peek == b"<<": 
    1445            return DictionaryObject.read_from_stream(stream, pdf, forced_encoding) 
    1446        return read_hex_string_from_stream(stream, forced_encoding) 
    1447    if tok == b"[": 
    1448        return ArrayObject.read_from_stream(stream, pdf, forced_encoding) 
    1449    if tok in (b"t", b"f"): 
    1450        return BooleanObject.read_from_stream(stream) 
    1451    if tok == b"(": 
    1452        return read_string_from_stream(stream, forced_encoding) 
    1453    if tok == b"e" and stream.read(6) == b"endobj": 
    1454        return NullObject() 
    1455    if tok == b"n": 
    1456        return NullObject.read_from_stream(stream) 
    1457    if tok == b"%": 
    1458        # comment 
    1459        skip_over_comment(stream) 
    1460        tok = read_non_whitespace(stream) 
    1461        stream.seek(-1, 1) 
    1462        return read_object(stream, pdf, forced_encoding) 
    1463    if tok in b"0123456789+-.": 
    1464        # number object OR indirect reference 
    1465        peek = stream.read(20) 
    1466        stream.seek(-len(peek), 1)  # reset to start 
    1467        if IndirectPattern.match(peek) is not None: 
    1468            assert pdf is not None, "mypy" 
    1469            return IndirectObject.read_from_stream(stream, pdf) 
    1470        return NumberObject.read_from_stream(stream) 
    1471    pos = stream.tell() 
    1472    stream.seek(-20, 1) 
    1473    stream_extract = stream.read(80) 
    1474    stream.seek(pos) 
    1475    read_until_whitespace(stream) 
    1476    raise PdfReadError( 
    1477        f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}" 
    1478    ) 
    1479 
    1480 
    1481class Field(TreeObject): 
    1482    """ 
    1483    A class representing a field dictionary. 
    1484 
    1485    This class is accessed through 
    1486    :meth:`get_fields()<pypdf.PdfReader.get_fields>` 
    1487    """ 
    1488 
    1489    def __init__(self, data: DictionaryObject) -> None: 
    1490        DictionaryObject.__init__(self) 
    1491        field_attributes = ( 
    1492            FieldDictionaryAttributes.attributes() 
    1493            + CheckboxRadioButtonAttributes.attributes() 
    1494        ) 
    1495        self.indirect_reference = data.indirect_reference 
    1496        for attr in field_attributes: 
    1497            try: 
    1498                self[NameObject(attr)] = data[attr] 
    1499            except KeyError: 
    1500                pass 
    1501        if isinstance(self.get("/V"), EncodedStreamObject): 
    1502            d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data() 
    1503            if isinstance(d, bytes): 
    1504                d_str = d.decode() 
    1505            elif d is None: 
    1506                d_str = "" 
    1507            else: 
    1508                raise Exception("Should never happen") 
    1509            self[NameObject("/V")] = TextStringObject(d_str) 
    1510 
    1511    # TABLE 8.69 Entries common to all field dictionaries 
    1512    @property 
    1513    def field_type(self) -> Optional[NameObject]: 
    1514        """Read-only property accessing the type of this field.""" 
    1515        return self.get(FieldDictionaryAttributes.FT) 
    1516 
    1517    @property 
    1518    def parent(self) -> Optional[DictionaryObject]: 
    1519        """Read-only property accessing the parent of this field.""" 
    1520        return self.get(FieldDictionaryAttributes.Parent) 
    1521 
    1522    @property 
    1523    def kids(self) -> Optional["ArrayObject"]: 
    1524        """Read-only property accessing the kids of this field.""" 
    1525        return self.get(FieldDictionaryAttributes.Kids) 
    1526 
    1527    @property 
    1528    def name(self) -> Optional[str]: 
    1529        """Read-only property accessing the name of this field.""" 
    1530        return self.get(FieldDictionaryAttributes.T) 
    1531 
    1532    @property 
    1533    def alternate_name(self) -> Optional[str]: 
    1534        """Read-only property accessing the alternate name of this field.""" 
    1535        return self.get(FieldDictionaryAttributes.TU) 
    1536 
    1537    @property 
    1538    def mapping_name(self) -> Optional[str]: 
    1539        """ 
    1540        Read-only property accessing the mapping name of this field. 
    1541 
    1542        This name is used by pypdf as a key in the dictionary returned by 
    1543        :meth:`get_fields()<pypdf.PdfReader.get_fields>` 
    1544        """ 
    1545        return self.get(FieldDictionaryAttributes.TM) 
    1546 
    1547    @property 
    1548    def flags(self) -> Optional[int]: 
    1549        """ 
    1550        Read-only property accessing the field flags, specifying various 
    1551        characteristics of the field (see Table 8.70 of the PDF 1.7 reference). 
    1552        """ 
    1553        return self.get(FieldDictionaryAttributes.Ff) 
    1554 
    1555    @property 
    1556    def value(self) -> Optional[Any]: 
    1557        """ 
    1558        Read-only property accessing the value of this field. 
    1559 
    1560        Format varies based on field type. 
    1561        """ 
    1562        return self.get(FieldDictionaryAttributes.V) 
    1563 
    1564    @property 
    1565    def default_value(self) -> Optional[Any]: 
    1566        """Read-only property accessing the default value of this field.""" 
    1567        return self.get(FieldDictionaryAttributes.DV) 
    1568 
    1569    @property 
    1570    def additional_actions(self) -> Optional[DictionaryObject]: 
    1571        """ 
    1572        Read-only property accessing the additional actions dictionary. 
    1573 
    1574        This dictionary defines the field's behavior in response to trigger 
    1575        events. See Section 8.5.2 of the PDF 1.7 reference. 
    1576        """ 
    1577        return self.get(FieldDictionaryAttributes.AA) 
    1578 
    1579 
    1580class Destination(TreeObject): 
    1581    """ 
    1582    A class representing a destination within a PDF file. 
    1583 
    1584    See section 12.3.2 of the PDF 2.0 reference. 
    1585 
    1586    Args: 
    1587        title: Title of this destination. 
    1588        page: Reference to the page of this destination. Should 
    1589            be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`. 
    1590        fit: How the destination is displayed. 
    1591 
    1592    Raises: 
    1593        PdfReadError: If destination type is invalid. 
    1594 
    1595    """ 
    1596 
    1597    node: Optional[ 
    1598        DictionaryObject 
    1599    ] = None  # node provide access to the original Object 
    1600 
    1601    def __init__( 
    1602        self, 
    1603        title: Union[str, bytes], 
    1604        page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject], 
    1605        fit: Fit, 
    1606    ) -> None: 
    1607        self._filtered_children: list[Any] = []  # used in PdfWriter 
    1608 
    1609        typ = fit.fit_type 
    1610        args = fit.fit_args 
    1611 
    1612        DictionaryObject.__init__(self) 
    1613        self[NameObject("/Title")] = TextStringObject(title) 
    1614        self[NameObject("/Page")] = page 
    1615        self[NameObject("/Type")] = typ 
    1616 
    1617        # from table 8.2 of the PDF 1.7 reference. 
    1618        if typ == "/XYZ": 
    1619            if len(args) < 1:  # left is missing : should never occur 
    1620                args.append(NumberObject(0.0)) 
    1621            if len(args) < 2:  # top is missing 
    1622                args.append(NumberObject(0.0)) 
    1623            if len(args) < 3:  # zoom is missing 
    1624                args.append(NumberObject(0.0)) 
    1625            ( 
    1626                self[NameObject(TA.LEFT)], 
    1627                self[NameObject(TA.TOP)], 
    1628                self[NameObject("/Zoom")], 
    1629            ) = args 
    1630        elif len(args) == 0: 
    1631            pass 
    1632        elif typ == TF.FIT_R: 
    1633            ( 
    1634                self[NameObject(TA.LEFT)], 
    1635                self[NameObject(TA.BOTTOM)], 
    1636                self[NameObject(TA.RIGHT)], 
    1637                self[NameObject(TA.TOP)], 
    1638            ) = args 
    1639        elif typ in [TF.FIT_H, TF.FIT_BH]: 
    1640            try:  # Prefer to be more robust not only to null parameters 
    1641                (self[NameObject(TA.TOP)],) = args 
    1642            except Exception: 
    1643                (self[NameObject(TA.TOP)],) = (NullObject(),) 
    1644        elif typ in [TF.FIT_V, TF.FIT_BV]: 
    1645            try:  # Prefer to be more robust not only to null parameters 
    1646                (self[NameObject(TA.LEFT)],) = args 
    1647            except Exception: 
    1648                (self[NameObject(TA.LEFT)],) = (NullObject(),) 
    1649        elif typ in [TF.FIT, TF.FIT_B]: 
    1650            pass 
    1651        else: 
    1652            raise PdfReadError(f"Unknown Destination Type: {typ!r}") 
    1653 
    1654    @property 
    1655    def dest_array(self) -> "ArrayObject": 
    1656        return ArrayObject( 
    1657            [self.raw_get("/Page"), self["/Type"]] 
    1658            + [ 
    1659                self[x] 
    1660                for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"] 
    1661                if x in self 
    1662            ] 
    1663        ) 
    1664 
    1665    def write_to_stream( 
    1666        self, stream: StreamType, encryption_key: Union[None, str, bytes] = None 
    1667    ) -> None: 
    1668        if encryption_key is not None:  # deprecated 
    1669            deprecation_no_replacement( 
    1670                "the encryption_key parameter of write_to_stream", "5.0.0" 
    1671            ) 
    1672        stream.write(b"<<\n") 
    1673        key = NameObject("/D") 
    1674        key.write_to_stream(stream) 
    1675        stream.write(b" ") 
    1676        value = self.dest_array 
    1677        value.write_to_stream(stream) 
    1678 
    1679        key = NameObject("/S") 
    1680        key.write_to_stream(stream) 
    1681        stream.write(b" ") 
    1682        value_s = NameObject("/GoTo") 
    1683        value_s.write_to_stream(stream) 
    1684 
    1685        stream.write(b"\n") 
    1686        stream.write(b">>") 
    1687 
    1688    @property 
    1689    def title(self) -> Optional[str]: 
    1690        """Read-only property accessing the destination title.""" 
    1691        return self.get("/Title") 
    1692 
    1693    @property 
    1694    def page(self) -> Optional[IndirectObject]: 
    1695        """Read-only property accessing the IndirectObject of the destination page.""" 
    1696        return self.get("/Page") 
    1697 
    1698    @property 
    1699    def typ(self) -> Optional[str]: 
    1700        """Read-only property accessing the destination type.""" 
    1701        return self.get("/Type") 
    1702 
    1703    @property 
    1704    def zoom(self) -> Optional[int]: 
    1705        """Read-only property accessing the zoom factor.""" 
    1706        return self.get("/Zoom", None) 
    1707 
    1708    @property 
    1709    def left(self) -> Optional[FloatObject]: 
    1710        """Read-only property accessing the left horizontal coordinate.""" 
    1711        return self.get("/Left", None) 
    1712 
    1713    @property 
    1714    def right(self) -> Optional[FloatObject]: 
    1715        """Read-only property accessing the right horizontal coordinate.""" 
    1716        return self.get("/Right", None) 
    1717 
    1718    @property 
    1719    def top(self) -> Optional[FloatObject]: 
    1720        """Read-only property accessing the top vertical coordinate.""" 
    1721        return self.get("/Top", None) 
    1722 
    1723    @property 
    1724    def bottom(self) -> Optional[FloatObject]: 
    1725        """Read-only property accessing the bottom vertical coordinate.""" 
    1726        return self.get("/Bottom", None) 
    1727 
    1728    @property 
    1729    def color(self) -> Optional["ArrayObject"]: 
    1730        """Read-only property accessing the color in (R, G, B) with values 0.0-1.0.""" 
    1731        return self.get( 
    1732            "/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)]) 
    1733        ) 
    1734 
    1735    @property 
    1736    def font_format(self) -> Optional[OutlineFontFlag]: 
    1737        """ 
    1738        Read-only property accessing the font type. 
    1739 
    1740        1=italic, 2=bold, 3=both 
    1741        """ 
    1742        return self.get("/F", 0) 
    1743 
    1744    @property 
    1745    def outline_count(self) -> Optional[int]: 
    1746        """ 
    1747        Read-only property accessing the outline count. 
    1748 
    1749        positive = expanded 
    1750        negative = collapsed 
    1751        absolute value = number of visible descendants at all levels 
    1752        """ 
    1753        return self.get("/Count", None)