Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/formdata.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

97 statements  

1import io 

2import warnings 

3from typing import Any, Iterable, List, Optional 

4from urllib.parse import urlencode 

5 

6from multidict import MultiDict, MultiDictProxy 

7 

8from . import hdrs, multipart, payload 

9from .helpers import guess_filename 

10from .payload import Payload 

11 

12__all__ = ("FormData",) 

13 

14 

15class FormData: 

16 """Helper class for form body generation. 

17 

18 Supports multipart/form-data and application/x-www-form-urlencoded. 

19 """ 

20 

21 def __init__( 

22 self, 

23 fields: Iterable[Any] = (), 

24 quote_fields: bool = True, 

25 charset: Optional[str] = None, 

26 ) -> None: 

27 self._writer = multipart.MultipartWriter("form-data") 

28 self._fields: List[Any] = [] 

29 self._is_multipart = False 

30 self._is_processed = False 

31 self._quote_fields = quote_fields 

32 self._charset = charset 

33 

34 if isinstance(fields, dict): 

35 fields = list(fields.items()) 

36 elif not isinstance(fields, (list, tuple)): 

37 fields = (fields,) 

38 self.add_fields(*fields) 

39 

40 @property 

41 def is_multipart(self) -> bool: 

42 return self._is_multipart 

43 

44 def add_field( 

45 self, 

46 name: str, 

47 value: Any, 

48 *, 

49 content_type: Optional[str] = None, 

50 filename: Optional[str] = None, 

51 content_transfer_encoding: Optional[str] = None, 

52 ) -> None: 

53 

54 if isinstance(value, io.IOBase): 

55 self._is_multipart = True 

56 elif isinstance(value, (bytes, bytearray, memoryview)): 

57 msg = ( 

58 "In v4, passing bytes will no longer create a file field. " 

59 "Please explicitly use the filename parameter or pass a BytesIO object." 

60 ) 

61 if filename is None and content_transfer_encoding is None: 

62 warnings.warn(msg, DeprecationWarning) 

63 filename = name 

64 

65 type_options: MultiDict[str] = MultiDict({"name": name}) 

66 if filename is not None and not isinstance(filename, str): 

67 raise TypeError( 

68 "filename must be an instance of str. " "Got: %s" % filename 

69 ) 

70 if filename is None and isinstance(value, io.IOBase): 

71 filename = guess_filename(value, name) 

72 if filename is not None: 

73 type_options["filename"] = filename 

74 self._is_multipart = True 

75 

76 headers = {} 

77 if content_type is not None: 

78 if not isinstance(content_type, str): 

79 raise TypeError( 

80 "content_type must be an instance of str. " "Got: %s" % content_type 

81 ) 

82 headers[hdrs.CONTENT_TYPE] = content_type 

83 self._is_multipart = True 

84 if content_transfer_encoding is not None: 

85 if not isinstance(content_transfer_encoding, str): 

86 raise TypeError( 

87 "content_transfer_encoding must be an instance" 

88 " of str. Got: %s" % content_transfer_encoding 

89 ) 

90 msg = ( 

91 "content_transfer_encoding is deprecated. " 

92 "To maintain compatibility with v4 please pass a BytesPayload." 

93 ) 

94 warnings.warn(msg, DeprecationWarning) 

95 self._is_multipart = True 

96 

97 self._fields.append((type_options, headers, value)) 

98 

99 def add_fields(self, *fields: Any) -> None: 

100 to_add = list(fields) 

101 

102 while to_add: 

103 rec = to_add.pop(0) 

104 

105 if isinstance(rec, io.IOBase): 

106 k = guess_filename(rec, "unknown") 

107 self.add_field(k, rec) # type: ignore[arg-type] 

108 

109 elif isinstance(rec, (MultiDictProxy, MultiDict)): 

110 to_add.extend(rec.items()) 

111 

112 elif isinstance(rec, (list, tuple)) and len(rec) == 2: 

113 k, fp = rec 

114 self.add_field(k, fp) # type: ignore[arg-type] 

115 

116 else: 

117 raise TypeError( 

118 "Only io.IOBase, multidict and (name, file) " 

119 "pairs allowed, use .add_field() for passing " 

120 "more complex parameters, got {!r}".format(rec) 

121 ) 

122 

123 def _gen_form_urlencoded(self) -> payload.BytesPayload: 

124 # form data (x-www-form-urlencoded) 

125 data = [] 

126 for type_options, _, value in self._fields: 

127 data.append((type_options["name"], value)) 

128 

129 charset = self._charset if self._charset is not None else "utf-8" 

130 

131 if charset == "utf-8": 

132 content_type = "application/x-www-form-urlencoded" 

133 else: 

134 content_type = "application/x-www-form-urlencoded; " "charset=%s" % charset 

135 

136 return payload.BytesPayload( 

137 urlencode(data, doseq=True, encoding=charset).encode(), 

138 content_type=content_type, 

139 ) 

140 

141 def _gen_form_data(self) -> multipart.MultipartWriter: 

142 """Encode a list of fields using the multipart/form-data MIME format""" 

143 if self._is_processed: 

144 raise RuntimeError("Form data has been processed already") 

145 for dispparams, headers, value in self._fields: 

146 try: 

147 if hdrs.CONTENT_TYPE in headers: 

148 part = payload.get_payload( 

149 value, 

150 content_type=headers[hdrs.CONTENT_TYPE], 

151 headers=headers, 

152 encoding=self._charset, 

153 ) 

154 else: 

155 part = payload.get_payload( 

156 value, headers=headers, encoding=self._charset 

157 ) 

158 except Exception as exc: 

159 raise TypeError( 

160 "Can not serialize value type: %r\n " 

161 "headers: %r\n value: %r" % (type(value), headers, value) 

162 ) from exc 

163 

164 if dispparams: 

165 part.set_content_disposition( 

166 "form-data", quote_fields=self._quote_fields, **dispparams 

167 ) 

168 # FIXME cgi.FieldStorage doesn't likes body parts with 

169 # Content-Length which were sent via chunked transfer encoding 

170 assert part.headers is not None 

171 part.headers.popall(hdrs.CONTENT_LENGTH, None) 

172 

173 self._writer.append_payload(part) 

174 

175 self._is_processed = True 

176 return self._writer 

177 

178 def __call__(self) -> Payload: 

179 if self._is_multipart: 

180 return self._gen_form_data() 

181 else: 

182 return self._gen_form_urlencoded()