Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_content.py: 25%
128 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import inspect
2import warnings
3from json import dumps as json_dumps
4from typing import (
5 Any,
6 AsyncIterable,
7 AsyncIterator,
8 Dict,
9 Iterable,
10 Iterator,
11 Mapping,
12 Optional,
13 Tuple,
14 Union,
15)
16from urllib.parse import urlencode
18from ._exceptions import StreamClosed, StreamConsumed
19from ._multipart import MultipartStream
20from ._types import (
21 AsyncByteStream,
22 RequestContent,
23 RequestData,
24 RequestFiles,
25 ResponseContent,
26 SyncByteStream,
27)
28from ._utils import peek_filelike_length, primitive_value_to_str
31class ByteStream(AsyncByteStream, SyncByteStream):
32 def __init__(self, stream: bytes) -> None:
33 self._stream = stream
35 def __iter__(self) -> Iterator[bytes]:
36 yield self._stream
38 async def __aiter__(self) -> AsyncIterator[bytes]:
39 yield self._stream
42class IteratorByteStream(SyncByteStream):
43 CHUNK_SIZE = 65_536
45 def __init__(self, stream: Iterable[bytes]):
46 self._stream = stream
47 self._is_stream_consumed = False
48 self._is_generator = inspect.isgenerator(stream)
50 def __iter__(self) -> Iterator[bytes]:
51 if self._is_stream_consumed and self._is_generator:
52 raise StreamConsumed()
54 self._is_stream_consumed = True
55 if hasattr(self._stream, "read"):
56 # File-like interfaces should use 'read' directly.
57 chunk = self._stream.read(self.CHUNK_SIZE) # type: ignore
58 while chunk:
59 yield chunk
60 chunk = self._stream.read(self.CHUNK_SIZE) # type: ignore
61 else:
62 # Otherwise iterate.
63 for part in self._stream:
64 yield part
67class AsyncIteratorByteStream(AsyncByteStream):
68 CHUNK_SIZE = 65_536
70 def __init__(self, stream: AsyncIterable[bytes]):
71 self._stream = stream
72 self._is_stream_consumed = False
73 self._is_generator = inspect.isasyncgen(stream)
75 async def __aiter__(self) -> AsyncIterator[bytes]:
76 if self._is_stream_consumed and self._is_generator:
77 raise StreamConsumed()
79 self._is_stream_consumed = True
80 if hasattr(self._stream, "aread"):
81 # File-like interfaces should use 'aread' directly.
82 chunk = await self._stream.aread(self.CHUNK_SIZE) # type: ignore
83 while chunk:
84 yield chunk
85 chunk = await self._stream.aread(self.CHUNK_SIZE) # type: ignore
86 else:
87 # Otherwise iterate.
88 async for part in self._stream:
89 yield part
92class UnattachedStream(AsyncByteStream, SyncByteStream):
93 """
94 If a request or response is serialized using pickle, then it is no longer
95 attached to a stream for I/O purposes. Any stream operations should result
96 in `httpx.StreamClosed`.
97 """
99 def __iter__(self) -> Iterator[bytes]:
100 raise StreamClosed()
102 async def __aiter__(self) -> AsyncIterator[bytes]:
103 raise StreamClosed()
104 yield b"" # pragma: no cover
107def encode_content(
108 content: Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]]
109) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
111 if isinstance(content, (bytes, str)):
112 body = content.encode("utf-8") if isinstance(content, str) else content
113 content_length = len(body)
114 headers = {"Content-Length": str(content_length)} if body else {}
115 return headers, ByteStream(body)
117 elif isinstance(content, Iterable) and not isinstance(content, dict):
118 # `not isinstance(content, dict)` is a bit oddly specific, but it
119 # catches a case that's easy for users to make in error, and would
120 # otherwise pass through here, like any other bytes-iterable,
121 # because `dict` happens to be iterable. See issue #2491.
122 content_length_or_none = peek_filelike_length(content)
124 if content_length_or_none is None:
125 headers = {"Transfer-Encoding": "chunked"}
126 else:
127 headers = {"Content-Length": str(content_length_or_none)}
128 return headers, IteratorByteStream(content) # type: ignore
130 elif isinstance(content, AsyncIterable):
131 headers = {"Transfer-Encoding": "chunked"}
132 return headers, AsyncIteratorByteStream(content)
134 raise TypeError(f"Unexpected type for 'content', {type(content)!r}")
137def encode_urlencoded_data(
138 data: RequestData,
139) -> Tuple[Dict[str, str], ByteStream]:
140 plain_data = []
141 for key, value in data.items():
142 if isinstance(value, (list, tuple)):
143 plain_data.extend([(key, primitive_value_to_str(item)) for item in value])
144 else:
145 plain_data.append((key, primitive_value_to_str(value)))
146 body = urlencode(plain_data, doseq=True).encode("utf-8")
147 content_length = str(len(body))
148 content_type = "application/x-www-form-urlencoded"
149 headers = {"Content-Length": content_length, "Content-Type": content_type}
150 return headers, ByteStream(body)
153def encode_multipart_data(
154 data: RequestData, files: RequestFiles, boundary: Optional[bytes]
155) -> Tuple[Dict[str, str], MultipartStream]:
156 multipart = MultipartStream(data=data, files=files, boundary=boundary)
157 headers = multipart.get_headers()
158 return headers, multipart
161def encode_text(text: str) -> Tuple[Dict[str, str], ByteStream]:
162 body = text.encode("utf-8")
163 content_length = str(len(body))
164 content_type = "text/plain; charset=utf-8"
165 headers = {"Content-Length": content_length, "Content-Type": content_type}
166 return headers, ByteStream(body)
169def encode_html(html: str) -> Tuple[Dict[str, str], ByteStream]:
170 body = html.encode("utf-8")
171 content_length = str(len(body))
172 content_type = "text/html; charset=utf-8"
173 headers = {"Content-Length": content_length, "Content-Type": content_type}
174 return headers, ByteStream(body)
177def encode_json(json: Any) -> Tuple[Dict[str, str], ByteStream]:
178 body = json_dumps(json).encode("utf-8")
179 content_length = str(len(body))
180 content_type = "application/json"
181 headers = {"Content-Length": content_length, "Content-Type": content_type}
182 return headers, ByteStream(body)
185def encode_request(
186 content: Optional[RequestContent] = None,
187 data: Optional[RequestData] = None,
188 files: Optional[RequestFiles] = None,
189 json: Optional[Any] = None,
190 boundary: Optional[bytes] = None,
191) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
192 """
193 Handles encoding the given `content`, `data`, `files`, and `json`,
194 returning a two-tuple of (<headers>, <stream>).
195 """
196 if data is not None and not isinstance(data, Mapping):
197 # We prefer to separate `content=<bytes|str|byte iterator|bytes aiterator>`
198 # for raw request content, and `data=<form data>` for url encoded or
199 # multipart form content.
200 #
201 # However for compat with requests, we *do* still support
202 # `data=<bytes...>` usages. We deal with that case here, treating it
203 # as if `content=<...>` had been supplied instead.
204 message = "Use 'content=<...>' to upload raw bytes/text content."
205 warnings.warn(message, DeprecationWarning)
206 return encode_content(data)
208 if content is not None:
209 return encode_content(content)
210 elif files:
211 return encode_multipart_data(data or {}, files, boundary)
212 elif data:
213 return encode_urlencoded_data(data)
214 elif json is not None:
215 return encode_json(json)
217 return {}, ByteStream(b"")
220def encode_response(
221 content: Optional[ResponseContent] = None,
222 text: Optional[str] = None,
223 html: Optional[str] = None,
224 json: Optional[Any] = None,
225) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
226 """
227 Handles encoding the given `content`, returning a two-tuple of
228 (<headers>, <stream>).
229 """
230 if content is not None:
231 return encode_content(content)
232 elif text is not None:
233 return encode_text(text)
234 elif html is not None:
235 return encode_html(html)
236 elif json is not None:
237 return encode_json(json)
239 return {}, ByteStream(b"")