1#
2# This file is part of pyasn1 software.
3#
4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
5# License: https://pyasn1.readthedocs.io/en/latest/license.html
6#
7import io
8import os
9
10from pyasn1 import error
11from pyasn1.type import univ
12
13class CachingStreamWrapper(io.IOBase):
14 """Wrapper around non-seekable streams.
15
16 Note that the implementation is tied to the decoder,
17 not checking for dangerous arguments for the sake
18 of performance.
19
20 The read bytes are kept in an internal cache until
21 setting _markedPosition which may reset the cache.
22 """
23 def __init__(self, raw):
24 self._raw = raw
25 self._cache = io.BytesIO()
26 self._markedPosition = 0
27
28 def peek(self, n):
29 result = self.read(n)
30 self._cache.seek(-len(result), os.SEEK_CUR)
31 return result
32
33 def seekable(self):
34 return True
35
36 def seek(self, n=-1, whence=os.SEEK_SET):
37 # Note that this not safe for seeking forward.
38 return self._cache.seek(n, whence)
39
40 def read(self, n=-1):
41 read_from_cache = self._cache.read(n)
42 if n != -1:
43 n -= len(read_from_cache)
44 if not n: # 0 bytes left to read
45 return read_from_cache
46
47 read_from_raw = self._raw.read(n)
48
49 self._cache.write(read_from_raw)
50
51 return read_from_cache + read_from_raw
52
53 @property
54 def markedPosition(self):
55 """Position where the currently processed element starts.
56
57 This is used for back-tracking in SingleItemDecoder.__call__
58 and (indefLen)ValueDecoder and should not be used for other purposes.
59 The client is not supposed to ever seek before this position.
60 """
61 return self._markedPosition
62
63 @markedPosition.setter
64 def markedPosition(self, value):
65 # By setting the value, we ensure we won't seek back before it.
66 # `value` should be the same as the current position
67 # We don't check for this for performance reasons.
68 self._markedPosition = value
69
70 # Whenever we set _marked_position, we know for sure
71 # that we will not return back, and thus it is
72 # safe to drop all cached data.
73 if self._cache.tell() > io.DEFAULT_BUFFER_SIZE:
74 self._cache = io.BytesIO(self._cache.read())
75 self._markedPosition = 0
76
77 def tell(self):
78 return self._cache.tell()
79
80
81def asSeekableStream(substrate):
82 """Convert object to seekable byte-stream.
83
84 Parameters
85 ----------
86 substrate: :py:class:`bytes` or :py:class:`io.IOBase` or :py:class:`univ.OctetString`
87
88 Returns
89 -------
90 : :py:class:`io.IOBase`
91
92 Raises
93 ------
94 : :py:class:`~pyasn1.error.PyAsn1Error`
95 If the supplied substrate cannot be converted to a seekable stream.
96 """
97 if isinstance(substrate, io.BytesIO):
98 return substrate
99
100 elif isinstance(substrate, bytes):
101 return io.BytesIO(substrate)
102
103 elif isinstance(substrate, univ.OctetString):
104 return io.BytesIO(substrate.asOctets())
105
106 try:
107 if substrate.seekable(): # Will fail for most invalid types
108 return substrate
109 else:
110 return CachingStreamWrapper(substrate)
111
112 except AttributeError:
113 raise error.UnsupportedSubstrateError(
114 "Cannot convert " + substrate.__class__.__name__ +
115 " to a seekable bit stream.")
116
117
118def isEndOfStream(substrate):
119 """Check whether we have reached the end of a stream.
120
121 Although it is more effective to read and catch exceptions, this
122 function
123
124 Parameters
125 ----------
126 substrate: :py:class:`IOBase`
127 Stream to check
128
129 Returns
130 -------
131 : :py:class:`bool`
132 """
133 if isinstance(substrate, io.BytesIO):
134 cp = substrate.tell()
135 substrate.seek(0, os.SEEK_END)
136 result = substrate.tell() == cp
137 substrate.seek(cp, os.SEEK_SET)
138 yield result
139
140 else:
141 received = substrate.read(1)
142 if received is None:
143 yield
144
145 if received:
146 substrate.seek(-1, os.SEEK_CUR)
147
148 yield not received
149
150
151def peekIntoStream(substrate, size=-1):
152 """Peek into stream.
153
154 Parameters
155 ----------
156 substrate: :py:class:`IOBase`
157 Stream to read from.
158
159 size: :py:class:`int`
160 How many bytes to peek (-1 = all available)
161
162 Returns
163 -------
164 : :py:class:`bytes` or :py:class:`str`
165 The return type depends on Python major version
166 """
167 if hasattr(substrate, "peek"):
168 received = substrate.peek(size)
169 if received is None:
170 yield
171
172 while len(received) < size:
173 yield
174
175 yield received
176
177 else:
178 current_position = substrate.tell()
179 try:
180 for chunk in readFromStream(substrate, size):
181 yield chunk
182
183 finally:
184 substrate.seek(current_position)
185
186
187def readFromStream(substrate, size=-1, context=None):
188 """Read from the stream.
189
190 Parameters
191 ----------
192 substrate: :py:class:`IOBase`
193 Stream to read from.
194
195 Keyword parameters
196 ------------------
197 size: :py:class:`int`
198 How many bytes to read (-1 = all available)
199
200 context: :py:class:`dict`
201 Opaque caller context will be attached to exception objects created
202 by this function.
203
204 Yields
205 ------
206 : :py:class:`bytes` or :py:class:`str` or :py:class:`SubstrateUnderrunError`
207 Read data or :py:class:`~pyasn1.error.SubstrateUnderrunError`
208 object if no `size` bytes is readily available in the stream. The
209 data type depends on Python major version
210
211 Raises
212 ------
213 : :py:class:`~pyasn1.error.EndOfStreamError`
214 Input stream is exhausted
215 """
216 while True:
217 # this will block unless stream is non-blocking
218 received = substrate.read(size)
219 if received is None: # non-blocking stream can do this
220 yield error.SubstrateUnderrunError(context=context)
221
222 elif not received and size != 0: # end-of-stream
223 raise error.EndOfStreamError(context=context)
224
225 elif len(received) < size:
226 substrate.seek(-len(received), os.SEEK_CUR)
227
228 # behave like a non-blocking stream
229 yield error.SubstrateUnderrunError(context=context)
230
231 else:
232 break
233
234 yield received