Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/formdata.py: 18%

94 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1import io 

2from typing import Any, Iterable, List, Optional 

3from urllib.parse import urlencode 

4 

5from multidict import MultiDict, MultiDictProxy 

6 

7from . import hdrs, multipart, payload 

8from .helpers import guess_filename 

9from .payload import Payload 

10 

11__all__ = ("FormData",) 

12 

13 

14class FormData: 

15 """Helper class for form body generation. 

16 

17 Supports multipart/form-data and application/x-www-form-urlencoded. 

18 """ 

19 

20 def __init__( 

21 self, 

22 fields: Iterable[Any] = (), 

23 quote_fields: bool = True, 

24 charset: Optional[str] = None, 

25 boundary: Optional[str] = None, 

26 ) -> None: 

27 self._boundary = boundary 

28 self._writer = multipart.MultipartWriter("form-data", boundary=self._boundary) 

29 self._fields: List[Any] = [] 

30 self._is_multipart = False 

31 self._is_processed = False 

32 self._quote_fields = quote_fields 

33 self._charset = charset 

34 

35 if isinstance(fields, dict): 

36 fields = list(fields.items()) 

37 elif not isinstance(fields, (list, tuple)): 

38 fields = (fields,) 

39 self.add_fields(*fields) 

40 

41 @property 

42 def is_multipart(self) -> bool: 

43 return self._is_multipart 

44 

45 def add_field( 

46 self, 

47 name: str, 

48 value: Any, 

49 *, 

50 content_type: Optional[str] = None, 

51 filename: Optional[str] = None, 

52 content_transfer_encoding: Optional[str] = None, 

53 ) -> None: 

54 if isinstance(value, io.IOBase): 

55 self._is_multipart = True 

56 elif isinstance(value, (bytes, bytearray, memoryview)): 

57 if filename is None and content_transfer_encoding is None: 

58 filename = name 

59 

60 type_options: MultiDict[str] = MultiDict({"name": name}) 

61 if filename is not None and not isinstance(filename, str): 

62 raise TypeError( 

63 "filename must be an instance of str. " "Got: %s" % filename 

64 ) 

65 if filename is None and isinstance(value, io.IOBase): 

66 filename = guess_filename(value, name) 

67 if filename is not None: 

68 type_options["filename"] = filename 

69 self._is_multipart = True 

70 

71 headers = {} 

72 if content_type is not None: 

73 if not isinstance(content_type, str): 

74 raise TypeError( 

75 "content_type must be an instance of str. " "Got: %s" % content_type 

76 ) 

77 headers[hdrs.CONTENT_TYPE] = content_type 

78 self._is_multipart = True 

79 if content_transfer_encoding is not None: 

80 if not isinstance(content_transfer_encoding, str): 

81 raise TypeError( 

82 "content_transfer_encoding must be an instance" 

83 " of str. Got: %s" % content_transfer_encoding 

84 ) 

85 headers[hdrs.CONTENT_TRANSFER_ENCODING] = content_transfer_encoding 

86 self._is_multipart = True 

87 

88 self._fields.append((type_options, headers, value)) 

89 

90 def add_fields(self, *fields: Any) -> None: 

91 to_add = list(fields) 

92 

93 while to_add: 

94 rec = to_add.pop(0) 

95 

96 if isinstance(rec, io.IOBase): 

97 k = guess_filename(rec, "unknown") 

98 self.add_field(k, rec) # type: ignore[arg-type] 

99 

100 elif isinstance(rec, (MultiDictProxy, MultiDict)): 

101 to_add.extend(rec.items()) 

102 

103 elif isinstance(rec, (list, tuple)) and len(rec) == 2: 

104 k, fp = rec 

105 self.add_field(k, fp) # type: ignore[arg-type] 

106 

107 else: 

108 raise TypeError( 

109 "Only io.IOBase, multidict and (name, file) " 

110 "pairs allowed, use .add_field() for passing " 

111 "more complex parameters, got {!r}".format(rec) 

112 ) 

113 

114 def _gen_form_urlencoded(self) -> payload.BytesPayload: 

115 # form data (x-www-form-urlencoded) 

116 data = [] 

117 for type_options, _, value in self._fields: 

118 data.append((type_options["name"], value)) 

119 

120 charset = self._charset if self._charset is not None else "utf-8" 

121 

122 if charset == "utf-8": 

123 content_type = "application/x-www-form-urlencoded" 

124 else: 

125 content_type = "application/x-www-form-urlencoded; " "charset=%s" % charset 

126 

127 return payload.BytesPayload( 

128 urlencode(data, doseq=True, encoding=charset).encode(), 

129 content_type=content_type, 

130 ) 

131 

132 def _gen_form_data(self) -> multipart.MultipartWriter: 

133 """Encode a list of fields using the multipart/form-data MIME format""" 

134 if self._is_processed: 

135 raise RuntimeError("Form data has been processed already") 

136 for dispparams, headers, value in self._fields: 

137 try: 

138 if hdrs.CONTENT_TYPE in headers: 

139 part = payload.get_payload( 

140 value, 

141 content_type=headers[hdrs.CONTENT_TYPE], 

142 headers=headers, 

143 encoding=self._charset, 

144 ) 

145 else: 

146 part = payload.get_payload( 

147 value, headers=headers, encoding=self._charset 

148 ) 

149 except Exception as exc: 

150 raise TypeError( 

151 "Can not serialize value type: %r\n " 

152 "headers: %r\n value: %r" % (type(value), headers, value) 

153 ) from exc 

154 

155 if dispparams: 

156 part.set_content_disposition( 

157 "form-data", quote_fields=self._quote_fields, **dispparams 

158 ) 

159 # FIXME cgi.FieldStorage doesn't likes body parts with 

160 # Content-Length which were sent via chunked transfer encoding 

161 assert part.headers is not None 

162 part.headers.popall(hdrs.CONTENT_LENGTH, None) 

163 

164 self._writer.append_payload(part) 

165 

166 self._is_processed = True 

167 return self._writer 

168 

169 def __call__(self) -> Payload: 

170 if self._is_multipart: 

171 return self._gen_form_data() 

172 else: 

173 return self._gen_form_urlencoded()