Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/python_multipart/decoders.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

57 statements  

1import base64 

2import binascii 

3from typing import TYPE_CHECKING 

4 

5from .exceptions import DecodeError 

6 

7if TYPE_CHECKING: # pragma: no cover 

8 from typing import Protocol, TypeVar 

9 

10 _T_contra = TypeVar("_T_contra", contravariant=True) 

11 

12 class SupportsWrite(Protocol[_T_contra]): 

13 def write(self, __b: _T_contra) -> object: ... 

14 

15 # No way to specify optional methods. See 

16 # https://github.com/python/typing/issues/601 

17 # close() [Optional] 

18 # finalize() [Optional] 

19 

20 

21class Base64Decoder: 

22 """This object provides an interface to decode a stream of Base64 data. It 

23 is instantiated with an "underlying object", and whenever a write() 

24 operation is performed, it will decode the incoming data as Base64, and 

25 call write() on the underlying object. This is primarily used for decoding 

26 form data encoded as Base64, but can be used for other purposes:: 

27 

28 from python_multipart.decoders import Base64Decoder 

29 fd = open("notb64.txt", "wb") 

30 decoder = Base64Decoder(fd) 

31 try: 

32 decoder.write("Zm9vYmFy") # "foobar" in Base64 

33 decoder.finalize() 

34 finally: 

35 decoder.close() 

36 

37 # The contents of "notb64.txt" should be "foobar". 

38 

39 This object will also pass all finalize() and close() calls to the 

40 underlying object, if the underlying object supports them. 

41 

42 Note that this class maintains a cache of base64 chunks, so that a write of 

43 arbitrary size can be performed. You must call :meth:`finalize` on this 

44 object after all writes are completed to ensure that all data is flushed 

45 to the underlying object. 

46 

47 :param underlying: the underlying object to pass writes to 

48 """ 

49 

50 def __init__(self, underlying: "SupportsWrite[bytes]") -> None: 

51 self.cache = bytearray() 

52 self.underlying = underlying 

53 

54 def write(self, data: bytes) -> int: 

55 """Takes any input data provided, decodes it as base64, and passes it 

56 on to the underlying object. If the data provided is invalid base64 

57 data, then this method will raise 

58 a :class:`python_multipart.exceptions.DecodeError` 

59 

60 :param data: base64 data to decode 

61 """ 

62 

63 # Prepend any cache info to our data. 

64 if len(self.cache) > 0: 

65 data = self.cache + data 

66 

67 # Slice off a string that's a multiple of 4. 

68 decode_len = (len(data) // 4) * 4 

69 val = data[:decode_len] 

70 

71 # Decode and write, if we have any. 

72 if len(val) > 0: 

73 try: 

74 decoded = base64.b64decode(val) 

75 except binascii.Error: 

76 raise DecodeError("There was an error raised while decoding base64-encoded data.") 

77 

78 self.underlying.write(decoded) 

79 

80 # Get the remaining bytes and save in our cache. 

81 remaining_len = len(data) % 4 

82 if remaining_len > 0: 

83 self.cache[:] = data[-remaining_len:] 

84 else: 

85 self.cache[:] = b"" 

86 

87 # Return the length of the data to indicate no error. 

88 return len(data) 

89 

90 def close(self) -> None: 

91 """Close this decoder. If the underlying object has a `close()` 

92 method, this function will call it. 

93 """ 

94 if hasattr(self.underlying, "close"): 

95 self.underlying.close() 

96 

97 def finalize(self) -> None: 

98 """Finalize this object. This should be called when no more data 

99 should be written to the stream. This function can raise a 

100 :class:`python_multipart.exceptions.DecodeError` if there is some remaining 

101 data in the cache. 

102 

103 If the underlying object has a `finalize()` method, this function will 

104 call it. 

105 """ 

106 if len(self.cache) > 0: 

107 raise DecodeError( 

108 "There are %d bytes remaining in the Base64Decoder cache when finalize() is called" % len(self.cache) 

109 ) 

110 

111 if hasattr(self.underlying, "finalize"): 

112 self.underlying.finalize() 

113 

114 def __repr__(self) -> str: 

115 return f"{self.__class__.__name__}(underlying={self.underlying!r})" 

116 

117 

118class QuotedPrintableDecoder: 

119 """This object provides an interface to decode a stream of quoted-printable 

120 data. It is instantiated with an "underlying object", in the same manner 

121 as the :class:`python_multipart.decoders.Base64Decoder` class. This class behaves 

122 in exactly the same way, including maintaining a cache of quoted-printable 

123 chunks. 

124 

125 :param underlying: the underlying object to pass writes to 

126 """ 

127 

128 def __init__(self, underlying: "SupportsWrite[bytes]") -> None: 

129 self.cache = b"" 

130 self.underlying = underlying 

131 

132 def write(self, data: bytes) -> int: 

133 """Takes any input data provided, decodes it as quoted-printable, and 

134 passes it on to the underlying object. 

135 

136 :param data: quoted-printable data to decode 

137 """ 

138 # Prepend any cache info to our data. 

139 if len(self.cache) > 0: 

140 data = self.cache + data 

141 

142 # If the last 2 characters have an '=' sign in it, then we won't be 

143 # able to decode the encoded value and we'll need to save it for the 

144 # next decoding step. 

145 if data[-2:].find(b"=") != -1: 

146 enc, rest = data[:-2], data[-2:] 

147 else: 

148 enc = data 

149 rest = b"" 

150 

151 # Encode and write, if we have data. 

152 if len(enc) > 0: 

153 self.underlying.write(binascii.a2b_qp(enc)) 

154 

155 # Save remaining in cache. 

156 self.cache = rest 

157 return len(data) 

158 

159 def close(self) -> None: 

160 """Close this decoder. If the underlying object has a `close()` 

161 method, this function will call it. 

162 """ 

163 if hasattr(self.underlying, "close"): 

164 self.underlying.close() 

165 

166 def finalize(self) -> None: 

167 """Finalize this object. This should be called when no more data 

168 should be written to the stream. This function will not raise any 

169 exceptions, but it may write more data to the underlying object if 

170 there is data remaining in the cache. 

171 

172 If the underlying object has a `finalize()` method, this function will 

173 call it. 

174 """ 

175 # If we have a cache, write and then remove it. 

176 if len(self.cache) > 0: # pragma: no cover 

177 self.underlying.write(binascii.a2b_qp(self.cache)) 

178 self.cache = b"" 

179 

180 # Finalize our underlying stream. 

181 if hasattr(self.underlying, "finalize"): 

182 self.underlying.finalize() 

183 

184 def __repr__(self) -> str: 

185 return f"{self.__class__.__name__}(underlying={self.underlying!r})"