Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/multipart/decoders.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

59 statements  

1import base64 

2import binascii 

3 

4from .exceptions import DecodeError 

5 

6 

7class Base64Decoder: 

8 """This object provides an interface to decode a stream of Base64 data. It 

9 is instantiated with an "underlying object", and whenever a write() 

10 operation is performed, it will decode the incoming data as Base64, and 

11 call write() on the underlying object. This is primarily used for decoding 

12 form data encoded as Base64, but can be used for other purposes:: 

13 

14 from multipart.decoders import Base64Decoder 

15 fd = open("notb64.txt", "wb") 

16 decoder = Base64Decoder(fd) 

17 try: 

18 decoder.write("Zm9vYmFy") # "foobar" in Base64 

19 decoder.finalize() 

20 finally: 

21 decoder.close() 

22 

23 # The contents of "notb64.txt" should be "foobar". 

24 

25 This object will also pass all finalize() and close() calls to the 

26 underlying object, if the underlying object supports them. 

27 

28 Note that this class maintains a cache of base64 chunks, so that a write of 

29 arbitrary size can be performed. You must call :meth:`finalize` on this 

30 object after all writes are completed to ensure that all data is flushed 

31 to the underlying object. 

32 

33 :param underlying: the underlying object to pass writes to 

34 """ 

35 

36 def __init__(self, underlying): 

37 self.cache = bytearray() 

38 self.underlying = underlying 

39 

40 def write(self, data): 

41 """Takes any input data provided, decodes it as base64, and passes it 

42 on to the underlying object. If the data provided is invalid base64 

43 data, then this method will raise 

44 a :class:`multipart.exceptions.DecodeError` 

45 

46 :param data: base64 data to decode 

47 """ 

48 

49 # Prepend any cache info to our data. 

50 if len(self.cache) > 0: 

51 data = self.cache + data 

52 

53 # Slice off a string that's a multiple of 4. 

54 decode_len = (len(data) // 4) * 4 

55 val = data[:decode_len] 

56 

57 # Decode and write, if we have any. 

58 if len(val) > 0: 

59 try: 

60 decoded = base64.b64decode(val) 

61 except binascii.Error: 

62 raise DecodeError("There was an error raised while decoding base64-encoded data.") 

63 

64 self.underlying.write(decoded) 

65 

66 # Get the remaining bytes and save in our cache. 

67 remaining_len = len(data) % 4 

68 if remaining_len > 0: 

69 self.cache = data[-remaining_len:] 

70 else: 

71 self.cache = b"" 

72 

73 # Return the length of the data to indicate no error. 

74 return len(data) 

75 

76 def close(self): 

77 """Close this decoder. If the underlying object has a `close()` 

78 method, this function will call it. 

79 """ 

80 if hasattr(self.underlying, "close"): 

81 self.underlying.close() 

82 

83 def finalize(self): 

84 """Finalize this object. This should be called when no more data 

85 should be written to the stream. This function can raise a 

86 :class:`multipart.exceptions.DecodeError` if there is some remaining 

87 data in the cache. 

88 

89 If the underlying object has a `finalize()` method, this function will 

90 call it. 

91 """ 

92 if len(self.cache) > 0: 

93 raise DecodeError( 

94 "There are %d bytes remaining in the Base64Decoder cache when finalize() is called" % len(self.cache) 

95 ) 

96 

97 if hasattr(self.underlying, "finalize"): 

98 self.underlying.finalize() 

99 

100 def __repr__(self): 

101 return f"{self.__class__.__name__}(underlying={self.underlying!r})" 

102 

103 

104class QuotedPrintableDecoder: 

105 """This object provides an interface to decode a stream of quoted-printable 

106 data. It is instantiated with an "underlying object", in the same manner 

107 as the :class:`multipart.decoders.Base64Decoder` class. This class behaves 

108 in exactly the same way, including maintaining a cache of quoted-printable 

109 chunks. 

110 

111 :param underlying: the underlying object to pass writes to 

112 """ 

113 

114 def __init__(self, underlying): 

115 self.cache = b"" 

116 self.underlying = underlying 

117 

118 def write(self, data): 

119 """Takes any input data provided, decodes it as quoted-printable, and 

120 passes it on to the underlying object. 

121 

122 :param data: quoted-printable data to decode 

123 """ 

124 # Prepend any cache info to our data. 

125 if len(self.cache) > 0: 

126 data = self.cache + data 

127 

128 # If the last 2 characters have an '=' sign in it, then we won't be 

129 # able to decode the encoded value and we'll need to save it for the 

130 # next decoding step. 

131 if data[-2:].find(b"=") != -1: 

132 enc, rest = data[:-2], data[-2:] 

133 else: 

134 enc = data 

135 rest = b"" 

136 

137 # Encode and write, if we have data. 

138 if len(enc) > 0: 

139 self.underlying.write(binascii.a2b_qp(enc)) 

140 

141 # Save remaining in cache. 

142 self.cache = rest 

143 return len(data) 

144 

145 def close(self): 

146 """Close this decoder. If the underlying object has a `close()` 

147 method, this function will call it. 

148 """ 

149 if hasattr(self.underlying, "close"): 

150 self.underlying.close() 

151 

152 def finalize(self): 

153 """Finalize this object. This should be called when no more data 

154 should be written to the stream. This function will not raise any 

155 exceptions, but it may write more data to the underlying object if 

156 there is data remaining in the cache. 

157 

158 If the underlying object has a `finalize()` method, this function will 

159 call it. 

160 """ 

161 # If we have a cache, write and then remove it. 

162 if len(self.cache) > 0: 

163 self.underlying.write(binascii.a2b_qp(self.cache)) 

164 self.cache = b"" 

165 

166 # Finalize our underlying stream. 

167 if hasattr(self.underlying, "finalize"): 

168 self.underlying.finalize() 

169 

170 def __repr__(self): 

171 return f"{self.__class__.__name__}(underlying={self.underlying!r})"