Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_content.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

129 statements  

1from __future__ import annotations 

2 

3import inspect 

4import warnings 

5from json import dumps as json_dumps 

6from typing import ( 

7 Any, 

8 AsyncIterable, 

9 AsyncIterator, 

10 Iterable, 

11 Iterator, 

12 Mapping, 

13) 

14from urllib.parse import urlencode 

15 

16from ._exceptions import StreamClosed, StreamConsumed 

17from ._multipart import MultipartStream 

18from ._types import ( 

19 AsyncByteStream, 

20 RequestContent, 

21 RequestData, 

22 RequestFiles, 

23 ResponseContent, 

24 SyncByteStream, 

25) 

26from ._utils import peek_filelike_length, primitive_value_to_str 

27 

28 

29class ByteStream(AsyncByteStream, SyncByteStream): 

30 def __init__(self, stream: bytes) -> None: 

31 self._stream = stream 

32 

33 def __iter__(self) -> Iterator[bytes]: 

34 yield self._stream 

35 

36 async def __aiter__(self) -> AsyncIterator[bytes]: 

37 yield self._stream 

38 

39 

40class IteratorByteStream(SyncByteStream): 

41 CHUNK_SIZE = 65_536 

42 

43 def __init__(self, stream: Iterable[bytes]) -> None: 

44 self._stream = stream 

45 self._is_stream_consumed = False 

46 self._is_generator = inspect.isgenerator(stream) 

47 

48 def __iter__(self) -> Iterator[bytes]: 

49 if self._is_stream_consumed and self._is_generator: 

50 raise StreamConsumed() 

51 

52 self._is_stream_consumed = True 

53 if hasattr(self._stream, "read"): 

54 # File-like interfaces should use 'read' directly. 

55 chunk = self._stream.read(self.CHUNK_SIZE) 

56 while chunk: 

57 yield chunk 

58 chunk = self._stream.read(self.CHUNK_SIZE) 

59 else: 

60 # Otherwise iterate. 

61 for part in self._stream: 

62 yield part 

63 

64 

65class AsyncIteratorByteStream(AsyncByteStream): 

66 CHUNK_SIZE = 65_536 

67 

68 def __init__(self, stream: AsyncIterable[bytes]) -> None: 

69 self._stream = stream 

70 self._is_stream_consumed = False 

71 self._is_generator = inspect.isasyncgen(stream) 

72 

73 async def __aiter__(self) -> AsyncIterator[bytes]: 

74 if self._is_stream_consumed and self._is_generator: 

75 raise StreamConsumed() 

76 

77 self._is_stream_consumed = True 

78 if hasattr(self._stream, "aread"): 

79 # File-like interfaces should use 'aread' directly. 

80 chunk = await self._stream.aread(self.CHUNK_SIZE) 

81 while chunk: 

82 yield chunk 

83 chunk = await self._stream.aread(self.CHUNK_SIZE) 

84 else: 

85 # Otherwise iterate. 

86 async for part in self._stream: 

87 yield part 

88 

89 

90class UnattachedStream(AsyncByteStream, SyncByteStream): 

91 """ 

92 If a request or response is serialized using pickle, then it is no longer 

93 attached to a stream for I/O purposes. Any stream operations should result 

94 in `httpx.StreamClosed`. 

95 """ 

96 

97 def __iter__(self) -> Iterator[bytes]: 

98 raise StreamClosed() 

99 

100 async def __aiter__(self) -> AsyncIterator[bytes]: 

101 raise StreamClosed() 

102 yield b"" # pragma: no cover 

103 

104 

105def encode_content( 

106 content: str | bytes | Iterable[bytes] | AsyncIterable[bytes], 

107) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]: 

108 if isinstance(content, (bytes, str)): 

109 body = content.encode("utf-8") if isinstance(content, str) else content 

110 content_length = len(body) 

111 headers = {"Content-Length": str(content_length)} if body else {} 

112 return headers, ByteStream(body) 

113 

114 elif isinstance(content, Iterable) and not isinstance(content, dict): 

115 # `not isinstance(content, dict)` is a bit oddly specific, but it 

116 # catches a case that's easy for users to make in error, and would 

117 # otherwise pass through here, like any other bytes-iterable, 

118 # because `dict` happens to be iterable. See issue #2491. 

119 content_length_or_none = peek_filelike_length(content) 

120 

121 if content_length_or_none is None: 

122 headers = {"Transfer-Encoding": "chunked"} 

