Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_content.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

130 statements  

1from __future__ import annotations 

2 

3import inspect 

4import warnings 

5from json import dumps as json_dumps 

6from typing import ( 

7 Any, 

8 AsyncIterable, 

9 AsyncIterator, 

10 Iterable, 

11 Iterator, 

12 Mapping, 

13) 

14from urllib.parse import urlencode 

15 

16from ._exceptions import StreamClosed, StreamConsumed 

17from ._multipart import MultipartStream 

18from ._types import ( 

19 AsyncByteStream, 

20 RequestContent, 

21 RequestData, 

22 RequestFiles, 

23 ResponseContent, 

24 SyncByteStream, 

25) 

26from ._utils import peek_filelike_length, primitive_value_to_str 

27 

28__all__ = ["ByteStream"] 

29 

30 

31class ByteStream(AsyncByteStream, SyncByteStream): 

32 def __init__(self, stream: bytes) -> None: 

33 self._stream = stream 

34 

35 def __iter__(self) -> Iterator[bytes]: 

36 yield self._stream 

37 

38 async def __aiter__(self) -> AsyncIterator[bytes]: 

39 yield self._stream 

40 

41 

42class IteratorByteStream(SyncByteStream): 

43 CHUNK_SIZE = 65_536 

44 

45 def __init__(self, stream: Iterable[bytes]) -> None: 

46 self._stream = stream 

47 self._is_stream_consumed = False 

48 self._is_generator = inspect.isgenerator(stream) 

49 

50 def __iter__(self) -> Iterator[bytes]: 

51 if self._is_stream_consumed and self._is_generator: 

52 raise StreamConsumed() 

53 

54 self._is_stream_consumed = True 

55 if hasattr(self._stream, "read"): 

56 # File-like interfaces should use 'read' directly. 

57 chunk = self._stream.read(self.CHUNK_SIZE) 

58 while chunk: 

59 yield chunk 

60 chunk = self._stream.read(self.CHUNK_SIZE) 

61 else: 

62 # Otherwise iterate. 

63 for part in self._stream: 

64 yield part 

65 

66 

67class AsyncIteratorByteStream(AsyncByteStream): 

68 CHUNK_SIZE = 65_536 

69 

70 def __init__(self, stream: AsyncIterable[bytes]) -> None: 

71 self._stream = stream 

72 self._is_stream_consumed = False 

73 self._is_generator = inspect.isasyncgen(stream) 

74 

75 async def __aiter__(self) -> AsyncIterator[bytes]: 

76 if self._is_stream_consumed and self._is_generator: 

77 raise StreamConsumed() 

78 

79 self._is_stream_consumed = True 

80 if hasattr(self._stream, "aread"): 

81 # File-like interfaces should use 'aread' directly. 

82 chunk = await self._stream.aread(self.CHUNK_SIZE) 

83 while chunk: 

84 yield chunk 

85 chunk = await self._stream.aread(self.CHUNK_SIZE) 

86 else: 

87 # Otherwise iterate. 

88 async for part in self._stream: 

89 yield part 

90 

91 

92class UnattachedStream(AsyncByteStream, SyncByteStream): 

93 """ 

94 If a request or response is serialized using pickle, then it is no longer 

95 attached to a stream for I/O purposes. Any stream operations should result 

96 in `httpx.StreamClosed`. 

97 """ 

98 

99 def __iter__(self) -> Iterator[bytes]: 

100 raise StreamClosed() 

101 

102 async def __aiter__(self) -> AsyncIterator[bytes]: 

103 raise StreamClosed() 

104 yield b"" # pragma: no cover 

105 

106 

107def encode_content( 

108 content: str | bytes | Iterable[bytes] | AsyncIterable[bytes], 

109) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]: 

110 if isinstance(content, (bytes, str)): 

111 body = content.encode("utf-8") if isinstance(content, str) else content 

112 content_length = len(body) 

113 headers = {"Content-Length": str(content_length)} if body else {} 

114 return headers, ByteStream(body) 

115 

116 elif isinstance(content, Iterable) and not isinstance(content, dict): 

117 # `not isinstance(content, dict)` is a bit oddly specific, but it 

118 # catches a case that's easy for users to make in error, and would 

119 # otherwise pass through here, like any other bytes-iterable, 

120 # because `dict` happens to be iterable. See issue #2491. 

121 content_length_or_none = peek_filelike_length(content) 

122 

123 if content_length_or_none is None: 

