Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/formdata.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import io
2from typing import Any, Iterable, List, Optional
3from urllib.parse import urlencode
5from multidict import MultiDict, MultiDictProxy
7from . import hdrs, multipart, payload
8from .helpers import guess_filename
9from .payload import Payload
11__all__ = ("FormData",)
14class FormData:
15 """Helper class for form body generation.
17 Supports multipart/form-data and application/x-www-form-urlencoded.
18 """
20 def __init__(
21 self,
22 fields: Iterable[Any] = (),
23 quote_fields: bool = True,
24 charset: Optional[str] = None,
25 boundary: Optional[str] = None,
26 *,
27 default_to_multipart: bool = False,
28 ) -> None:
29 self._boundary = boundary
30 self._writer = multipart.MultipartWriter("form-data", boundary=self._boundary)
31 self._fields: List[Any] = []
32 self._is_multipart = default_to_multipart
33 self._is_processed = False
34 self._quote_fields = quote_fields
35 self._charset = charset
37 if isinstance(fields, dict):
38 fields = list(fields.items())
39 elif not isinstance(fields, (list, tuple)):
40 fields = (fields,)
41 self.add_fields(*fields)
43 @property
44 def is_multipart(self) -> bool:
45 return self._is_multipart
47 def add_field(
48 self,
49 name: str,
50 value: Any,
51 *,
52 content_type: Optional[str] = None,
53 filename: Optional[str] = None,
54 ) -> None:
55 if isinstance(value, (io.IOBase, bytes, bytearray, memoryview)):
56 self._is_multipart = True
58 type_options: MultiDict[str] = MultiDict({"name": name})
59 if filename is not None and not isinstance(filename, str):
60 raise TypeError("filename must be an instance of str. Got: %s" % filename)
61 if filename is None and isinstance(value, io.IOBase):
62 filename = guess_filename(value, name)
63 if filename is not None:
64 type_options["filename"] = filename
65 self._is_multipart = True
67 headers = {}
68 if content_type is not None:
69 if not isinstance(content_type, str):
70 raise TypeError(
71 "content_type must be an instance of str. Got: %s" % content_type
72 )
73 headers[hdrs.CONTENT_TYPE] = content_type
74 self._is_multipart = True
76 self._fields.append((type_options, headers, value))
78 def add_fields(self, *fields: Any) -> None:
79 to_add = list(fields)
81 while to_add:
82 rec = to_add.pop(0)
84 if isinstance(rec, io.IOBase):
85 k = guess_filename(rec, "unknown")
86 self.add_field(k, rec) # type: ignore[arg-type]
88 elif isinstance(rec, (MultiDictProxy, MultiDict)):
89 to_add.extend(rec.items())
91 elif isinstance(rec, (list, tuple)) and len(rec) == 2:
92 k, fp = rec
93 self.add_field(k, fp) # type: ignore[arg-type]
95 else:
96 raise TypeError(
97 "Only io.IOBase, multidict and (name, file) "
98 "pairs allowed, use .add_field() for passing "
99 "more complex parameters, got {!r}".format(rec)
100 )
102 def _gen_form_urlencoded(self) -> payload.BytesPayload:
103 # form data (x-www-form-urlencoded)
104 data = []
105 for type_options, _, value in self._fields:
106 if not isinstance(value, str):
107 raise TypeError(f"expected str, got {value!r}")
108 data.append((type_options["name"], value))
110 charset = self._charset if self._charset is not None else "utf-8"
112 if charset == "utf-8":
113 content_type = "application/x-www-form-urlencoded"
114 else:
115 content_type = "application/x-www-form-urlencoded; charset=%s" % charset
117 return payload.BytesPayload(
118 urlencode(data, doseq=True, encoding=charset).encode(),
119 content_type=content_type,
120 )
122 def _gen_form_data(self) -> multipart.MultipartWriter:
123 """Encode a list of fields using the multipart/form-data MIME format"""
124 if self._is_processed:
125 raise RuntimeError("Form data has been processed already")
126 for dispparams, headers, value in self._fields:
127 try:
128 if hdrs.CONTENT_TYPE in headers:
129 part = payload.get_payload(
130 value,
131 content_type=headers[hdrs.CONTENT_TYPE],
132 headers=headers,
133 encoding=self._charset,
134 )
135 else:
136 part = payload.get_payload(
137 value, headers=headers, encoding=self._charset
138 )
139 except Exception as exc:
140 raise TypeError(
141 "Can not serialize value type: %r\n "
142 "headers: %r\n value: %r" % (type(value), headers, value)
143 ) from exc
145 if dispparams:
146 part.set_content_disposition(
147 "form-data", quote_fields=self._quote_fields, **dispparams
148 )
149 # FIXME cgi.FieldStorage doesn't likes body parts with
150 # Content-Length which were sent via chunked transfer encoding
151 assert part.headers is not None
152 part.headers.popall(hdrs.CONTENT_LENGTH, None)
154 self._writer.append_payload(part)
156 self._is_processed = True
157 return self._writer
159 def __call__(self) -> Payload:
160 if self._is_multipart:
161 return self._gen_form_data()
162 else:
163 return self._gen_form_urlencoded()