Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/formdata.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

89 statements  

1import io 

2from collections import deque 

3from collections.abc import Iterable 

4from typing import Any 

5from urllib.parse import urlencode 

6 

7from multidict import MultiDict, MultiDictProxy 

8 

9from . import hdrs, multipart, payload 

10from .helpers import guess_filename 

11from .payload import Payload 

12 

13__all__ = ("FormData",) 

14 

15 

16class FormData: 

17 """Helper class for form body generation. 

18 

19 Supports multipart/form-data and application/x-www-form-urlencoded. 

20 """ 

21 

22 def __init__( 

23 self, 

24 fields: Iterable[Any] = (), 

25 quote_fields: bool = True, 

26 charset: str | None = None, 

27 boundary: str | None = None, 

28 *, 

29 default_to_multipart: bool = False, 

30 ) -> None: 

31 self._boundary = boundary 

32 self._writer = multipart.MultipartWriter("form-data", boundary=self._boundary) 

33 self._fields: list[Any] = [] 

34 self._is_multipart = default_to_multipart 

35 self._quote_fields = quote_fields 

36 self._charset = charset 

37 

38 if isinstance(fields, dict): 

39 fields = list(fields.items()) 

40 elif not isinstance(fields, (list, tuple)): 

41 fields = (fields,) 

42 self.add_fields(*fields) 

43 

44 @property 

45 def is_multipart(self) -> bool: 

46 return self._is_multipart 

47 

48 def add_field( 

49 self, 

50 name: str, 

51 value: Any, 

52 *, 

53 content_type: str | None = None, 

54 filename: str | None = None, 

55 ) -> None: 

56 if isinstance(value, (io.IOBase, bytes, bytearray, memoryview)): 

57 self._is_multipart = True 

58 

59 type_options: MultiDict[str] = MultiDict({"name": name}) 

60 if filename is not None and not isinstance(filename, str): 

61 raise TypeError("filename must be an instance of str. Got: %s" % filename) 

62 if filename is None and isinstance(value, io.IOBase): 

63 filename = guess_filename(value, name) 

64 if filename is not None: 

65 type_options["filename"] = filename 

66 self._is_multipart = True 

67 

68 headers = {} 

69 if content_type is not None: 

70 if not isinstance(content_type, str): 

71 raise TypeError( 

72 "content_type must be an instance of str. Got: %s" % content_type 

73 ) 

74 if "\r" in content_type or "\n" in content_type: 

75 raise ValueError( 

76 "Newline or carriage return detected in headers. " 

77 "Potential header injection attack." 

78 ) 

79 headers[hdrs.CONTENT_TYPE] = content_type 

80 self._is_multipart = True 

81 

82 self._fields.append((type_options, headers, value)) 

83 

84 def add_fields(self, *fields: Any) -> None: 

85 to_add: deque[Any] = deque(fields) 

86 

87 while to_add: 

88 rec = to_add.popleft() 

89 

90 if isinstance(rec, io.IOBase): 

91 k = guess_filename(rec, "unknown") 

92 self.add_field(k, rec) # type: ignore[arg-type] 

93 

94 elif isinstance(rec, (MultiDictProxy, MultiDict)): 

95 to_add.extend(rec.items()) 

96 

97 elif isinstance(rec, (list, tuple)) and len(rec) == 2: 

98 k, fp = rec 

99 self.add_field(k, fp) 

100 

101 else: 

102 raise TypeError( 

103 "Only io.IOBase, multidict and (name, file) " 

104 "pairs allowed, use .add_field() for passing " 

105 f"more complex parameters, got {rec!r}" 

106 ) 

107 

108 def _gen_form_urlencoded(self) -> payload.BytesPayload: 

109 # form data (x-www-form-urlencoded) 

110 data = [] 

111 for type_options, _, value in self._fields: 

112 if not isinstance(value, str): 

113 raise TypeError(f"expected str, got {value!r}") 

114 data.append((type_options["name"], value)) 

115 

116 charset = self._charset if self._charset is not None else "utf-8" 

117 

118 if charset == "utf-8": 

119 content_type = "application/x-www-form-urlencoded" 

120 else: 

121 content_type = "application/x-www-form-urlencoded; charset=%s" % charset 

122 

123 return payload.BytesPayload( 

124 urlencode(data, doseq=True, encoding=charset).encode(), 

125 content_type=content_type, 

126 ) 

127 

128 def _gen_form_data(self) -> multipart.MultipartWriter: 

129 """Encode a list of fields using the multipart/form-data MIME format""" 

130 for dispparams, headers, value in self._fields: 

131 try: 

132 if hdrs.CONTENT_TYPE in headers: 

133 part = payload.get_payload( 

134 value, 

135 content_type=headers[hdrs.CONTENT_TYPE], 

136 headers=headers, 

137 encoding=self._charset, 

138 ) 

139 else: 

140 part = payload.get_payload( 

141 value, headers=headers, encoding=self._charset 

142 ) 

143 except Exception as exc: 

144 raise TypeError( 

145 "Can not serialize value type: %r\n " 

146 "headers: %r\n value: %r" % (type(value), headers, value) 

147 ) from exc 

148 

149 if dispparams: 

150 part.set_content_disposition( 

151 "form-data", quote_fields=self._quote_fields, **dispparams 

152 ) 

153 # FIXME cgi.FieldStorage doesn't likes body parts with 

154 # Content-Length which were sent via chunked transfer encoding 

155 assert part.headers is not None 

156 part.headers.popall(hdrs.CONTENT_LENGTH, None) 

157 

158 self._writer.append_payload(part) 

159 

160 self._fields.clear() 

161 return self._writer 

162 

163 def __call__(self) -> Payload: 

164 if self._is_multipart: 

165 return self._gen_form_data() 

166 else: 

167 return self._gen_form_urlencoded()