124 headers = {"Transfer-Encoding": "chunked"} 

125 else: 

126 headers = {"Content-Length": str(content_length_or_none)} 

127 return headers, IteratorByteStream(content) # type: ignore 

128 

129 elif isinstance(content, AsyncIterable): 

130 headers = {"Transfer-Encoding": "chunked"} 

131 return headers, AsyncIteratorByteStream(content) 

132 

133 raise TypeError(f"Unexpected type for 'content', {type(content)!r}") 

134 

135 

136def encode_urlencoded_data( 

137 data: RequestData, 

138) -> tuple[dict[str, str], ByteStream]: 

139 plain_data = [] 

140 for key, value in data.items(): 

141 if isinstance(value, (list, tuple)): 

142 plain_data.extend([(key, primitive_value_to_str(item)) for item in value]) 

143 else: 

144 plain_data.append((key, primitive_value_to_str(value))) 

145 body = urlencode(plain_data, doseq=True).encode("utf-8") 

146 content_length = str(len(body)) 

147 content_type = "application/x-www-form-urlencoded" 

148 headers = {"Content-Length": content_length, "Content-Type": content_type} 

149 return headers, ByteStream(body) 

150 

151 

152def encode_multipart_data( 

153 data: RequestData, files: RequestFiles, boundary: bytes | None 

154) -> tuple[dict[str, str], MultipartStream]: 

155 multipart = MultipartStream(data=data, files=files, boundary=boundary) 

156 headers = multipart.get_headers() 

157 return headers, multipart 

158 

159 

160def encode_text(text: str) -> tuple[dict[str, str], ByteStream]: 

161 body = text.encode("utf-8") 

162 content_length = str(len(body)) 

163 content_type = "text/plain; charset=utf-8" 

164 headers = {"Content-Length": content_length, "Content-Type": content_type} 

165 return headers, ByteStream(body) 

166 

167 

168def encode_html(html: str) -> tuple[dict[str, str], ByteStream]: 

169 body = html.encode("utf-8") 

170 content_length = str(len(body)) 

171 content_type = "text/html; charset=utf-8" 

172 headers = {"Content-Length": content_length, "Content-Type": content_type} 

173 return headers, ByteStream(body) 

174 

175 

176def encode_json(json: Any) -> tuple[dict[str, str], ByteStream]: 

177 body = json_dumps(json).encode("utf-8") 

178 content_length = str(len(body)) 

179 content_type = "application/json" 

180 headers = {"Content-Length": content_length, "Content-Type": content_type} 

181 return headers, ByteStream(body) 

182 

183 

184def encode_request( 

185 content: RequestContent | None = None, 

186 data: RequestData | None = None, 

187 files: RequestFiles | None = None, 

188 json: Any | None = None, 

189 boundary: bytes | None = None, 

190) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]: 

191 """ 

192 Handles encoding the given `content`, `data`, `files`, and `json`, 

193 returning a two-tuple of (<headers>, <stream>). 

194 """ 

195 if data is not None and not isinstance(data, Mapping): 

196 # We prefer to separate `content=<bytes|str|byte iterator|bytes aiterator>` 

197 # for raw request content, and `data=<form data>` for url encoded or 

198 # multipart form content. 

199 # 

200 # However for compat with requests, we *do* still support 

201 # `data=<bytes...>` usages. We deal with that case here, treating it 

202 # as if `content=<...>` had been supplied instead. 

203 message = "Use 'content=<...>' to upload raw bytes/text content." 

204 warnings.warn(message, DeprecationWarning) 

205 return encode_content(data) 

206 

207 if content is not None: 

208 return encode_content(content) 

209 elif files: 

210 return encode_multipart_data(data or {}, files, boundary) 

211 elif data: 

212 return encode_urlencoded_data(data) 

213 elif json is not None: 

214 return encode_json(json) 

215 

216 return {}, ByteStream(b"") 

217 

218 

219def encode_response( 

220 content: ResponseContent | None = None, 

221 text: str | None = None, 

222 html: str | None = None, 

223 json: Any | None = None, 

224) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]: 

225 """ 

226 Handles encoding the given `content`, returning a two-tuple of 

227 (<headers>, <stream>). 

228 """ 

229 if content is not None: 

230 return encode_content(content) 

231 elif text is not None: 

232 return encode_text(text) 

233 elif html is not None: 

234 return encode_html(html) 

235 elif json is not None: 

236 return encode_json(json) 

237 

238 return {}, ByteStream(b"")