Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/formdata.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import io
2import warnings
3from typing import Any, Iterable, List, Optional
4from urllib.parse import urlencode
6from multidict import MultiDict, MultiDictProxy
8from . import hdrs, multipart, payload
9from .helpers import guess_filename
10from .payload import Payload
12__all__ = ("FormData",)
15class FormData:
16 """Helper class for form body generation.
18 Supports multipart/form-data and application/x-www-form-urlencoded.
19 """
21 def __init__(
22 self,
23 fields: Iterable[Any] = (),
24 quote_fields: bool = True,
25 charset: Optional[str] = None,
26 ) -> None:
27 self._writer = multipart.MultipartWriter("form-data")
28 self._fields: List[Any] = []
29 self._is_multipart = False
30 self._is_processed = False
31 self._quote_fields = quote_fields
32 self._charset = charset
34 if isinstance(fields, dict):
35 fields = list(fields.items())
36 elif not isinstance(fields, (list, tuple)):
37 fields = (fields,)
38 self.add_fields(*fields)
40 @property
41 def is_multipart(self) -> bool:
42 return self._is_multipart
44 def add_field(
45 self,
46 name: str,
47 value: Any,
48 *,
49 content_type: Optional[str] = None,
50 filename: Optional[str] = None,
51 content_transfer_encoding: Optional[str] = None,
52 ) -> None:
54 if isinstance(value, io.IOBase):
55 self._is_multipart = True
56 elif isinstance(value, (bytes, bytearray, memoryview)):
57 msg = (
58 "In v4, passing bytes will no longer create a file field. "
59 "Please explicitly use the filename parameter or pass a BytesIO object."
60 )
61 if filename is None and content_transfer_encoding is None:
62 warnings.warn(msg, DeprecationWarning)
63 filename = name
65 type_options: MultiDict[str] = MultiDict({"name": name})
66 if filename is not None and not isinstance(filename, str):
67 raise TypeError(
68 "filename must be an instance of str. " "Got: %s" % filename
69 )
70 if filename is None and isinstance(value, io.IOBase):
71 filename = guess_filename(value, name)
72 if filename is not None:
73 type_options["filename"] = filename
74 self._is_multipart = True
76 headers = {}
77 if content_type is not None:
78 if not isinstance(content_type, str):
79 raise TypeError(
80 "content_type must be an instance of str. " "Got: %s" % content_type
81 )
82 headers[hdrs.CONTENT_TYPE] = content_type
83 self._is_multipart = True
84 if content_transfer_encoding is not None:
85 if not isinstance(content_transfer_encoding, str):
86 raise TypeError(
87 "content_transfer_encoding must be an instance"
88 " of str. Got: %s" % content_transfer_encoding
89 )
90 msg = (
91 "content_transfer_encoding is deprecated. "
92 "To maintain compatibility with v4 please pass a BytesPayload."
93 )
94 warnings.warn(msg, DeprecationWarning)
95 self._is_multipart = True
97 self._fields.append((type_options, headers, value))
99 def add_fields(self, *fields: Any) -> None:
100 to_add = list(fields)
102 while to_add:
103 rec = to_add.pop(0)
105 if isinstance(rec, io.IOBase):
106 k = guess_filename(rec, "unknown")
107 self.add_field(k, rec) # type: ignore[arg-type]
109 elif isinstance(rec, (MultiDictProxy, MultiDict)):
110 to_add.extend(rec.items())
112 elif isinstance(rec, (list, tuple)) and len(rec) == 2:
113 k, fp = rec
114 self.add_field(k, fp) # type: ignore[arg-type]
116 else:
117 raise TypeError(
118 "Only io.IOBase, multidict and (name, file) "
119 "pairs allowed, use .add_field() for passing "
120 "more complex parameters, got {!r}".format(rec)
121 )
123 def _gen_form_urlencoded(self) -> payload.BytesPayload:
124 # form data (x-www-form-urlencoded)
125 data = []
126 for type_options, _, value in self._fields:
127 data.append((type_options["name"], value))
129 charset = self._charset if self._charset is not None else "utf-8"
131 if charset == "utf-8":
132 content_type = "application/x-www-form-urlencoded"
133 else:
134 content_type = "application/x-www-form-urlencoded; " "charset=%s" % charset
136 return payload.BytesPayload(
137 urlencode(data, doseq=True, encoding=charset).encode(),
138 content_type=content_type,
139 )
141 def _gen_form_data(self) -> multipart.MultipartWriter:
142 """Encode a list of fields using the multipart/form-data MIME format"""
143 if self._is_processed:
144 raise RuntimeError("Form data has been processed already")
145 for dispparams, headers, value in self._fields:
146 try:
147 if hdrs.CONTENT_TYPE in headers:
148 part = payload.get_payload(
149 value,
150 content_type=headers[hdrs.CONTENT_TYPE],
151 headers=headers,
152 encoding=self._charset,
153 )
154 else:
155 part = payload.get_payload(
156 value, headers=headers, encoding=self._charset
157 )
158 except Exception as exc:
159 raise TypeError(
160 "Can not serialize value type: %r\n "
161 "headers: %r\n value: %r" % (type(value), headers, value)
162 ) from exc
164 if dispparams:
165 part.set_content_disposition(
166 "form-data", quote_fields=self._quote_fields, **dispparams
167 )
168 # FIXME cgi.FieldStorage doesn't likes body parts with
169 # Content-Length which were sent via chunked transfer encoding
170 assert part.headers is not None
171 part.headers.popall(hdrs.CONTENT_LENGTH, None)
173 self._writer.append_payload(part)
175 self._is_processed = True
176 return self._writer
178 def __call__(self) -> Payload:
179 if self._is_multipart:
180 return self._gen_form_data()
181 else:
182 return self._gen_form_urlencoded()