1#
2# This file is part of pyasn1 software.
3#
4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
5# License: https://pyasn1.readthedocs.io/en/latest/license.html
6#
7import io
8import os
9import sys
10
11from pyasn1 import error
12from pyasn1.type import univ
13
14_PY2 = sys.version_info < (3,)
15
16
17class CachingStreamWrapper(io.IOBase):
18 """Wrapper around non-seekable streams.
19
20 Note that the implementation is tied to the decoder,
21 not checking for dangerous arguments for the sake
22 of performance.
23
24 The read bytes are kept in an internal cache until
25 setting _markedPosition which may reset the cache.
26 """
27 def __init__(self, raw):
28 self._raw = raw
29 self._cache = io.BytesIO()
30 self._markedPosition = 0
31
32 def peek(self, n):
33 result = self.read(n)
34 self._cache.seek(-len(result), os.SEEK_CUR)
35 return result
36
37 def seekable(self):
38 return True
39
40 def seek(self, n=-1, whence=os.SEEK_SET):
41 # Note that this not safe for seeking forward.
42 return self._cache.seek(n, whence)
43
44 def read(self, n=-1):
45 read_from_cache = self._cache.read(n)
46 if n != -1:
47 n -= len(read_from_cache)
48 if not n: # 0 bytes left to read
49 return read_from_cache
50
51 read_from_raw = self._raw.read(n)
52
53 self._cache.write(read_from_raw)
54
55 return read_from_cache + read_from_raw
56
57 @property
58 def markedPosition(self):
59 """Position where the currently processed element starts.
60
61 This is used for back-tracking in SingleItemDecoder.__call__
62 and (indefLen)ValueDecoder and should not be used for other purposes.
63 The client is not supposed to ever seek before this position.
64 """
65 return self._markedPosition
66
67 @markedPosition.setter
68 def markedPosition(self, value):
69 # By setting the value, we ensure we won't seek back before it.
70 # `value` should be the same as the current position
71 # We don't check for this for performance reasons.
72 self._markedPosition = value
73
74 # Whenever we set _marked_position, we know for sure
75 # that we will not return back, and thus it is
76 # safe to drop all cached data.
77 if self._cache.tell() > io.DEFAULT_BUFFER_SIZE:
78 self._cache = io.BytesIO(self._cache.read())
79 self._markedPosition = 0
80
81 def tell(self):
82 return self._cache.tell()
83
84
85def asSeekableStream(substrate):
86 """Convert object to seekable byte-stream.
87
88 Parameters
89 ----------
90 substrate: :py:class:`bytes` or :py:class:`io.IOBase` or :py:class:`univ.OctetString`
91
92 Returns
93 -------
94 : :py:class:`io.IOBase`
95
96 Raises
97 ------
98 : :py:class:`~pyasn1.error.PyAsn1Error`
99 If the supplied substrate cannot be converted to a seekable stream.
100 """
101 if isinstance(substrate, io.BytesIO):
102 return substrate
103
104 elif isinstance(substrate, bytes):
105 return io.BytesIO(substrate)
106
107 elif isinstance(substrate, univ.OctetString):
108 return io.BytesIO(substrate.asOctets())
109
110 try:
111 # Special case: impossible to set attributes on `file` built-in
112 # XXX: broken, BufferedReader expects a "readable" attribute.
113 if _PY2 and isinstance(substrate, file):
114 return io.BufferedReader(substrate)
115
116 elif substrate.seekable(): # Will fail for most invalid types
117 return substrate
118
119 else:
120 return CachingStreamWrapper(substrate)
121
122 except AttributeError:
123 raise error.UnsupportedSubstrateError(
124 "Cannot convert " + substrate.__class__.__name__ +
125 " to a seekable bit stream.")
126
127
128def isEndOfStream(substrate):
129 """Check whether we have reached the end of a stream.
130
131 Although it is more effective to read and catch exceptions, this
132 function
133
134 Parameters
135 ----------
136 substrate: :py:class:`IOBase`
137 Stream to check
138
139 Returns
140 -------
141 : :py:class:`bool`
142 """
143 if isinstance(substrate, io.BytesIO):
144 cp = substrate.tell()
145 substrate.seek(0, os.SEEK_END)
146 result = substrate.tell() == cp
147 substrate.seek(cp, os.SEEK_SET)
148 yield result
149
150 else:
151 received = substrate.read(1)
152 if received is None:
153 yield
154
155 if received:
156 substrate.seek(-1, os.SEEK_CUR)
157
158 yield not received
159
160
161def peekIntoStream(substrate, size=-1):
162 """Peek into stream.
163
164 Parameters
165 ----------
166 substrate: :py:class:`IOBase`
167 Stream to read from.
168
169 size: :py:class:`int`
170 How many bytes to peek (-1 = all available)
171
172 Returns
173 -------
174 : :py:class:`bytes` or :py:class:`str`
175 The return type depends on Python major version
176 """
177 if hasattr(substrate, "peek"):
178 received = substrate.peek(size)
179 if received is None:
180 yield
181
182 while len(received) < size:
183 yield
184
185 yield received
186
187 else:
188 current_position = substrate.tell()
189 try:
190 for chunk in readFromStream(substrate, size):
191 yield chunk
192
193 finally:
194 substrate.seek(current_position)
195
196
197def readFromStream(substrate, size=-1, context=None):
198 """Read from the stream.
199
200 Parameters
201 ----------
202 substrate: :py:class:`IOBase`
203 Stream to read from.
204
205 Keyword parameters
206 ------------------
207 size: :py:class:`int`
208 How many bytes to read (-1 = all available)
209
210 context: :py:class:`dict`
211 Opaque caller context will be attached to exception objects created
212 by this function.
213
214 Yields
215 ------
216 : :py:class:`bytes` or :py:class:`str` or :py:class:`SubstrateUnderrunError`
217 Read data or :py:class:`~pyasn1.error.SubstrateUnderrunError`
218 object if no `size` bytes is readily available in the stream. The
219 data type depends on Python major version
220
221 Raises
222 ------
223 : :py:class:`~pyasn1.error.EndOfStreamError`
224 Input stream is exhausted
225 """
226 while True:
227 # this will block unless stream is non-blocking
228 received = substrate.read(size)
229 if received is None: # non-blocking stream can do this
230 yield error.SubstrateUnderrunError(context=context)
231
232 elif not received and size != 0: # end-of-stream
233 raise error.EndOfStreamError(context=context)
234
235 elif len(received) < size:
236 substrate.seek(-len(received), os.SEEK_CUR)
237
238 # behave like a non-blocking stream
239 yield error.SubstrateUnderrunError(context=context)
240
241 else:
242 break
243
244 yield received