Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pyasn1/codec/streaming.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

89 statements  

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> 

5# License: https://pyasn1.readthedocs.io/en/latest/license.html 

6# 

7import io 

8import os 

9 

10from pyasn1 import error 

11from pyasn1.type import univ 

12 

13class CachingStreamWrapper(io.IOBase): 

14 """Wrapper around non-seekable streams. 

15 

16 Note that the implementation is tied to the decoder, 

17 not checking for dangerous arguments for the sake 

18 of performance. 

19 

20 The read bytes are kept in an internal cache until 

21 setting _markedPosition which may reset the cache. 

22 """ 

23 def __init__(self, raw): 

24 self._raw = raw 

25 self._cache = io.BytesIO() 

26 self._markedPosition = 0 

27 

28 def peek(self, n): 

29 result = self.read(n) 

30 self._cache.seek(-len(result), os.SEEK_CUR) 

31 return result 

32 

33 def seekable(self): 

34 return True 

35 

36 def seek(self, n=-1, whence=os.SEEK_SET): 

37 # Note that this not safe for seeking forward. 

38 return self._cache.seek(n, whence) 

39 

40 def read(self, n=-1): 

41 read_from_cache = self._cache.read(n) 

42 if n != -1: 

43 n -= len(read_from_cache) 

44 if not n: # 0 bytes left to read 

45 return read_from_cache 

46 

47 read_from_raw = self._raw.read(n) 

48 

49 self._cache.write(read_from_raw) 

50 

51 return read_from_cache + read_from_raw 

52 

53 @property 

54 def markedPosition(self): 

55 """Position where the currently processed element starts. 

56 

57 This is used for back-tracking in SingleItemDecoder.__call__ 

58 and (indefLen)ValueDecoder and should not be used for other purposes. 

59 The client is not supposed to ever seek before this position. 

60 """ 

61 return self._markedPosition 

62 

63 @markedPosition.setter 

64 def markedPosition(self, value): 

65 # By setting the value, we ensure we won't seek back before it. 

66 # `value` should be the same as the current position 

67 # We don't check for this for performance reasons. 

68 self._markedPosition = value 

69 

70 # Whenever we set _marked_position, we know for sure 

71 # that we will not return back, and thus it is 

72 # safe to drop all cached data. 

73 if self._cache.tell() > io.DEFAULT_BUFFER_SIZE: 

74 self._cache = io.BytesIO(self._cache.read()) 

75 self._markedPosition = 0 

76 

77 def tell(self): 

78 return self._cache.tell() 

79 

80 

81def asSeekableStream(substrate): 

82 """Convert object to seekable byte-stream. 

83 

84 Parameters 

85 ---------- 

86 substrate: :py:class:`bytes` or :py:class:`io.IOBase` or :py:class:`univ.OctetString` 

87 

88 Returns 

89 ------- 

90 : :py:class:`io.IOBase` 

91 

92 Raises 

93 ------ 

94 : :py:class:`~pyasn1.error.PyAsn1Error` 

95 If the supplied substrate cannot be converted to a seekable stream. 

96 """ 

97 if isinstance(substrate, io.BytesIO): 

98 return substrate 

99 

100 elif isinstance(substrate, bytes): 

101 return io.BytesIO(substrate) 

102 

103 elif isinstance(substrate, univ.OctetString): 

104 return io.BytesIO(substrate.asOctets()) 

105 

106 try: 

107 if substrate.seekable(): # Will fail for most invalid types 

108 return substrate 

109 else: 

110 return CachingStreamWrapper(substrate) 

111 

112 except AttributeError: 

113 raise error.UnsupportedSubstrateError( 

114 "Cannot convert " + substrate.__class__.__name__ + 

115 " to a seekable bit stream.") 

116 

117 

118def isEndOfStream(substrate): 

119 """Check whether we have reached the end of a stream. 

120 

121 Although it is more effective to read and catch exceptions, this 

122 function 

123 

124 Parameters 

125 ---------- 

126 substrate: :py:class:`IOBase` 

127 Stream to check 

128 

129 Returns 

130 ------- 

131 : :py:class:`bool` 

132 """ 

133 if isinstance(substrate, io.BytesIO): 

134 cp = substrate.tell() 

135 substrate.seek(0, os.SEEK_END) 

136 result = substrate.tell() == cp 

137 substrate.seek(cp, os.SEEK_SET) 

138 yield result 

139 

140 else: 

141 received = substrate.read(1) 

142 if received is None: 

143 yield 

144 

145 if received: 

146 substrate.seek(-1, os.SEEK_CUR) 

147 

148 yield not received 

149 

150 

151def peekIntoStream(substrate, size=-1): 

152 """Peek into stream. 

153 

154 Parameters 

155 ---------- 

156 substrate: :py:class:`IOBase` 

157 Stream to read from. 

158 

159 size: :py:class:`int` 

160 How many bytes to peek (-1 = all available) 

161 

162 Returns 

163 ------- 

164 : :py:class:`bytes` or :py:class:`str` 

165 The return type depends on Python major version 

166 """ 

167 if hasattr(substrate, "peek"): 

168 received = substrate.peek(size) 

169 if received is None: 

170 yield 

171 

172 while len(received) < size: 

173 yield 

174 

175 yield received 

176 

177 else: 

178 current_position = substrate.tell() 

179 try: 

180 for chunk in readFromStream(substrate, size): 

181 yield chunk 

182 

183 finally: 

184 substrate.seek(current_position) 

185 

186 

187def readFromStream(substrate, size=-1, context=None): 

188 """Read from the stream. 

189 

190 Parameters 

191 ---------- 

192 substrate: :py:class:`IOBase` 

193 Stream to read from. 

194 

195 Keyword parameters 

196 ------------------ 

197 size: :py:class:`int` 

198 How many bytes to read (-1 = all available) 

199 

200 context: :py:class:`dict` 

201 Opaque caller context will be attached to exception objects created 

202 by this function. 

203 

204 Yields 

205 ------ 

206 : :py:class:`bytes` or :py:class:`str` or :py:class:`SubstrateUnderrunError` 

207 Read data or :py:class:`~pyasn1.error.SubstrateUnderrunError` 

208 object if no `size` bytes is readily available in the stream. The 

209 data type depends on Python major version 

210 

211 Raises 

212 ------ 

213 : :py:class:`~pyasn1.error.EndOfStreamError` 

214 Input stream is exhausted 

215 """ 

216 while True: 

217 # this will block unless stream is non-blocking 

218 received = substrate.read(size) 

219 if received is None: # non-blocking stream can do this 

220 yield error.SubstrateUnderrunError(context=context) 

221 

222 elif not received and size != 0: # end-of-stream 

223 raise error.EndOfStreamError(context=context) 

224 

225 elif len(received) < size: 

226 substrate.seek(-len(received), os.SEEK_CUR) 

227 

228 # behave like a non-blocking stream 

229 yield error.SubstrateUnderrunError(context=context) 

230 

231 else: 

232 break 

233 

234 yield received