Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/multipart/decoders.py: 82%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

60 statements  

1import base64 

2import binascii 

3from io import BufferedWriter 

4 

5from .exceptions import DecodeError 

6 

7 

8class Base64Decoder: 

9 """This object provides an interface to decode a stream of Base64 data. It 

10 is instantiated with an "underlying object", and whenever a write() 

11 operation is performed, it will decode the incoming data as Base64, and 

12 call write() on the underlying object. This is primarily used for decoding 

13 form data encoded as Base64, but can be used for other purposes:: 

14 

15 from multipart.decoders import Base64Decoder 

16 fd = open("notb64.txt", "wb") 

17 decoder = Base64Decoder(fd) 

18 try: 

19 decoder.write("Zm9vYmFy") # "foobar" in Base64 

20 decoder.finalize() 

21 finally: 

22 decoder.close() 

23 

24 # The contents of "notb64.txt" should be "foobar". 

25 

26 This object will also pass all finalize() and close() calls to the 

27 underlying object, if the underlying object supports them. 

28 

29 Note that this class maintains a cache of base64 chunks, so that a write of 

30 arbitrary size can be performed. You must call :meth:`finalize` on this 

31 object after all writes are completed to ensure that all data is flushed 

32 to the underlying object. 

33 

34 :param underlying: the underlying object to pass writes to 

35 """ 

36 

37 def __init__(self, underlying: BufferedWriter): 

38 self.cache = bytearray() 

39 self.underlying = underlying 

40 

41 def write(self, data: bytes) -> int: 

42 """Takes any input data provided, decodes it as base64, and passes it 

43 on to the underlying object. If the data provided is invalid base64 

44 data, then this method will raise 

45 a :class:`multipart.exceptions.DecodeError` 

46 

47 :param data: base64 data to decode 

48 """ 

49 

50 # Prepend any cache info to our data. 

51 if len(self.cache) > 0: 

52 data = self.cache + data 

53 

54 # Slice off a string that's a multiple of 4. 

55 decode_len = (len(data) // 4) * 4 

56 val = data[:decode_len] 

57 

58 # Decode and write, if we have any. 

59 if len(val) > 0: 

60 try: 

61 decoded = base64.b64decode(val) 

62 except binascii.Error: 

63 raise DecodeError("There was an error raised while decoding base64-encoded data.") 

64 

65 self.underlying.write(decoded) 

66 

67 # Get the remaining bytes and save in our cache. 

68 remaining_len = len(data) % 4 

69 if remaining_len > 0: 

70 self.cache = data[-remaining_len:] 

71 else: 

72 self.cache = b"" 

73 

74 # Return the length of the data to indicate no error. 

75 return len(data) 

76 

77 def close(self) -> None: 

78 """Close this decoder. If the underlying object has a `close()` 

79 method, this function will call it. 

80 """ 

81 if hasattr(self.underlying, "close"): 

82 self.underlying.close() 

83 

84 def finalize(self) -> None: 

85 """Finalize this object. This should be called when no more data 

86 should be written to the stream. This function can raise a 

87 :class:`multipart.exceptions.DecodeError` if there is some remaining 

88 data in the cache. 

89 

90 If the underlying object has a `finalize()` method, this function will 

91 call it. 

92 """ 

93 if len(self.cache) > 0: 

94 raise DecodeError( 

95 "There are %d bytes remaining in the Base64Decoder cache when finalize() is called" % len(self.cache) 

96 ) 

97 

98 if hasattr(self.underlying, "finalize"): 

99 self.underlying.finalize() 

100 

101 def __repr__(self) -> str: 

102 return f"{self.__class__.__name__}(underlying={self.underlying!r})" 

103 

104 

105class QuotedPrintableDecoder: 

106 """This object provides an interface to decode a stream of quoted-printable 

107 data. It is instantiated with an "underlying object", in the same manner 

108 as the :class:`multipart.decoders.Base64Decoder` class. This class behaves 

109 in exactly the same way, including maintaining a cache of quoted-printable 

110 chunks. 

111 

112 :param underlying: the underlying object to pass writes to 

113 """ 

114 

115 def __init__(self, underlying: BufferedWriter) -> None: 

116 self.cache = b"" 

117 self.underlying = underlying 

118 

119 def write(self, data: bytes) -> int: 

120 """Takes any input data provided, decodes it as quoted-printable, and 

121 passes it on to the underlying object. 

122 

123 :param data: quoted-printable data to decode 

124 """ 

125 # Prepend any cache info to our data. 

126 if len(self.cache) > 0: 

127 data = self.cache + data 

128 

129 # If the last 2 characters have an '=' sign in it, then we won't be 

130 # able to decode the encoded value and we'll need to save it for the 

131 # next decoding step. 

132 if data[-2:].find(b"=") != -1: 

133 enc, rest = data[:-2], data[-2:] 

134 else: 

135 enc = data 

136 rest = b"" 

137 

138 # Encode and write, if we have data. 

139 if len(enc) > 0: 

140 self.underlying.write(binascii.a2b_qp(enc)) 

141 

142 # Save remaining in cache. 

143 self.cache = rest 

144 return len(data) 

145 

146 def close(self) -> None: 

147 """Close this decoder. If the underlying object has a `close()` 

148 method, this function will call it. 

149 """ 

150 if hasattr(self.underlying, "close"): 

151 self.underlying.close() 

152 

153 def finalize(self) -> None: 

154 """Finalize this object. This should be called when no more data 

155 should be written to the stream. This function will not raise any 

156 exceptions, but it may write more data to the underlying object if 

157 there is data remaining in the cache. 

158 

159 If the underlying object has a `finalize()` method, this function will 

160 call it. 

161 """ 

162 # If we have a cache, write and then remove it. 

163 if len(self.cache) > 0: 

164 self.underlying.write(binascii.a2b_qp(self.cache)) 

165 self.cache = b"" 

166 

167 # Finalize our underlying stream. 

168 if hasattr(self.underlying, "finalize"): 

169 self.underlying.finalize() 

170 

171 def __repr__(self) -> str: 

172 return f"{self.__class__.__name__}(underlying={self.underlying!r})"