1import base64
2import binascii
3
4from .exceptions import DecodeError
5
6
7class Base64Decoder:
8 """This object provides an interface to decode a stream of Base64 data. It
9 is instantiated with an "underlying object", and whenever a write()
10 operation is performed, it will decode the incoming data as Base64, and
11 call write() on the underlying object. This is primarily used for decoding
12 form data encoded as Base64, but can be used for other purposes::
13
14 from multipart.decoders import Base64Decoder
15 fd = open("notb64.txt", "wb")
16 decoder = Base64Decoder(fd)
17 try:
18 decoder.write("Zm9vYmFy") # "foobar" in Base64
19 decoder.finalize()
20 finally:
21 decoder.close()
22
23 # The contents of "notb64.txt" should be "foobar".
24
25 This object will also pass all finalize() and close() calls to the
26 underlying object, if the underlying object supports them.
27
28 Note that this class maintains a cache of base64 chunks, so that a write of
29 arbitrary size can be performed. You must call :meth:`finalize` on this
30 object after all writes are completed to ensure that all data is flushed
31 to the underlying object.
32
33 :param underlying: the underlying object to pass writes to
34 """
35
36 def __init__(self, underlying):
37 self.cache = bytearray()
38 self.underlying = underlying
39
40 def write(self, data):
41 """Takes any input data provided, decodes it as base64, and passes it
42 on to the underlying object. If the data provided is invalid base64
43 data, then this method will raise
44 a :class:`multipart.exceptions.DecodeError`
45
46 :param data: base64 data to decode
47 """
48
49 # Prepend any cache info to our data.
50 if len(self.cache) > 0:
51 data = self.cache + data
52
53 # Slice off a string that's a multiple of 4.
54 decode_len = (len(data) // 4) * 4
55 val = data[:decode_len]
56
57 # Decode and write, if we have any.
58 if len(val) > 0:
59 try:
60 decoded = base64.b64decode(val)
61 except binascii.Error:
62 raise DecodeError("There was an error raised while decoding base64-encoded data.")
63
64 self.underlying.write(decoded)
65
66 # Get the remaining bytes and save in our cache.
67 remaining_len = len(data) % 4
68 if remaining_len > 0:
69 self.cache = data[-remaining_len:]
70 else:
71 self.cache = b""
72
73 # Return the length of the data to indicate no error.
74 return len(data)
75
76 def close(self):
77 """Close this decoder. If the underlying object has a `close()`
78 method, this function will call it.
79 """
80 if hasattr(self.underlying, "close"):
81 self.underlying.close()
82
83 def finalize(self):
84 """Finalize this object. This should be called when no more data
85 should be written to the stream. This function can raise a
86 :class:`multipart.exceptions.DecodeError` if there is some remaining
87 data in the cache.
88
89 If the underlying object has a `finalize()` method, this function will
90 call it.
91 """
92 if len(self.cache) > 0:
93 raise DecodeError(
94 "There are %d bytes remaining in the Base64Decoder cache when finalize() is called" % len(self.cache)
95 )
96
97 if hasattr(self.underlying, "finalize"):
98 self.underlying.finalize()
99
100 def __repr__(self):
101 return f"{self.__class__.__name__}(underlying={self.underlying!r})"
102
103
104class QuotedPrintableDecoder:
105 """This object provides an interface to decode a stream of quoted-printable
106 data. It is instantiated with an "underlying object", in the same manner
107 as the :class:`multipart.decoders.Base64Decoder` class. This class behaves
108 in exactly the same way, including maintaining a cache of quoted-printable
109 chunks.
110
111 :param underlying: the underlying object to pass writes to
112 """
113
114 def __init__(self, underlying):
115 self.cache = b""
116 self.underlying = underlying
117
118 def write(self, data):
119 """Takes any input data provided, decodes it as quoted-printable, and
120 passes it on to the underlying object.
121
122 :param data: quoted-printable data to decode
123 """
124 # Prepend any cache info to our data.
125 if len(self.cache) > 0:
126 data = self.cache + data
127
128 # If the last 2 characters have an '=' sign in it, then we won't be
129 # able to decode the encoded value and we'll need to save it for the
130 # next decoding step.
131 if data[-2:].find(b"=") != -1:
132 enc, rest = data[:-2], data[-2:]
133 else:
134 enc = data
135 rest = b""
136
137 # Encode and write, if we have data.
138 if len(enc) > 0:
139 self.underlying.write(binascii.a2b_qp(enc))
140
141 # Save remaining in cache.
142 self.cache = rest
143 return len(data)
144
145 def close(self):
146 """Close this decoder. If the underlying object has a `close()`
147 method, this function will call it.
148 """
149 if hasattr(self.underlying, "close"):
150 self.underlying.close()
151
152 def finalize(self):
153 """Finalize this object. This should be called when no more data
154 should be written to the stream. This function will not raise any
155 exceptions, but it may write more data to the underlying object if
156 there is data remaining in the cache.
157
158 If the underlying object has a `finalize()` method, this function will
159 call it.
160 """
161 # If we have a cache, write and then remove it.
162 if len(self.cache) > 0:
163 self.underlying.write(binascii.a2b_qp(self.cache))
164 self.cache = b""
165
166 # Finalize our underlying stream.
167 if hasattr(self.underlying, "finalize"):
168 self.underlying.finalize()
169
170 def __repr__(self):
171 return f"{self.__class__.__name__}(underlying={self.underlying!r})"