Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/formdata.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import io
2from collections import deque
3from collections.abc import Iterable
4from typing import Any
5from urllib.parse import urlencode
7from multidict import MultiDict, MultiDictProxy
9from . import hdrs, multipart, payload
10from .helpers import guess_filename
11from .payload import Payload
13__all__ = ("FormData",)
16class FormData:
17 """Helper class for form body generation.
19 Supports multipart/form-data and application/x-www-form-urlencoded.
20 """
22 def __init__(
23 self,
24 fields: Iterable[Any] = (),
25 quote_fields: bool = True,
26 charset: str | None = None,
27 boundary: str | None = None,
28 *,
29 default_to_multipart: bool = False,
30 ) -> None:
31 self._boundary = boundary
32 self._writer = multipart.MultipartWriter("form-data", boundary=self._boundary)
33 self._fields: list[Any] = []
34 self._is_multipart = default_to_multipart
35 self._quote_fields = quote_fields
36 self._charset = charset
38 if isinstance(fields, dict):
39 fields = list(fields.items())
40 elif not isinstance(fields, (list, tuple)):
41 fields = (fields,)
42 self.add_fields(*fields)
44 @property
45 def is_multipart(self) -> bool:
46 return self._is_multipart
48 def add_field(
49 self,
50 name: str,
51 value: Any,
52 *,
53 content_type: str | None = None,
54 filename: str | None = None,
55 ) -> None:
56 if isinstance(value, (io.IOBase, bytes, bytearray, memoryview)):
57 self._is_multipart = True
59 type_options: MultiDict[str] = MultiDict({"name": name})
60 if filename is not None and not isinstance(filename, str):
61 raise TypeError("filename must be an instance of str. Got: %s" % filename)
62 if filename is None and isinstance(value, io.IOBase):
63 filename = guess_filename(value, name)
64 if filename is not None:
65 type_options["filename"] = filename
66 self._is_multipart = True
68 headers = {}
69 if content_type is not None:
70 if not isinstance(content_type, str):
71 raise TypeError(
72 "content_type must be an instance of str. Got: %s" % content_type
73 )
74 if "\r" in content_type or "\n" in content_type:
75 raise ValueError(
76 "Newline or carriage return detected in headers. "
77 "Potential header injection attack."
78 )
79 headers[hdrs.CONTENT_TYPE] = content_type
80 self._is_multipart = True
82 self._fields.append((type_options, headers, value))
84 def add_fields(self, *fields: Any) -> None:
85 to_add: deque[Any] = deque(fields)
87 while to_add:
88 rec = to_add.popleft()
90 if isinstance(rec, io.IOBase):
91 k = guess_filename(rec, "unknown")
92 self.add_field(k, rec) # type: ignore[arg-type]
94 elif isinstance(rec, (MultiDictProxy, MultiDict)):
95 to_add.extend(rec.items())
97 elif isinstance(rec, (list, tuple)) and len(rec) == 2:
98 k, fp = rec
99 self.add_field(k, fp)
101 else:
102 raise TypeError(
103 "Only io.IOBase, multidict and (name, file) "
104 "pairs allowed, use .add_field() for passing "
105 f"more complex parameters, got {rec!r}"
106 )
108 def _gen_form_urlencoded(self) -> payload.BytesPayload:
109 # form data (x-www-form-urlencoded)
110 data = []
111 for type_options, _, value in self._fields:
112 if not isinstance(value, str):
113 raise TypeError(f"expected str, got {value!r}")
114 data.append((type_options["name"], value))
116 charset = self._charset if self._charset is not None else "utf-8"
118 if charset == "utf-8":
119 content_type = "application/x-www-form-urlencoded"
120 else:
121 content_type = "application/x-www-form-urlencoded; charset=%s" % charset
123 return payload.BytesPayload(
124 urlencode(data, doseq=True, encoding=charset).encode(),
125 content_type=content_type,
126 )
128 def _gen_form_data(self) -> multipart.MultipartWriter:
129 """Encode a list of fields using the multipart/form-data MIME format"""
130 for dispparams, headers, value in self._fields:
131 try:
132 if hdrs.CONTENT_TYPE in headers:
133 part = payload.get_payload(
134 value,
135 content_type=headers[hdrs.CONTENT_TYPE],
136 headers=headers,
137 encoding=self._charset,
138 )
139 else:
140 part = payload.get_payload(
141 value, headers=headers, encoding=self._charset
142 )
143 except Exception as exc:
144 raise TypeError(
145 "Can not serialize value type: %r\n "
146 "headers: %r\n value: %r" % (type(value), headers, value)
147 ) from exc
149 if dispparams:
150 part.set_content_disposition(
151 "form-data", quote_fields=self._quote_fields, **dispparams
152 )
153 # FIXME cgi.FieldStorage doesn't likes body parts with
154 # Content-Length which were sent via chunked transfer encoding
155 assert part.headers is not None
156 part.headers.popall(hdrs.CONTENT_LENGTH, None)
158 self._writer.append_payload(part)
160 self._fields.clear()
161 return self._writer
163 def __call__(self) -> Payload:
164 if self._is_multipart:
165 return self._gen_form_data()
166 else:
167 return self._gen_form_urlencoded()