Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pyasn1/codec/streaming.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

93 statements  

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> 

5# License: http://snmplabs.com/pyasn1/license.html 

6# 

7import io 

8import os 

9import sys 

10 

11from pyasn1 import error 

12from pyasn1.type import univ 

13 

14_PY2 = sys.version_info < (3,) 

15 

16 

17class CachingStreamWrapper(io.IOBase): 

18 """Wrapper around non-seekable streams. 

19 

20 Note that the implementation is tied to the decoder, 

21 not checking for dangerous arguments for the sake 

22 of performance. 

23 

24 The read bytes are kept in an internal cache until 

25 setting _markedPosition which may reset the cache. 

26 """ 

27 def __init__(self, raw): 

28 self._raw = raw 

29 self._cache = io.BytesIO() 

30 self._markedPosition = 0 

31 

32 def peek(self, n): 

33 result = self.read(n) 

34 self._cache.seek(-len(result), os.SEEK_CUR) 

35 return result 

36 

37 def seekable(self): 

38 return True 

39 

40 def seek(self, n=-1, whence=os.SEEK_SET): 

41 # Note that this not safe for seeking forward. 

42 return self._cache.seek(n, whence) 

43 

44 def read(self, n=-1): 

45 read_from_cache = self._cache.read(n) 

46 if n != -1: 

47 n -= len(read_from_cache) 

48 if not n: # 0 bytes left to read 

49 return read_from_cache 

50 

51 read_from_raw = self._raw.read(n) 

52 

53 self._cache.write(read_from_raw) 

54 

55 return read_from_cache + read_from_raw 

56 

57 @property 

58 def markedPosition(self): 

59 """Position where the currently processed element starts. 

60 

61 This is used for back-tracking in SingleItemDecoder.__call__ 

62 and (indefLen)ValueDecoder and should not be used for other purposes. 

63 The client is not supposed to ever seek before this position. 

64 """ 

65 return self._markedPosition 

66 

67 @markedPosition.setter 

68 def markedPosition(self, value): 

69 # By setting the value, we ensure we won't seek back before it. 

70 # `value` should be the same as the current position 

71 # We don't check for this for performance reasons. 

72 self._markedPosition = value 

73 

74 # Whenever we set _marked_position, we know for sure 

75 # that we will not return back, and thus it is 

76 # safe to drop all cached data. 

77 if self._cache.tell() > io.DEFAULT_BUFFER_SIZE: 

78 self._cache = io.BytesIO(self._cache.read()) 

79 self._markedPosition = 0 

80 

81 def tell(self): 

82 return self._cache.tell() 

83 

84 

85def asSeekableStream(substrate): 

86 """Convert object to seekable byte-stream. 

87 

88 Parameters 

89 ---------- 

90 substrate: :py:class:`bytes` or :py:class:`io.IOBase` or :py:class:`univ.OctetString` 

91 

92 Returns 

93 ------- 

94 : :py:class:`io.IOBase` 

95 

96 Raises 

97 ------ 

98 : :py:class:`~pyasn1.error.PyAsn1Error` 

99 If the supplied substrate cannot be converted to a seekable stream. 

100 """ 

101 if isinstance(substrate, io.BytesIO): 

102 return substrate 

103 

104 elif isinstance(substrate, bytes): 

105 return io.BytesIO(substrate) 

106 

107 elif isinstance(substrate, univ.OctetString): 

108 return io.BytesIO(substrate.asOctets()) 

109 

110 try: 

111 # Special case: impossible to set attributes on `file` built-in 

112 if _PY2 and isinstance(substrate, file): 

113 return io.BufferedReader(substrate) 

114 

115 elif substrate.seekable(): # Will fail for most invalid types 

116 return substrate 

117 

118 else: 

119 return CachingStreamWrapper(substrate) 

120 

121 except AttributeError: 

122 raise error.UnsupportedSubstrateError( 

123 "Cannot convert " + substrate.__class__.__name__ + 

124 " to a seekable bit stream.") 

125 

126 

127def isEndOfStream(substrate): 

128 """Check whether we have reached the end of a stream. 

129 

130 Although it is more effective to read and catch exceptions, this 

131 function 

132 

133 Parameters 

134 ---------- 

135 substrate: :py:class:`IOBase` 

136 Stream to check 

137 

138 Returns 

139 ------- 

140 : :py:class:`bool` 

141 """ 

142 if isinstance(substrate, io.BytesIO): 

143 cp = substrate.tell() 

144 substrate.seek(0, os.SEEK_END) 

145 result = substrate.tell() == cp 

146 substrate.seek(cp, os.SEEK_SET) 

147 yield result 

148 

149 else: 

150 received = substrate.read(1) 

151 if received is None: 

152 yield 

153 

154 if received: 

155 substrate.seek(-1, os.SEEK_CUR) 

156 

157 yield not received 

158 

159 

160def peekIntoStream(substrate, size=-1): 

161 """Peek into stream. 

162 

163 Parameters 

164 ---------- 

165 substrate: :py:class:`IOBase` 

166 Stream to read from. 

167 

168 size: :py:class:`int` 

169 How many bytes to peek (-1 = all available) 

170 

171 Returns 

172 ------- 

173 : :py:class:`bytes` or :py:class:`str` 

174 The return type depends on Python major version 

175 """ 

176 if hasattr(substrate, "peek"): 

177 received = substrate.peek(size) 

178 if received is None: 

179 yield 

180 

181 while len(received) < size: 

182 yield 

183 

184 yield received 

185 

186 else: 

187 current_position = substrate.tell() 

188 try: 

189 for chunk in readFromStream(substrate, size): 

190 yield chunk 

191 

192 finally: 

193 substrate.seek(current_position) 

194 

195 

196def readFromStream(substrate, size=-1, context=None): 

197 """Read from the stream. 

198 

199 Parameters 

200 ---------- 

201 substrate: :py:class:`IOBase` 

202 Stream to read from. 

203 

204 Keyword parameters 

205 ------------------ 

206 size: :py:class:`int` 

207 How many bytes to read (-1 = all available) 

208 

209 context: :py:class:`dict` 

210 Opaque caller context will be attached to exception objects created 

211 by this function. 

212 

213 Yields 

214 ------ 

215 : :py:class:`bytes` or :py:class:`str` or :py:class:`SubstrateUnderrunError` 

216 Read data or :py:class:`~pyasn1.error.SubstrateUnderrunError` 

217 object if no `size` bytes is readily available in the stream. The 

218 data type depends on Python major version 

219 

220 Raises 

221 ------ 

222 : :py:class:`~pyasn1.error.EndOfStreamError` 

223 Input stream is exhausted 

224 """ 

225 while True: 

226 # this will block unless stream is non-blocking 

227 received = substrate.read(size) 

228 if received is None: # non-blocking stream can do this 

229 yield error.SubstrateUnderrunError(context=context) 

230 

231 elif not received and size != 0: # end-of-stream 

232 raise error.EndOfStreamError(context=context) 

233 

234 elif len(received) < size: 

235 substrate.seek(-len(received), os.SEEK_CUR) 

236 

237 # behave like a non-blocking stream 

238 yield error.SubstrateUnderrunError(context=context) 

239 

240 else: 

241 break 

242 

243 yield received