1import base64
2import binascii
3from io import BufferedWriter
4
5from .exceptions import DecodeError
6
7
8class Base64Decoder:
9 """This object provides an interface to decode a stream of Base64 data. It
10 is instantiated with an "underlying object", and whenever a write()
11 operation is performed, it will decode the incoming data as Base64, and
12 call write() on the underlying object. This is primarily used for decoding
13 form data encoded as Base64, but can be used for other purposes::
14
15 from multipart.decoders import Base64Decoder
16 fd = open("notb64.txt", "wb")
17 decoder = Base64Decoder(fd)
18 try:
19 decoder.write("Zm9vYmFy") # "foobar" in Base64
20 decoder.finalize()
21 finally:
22 decoder.close()
23
24 # The contents of "notb64.txt" should be "foobar".
25
26 This object will also pass all finalize() and close() calls to the
27 underlying object, if the underlying object supports them.
28
29 Note that this class maintains a cache of base64 chunks, so that a write of
30 arbitrary size can be performed. You must call :meth:`finalize` on this
31 object after all writes are completed to ensure that all data is flushed
32 to the underlying object.
33
34 :param underlying: the underlying object to pass writes to
35 """
36
37 def __init__(self, underlying: BufferedWriter):
38 self.cache = bytearray()
39 self.underlying = underlying
40
41 def write(self, data: bytes) -> int:
42 """Takes any input data provided, decodes it as base64, and passes it
43 on to the underlying object. If the data provided is invalid base64
44 data, then this method will raise
45 a :class:`multipart.exceptions.DecodeError`
46
47 :param data: base64 data to decode
48 """
49
50 # Prepend any cache info to our data.
51 if len(self.cache) > 0:
52 data = self.cache + data
53
54 # Slice off a string that's a multiple of 4.
55 decode_len = (len(data) // 4) * 4
56 val = data[:decode_len]
57
58 # Decode and write, if we have any.
59 if len(val) > 0:
60 try:
61 decoded = base64.b64decode(val)
62 except binascii.Error:
63 raise DecodeError("There was an error raised while decoding base64-encoded data.")
64
65 self.underlying.write(decoded)
66
67 # Get the remaining bytes and save in our cache.
68 remaining_len = len(data) % 4
69 if remaining_len > 0:
70 self.cache = data[-remaining_len:]
71 else:
72 self.cache = b""
73
74 # Return the length of the data to indicate no error.
75 return len(data)
76
77 def close(self) -> None:
78 """Close this decoder. If the underlying object has a `close()`
79 method, this function will call it.
80 """
81 if hasattr(self.underlying, "close"):
82 self.underlying.close()
83
84 def finalize(self) -> None:
85 """Finalize this object. This should be called when no more data
86 should be written to the stream. This function can raise a
87 :class:`multipart.exceptions.DecodeError` if there is some remaining
88 data in the cache.
89
90 If the underlying object has a `finalize()` method, this function will
91 call it.
92 """
93 if len(self.cache) > 0:
94 raise DecodeError(
95 "There are %d bytes remaining in the Base64Decoder cache when finalize() is called" % len(self.cache)
96 )
97
98 if hasattr(self.underlying, "finalize"):
99 self.underlying.finalize()
100
101 def __repr__(self) -> str:
102 return f"{self.__class__.__name__}(underlying={self.underlying!r})"
103
104
105class QuotedPrintableDecoder:
106 """This object provides an interface to decode a stream of quoted-printable
107 data. It is instantiated with an "underlying object", in the same manner
108 as the :class:`multipart.decoders.Base64Decoder` class. This class behaves
109 in exactly the same way, including maintaining a cache of quoted-printable
110 chunks.
111
112 :param underlying: the underlying object to pass writes to
113 """
114
115 def __init__(self, underlying: BufferedWriter) -> None:
116 self.cache = b""
117 self.underlying = underlying
118
119 def write(self, data: bytes) -> int:
120 """Takes any input data provided, decodes it as quoted-printable, and
121 passes it on to the underlying object.
122
123 :param data: quoted-printable data to decode
124 """
125 # Prepend any cache info to our data.
126 if len(self.cache) > 0:
127 data = self.cache + data
128
129 # If the last 2 characters have an '=' sign in it, then we won't be
130 # able to decode the encoded value and we'll need to save it for the
131 # next decoding step.
132 if data[-2:].find(b"=") != -1:
133 enc, rest = data[:-2], data[-2:]
134 else:
135 enc = data
136 rest = b""
137
138 # Encode and write, if we have data.
139 if len(enc) > 0:
140 self.underlying.write(binascii.a2b_qp(enc))
141
142 # Save remaining in cache.
143 self.cache = rest
144 return len(data)
145
146 def close(self) -> None:
147 """Close this decoder. If the underlying object has a `close()`
148 method, this function will call it.
149 """
150 if hasattr(self.underlying, "close"):
151 self.underlying.close()
152
153 def finalize(self) -> None:
154 """Finalize this object. This should be called when no more data
155 should be written to the stream. This function will not raise any
156 exceptions, but it may write more data to the underlying object if
157 there is data remaining in the cache.
158
159 If the underlying object has a `finalize()` method, this function will
160 call it.
161 """
162 # If we have a cache, write and then remove it.
163 if len(self.cache) > 0:
164 self.underlying.write(binascii.a2b_qp(self.cache))
165 self.cache = b""
166
167 # Finalize our underlying stream.
168 if hasattr(self.underlying, "finalize"):
169 self.underlying.finalize()
170
171 def __repr__(self) -> str:
172 return f"{self.__class__.__name__}(underlying={self.underlying!r})"