1import base64
2import binascii
3from typing import TYPE_CHECKING
4
5from .exceptions import DecodeError
6
7if TYPE_CHECKING: # pragma: no cover
8 from typing import Protocol, TypeVar
9
10 _T_contra = TypeVar("_T_contra", contravariant=True)
11
12 class SupportsWrite(Protocol[_T_contra]):
13 def write(self, __b: _T_contra) -> object: ...
14
15 # No way to specify optional methods. See
16 # https://github.com/python/typing/issues/601
17 # close() [Optional]
18 # finalize() [Optional]
19
20
21class Base64Decoder:
22 """This object provides an interface to decode a stream of Base64 data. It
23 is instantiated with an "underlying object", and whenever a write()
24 operation is performed, it will decode the incoming data as Base64, and
25 call write() on the underlying object. This is primarily used for decoding
26 form data encoded as Base64, but can be used for other purposes::
27
28 from python_multipart.decoders import Base64Decoder
29 fd = open("notb64.txt", "wb")
30 decoder = Base64Decoder(fd)
31 try:
32 decoder.write("Zm9vYmFy") # "foobar" in Base64
33 decoder.finalize()
34 finally:
35 decoder.close()
36
37 # The contents of "notb64.txt" should be "foobar".
38
39 This object will also pass all finalize() and close() calls to the
40 underlying object, if the underlying object supports them.
41
42 Note that this class maintains a cache of base64 chunks, so that a write of
43 arbitrary size can be performed. You must call :meth:`finalize` on this
44 object after all writes are completed to ensure that all data is flushed
45 to the underlying object.
46
47 :param underlying: the underlying object to pass writes to
48 """
49
50 def __init__(self, underlying: "SupportsWrite[bytes]") -> None:
51 self.cache = bytearray()
52 self.underlying = underlying
53
54 def write(self, data: bytes) -> int:
55 """Takes any input data provided, decodes it as base64, and passes it
56 on to the underlying object. If the data provided is invalid base64
57 data, then this method will raise
58 a :class:`python_multipart.exceptions.DecodeError`
59
60 :param data: base64 data to decode
61 """
62
63 # Prepend any cache info to our data.
64 if len(self.cache) > 0:
65 data = self.cache + data
66
67 # Slice off a string that's a multiple of 4.
68 decode_len = (len(data) // 4) * 4
69 val = data[:decode_len]
70
71 # Decode and write, if we have any.
72 if len(val) > 0:
73 try:
74 decoded = base64.b64decode(val)
75 except binascii.Error:
76 raise DecodeError("There was an error raised while decoding base64-encoded data.")
77
78 self.underlying.write(decoded)
79
80 # Get the remaining bytes and save in our cache.
81 remaining_len = len(data) % 4
82 if remaining_len > 0:
83 self.cache[:] = data[-remaining_len:]
84 else:
85 self.cache[:] = b""
86
87 # Return the length of the data to indicate no error.
88 return len(data)
89
90 def close(self) -> None:
91 """Close this decoder. If the underlying object has a `close()`
92 method, this function will call it.
93 """
94 if hasattr(self.underlying, "close"):
95 self.underlying.close()
96
97 def finalize(self) -> None:
98 """Finalize this object. This should be called when no more data
99 should be written to the stream. This function can raise a
100 :class:`python_multipart.exceptions.DecodeError` if there is some remaining
101 data in the cache.
102
103 If the underlying object has a `finalize()` method, this function will
104 call it.
105 """
106 if len(self.cache) > 0:
107 raise DecodeError(
108 "There are %d bytes remaining in the Base64Decoder cache when finalize() is called" % len(self.cache)
109 )
110
111 if hasattr(self.underlying, "finalize"):
112 self.underlying.finalize()
113
114 def __repr__(self) -> str:
115 return f"{self.__class__.__name__}(underlying={self.underlying!r})"
116
117
118class QuotedPrintableDecoder:
119 """This object provides an interface to decode a stream of quoted-printable
120 data. It is instantiated with an "underlying object", in the same manner
121 as the :class:`python_multipart.decoders.Base64Decoder` class. This class behaves
122 in exactly the same way, including maintaining a cache of quoted-printable
123 chunks.
124
125 :param underlying: the underlying object to pass writes to
126 """
127
128 def __init__(self, underlying: "SupportsWrite[bytes]") -> None:
129 self.cache = b""
130 self.underlying = underlying
131
132 def write(self, data: bytes) -> int:
133 """Takes any input data provided, decodes it as quoted-printable, and
134 passes it on to the underlying object.
135
136 :param data: quoted-printable data to decode
137 """
138 # Prepend any cache info to our data.
139 if len(self.cache) > 0:
140 data = self.cache + data
141
142 # If the last 2 characters have an '=' sign in it, then we won't be
143 # able to decode the encoded value and we'll need to save it for the
144 # next decoding step.
145 if data[-2:].find(b"=") != -1:
146 enc, rest = data[:-2], data[-2:]
147 else:
148 enc = data
149 rest = b""
150
151 # Encode and write, if we have data.
152 if len(enc) > 0:
153 self.underlying.write(binascii.a2b_qp(enc))
154
155 # Save remaining in cache.
156 self.cache = rest
157 return len(data)
158
159 def close(self) -> None:
160 """Close this decoder. If the underlying object has a `close()`
161 method, this function will call it.
162 """
163 if hasattr(self.underlying, "close"):
164 self.underlying.close()
165
166 def finalize(self) -> None:
167 """Finalize this object. This should be called when no more data
168 should be written to the stream. This function will not raise any
169 exceptions, but it may write more data to the underlying object if
170 there is data remaining in the cache.
171
172 If the underlying object has a `finalize()` method, this function will
173 call it.
174 """
175 # If we have a cache, write and then remove it.
176 if len(self.cache) > 0: # pragma: no cover
177 self.underlying.write(binascii.a2b_qp(self.cache))
178 self.cache = b""
179
180 # Finalize our underlying stream.
181 if hasattr(self.underlying, "finalize"):
182 self.underlying.finalize()
183
184 def __repr__(self) -> str:
185 return f"{self.__class__.__name__}(underlying={self.underlying!r})"