Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/formdata.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import io
2import warnings
3from collections.abc import Iterable
4from typing import Any
5from urllib.parse import urlencode
7from multidict import MultiDict, MultiDictProxy
9from . import hdrs, multipart, payload
10from .helpers import guess_filename
11from .http_writer import _safe_header
12from .payload import Payload
14__all__ = ("FormData",)
17class FormData:
18 """Helper class for form body generation.
20 Supports multipart/form-data and application/x-www-form-urlencoded.
21 """
23 def __init__(
24 self,
25 fields: Iterable[Any] = (),
26 quote_fields: bool = True,
27 charset: str | None = None,
28 *,
29 default_to_multipart: bool = False,
30 ) -> None:
31 self._writer = multipart.MultipartWriter("form-data")
32 self._fields: list[Any] = []
33 self._is_multipart = default_to_multipart
34 self._quote_fields = quote_fields
35 self._charset = charset
37 if isinstance(fields, dict):
38 fields = list(fields.items())
39 elif not isinstance(fields, (list, tuple)):
40 fields = (fields,)
41 self.add_fields(*fields)
43 @property
44 def is_multipart(self) -> bool:
45 return self._is_multipart
47 def add_field(
48 self,
49 name: str,
50 value: Any,
51 *,
52 content_type: str | None = None,
53 filename: str | None = None,
54 content_transfer_encoding: str | None = None,
55 ) -> None:
57 if isinstance(value, io.IOBase):
58 self._is_multipart = True
59 elif isinstance(value, (bytes, bytearray, memoryview)):
60 msg = (
61 "In v4, passing bytes will no longer create a file field. "
62 "Please explicitly use the filename parameter or pass a BytesIO object."
63 )
64 if filename is None and content_transfer_encoding is None:
65 warnings.warn(msg, DeprecationWarning)
66 filename = name
68 _safe_header(name)
69 type_options: MultiDict[str] = MultiDict({"name": name})
70 if filename is not None and not isinstance(filename, str):
71 raise TypeError("filename must be an instance of str. Got: %s" % filename)
72 if filename is None and isinstance(value, io.IOBase):
73 filename = guess_filename(value, name)
74 if filename is not None:
75 _safe_header(filename)
76 type_options["filename"] = filename
77 self._is_multipart = True
79 headers = {}
80 if content_type is not None:
81 if not isinstance(content_type, str):
82 raise TypeError(
83 "content_type must be an instance of str. Got: %s" % content_type
84 )
85 _safe_header(content_type)
86 headers[hdrs.CONTENT_TYPE] = content_type
87 self._is_multipart = True
88 if content_transfer_encoding is not None:
89 if not isinstance(content_transfer_encoding, str):
90 raise TypeError(
91 "content_transfer_encoding must be an instance"
92 " of str. Got: %s" % content_transfer_encoding
93 )
94 msg = (
95 "content_transfer_encoding is deprecated. "
96 "To maintain compatibility with v4 please pass a BytesPayload."
97 )
98 warnings.warn(msg, DeprecationWarning)
99 self._is_multipart = True
101 self._fields.append((type_options, headers, value))
103 def add_fields(self, *fields: Any) -> None:
104 to_add = list(fields)
106 while to_add:
107 rec = to_add.pop(0)
109 if isinstance(rec, io.IOBase):
110 k = guess_filename(rec, "unknown")
111 self.add_field(k, rec) # type: ignore[arg-type]
113 elif isinstance(rec, (MultiDictProxy, MultiDict)):
114 to_add.extend(rec.items())
116 elif isinstance(rec, (list, tuple)) and len(rec) == 2:
117 k, fp = rec
118 self.add_field(k, fp)
120 else:
121 raise TypeError(
122 "Only io.IOBase, multidict and (name, file) "
123 "pairs allowed, use .add_field() for passing "
124 f"more complex parameters, got {rec!r}"
125 )
127 def _gen_form_urlencoded(self) -> payload.BytesPayload:
128 # form data (x-www-form-urlencoded)
129 data = []
130 for type_options, _, value in self._fields:
131 data.append((type_options["name"], value))
133 charset = self._charset if self._charset is not None else "utf-8"
135 if charset == "utf-8":
136 content_type = "application/x-www-form-urlencoded"
137 else:
138 content_type = "application/x-www-form-urlencoded; charset=%s" % charset
140 return payload.BytesPayload(
141 urlencode(data, doseq=True, encoding=charset).encode(),
142 content_type=content_type,
143 )
145 def _gen_form_data(self) -> multipart.MultipartWriter:
146 """Encode a list of fields using the multipart/form-data MIME format"""
147 for dispparams, headers, value in self._fields:
148 try:
149 if hdrs.CONTENT_TYPE in headers:
150 part = payload.get_payload(
151 value,
152 content_type=headers[hdrs.CONTENT_TYPE],
153 headers=headers,
154 encoding=self._charset,
155 )
156 else:
157 part = payload.get_payload(
158 value, headers=headers, encoding=self._charset
159 )
160 except Exception as exc:
161 raise TypeError(
162 "Can not serialize value type: %r\n "
163 "headers: %r\n value: %r" % (type(value), headers, value)
164 ) from exc
166 if dispparams:
167 part.set_content_disposition(
168 "form-data", quote_fields=self._quote_fields, **dispparams
169 )
170 # FIXME cgi.FieldStorage doesn't likes body parts with
171 # Content-Length which were sent via chunked transfer encoding
172 assert part.headers is not None
173 part.headers.popall(hdrs.CONTENT_LENGTH, None)
175 self._writer.append_payload(part)
177 self._fields.clear()
178 return self._writer
180 def __call__(self) -> Payload:
181 if self._is_multipart:
182 return self._gen_form_data()
183 else:
184 return self._gen_form_urlencoded()