Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/formdata.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

88 statements  

1import io 

2from typing import Any, Iterable, List, Optional 

3from urllib.parse import urlencode 

4 

5from multidict import MultiDict, MultiDictProxy 

6 

7from . import hdrs, multipart, payload 

8from .helpers import guess_filename 

9from .payload import Payload 

10 

11__all__ = ("FormData",) 

12 

13 

14class FormData: 

15 """Helper class for form body generation. 

16 

17 Supports multipart/form-data and application/x-www-form-urlencoded. 

18 """ 

19 

20 def __init__( 

21 self, 

22 fields: Iterable[Any] = (), 

23 quote_fields: bool = True, 

24 charset: Optional[str] = None, 

25 boundary: Optional[str] = None, 

26 *, 

27 default_to_multipart: bool = False, 

28 ) -> None: 

29 self._boundary = boundary 

30 self._writer = multipart.MultipartWriter("form-data", boundary=self._boundary) 

31 self._fields: List[Any] = [] 

32 self._is_multipart = default_to_multipart 

33 self._is_processed = False 

34 self._quote_fields = quote_fields 

35 self._charset = charset 

36 

37 if isinstance(fields, dict): 

38 fields = list(fields.items()) 

39 elif not isinstance(fields, (list, tuple)): 

40 fields = (fields,) 

41 self.add_fields(*fields) 

42 

43 @property 

44 def is_multipart(self) -> bool: 

45 return self._is_multipart 

46 

47 def add_field( 

48 self, 

49 name: str, 

50 value: Any, 

51 *, 

52 content_type: Optional[str] = None, 

53 filename: Optional[str] = None, 

54 ) -> None: 

55 if isinstance(value, (io.IOBase, bytes, bytearray, memoryview)): 

56 self._is_multipart = True 

57 

58 type_options: MultiDict[str] = MultiDict({"name": name}) 

59 if filename is not None and not isinstance(filename, str): 

60 raise TypeError("filename must be an instance of str. Got: %s" % filename) 

61 if filename is None and isinstance(value, io.IOBase): 

62 filename = guess_filename(value, name) 

63 if filename is not None: 

64 type_options["filename"] = filename 

65 self._is_multipart = True 

66 

67 headers = {} 

68 if content_type is not None: 

69 if not isinstance(content_type, str): 

70 raise TypeError( 

71 "content_type must be an instance of str. Got: %s" % content_type 

72 ) 

73 headers[hdrs.CONTENT_TYPE] = content_type 

74 self._is_multipart = True 

75 

76 self._fields.append((type_options, headers, value)) 

77 

78 def add_fields(self, *fields: Any) -> None: 

79 to_add = list(fields) 

80 

81 while to_add: 

82 rec = to_add.pop(0) 

83 

84 if isinstance(rec, io.IOBase): 

85 k = guess_filename(rec, "unknown") 

86 self.add_field(k, rec) # type: ignore[arg-type] 

87 

88 elif isinstance(rec, (MultiDictProxy, MultiDict)): 

89 to_add.extend(rec.items()) 

90 

91 elif isinstance(rec, (list, tuple)) and len(rec) == 2: 

92 k, fp = rec 

93 self.add_field(k, fp) # type: ignore[arg-type] 

94 

95 else: 

96 raise TypeError( 

97 "Only io.IOBase, multidict and (name, file) " 

98 "pairs allowed, use .add_field() for passing " 

99 "more complex parameters, got {!r}".format(rec) 

100 ) 

101 

102 def _gen_form_urlencoded(self) -> payload.BytesPayload: 

103 # form data (x-www-form-urlencoded) 

104 data = [] 

105 for type_options, _, value in self._fields: 

106 if not isinstance(value, str): 

107 raise TypeError(f"expected str, got {value!r}") 

108 data.append((type_options["name"], value)) 

109 

110 charset = self._charset if self._charset is not None else "utf-8" 

111 

112 if charset == "utf-8": 

113 content_type = "application/x-www-form-urlencoded" 

114 else: 

115 content_type = "application/x-www-form-urlencoded; charset=%s" % charset 

116 

117 return payload.BytesPayload( 

118 urlencode(data, doseq=True, encoding=charset).encode(), 

119 content_type=content_type, 

120 ) 

121 

122 def _gen_form_data(self) -> multipart.MultipartWriter: 

123 """Encode a list of fields using the multipart/form-data MIME format""" 

124 if self._is_processed: 

125 raise RuntimeError("Form data has been processed already") 

126 for dispparams, headers, value in self._fields: 

127 try: 

128 if hdrs.CONTENT_TYPE in headers: 

129 part = payload.get_payload( 

130 value, 

131 content_type=headers[hdrs.CONTENT_TYPE], 

132 headers=headers, 

133 encoding=self._charset, 

134 ) 

135 else: 

136 part = payload.get_payload( 

137 value, headers=headers, encoding=self._charset 

138 ) 

139 except Exception as exc: 

140 raise TypeError( 

141 "Can not serialize value type: %r\n " 

142 "headers: %r\n value: %r" % (type(value), headers, value) 

143 ) from exc 

144 

145 if dispparams: 

146 part.set_content_disposition( 

147 "form-data", quote_fields=self._quote_fields, **dispparams 

148 ) 

149 # FIXME cgi.FieldStorage doesn't likes body parts with 

150 # Content-Length which were sent via chunked transfer encoding 

151 assert part.headers is not None 

152 part.headers.popall(hdrs.CONTENT_LENGTH, None) 

153 

154 self._writer.append_payload(part) 

155 

156 self._is_processed = True 

157 return self._writer 

158 

159 def __call__(self) -> Payload: 

160 if self._is_multipart: 

161 return self._gen_form_data() 

162 else: 

163 return self._gen_form_urlencoded()