Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyasn1/codec/streaming.py: 23%
91 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1#
2# This file is part of pyasn1 software.
3#
4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
5# License: https://pyasn1.readthedocs.io/en/latest/license.html
6#
7import io
8import os
9import sys
11from pyasn1 import error
12from pyasn1.type import univ
14_PY2 = sys.version_info < (3,)
17class CachingStreamWrapper(io.IOBase):
18 """Wrapper around non-seekable streams.
20 Note that the implementation is tied to the decoder,
21 not checking for dangerous arguments for the sake
22 of performance.
24 The read bytes are kept in an internal cache until
25 setting _markedPosition which may reset the cache.
26 """
27 def __init__(self, raw):
28 self._raw = raw
29 self._cache = io.BytesIO()
30 self._markedPosition = 0
32 def peek(self, n):
33 result = self.read(n)
34 self._cache.seek(-len(result), os.SEEK_CUR)
35 return result
37 def seekable(self):
38 return True
40 def seek(self, n=-1, whence=os.SEEK_SET):
41 # Note that this not safe for seeking forward.
42 return self._cache.seek(n, whence)
44 def read(self, n=-1):
45 read_from_cache = self._cache.read(n)
46 if n != -1:
47 n -= len(read_from_cache)
48 if not n: # 0 bytes left to read
49 return read_from_cache
51 read_from_raw = self._raw.read(n)
53 self._cache.write(read_from_raw)
55 return read_from_cache + read_from_raw
57 @property
58 def markedPosition(self):
59 """Position where the currently processed element starts.
61 This is used for back-tracking in SingleItemDecoder.__call__
62 and (indefLen)ValueDecoder and should not be used for other purposes.
63 The client is not supposed to ever seek before this position.
64 """
65 return self._markedPosition
67 @markedPosition.setter
68 def markedPosition(self, value):
69 # By setting the value, we ensure we won't seek back before it.
70 # `value` should be the same as the current position
71 # We don't check for this for performance reasons.
72 self._markedPosition = value
74 # Whenever we set _marked_position, we know for sure
75 # that we will not return back, and thus it is
76 # safe to drop all cached data.
77 if self._cache.tell() > io.DEFAULT_BUFFER_SIZE:
78 self._cache = io.BytesIO(self._cache.read())
79 self._markedPosition = 0
81 def tell(self):
82 return self._cache.tell()
85def asSeekableStream(substrate):
86 """Convert object to seekable byte-stream.
88 Parameters
89 ----------
90 substrate: :py:class:`bytes` or :py:class:`io.IOBase` or :py:class:`univ.OctetString`
92 Returns
93 -------
94 : :py:class:`io.IOBase`
96 Raises
97 ------
98 : :py:class:`~pyasn1.error.PyAsn1Error`
99 If the supplied substrate cannot be converted to a seekable stream.
100 """
101 if isinstance(substrate, io.BytesIO):
102 return substrate
104 elif isinstance(substrate, bytes):
105 return io.BytesIO(substrate)
107 elif isinstance(substrate, univ.OctetString):
108 return io.BytesIO(substrate.asOctets())
110 try:
111 # Special case: impossible to set attributes on `file` built-in
112 # XXX: broken, BufferedReader expects a "readable" attribute.
113 if _PY2 and isinstance(substrate, file):
114 return io.BufferedReader(substrate)
116 elif substrate.seekable(): # Will fail for most invalid types
117 return substrate
119 else:
120 return CachingStreamWrapper(substrate)
122 except AttributeError:
123 raise error.UnsupportedSubstrateError(
124 "Cannot convert " + substrate.__class__.__name__ +
125 " to a seekable bit stream.")
128def isEndOfStream(substrate):
129 """Check whether we have reached the end of a stream.
131 Although it is more effective to read and catch exceptions, this
132 function
134 Parameters
135 ----------
136 substrate: :py:class:`IOBase`
137 Stream to check
139 Returns
140 -------
141 : :py:class:`bool`
142 """
143 if isinstance(substrate, io.BytesIO):
144 cp = substrate.tell()
145 substrate.seek(0, os.SEEK_END)
146 result = substrate.tell() == cp
147 substrate.seek(cp, os.SEEK_SET)
148 yield result
150 else:
151 received = substrate.read(1)
152 if received is None:
153 yield
155 if received:
156 substrate.seek(-1, os.SEEK_CUR)
158 yield not received
161def peekIntoStream(substrate, size=-1):
162 """Peek into stream.
164 Parameters
165 ----------
166 substrate: :py:class:`IOBase`
167 Stream to read from.
169 size: :py:class:`int`
170 How many bytes to peek (-1 = all available)
172 Returns
173 -------
174 : :py:class:`bytes` or :py:class:`str`
175 The return type depends on Python major version
176 """
177 if hasattr(substrate, "peek"):
178 received = substrate.peek(size)
179 if received is None:
180 yield
182 while len(received) < size:
183 yield
185 yield received
187 else:
188 current_position = substrate.tell()
189 try:
190 for chunk in readFromStream(substrate, size):
191 yield chunk
193 finally:
194 substrate.seek(current_position)
197def readFromStream(substrate, size=-1, context=None):
198 """Read from the stream.
200 Parameters
201 ----------
202 substrate: :py:class:`IOBase`
203 Stream to read from.
205 Keyword parameters
206 ------------------
207 size: :py:class:`int`
208 How many bytes to read (-1 = all available)
210 context: :py:class:`dict`
211 Opaque caller context will be attached to exception objects created
212 by this function.
214 Yields
215 ------
216 : :py:class:`bytes` or :py:class:`str` or :py:class:`SubstrateUnderrunError`
217 Read data or :py:class:`~pyasn1.error.SubstrateUnderrunError`
218 object if no `size` bytes is readily available in the stream. The
219 data type depends on Python major version
221 Raises
222 ------
223 : :py:class:`~pyasn1.error.EndOfStreamError`
224 Input stream is exhausted
225 """
226 while True:
227 # this will block unless stream is non-blocking
228 received = substrate.read(size)
229 if received is None: # non-blocking stream can do this
230 yield error.SubstrateUnderrunError(context=context)
232 elif not received and size != 0: # end-of-stream
233 raise error.EndOfStreamError(context=context)
235 elif len(received) < size:
236 substrate.seek(-len(received), os.SEEK_CUR)
238 # behave like a non-blocking stream
239 yield error.SubstrateUnderrunError(context=context)
241 else:
242 break
244 yield received