Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyasn1/codec/streaming.py: 23%

91 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> 

5# License: https://pyasn1.readthedocs.io/en/latest/license.html 

6# 

7import io 

8import os 

9import sys 

10 

11from pyasn1 import error 

12from pyasn1.type import univ 

13 

14_PY2 = sys.version_info < (3,) 

15 

16 

17class CachingStreamWrapper(io.IOBase): 

18 """Wrapper around non-seekable streams. 

19 

20 Note that the implementation is tied to the decoder, 

21 not checking for dangerous arguments for the sake 

22 of performance. 

23 

24 The read bytes are kept in an internal cache until 

25 setting _markedPosition which may reset the cache. 

26 """ 

27 def __init__(self, raw): 

28 self._raw = raw 

29 self._cache = io.BytesIO() 

30 self._markedPosition = 0 

31 

32 def peek(self, n): 

33 result = self.read(n) 

34 self._cache.seek(-len(result), os.SEEK_CUR) 

35 return result 

36 

37 def seekable(self): 

38 return True 

39 

40 def seek(self, n=-1, whence=os.SEEK_SET): 

41 # Note that this not safe for seeking forward. 

42 return self._cache.seek(n, whence) 

43 

44 def read(self, n=-1): 

45 read_from_cache = self._cache.read(n) 

46 if n != -1: 

47 n -= len(read_from_cache) 

48 if not n: # 0 bytes left to read 

49 return read_from_cache 

50 

51 read_from_raw = self._raw.read(n) 

52 

53 self._cache.write(read_from_raw) 

54 

55 return read_from_cache + read_from_raw 

56 

57 @property 

58 def markedPosition(self): 

59 """Position where the currently processed element starts. 

60 

61 This is used for back-tracking in SingleItemDecoder.__call__ 

62 and (indefLen)ValueDecoder and should not be used for other purposes. 

63 The client is not supposed to ever seek before this position. 

64 """ 

65 return self._markedPosition 

66 

67 @markedPosition.setter 

68 def markedPosition(self, value): 

69 # By setting the value, we ensure we won't seek back before it. 

70 # `value` should be the same as the current position 

71 # We don't check for this for performance reasons. 

72 self._markedPosition = value 

73 

74 # Whenever we set _marked_position, we know for sure 

75 # that we will not return back, and thus it is 

76 # safe to drop all cached data. 

77 if self._cache.tell() > io.DEFAULT_BUFFER_SIZE: 

78 self._cache = io.BytesIO(self._cache.read()) 

79 self._markedPosition = 0 

80 

81 def tell(self): 

82 return self._cache.tell() 

83 

84 

85def asSeekableStream(substrate): 

86 """Convert object to seekable byte-stream. 

87 

88 Parameters 

89 ---------- 

90 substrate: :py:class:`bytes` or :py:class:`io.IOBase` or :py:class:`univ.OctetString` 

91 

92 Returns 

93 ------- 

94 : :py:class:`io.IOBase` 

95 

96 Raises 

97 ------ 

98 : :py:class:`~pyasn1.error.PyAsn1Error` 

99 If the supplied substrate cannot be converted to a seekable stream. 

100 """ 

101 if isinstance(substrate, io.BytesIO): 

102 return substrate 

103 

104 elif isinstance(substrate, bytes): 

105 return io.BytesIO(substrate) 

106 

107 elif isinstance(substrate, univ.OctetString): 

108 return io.BytesIO(substrate.asOctets()) 

109 

110 try: 

111 # Special case: impossible to set attributes on `file` built-in 

112 # XXX: broken, BufferedReader expects a "readable" attribute. 

113 if _PY2 and isinstance(substrate, file): 

114 return io.BufferedReader(substrate) 

115 

116 elif substrate.seekable(): # Will fail for most invalid types 

117 return substrate 

118 

119 else: 

120 return CachingStreamWrapper(substrate) 

121 

122 except AttributeError: 

123 raise error.UnsupportedSubstrateError( 

124 "Cannot convert " + substrate.__class__.__name__ + 

125 " to a seekable bit stream.") 

126 

127 

128def isEndOfStream(substrate): 

129 """Check whether we have reached the end of a stream. 

130 

131 Although it is more effective to read and catch exceptions, this 

132 function 

133 

134 Parameters 

135 ---------- 

136 substrate: :py:class:`IOBase` 

137 Stream to check 

138 

139 Returns 

140 ------- 

141 : :py:class:`bool` 

142 """ 

143 if isinstance(substrate, io.BytesIO): 

144 cp = substrate.tell() 

145 substrate.seek(0, os.SEEK_END) 

146 result = substrate.tell() == cp 

147 substrate.seek(cp, os.SEEK_SET) 

148 yield result 

149 

150 else: 

151 received = substrate.read(1) 

152 if received is None: 

153 yield 

154 

155 if received: 

156 substrate.seek(-1, os.SEEK_CUR) 

157 

158 yield not received 

159 

160 

161def peekIntoStream(substrate, size=-1): 

162 """Peek into stream. 

163 

164 Parameters 

165 ---------- 

166 substrate: :py:class:`IOBase` 

167 Stream to read from. 

168 

169 size: :py:class:`int` 

170 How many bytes to peek (-1 = all available) 

171 

172 Returns 

173 ------- 

174 : :py:class:`bytes` or :py:class:`str` 

175 The return type depends on Python major version 

176 """ 

177 if hasattr(substrate, "peek"): 

178 received = substrate.peek(size) 

179 if received is None: 

180 yield 

181 

182 while len(received) < size: 

183 yield 

184 

185 yield received 

186 

187 else: 

188 current_position = substrate.tell() 

189 try: 

190 for chunk in readFromStream(substrate, size): 

191 yield chunk 

192 

193 finally: 

194 substrate.seek(current_position) 

195 

196 

197def readFromStream(substrate, size=-1, context=None): 

198 """Read from the stream. 

199 

200 Parameters 

201 ---------- 

202 substrate: :py:class:`IOBase` 

203 Stream to read from. 

204 

205 Keyword parameters 

206 ------------------ 

207 size: :py:class:`int` 

208 How many bytes to read (-1 = all available) 

209 

210 context: :py:class:`dict` 

211 Opaque caller context will be attached to exception objects created 

212 by this function. 

213 

214 Yields 

215 ------ 

216 : :py:class:`bytes` or :py:class:`str` or :py:class:`SubstrateUnderrunError` 

217 Read data or :py:class:`~pyasn1.error.SubstrateUnderrunError` 

218 object if no `size` bytes is readily available in the stream. The 

219 data type depends on Python major version 

220 

221 Raises 

222 ------ 

223 : :py:class:`~pyasn1.error.EndOfStreamError` 

224 Input stream is exhausted 

225 """ 

226 while True: 

227 # this will block unless stream is non-blocking 

228 received = substrate.read(size) 

229 if received is None: # non-blocking stream can do this 

230 yield error.SubstrateUnderrunError(context=context) 

231 

232 elif not received and size != 0: # end-of-stream 

233 raise error.EndOfStreamError(context=context) 

234 

235 elif len(received) < size: 

236 substrate.seek(-len(received), os.SEEK_CUR) 

237 

238 # behave like a non-blocking stream 

239 yield error.SubstrateUnderrunError(context=context) 

240 

241 else: 

242 break 

243 

244 yield received