1import io
2import warnings
3from typing import Any, Iterable, List, Optional
4from urllib.parse import urlencode
5
6from multidict import MultiDict, MultiDictProxy
7
8from . import hdrs, multipart, payload
9from .helpers import guess_filename
10from .payload import Payload
11
12__all__ = ("FormData",)
13
14
15class FormData:
16 """Helper class for form body generation.
17
18 Supports multipart/form-data and application/x-www-form-urlencoded.
19 """
20
21 def __init__(
22 self,
23 fields: Iterable[Any] = (),
24 quote_fields: bool = True,
25 charset: Optional[str] = None,
26 *,
27 default_to_multipart: bool = False,
28 ) -> None:
29 self._writer = multipart.MultipartWriter("form-data")
30 self._fields: List[Any] = []
31 self._is_multipart = default_to_multipart
32 self._quote_fields = quote_fields
33 self._charset = charset
34
35 if isinstance(fields, dict):
36 fields = list(fields.items())
37 elif not isinstance(fields, (list, tuple)):
38 fields = (fields,)
39 self.add_fields(*fields)
40
41 @property
42 def is_multipart(self) -> bool:
43 return self._is_multipart
44
45 def add_field(
46 self,
47 name: str,
48 value: Any,
49 *,
50 content_type: Optional[str] = None,
51 filename: Optional[str] = None,
52 content_transfer_encoding: Optional[str] = None,
53 ) -> None:
54
55 if isinstance(value, io.IOBase):
56 self._is_multipart = True
57 elif isinstance(value, (bytes, bytearray, memoryview)):
58 msg = (
59 "In v4, passing bytes will no longer create a file field. "
60 "Please explicitly use the filename parameter or pass a BytesIO object."
61 )
62 if filename is None and content_transfer_encoding is None:
63 warnings.warn(msg, DeprecationWarning)
64 filename = name
65
66 type_options: MultiDict[str] = MultiDict({"name": name})
67 if filename is not None and not isinstance(filename, str):
68 raise TypeError("filename must be an instance of str. Got: %s" % filename)
69 if filename is None and isinstance(value, io.IOBase):
70 filename = guess_filename(value, name)
71 if filename is not None:
72 type_options["filename"] = filename
73 self._is_multipart = True
74
75 headers = {}
76 if content_type is not None:
77 if not isinstance(content_type, str):
78 raise TypeError(
79 "content_type must be an instance of str. Got: %s" % content_type
80 )
81 headers[hdrs.CONTENT_TYPE] = content_type
82 self._is_multipart = True
83 if content_transfer_encoding is not None:
84 if not isinstance(content_transfer_encoding, str):
85 raise TypeError(
86 "content_transfer_encoding must be an instance"
87 " of str. Got: %s" % content_transfer_encoding
88 )
89 msg = (
90 "content_transfer_encoding is deprecated. "
91 "To maintain compatibility with v4 please pass a BytesPayload."
92 )
93 warnings.warn(msg, DeprecationWarning)
94 self._is_multipart = True
95
96 self._fields.append((type_options, headers, value))
97
98 def add_fields(self, *fields: Any) -> None:
99 to_add = list(fields)
100
101 while to_add:
102 rec = to_add.pop(0)
103
104 if isinstance(rec, io.IOBase):
105 k = guess_filename(rec, "unknown")
106 self.add_field(k, rec) # type: ignore[arg-type]
107
108 elif isinstance(rec, (MultiDictProxy, MultiDict)):
109 to_add.extend(rec.items())
110
111 elif isinstance(rec, (list, tuple)) and len(rec) == 2:
112 k, fp = rec
113 self.add_field(k, fp) # type: ignore[arg-type]
114
115 else:
116 raise TypeError(
117 "Only io.IOBase, multidict and (name, file) "
118 "pairs allowed, use .add_field() for passing "
119 "more complex parameters, got {!r}".format(rec)
120 )
121
122 def _gen_form_urlencoded(self) -> payload.BytesPayload:
123 # form data (x-www-form-urlencoded)
124 data = []
125 for type_options, _, value in self._fields:
126 data.append((type_options["name"], value))
127
128 charset = self._charset if self._charset is not None else "utf-8"
129
130 if charset == "utf-8":
131 content_type = "application/x-www-form-urlencoded"
132 else:
133 content_type = "application/x-www-form-urlencoded; charset=%s" % charset
134
135 return payload.BytesPayload(
136 urlencode(data, doseq=True, encoding=charset).encode(),
137 content_type=content_type,
138 )
139
140 def _gen_form_data(self) -> multipart.MultipartWriter:
141 """Encode a list of fields using the multipart/form-data MIME format"""
142 for dispparams, headers, value in self._fields:
143 try:
144 if hdrs.CONTENT_TYPE in headers:
145 part = payload.get_payload(
146 value,
147 content_type=headers[hdrs.CONTENT_TYPE],
148 headers=headers,
149 encoding=self._charset,
150 )
151 else:
152 part = payload.get_payload(
153 value, headers=headers, encoding=self._charset
154 )
155 except Exception as exc:
156 raise TypeError(
157 "Can not serialize value type: %r\n "
158 "headers: %r\n value: %r" % (type(value), headers, value)
159 ) from exc
160
161 if dispparams:
162 part.set_content_disposition(
163 "form-data", quote_fields=self._quote_fields, **dispparams
164 )
165 # FIXME cgi.FieldStorage doesn't likes body parts with
166 # Content-Length which were sent via chunked transfer encoding
167 assert part.headers is not None
168 part.headers.popall(hdrs.CONTENT_LENGTH, None)
169
170 self._writer.append_payload(part)
171
172 self._fields.clear()
173 return self._writer
174
175 def __call__(self) -> Payload:
176 if self._is_multipart:
177 return self._gen_form_data()
178 else:
179 return self._gen_form_urlencoded()