Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/formdata.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

91 statements  

1import io 

2from collections import deque 

3from collections.abc import Iterable 

4from typing import Any 

5from urllib.parse import urlencode 

6 

7from multidict import MultiDict, MultiDictProxy 

8 

9from . import hdrs, multipart, payload 

10from .helpers import guess_filename 

11from .http_writer import _safe_header 

12from .payload import Payload 

13 

14__all__ = ("FormData",) 

15 

16 

17class FormData: 

18 """Helper class for form body generation. 

19 

20 Supports multipart/form-data and application/x-www-form-urlencoded. 

21 """ 

22 

23 def __init__( 

24 self, 

25 fields: Iterable[Any] = (), 

26 quote_fields: bool = True, 

27 charset: str | None = None, 

28 boundary: str | None = None, 

29 *, 

30 default_to_multipart: bool = False, 

31 ) -> None: 

32 self._boundary = boundary 

33 self._writer = multipart.MultipartWriter("form-data", boundary=self._boundary) 

34 self._fields: list[Any] = [] 

35 self._is_multipart = default_to_multipart 

36 self._quote_fields = quote_fields 

37 self._charset = charset 

38 

39 if isinstance(fields, dict): 

40 fields = list(fields.items()) 

41 elif not isinstance(fields, (list, tuple)): 

42 fields = (fields,) 

43 self.add_fields(*fields) 

44 

45 @property 

46 def is_multipart(self) -> bool: 

47 return self._is_multipart 

48 

49 def add_field( 

50 self, 

51 name: str, 

52 value: Any, 

53 *, 

54 content_type: str | None = None, 

55 filename: str | None = None, 

56 ) -> None: 

57 if isinstance(value, (io.IOBase, bytes, bytearray, memoryview)): 

58 self._is_multipart = True 

59 

60 _safe_header(name) 

61 type_options: MultiDict[str] = MultiDict({"name": name}) 

62 if filename is not None and not isinstance(filename, str): 

63 raise TypeError("filename must be an instance of str. Got: %s" % filename) 

64 if filename is None and isinstance(value, io.IOBase): 

65 filename = guess_filename(value, name) 

66 if filename is not None: 

67 _safe_header(filename) 

68 type_options["filename"] = filename 

69 self._is_multipart = True 

70 

71 headers = {} 

72 if content_type is not None: 

73 if not isinstance(content_type, str): 

74 raise TypeError( 

75 "content_type must be an instance of str. Got: %s" % content_type 

76 ) 

77 _safe_header(content_type) 

78 headers[hdrs.CONTENT_TYPE] = content_type 

79 self._is_multipart = True 

80 

81 self._fields.append((type_options, headers, value)) 

82 

83 def add_fields(self, *fields: Any) -> None: 

84 to_add: deque[Any] = deque(fields) 

85 

86 while to_add: 

87 rec = to_add.popleft() 

88 

89 if isinstance(rec, io.IOBase): 

90 k = guess_filename(rec, "unknown") 

91 self.add_field(k, rec) # type: ignore[arg-type] 

92 

93 elif isinstance(rec, (MultiDictProxy, MultiDict)): 

94 to_add.extend(rec.items()) 

95 

96 elif isinstance(rec, (list, tuple)) and len(rec) == 2: 

97 k, fp = rec 

98 self.add_field(k, fp) 

99 

100 else: 

101 raise TypeError( 

102 "Only io.IOBase, multidict and (name, file) " 

103 "pairs allowed, use .add_field() for passing " 

104 f"more complex parameters, got {rec!r}" 

105 ) 

106 

107 def _gen_form_urlencoded(self) -> payload.BytesPayload: 

108 # form data (x-www-form-urlencoded) 

109 data = [] 

110 for type_options, _, value in self._fields: 

111 if not isinstance(value, str): 

112 raise TypeError(f"expected str, got {value!r}") 

113 data.append((type_options["name"], value)) 

114 

115 charset = self._charset if self._charset is not None else "utf-8" 

116 

117 if charset == "utf-8": 

118 content_type = "application/x-www-form-urlencoded" 

119 else: 

120 content_type = "application/x-www-form-urlencoded; charset=%s" % charset 

121 

122 return payload.BytesPayload( 

123 urlencode(data, doseq=True, encoding=charset).encode(), 

124 content_type=content_type, 

125 ) 

126 

127 def _gen_form_data(self) -> multipart.MultipartWriter: 

128 """Encode a list of fields using the multipart/form-data MIME format""" 

129 for dispparams, headers, value in self._fields: 

130 try: 

131 if hdrs.CONTENT_TYPE in headers: 

132 part = payload.get_payload( 

133 value, 

134 content_type=headers[hdrs.CONTENT_TYPE], 

135 headers=headers, 

136 encoding=self._charset, 

137 ) 

138 else: 

139 part = payload.get_payload( 

140 value, headers=headers, encoding=self._charset 

141 ) 

142 except Exception as exc: 

143 raise TypeError( 

144 "Can not serialize value type: %r\n " 

145 "headers: %r\n value: %r" % (type(value), headers, value) 

146 ) from exc 

147 

148 if dispparams: 

149 part.set_content_disposition( 

150 "form-data", quote_fields=self._quote_fields, **dispparams 

151 ) 

152 # FIXME cgi.FieldStorage doesn't likes body parts with 

153 # Content-Length which were sent via chunked transfer encoding 

154 assert part.headers is not None 

155 part.headers.popall(hdrs.CONTENT_LENGTH, None) 

156 

157 self._writer.append_payload(part) 

158 

159 self._fields.clear() 

160 return self._writer 

161 

162 def __call__(self) -> Payload: 

163 if self._is_multipart: 

164 return self._gen_form_data() 

165 else: 

166 return self._gen_form_urlencoded()