123 else: 

124 headers = {"Content-Length": str(content_length_or_none)} 

125 return headers, IteratorByteStream(content) # type: ignore 

126 

127 elif isinstance(content, AsyncIterable): 

128 headers = {"Transfer-Encoding": "chunked"} 

129 return headers, AsyncIteratorByteStream(content) 

130 

131 raise TypeError(f"Unexpected type for 'content', {type(content)!r}") 

132 

133 

134def encode_urlencoded_data( 

135 data: RequestData, 

136) -> tuple[dict[str, str], ByteStream]: 

137 plain_data = [] 

138 for key, value in data.items(): 

139 if isinstance(value, (list, tuple)): 

140 plain_data.extend([(key, primitive_value_to_str(item)) for item in value]) 

141 else: 

142 plain_data.append((key, primitive_value_to_str(value))) 

143 body = urlencode(plain_data, doseq=True).encode("utf-8") 

144 content_length = str(len(body)) 

145 content_type = "application/x-www-form-urlencoded" 

146 headers = {"Content-Length": content_length, "Content-Type": content_type} 

147 return headers, ByteStream(body) 

148 

149 

150def encode_multipart_data( 

151 data: RequestData, files: RequestFiles, boundary: bytes | None 

152) -> tuple[dict[str, str], MultipartStream]: 

153 multipart = MultipartStream(data=data, files=files, boundary=boundary) 

154 headers = multipart.get_headers() 

155 return headers, multipart 

156 

157 

158def encode_text(text: str) -> tuple[dict[str, str], ByteStream]: 

159 body = text.encode("utf-8") 

160 content_length = str(len(body)) 

161 content_type = "text/plain; charset=utf-8" 

162 headers = {"Content-Length": content_length, "Content-Type": content_type} 

163 return headers, ByteStream(body) 

164 

165 

166def encode_html(html: str) -> tuple[dict[str, str], ByteStream]: 

167 body = html.encode("utf-8") 

168 content_length = str(len(body)) 

169 content_type = "text/html; charset=utf-8" 

170 headers = {"Content-Length": content_length, "Content-Type": content_type} 

171 return headers, ByteStream(body) 

172 

173 

174def encode_json(json: Any) -> tuple[dict[str, str], ByteStream]: 

175 body = json_dumps(json).encode("utf-8") 

176 content_length = str(len(body)) 

177 content_type = "application/json" 

178 headers = {"Content-Length": content_length, "Content-Type": content_type} 

179 return headers, ByteStream(body) 

180 

181 

182def encode_request( 

183 content: RequestContent | None = None, 

184 data: RequestData | None = None, 

185 files: RequestFiles | None = None, 

186 json: Any | None = None, 

187 boundary: bytes | None = None, 

188) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]: 

189 """ 

190 Handles encoding the given `content`, `data`, `files`, and `json`, 

191 returning a two-tuple of (<headers>, <stream>). 

192 """ 

193 if data is not None and not isinstance(data, Mapping): 

194 # We prefer to separate `content=<bytes|str|byte iterator|bytes aiterator>` 

195 # for raw request content, and `data=<form data>` for url encoded or 

196 # multipart form content. 

197 # 

198 # However for compat with requests, we *do* still support 

199 # `data=<bytes...>` usages. We deal with that case here, treating it 

200 # as if `content=<...>` had been supplied instead. 

201 message = "Use 'content=<...>' to upload raw bytes/text content." 

202 warnings.warn(message, DeprecationWarning) 

203 return encode_content(data) 

204 

205 if content is not None: 

206 return encode_content(content) 

207 elif files: 

208 return encode_multipart_data(data or {}, files, boundary) 

209 elif data: 

210 return encode_urlencoded_data(data) 

211 elif json is not None: 

212 return encode_json(json) 

213 

214 return {}, ByteStream(b"") 

215 

216 

217def encode_response( 

218 content: ResponseContent | None = None, 

219 text: str | None = None, 

220 html: str | None = None, 

221 json: Any | None = None, 

222) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]: 

223 """ 

224 Handles encoding the given `content`, returning a two-tuple of 

225 (<headers>, <stream>). 

226 """ 

227 if content is not None: 

228 return encode_content(content) 

229 elif text is not None: 

230 return encode_text(text) 

231 elif html is not None: 

232 return encode_html(html) 

233 elif json is not None: 

234 return encode_json(json) 

235 

236 return {}, ByteStream(b"")