Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/multipart/decoders.py: 25%

59 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import base64 

2import binascii 

3 

4from .exceptions import DecodeError 

5 

6 

7class Base64Decoder: 

8 """This object provides an interface to decode a stream of Base64 data. It 

9 is instantiated with an "underlying object", and whenever a write() 

10 operation is performed, it will decode the incoming data as Base64, and 

11 call write() on the underlying object. This is primarily used for decoding 

12 form data encoded as Base64, but can be used for other purposes:: 

13 

14 from multipart.decoders import Base64Decoder 

15 fd = open("notb64.txt", "wb") 

16 decoder = Base64Decoder(fd) 

17 try: 

18 decoder.write("Zm9vYmFy") # "foobar" in Base64 

19 decoder.finalize() 

20 finally: 

21 decoder.close() 

22 

23 # The contents of "notb64.txt" should be "foobar". 

24 

25 This object will also pass all finalize() and close() calls to the 

26 underlying object, if the underlying object supports them. 

27 

28 Note that this class maintains a cache of base64 chunks, so that a write of 

29 arbitrary size can be performed. You must call :meth:`finalize` on this 

30 object after all writes are completed to ensure that all data is flushed 

31 to the underlying object. 

32 

33 :param underlying: the underlying object to pass writes to 

34 """ 

35 

36 def __init__(self, underlying): 

37 self.cache = bytearray() 

38 self.underlying = underlying 

39 

40 def write(self, data): 

41 """Takes any input data provided, decodes it as base64, and passes it 

42 on to the underlying object. If the data provided is invalid base64 

43 data, then this method will raise 

44 a :class:`multipart.exceptions.DecodeError` 

45 

46 :param data: base64 data to decode 

47 """ 

48 

49 # Prepend any cache info to our data. 

50 if len(self.cache) > 0: 

51 data = self.cache + data 

52 

53 # Slice off a string that's a multiple of 4. 

54 decode_len = (len(data) // 4) * 4 

55 val = data[:decode_len] 

56 

57 # Decode and write, if we have any. 

58 if len(val) > 0: 

59 try: 

60 decoded = base64.b64decode(val) 

61 except binascii.Error: 

62 raise DecodeError('There was an error raised while decoding ' 

63 'base64-encoded data.') 

64 

65 self.underlying.write(decoded) 

66 

67 # Get the remaining bytes and save in our cache. 

68 remaining_len = len(data) % 4 

69 if remaining_len > 0: 

70 self.cache = data[-remaining_len:] 

71 else: 

72 self.cache = b'' 

73 

74 # Return the length of the data to indicate no error. 

75 return len(data) 

76 

77 def close(self): 

78 """Close this decoder. If the underlying object has a `close()` 

79 method, this function will call it. 

80 """ 

81 if hasattr(self.underlying, 'close'): 

82 self.underlying.close() 

83 

84 def finalize(self): 

85 """Finalize this object. This should be called when no more data 

86 should be written to the stream. This function can raise a 

87 :class:`multipart.exceptions.DecodeError` if there is some remaining 

88 data in the cache. 

89 

90 If the underlying object has a `finalize()` method, this function will 

91 call it. 

92 """ 

93 if len(self.cache) > 0: 

94 raise DecodeError('There are %d bytes remaining in the ' 

95 'Base64Decoder cache when finalize() is called' 

96 % len(self.cache)) 

97 

98 if hasattr(self.underlying, 'finalize'): 

99 self.underlying.finalize() 

100 

101 def __repr__(self): 

102 return f"{self.__class__.__name__}(underlying={self.underlying!r})" 

103 

104 

105class QuotedPrintableDecoder: 

106 """This object provides an interface to decode a stream of quoted-printable 

107 data. It is instantiated with an "underlying object", in the same manner 

108 as the :class:`multipart.decoders.Base64Decoder` class. This class behaves 

109 in exactly the same way, including maintaining a cache of quoted-printable 

110 chunks. 

111 

112 :param underlying: the underlying object to pass writes to 

113 """ 

114 def __init__(self, underlying): 

115 self.cache = b'' 

116 self.underlying = underlying 

117 

118 def write(self, data): 

119 """Takes any input data provided, decodes it as quoted-printable, and 

120 passes it on to the underlying object. 

121 

122 :param data: quoted-printable data to decode 

123 """ 

124 # Prepend any cache info to our data. 

125 if len(self.cache) > 0: 

126 data = self.cache + data 

127 

128 # If the last 2 characters have an '=' sign in it, then we won't be 

129 # able to decode the encoded value and we'll need to save it for the 

130 # next decoding step. 

131 if data[-2:].find(b'=') != -1: 

132 enc, rest = data[:-2], data[-2:] 

133 else: 

134 enc = data 

135 rest = b'' 

136 

137 # Encode and write, if we have data. 

138 if len(enc) > 0: 

139 self.underlying.write(binascii.a2b_qp(enc)) 

140 

141 # Save remaining in cache. 

142 self.cache = rest 

143 return len(data) 

144 

145 def close(self): 

146 """Close this decoder. If the underlying object has a `close()` 

147 method, this function will call it. 

148 """ 

149 if hasattr(self.underlying, 'close'): 

150 self.underlying.close() 

151 

152 def finalize(self): 

153 """Finalize this object. This should be called when no more data 

154 should be written to the stream. This function will not raise any 

155 exceptions, but it may write more data to the underlying object if 

156 there is data remaining in the cache. 

157 

158 If the underlying object has a `finalize()` method, this function will 

159 call it. 

160 """ 

161 # If we have a cache, write and then remove it. 

162 if len(self.cache) > 0: 

163 self.underlying.write(binascii.a2b_qp(self.cache)) 

164 self.cache = b'' 

165 

166 # Finalize our underlying stream. 

167 if hasattr(self.underlying, 'finalize'): 

168 self.underlying.finalize() 

169 

170 def __repr__(self): 

171 return f"{self.__class__.__name__}(underlying={self.underlying!r})"