Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/formdata.py: 18%

93 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1import io 

2from typing import Any, Iterable, List, Optional 

3from urllib.parse import urlencode 

4 

5from multidict import MultiDict, MultiDictProxy 

6 

7from . import hdrs, multipart, payload 

8from .helpers import guess_filename 

9from .payload import Payload 

10 

11__all__ = ("FormData",) 

12 

13 

14class FormData: 

15 """Helper class for form body generation. 

16 

17 Supports multipart/form-data and application/x-www-form-urlencoded. 

18 """ 

19 

20 def __init__( 

21 self, 

22 fields: Iterable[Any] = (), 

23 quote_fields: bool = True, 

24 charset: Optional[str] = None, 

25 ) -> None: 

26 self._writer = multipart.MultipartWriter("form-data") 

27 self._fields: List[Any] = [] 

28 self._is_multipart = False 

29 self._is_processed = False 

30 self._quote_fields = quote_fields 

31 self._charset = charset 

32 

33 if isinstance(fields, dict): 

34 fields = list(fields.items()) 

35 elif not isinstance(fields, (list, tuple)): 

36 fields = (fields,) 

37 self.add_fields(*fields) 

38 

39 @property 

40 def is_multipart(self) -> bool: 

41 return self._is_multipart 

42 

43 def add_field( 

44 self, 

45 name: str, 

46 value: Any, 

47 *, 

48 content_type: Optional[str] = None, 

49 filename: Optional[str] = None, 

50 content_transfer_encoding: Optional[str] = None, 

51 ) -> None: 

52 

53 if isinstance(value, io.IOBase): 

54 self._is_multipart = True 

55 elif isinstance(value, (bytes, bytearray, memoryview)): 

56 if filename is None and content_transfer_encoding is None: 

57 filename = name 

58 

59 type_options: MultiDict[str] = MultiDict({"name": name}) 

60 if filename is not None and not isinstance(filename, str): 

61 raise TypeError( 

62 "filename must be an instance of str. " "Got: %s" % filename 

63 ) 

64 if filename is None and isinstance(value, io.IOBase): 

65 filename = guess_filename(value, name) 

66 if filename is not None: 

67 type_options["filename"] = filename 

68 self._is_multipart = True 

69 

70 headers = {} 

71 if content_type is not None: 

72 if not isinstance(content_type, str): 

73 raise TypeError( 

74 "content_type must be an instance of str. " "Got: %s" % content_type 

75 ) 

76 headers[hdrs.CONTENT_TYPE] = content_type 

77 self._is_multipart = True 

78 if content_transfer_encoding is not None: 

79 if not isinstance(content_transfer_encoding, str): 

80 raise TypeError( 

81 "content_transfer_encoding must be an instance" 

82 " of str. Got: %s" % content_transfer_encoding 

83 ) 

84 headers[hdrs.CONTENT_TRANSFER_ENCODING] = content_transfer_encoding 

85 self._is_multipart = True 

86 

87 self._fields.append((type_options, headers, value)) 

88 

89 def add_fields(self, *fields: Any) -> None: 

90 to_add = list(fields) 

91 

92 while to_add: 

93 rec = to_add.pop(0) 

94 

95 if isinstance(rec, io.IOBase): 

96 k = guess_filename(rec, "unknown") 

97 self.add_field(k, rec) # type: ignore[arg-type] 

98 

99 elif isinstance(rec, (MultiDictProxy, MultiDict)): 

100 to_add.extend(rec.items()) 

101 

102 elif isinstance(rec, (list, tuple)) and len(rec) == 2: 

103 k, fp = rec 

104 self.add_field(k, fp) # type: ignore[arg-type] 

105 

106 else: 

107 raise TypeError( 

108 "Only io.IOBase, multidict and (name, file) " 

109 "pairs allowed, use .add_field() for passing " 

110 "more complex parameters, got {!r}".format(rec) 

111 ) 

112 

113 def _gen_form_urlencoded(self) -> payload.BytesPayload: 

114 # form data (x-www-form-urlencoded) 

115 data = [] 

116 for type_options, _, value in self._fields: 

117 data.append((type_options["name"], value)) 

118 

119 charset = self._charset if self._charset is not None else "utf-8" 

120 

121 if charset == "utf-8": 

122 content_type = "application/x-www-form-urlencoded" 

123 else: 

124 content_type = "application/x-www-form-urlencoded; " "charset=%s" % charset 

125 

126 return payload.BytesPayload( 

127 urlencode(data, doseq=True, encoding=charset).encode(), 

128 content_type=content_type, 

129 ) 

130 

131 def _gen_form_data(self) -> multipart.MultipartWriter: 

132 """Encode a list of fields using the multipart/form-data MIME format""" 

133 if self._is_processed: 

134 raise RuntimeError("Form data has been processed already") 

135 for dispparams, headers, value in self._fields: 

136 try: 

137 if hdrs.CONTENT_TYPE in headers: 

138 part = payload.get_payload( 

139 value, 

140 content_type=headers[hdrs.CONTENT_TYPE], 

141 headers=headers, 

142 encoding=self._charset, 

143 ) 

144 else: 

145 part = payload.get_payload( 

146 value, headers=headers, encoding=self._charset 

147 ) 

148 except Exception as exc: 

149 raise TypeError( 

150 "Can not serialize value type: %r\n " 

151 "headers: %r\n value: %r" % (type(value), headers, value) 

152 ) from exc 

153 

154 if dispparams: 

155 part.set_content_disposition( 

156 "form-data", quote_fields=self._quote_fields, **dispparams 

157 ) 

158 # FIXME cgi.FieldStorage doesn't likes body parts with 

159 # Content-Length which were sent via chunked transfer encoding 

160 assert part.headers is not None 

161 part.headers.popall(hdrs.CONTENT_LENGTH, None) 

162 

163 self._writer.append_payload(part) 

164 

165 self._is_processed = True 

166 return self._writer 

167 

168 def __call__(self) -> Payload: 

169 if self._is_multipart: 

170 return self._gen_form_data() 

171 else: 

172 return self._gen_form_urlencoded()