1# 
    2# This file is part of pyasn1 software. 
    3# 
    4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> 
    5# License: https://pyasn1.readthedocs.io/en/latest/license.html 
    6# 
    7import io 
    8import os 
    9 
    10from pyasn1 import error 
    11from pyasn1.type import univ 
    12 
    13class CachingStreamWrapper(io.IOBase): 
    14    """Wrapper around non-seekable streams. 
    15 
    16    Note that the implementation is tied to the decoder, 
    17    not checking for dangerous arguments for the sake 
    18    of performance. 
    19 
    20    The read bytes are kept in an internal cache until 
    21    setting _markedPosition which may reset the cache. 
    22    """ 
    23    def __init__(self, raw): 
    24        self._raw = raw 
    25        self._cache = io.BytesIO() 
    26        self._markedPosition = 0 
    27 
    28    def peek(self, n): 
    29        result = self.read(n) 
    30        self._cache.seek(-len(result), os.SEEK_CUR) 
    31        return result 
    32 
    33    def seekable(self): 
    34        return True 
    35 
    36    def seek(self, n=-1, whence=os.SEEK_SET): 
    37        # Note that this not safe for seeking forward. 
    38        return self._cache.seek(n, whence) 
    39 
    40    def read(self, n=-1): 
    41        read_from_cache = self._cache.read(n) 
    42        if n != -1: 
    43            n -= len(read_from_cache) 
    44            if not n:  # 0 bytes left to read 
    45                return read_from_cache 
    46 
    47        read_from_raw = self._raw.read(n) 
    48 
    49        self._cache.write(read_from_raw) 
    50 
    51        return read_from_cache + read_from_raw 
    52 
    53    @property 
    54    def markedPosition(self): 
    55        """Position where the currently processed element starts. 
    56 
    57        This is used for back-tracking in SingleItemDecoder.__call__ 
    58        and (indefLen)ValueDecoder and should not be used for other purposes. 
    59        The client is not supposed to ever seek before this position. 
    60        """ 
    61        return self._markedPosition 
    62 
    63    @markedPosition.setter 
    64    def markedPosition(self, value): 
    65        # By setting the value, we ensure we won't seek back before it. 
    66        # `value` should be the same as the current position 
    67        # We don't check for this for performance reasons. 
    68        self._markedPosition = value 
    69 
    70        # Whenever we set _marked_position, we know for sure 
    71        # that we will not return back, and thus it is 
    72        # safe to drop all cached data. 
    73        if self._cache.tell() > io.DEFAULT_BUFFER_SIZE: 
    74            self._cache = io.BytesIO(self._cache.read()) 
    75            self._markedPosition = 0 
    76 
    77    def tell(self): 
    78        return self._cache.tell() 
    79 
    80 
    81def asSeekableStream(substrate): 
    82    """Convert object to seekable byte-stream. 
    83 
    84    Parameters 
    85    ---------- 
    86    substrate: :py:class:`bytes` or :py:class:`io.IOBase` or :py:class:`univ.OctetString` 
    87 
    88    Returns 
    89    ------- 
    90    : :py:class:`io.IOBase` 
    91 
    92    Raises 
    93    ------ 
    94    : :py:class:`~pyasn1.error.PyAsn1Error` 
    95        If the supplied substrate cannot be converted to a seekable stream. 
    96    """ 
    97    if isinstance(substrate, io.BytesIO): 
    98        return substrate 
    99 
    100    elif isinstance(substrate, bytes): 
    101        return io.BytesIO(substrate) 
    102 
    103    elif isinstance(substrate, univ.OctetString): 
    104        return io.BytesIO(substrate.asOctets()) 
    105 
    106    try: 
    107        if substrate.seekable():  # Will fail for most invalid types 
    108            return substrate 
    109        else: 
    110            return CachingStreamWrapper(substrate) 
    111 
    112    except AttributeError: 
    113        raise error.UnsupportedSubstrateError( 
    114            "Cannot convert " + substrate.__class__.__name__ + 
    115            " to a seekable bit stream.") 
    116 
    117 
    118def isEndOfStream(substrate): 
    119    """Check whether we have reached the end of a stream. 
    120 
    121    Although it is more effective to read and catch exceptions, this 
    122    function 
    123 
    124    Parameters 
    125    ---------- 
    126    substrate: :py:class:`IOBase` 
    127        Stream to check 
    128 
    129    Returns 
    130    ------- 
    131    : :py:class:`bool` 
    132    """ 
    133    if isinstance(substrate, io.BytesIO): 
    134        cp = substrate.tell() 
    135        substrate.seek(0, os.SEEK_END) 
    136        result = substrate.tell() == cp 
    137        substrate.seek(cp, os.SEEK_SET) 
    138        yield result 
    139 
    140    else: 
    141        received = substrate.read(1) 
    142        if received is None: 
    143            yield 
    144 
    145        if received: 
    146            substrate.seek(-1, os.SEEK_CUR) 
    147 
    148        yield not received 
    149 
    150 
    151def peekIntoStream(substrate, size=-1): 
    152    """Peek into stream. 
    153 
    154    Parameters 
    155    ---------- 
    156    substrate: :py:class:`IOBase` 
    157        Stream to read from. 
    158 
    159    size: :py:class:`int` 
    160        How many bytes to peek (-1 = all available) 
    161 
    162    Returns 
    163    ------- 
    164    : :py:class:`bytes` or :py:class:`str` 
    165        The return type depends on Python major version 
    166    """ 
    167    if hasattr(substrate, "peek"): 
    168        received = substrate.peek(size) 
    169        if received is None: 
    170            yield 
    171 
    172        while len(received) < size: 
    173            yield 
    174 
    175        yield received 
    176 
    177    else: 
    178        current_position = substrate.tell() 
    179        try: 
    180            for chunk in readFromStream(substrate, size): 
    181                yield chunk 
    182 
    183        finally: 
    184            substrate.seek(current_position) 
    185 
    186 
    187def readFromStream(substrate, size=-1, context=None): 
    188    """Read from the stream. 
    189 
    190    Parameters 
    191    ---------- 
    192    substrate: :py:class:`IOBase` 
    193        Stream to read from. 
    194 
    195    Keyword parameters 
    196    ------------------ 
    197    size: :py:class:`int` 
    198        How many bytes to read (-1 = all available) 
    199 
    200    context: :py:class:`dict` 
    201        Opaque caller context will be attached to exception objects created 
    202        by this function. 
    203 
    204    Yields 
    205    ------ 
    206    : :py:class:`bytes` or :py:class:`str` or :py:class:`SubstrateUnderrunError` 
    207        Read data or :py:class:`~pyasn1.error.SubstrateUnderrunError` 
    208        object if no `size` bytes is readily available in the stream. The 
    209        data type depends on Python major version 
    210 
    211    Raises 
    212    ------ 
    213    : :py:class:`~pyasn1.error.EndOfStreamError` 
    214        Input stream is exhausted 
    215    """ 
    216    while True: 
    217        # this will block unless stream is non-blocking 
    218        received = substrate.read(size) 
    219        if received is None:  # non-blocking stream can do this 
    220            yield error.SubstrateUnderrunError(context=context) 
    221 
    222        elif not received and size != 0:  # end-of-stream 
    223            raise error.EndOfStreamError(context=context) 
    224 
    225        elif len(received) < size: 
    226            substrate.seek(-len(received), os.SEEK_CUR) 
    227 
    228            # behave like a non-blocking stream 
    229            yield error.SubstrateUnderrunError(context=context) 
    230 
    231        else: 
    232            break 
    233 
    234    yield received