Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/formdata.py: 18%
94 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1import io
2from typing import Any, Iterable, List, Optional
3from urllib.parse import urlencode
5from multidict import MultiDict, MultiDictProxy
7from . import hdrs, multipart, payload
8from .helpers import guess_filename
9from .payload import Payload
11__all__ = ("FormData",)
14class FormData:
15 """Helper class for form body generation.
17 Supports multipart/form-data and application/x-www-form-urlencoded.
18 """
20 def __init__(
21 self,
22 fields: Iterable[Any] = (),
23 quote_fields: bool = True,
24 charset: Optional[str] = None,
25 boundary: Optional[str] = None,
26 ) -> None:
27 self._boundary = boundary
28 self._writer = multipart.MultipartWriter("form-data", boundary=self._boundary)
29 self._fields: List[Any] = []
30 self._is_multipart = False
31 self._is_processed = False
32 self._quote_fields = quote_fields
33 self._charset = charset
35 if isinstance(fields, dict):
36 fields = list(fields.items())
37 elif not isinstance(fields, (list, tuple)):
38 fields = (fields,)
39 self.add_fields(*fields)
41 @property
42 def is_multipart(self) -> bool:
43 return self._is_multipart
45 def add_field(
46 self,
47 name: str,
48 value: Any,
49 *,
50 content_type: Optional[str] = None,
51 filename: Optional[str] = None,
52 content_transfer_encoding: Optional[str] = None,
53 ) -> None:
54 if isinstance(value, io.IOBase):
55 self._is_multipart = True
56 elif isinstance(value, (bytes, bytearray, memoryview)):
57 if filename is None and content_transfer_encoding is None:
58 filename = name
60 type_options: MultiDict[str] = MultiDict({"name": name})
61 if filename is not None and not isinstance(filename, str):
62 raise TypeError(
63 "filename must be an instance of str. " "Got: %s" % filename
64 )
65 if filename is None and isinstance(value, io.IOBase):
66 filename = guess_filename(value, name)
67 if filename is not None:
68 type_options["filename"] = filename
69 self._is_multipart = True
71 headers = {}
72 if content_type is not None:
73 if not isinstance(content_type, str):
74 raise TypeError(
75 "content_type must be an instance of str. " "Got: %s" % content_type
76 )
77 headers[hdrs.CONTENT_TYPE] = content_type
78 self._is_multipart = True
79 if content_transfer_encoding is not None:
80 if not isinstance(content_transfer_encoding, str):
81 raise TypeError(
82 "content_transfer_encoding must be an instance"
83 " of str. Got: %s" % content_transfer_encoding
84 )
85 headers[hdrs.CONTENT_TRANSFER_ENCODING] = content_transfer_encoding
86 self._is_multipart = True
88 self._fields.append((type_options, headers, value))
90 def add_fields(self, *fields: Any) -> None:
91 to_add = list(fields)
93 while to_add:
94 rec = to_add.pop(0)
96 if isinstance(rec, io.IOBase):
97 k = guess_filename(rec, "unknown")
98 self.add_field(k, rec) # type: ignore[arg-type]
100 elif isinstance(rec, (MultiDictProxy, MultiDict)):
101 to_add.extend(rec.items())
103 elif isinstance(rec, (list, tuple)) and len(rec) == 2:
104 k, fp = rec
105 self.add_field(k, fp) # type: ignore[arg-type]
107 else:
108 raise TypeError(
109 "Only io.IOBase, multidict and (name, file) "
110 "pairs allowed, use .add_field() for passing "
111 "more complex parameters, got {!r}".format(rec)
112 )
114 def _gen_form_urlencoded(self) -> payload.BytesPayload:
115 # form data (x-www-form-urlencoded)
116 data = []
117 for type_options, _, value in self._fields:
118 data.append((type_options["name"], value))
120 charset = self._charset if self._charset is not None else "utf-8"
122 if charset == "utf-8":
123 content_type = "application/x-www-form-urlencoded"
124 else:
125 content_type = "application/x-www-form-urlencoded; " "charset=%s" % charset
127 return payload.BytesPayload(
128 urlencode(data, doseq=True, encoding=charset).encode(),
129 content_type=content_type,
130 )
132 def _gen_form_data(self) -> multipart.MultipartWriter:
133 """Encode a list of fields using the multipart/form-data MIME format"""
134 if self._is_processed:
135 raise RuntimeError("Form data has been processed already")
136 for dispparams, headers, value in self._fields:
137 try:
138 if hdrs.CONTENT_TYPE in headers:
139 part = payload.get_payload(
140 value,
141 content_type=headers[hdrs.CONTENT_TYPE],
142 headers=headers,
143 encoding=self._charset,
144 )
145 else:
146 part = payload.get_payload(
147 value, headers=headers, encoding=self._charset
148 )
149 except Exception as exc:
150 raise TypeError(
151 "Can not serialize value type: %r\n "
152 "headers: %r\n value: %r" % (type(value), headers, value)
153 ) from exc
155 if dispparams:
156 part.set_content_disposition(
157 "form-data", quote_fields=self._quote_fields, **dispparams
158 )
159 # FIXME cgi.FieldStorage doesn't likes body parts with
160 # Content-Length which were sent via chunked transfer encoding
161 assert part.headers is not None
162 part.headers.popall(hdrs.CONTENT_LENGTH, None)
164 self._writer.append_payload(part)
166 self._is_processed = True
167 return self._writer
169 def __call__(self) -> Payload:
170 if self._is_multipart:
171 return self._gen_form_data()
172 else:
173 return self._gen_form_urlencoded()