Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/formdata.py: 22%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import io
2from collections import deque
3from collections.abc import Iterable
4from typing import Any
5from urllib.parse import urlencode
7from multidict import MultiDict, MultiDictProxy
9from . import hdrs, multipart, payload
10from .helpers import guess_filename
11from .http_writer import _safe_header
12from .payload import Payload
14__all__ = ("FormData",)
17class FormData:
18 """Helper class for form body generation.
20 Supports multipart/form-data and application/x-www-form-urlencoded.
21 """
23 def __init__(
24 self,
25 fields: Iterable[Any] = (),
26 quote_fields: bool = True,
27 charset: str | None = None,
28 boundary: str | None = None,
29 *,
30 default_to_multipart: bool = False,
31 ) -> None:
32 self._boundary = boundary
33 self._writer = multipart.MultipartWriter("form-data", boundary=self._boundary)
34 self._fields: list[Any] = []
35 self._is_multipart = default_to_multipart
36 self._quote_fields = quote_fields
37 self._charset = charset
39 if isinstance(fields, dict):
40 fields = list(fields.items())
41 elif not isinstance(fields, (list, tuple)):
42 fields = (fields,)
43 self.add_fields(*fields)
45 @property
46 def is_multipart(self) -> bool:
47 return self._is_multipart
49 def add_field(
50 self,
51 name: str,
52 value: Any,
53 *,
54 content_type: str | None = None,
55 filename: str | None = None,
56 ) -> None:
57 if isinstance(value, (io.IOBase, bytes, bytearray, memoryview)):
58 self._is_multipart = True
60 _safe_header(name)
61 type_options: MultiDict[str] = MultiDict({"name": name})
62 if filename is not None and not isinstance(filename, str):
63 raise TypeError("filename must be an instance of str. Got: %s" % filename)
64 if filename is None and isinstance(value, io.IOBase):
65 filename = guess_filename(value, name)
66 if filename is not None:
67 _safe_header(filename)
68 type_options["filename"] = filename
69 self._is_multipart = True
71 headers = {}
72 if content_type is not None:
73 if not isinstance(content_type, str):
74 raise TypeError(
75 "content_type must be an instance of str. Got: %s" % content_type
76 )
77 _safe_header(content_type)
78 headers[hdrs.CONTENT_TYPE] = content_type
79 self._is_multipart = True
81 self._fields.append((type_options, headers, value))
83 def add_fields(self, *fields: Any) -> None:
84 to_add: deque[Any] = deque(fields)
86 while to_add:
87 rec = to_add.popleft()
89 if isinstance(rec, io.IOBase):
90 k = guess_filename(rec, "unknown")
91 self.add_field(k, rec) # type: ignore[arg-type]
93 elif isinstance(rec, (MultiDictProxy, MultiDict)):
94 to_add.extend(rec.items())
96 elif isinstance(rec, (list, tuple)) and len(rec) == 2:
97 k, fp = rec
98 self.add_field(k, fp)
100 else:
101 raise TypeError(
102 "Only io.IOBase, multidict and (name, file) "
103 "pairs allowed, use .add_field() for passing "
104 f"more complex parameters, got {rec!r}"
105 )
107 def _gen_form_urlencoded(self) -> payload.BytesPayload:
108 # form data (x-www-form-urlencoded)
109 data = []
110 for type_options, _, value in self._fields:
111 if not isinstance(value, str):
112 raise TypeError(f"expected str, got {value!r}")
113 data.append((type_options["name"], value))
115 charset = self._charset if self._charset is not None else "utf-8"
117 if charset == "utf-8":
118 content_type = "application/x-www-form-urlencoded"
119 else:
120 content_type = "application/x-www-form-urlencoded; charset=%s" % charset
122 return payload.BytesPayload(
123 urlencode(data, doseq=True, encoding=charset).encode(),
124 content_type=content_type,
125 )
127 def _gen_form_data(self) -> multipart.MultipartWriter:
128 """Encode a list of fields using the multipart/form-data MIME format"""
129 for dispparams, headers, value in self._fields:
130 try:
131 if hdrs.CONTENT_TYPE in headers:
132 part = payload.get_payload(
133 value,
134 content_type=headers[hdrs.CONTENT_TYPE],
135 headers=headers,
136 encoding=self._charset,
137 )
138 else:
139 part = payload.get_payload(
140 value, headers=headers, encoding=self._charset
141 )
142 except Exception as exc:
143 raise TypeError(
144 "Can not serialize value type: %r\n "
145 "headers: %r\n value: %r" % (type(value), headers, value)
146 ) from exc
148 if dispparams:
149 part.set_content_disposition(
150 "form-data", quote_fields=self._quote_fields, **dispparams
151 )
152 # FIXME cgi.FieldStorage doesn't likes body parts with
153 # Content-Length which were sent via chunked transfer encoding
154 assert part.headers is not None
155 part.headers.popall(hdrs.CONTENT_LENGTH, None)
157 self._writer.append_payload(part)
159 self._fields.clear()
160 return self._writer
162 def __call__(self) -> Payload:
163 if self._is_multipart:
164 return self._gen_form_data()
165 else:
166 return self._gen_form_urlencoded()