Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/http_writer.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Http related parsers and protocol."""
3import asyncio
4import sys
5from typing import ( # noqa
6 TYPE_CHECKING,
7 Any,
8 Awaitable,
9 Callable,
10 Iterable,
11 List,
12 NamedTuple,
13 Optional,
14 Union,
15)
17from multidict import CIMultiDict
19from .abc import AbstractStreamWriter
20from .base_protocol import BaseProtocol
21from .client_exceptions import ClientConnectionResetError
22from .compression_utils import ZLibCompressor
23from .helpers import NO_EXTENSIONS
25__all__ = ("StreamWriter", "HttpVersion", "HttpVersion10", "HttpVersion11")
28MIN_PAYLOAD_FOR_WRITELINES = 2048
29IS_PY313_BEFORE_313_2 = (3, 13, 0) <= sys.version_info < (3, 13, 2)
30IS_PY_BEFORE_312_9 = sys.version_info < (3, 12, 9)
31SKIP_WRITELINES = IS_PY313_BEFORE_313_2 or IS_PY_BEFORE_312_9
32# writelines is not safe for use
33# on Python 3.12+ until 3.12.9
34# on Python 3.13+ until 3.13.2
35# and on older versions it not any faster than write
36# CVE-2024-12254: https://github.com/python/cpython/pull/127656
39class HttpVersion(NamedTuple):
40 major: int
41 minor: int
44HttpVersion10 = HttpVersion(1, 0)
45HttpVersion11 = HttpVersion(1, 1)
48_T_OnChunkSent = Optional[
49 Callable[
50 [Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]],
51 Awaitable[None],
52 ]
53]
54_T_OnHeadersSent = Optional[Callable[["CIMultiDict[str]"], Awaitable[None]]]
57class StreamWriter(AbstractStreamWriter):
59 length: int | None = None
60 chunked: bool = False
61 _eof: bool = False
62 _compress: ZLibCompressor | None = None
64 def __init__(
65 self,
66 protocol: BaseProtocol,
67 loop: asyncio.AbstractEventLoop,
68 on_chunk_sent: _T_OnChunkSent = None,
69 on_headers_sent: _T_OnHeadersSent = None,
70 ) -> None:
71 self._protocol = protocol
72 self.loop = loop
73 self._on_chunk_sent: _T_OnChunkSent = on_chunk_sent
74 self._on_headers_sent: _T_OnHeadersSent = on_headers_sent
75 self._headers_buf: bytes | None = None
76 self._headers_written: bool = False
78 @property
79 def transport(self) -> asyncio.Transport | None:
80 return self._protocol.transport
82 @property
83 def protocol(self) -> BaseProtocol:
84 return self._protocol
86 def enable_chunking(self) -> None:
87 self.chunked = True
89 def enable_compression(
90 self, encoding: str = "deflate", strategy: int | None = None
91 ) -> None:
92 self._compress = ZLibCompressor(encoding=encoding, strategy=strategy)
94 def _write(
95 self, chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
96 ) -> None:
97 size = len(chunk)
98 self.buffer_size += size
99 self.output_size += size
100 transport = self._protocol.transport
101 if transport is None or transport.is_closing():
102 raise ClientConnectionResetError("Cannot write to closing transport")
103 transport.write(chunk)
105 def _writelines(
106 self,
107 chunks: Iterable[
108 Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
109 ],
110 ) -> None:
111 size = 0
112 for chunk in chunks:
113 size += len(chunk)
114 self.buffer_size += size
115 self.output_size += size
116 transport = self._protocol.transport
117 if transport is None or transport.is_closing():
118 raise ClientConnectionResetError("Cannot write to closing transport")
119 if SKIP_WRITELINES or size < MIN_PAYLOAD_FOR_WRITELINES:
120 transport.write(b"".join(chunks))
121 else:
122 transport.writelines(chunks)
124 def _write_chunked_payload(
125 self, chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
126 ) -> None:
127 """Write a chunk with proper chunked encoding."""
128 chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii")
129 self._writelines((chunk_len_pre, chunk, b"\r\n"))
131 def _send_headers_with_payload(
132 self,
133 chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"],
134 is_eof: bool,
135 ) -> None:
136 """Send buffered headers with payload, coalescing into single write."""
137 # Mark headers as written
138 self._headers_written = True
139 headers_buf = self._headers_buf
140 self._headers_buf = None
142 if TYPE_CHECKING:
143 # Safe because callers (write() and write_eof()) only invoke this method
144 # after checking that self._headers_buf is truthy
145 assert headers_buf is not None
147 if not self.chunked:
148 # Non-chunked: coalesce headers with body
149 if chunk:
150 self._writelines((headers_buf, chunk))
151 else:
152 self._write(headers_buf)
153 return
155 # Coalesce headers with chunked data
156 if chunk:
157 chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii")
158 if is_eof:
159 self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n0\r\n\r\n"))
160 else:
161 self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n"))
162 elif is_eof:
163 self._writelines((headers_buf, b"0\r\n\r\n"))
164 else:
165 self._write(headers_buf)
167 async def write(
168 self,
169 chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"],
170 *,
171 drain: bool = True,
172 LIMIT: int = 0x10000,
173 ) -> None:
174 """
175 Writes chunk of data to a stream.
177 write_eof() indicates end of stream.
178 writer can't be used after write_eof() method being called.
179 write() return drain future.
180 """
181 if self._on_chunk_sent is not None:
182 await self._on_chunk_sent(chunk)
184 if isinstance(chunk, memoryview):
185 if chunk.nbytes != len(chunk):
186 # just reshape it
187 chunk = chunk.cast("c")
189 if self._compress is not None:
190 chunk = await self._compress.compress(chunk)
191 if not chunk:
192 return
194 if self.length is not None:
195 chunk_len = len(chunk)
196 if self.length >= chunk_len:
197 self.length = self.length - chunk_len
198 else:
199 chunk = chunk[: self.length]
200 self.length = 0
201 if not chunk:
202 return
204 # Handle buffered headers for small payload optimization
205 if self._headers_buf and not self._headers_written:
206 self._send_headers_with_payload(chunk, False)
207 if drain and self.buffer_size > LIMIT:
208 self.buffer_size = 0
209 await self.drain()
210 return
212 if chunk:
213 if self.chunked:
214 self._write_chunked_payload(chunk)
215 else:
216 self._write(chunk)
218 if drain and self.buffer_size > LIMIT:
219 self.buffer_size = 0
220 await self.drain()
222 async def write_headers(
223 self, status_line: str, headers: "CIMultiDict[str]"
224 ) -> None:
225 """Write headers to the stream."""
226 if self._on_headers_sent is not None:
227 await self._on_headers_sent(headers)
228 # status + headers
229 buf = _serialize_headers(status_line, headers)
230 self._headers_written = False
231 self._headers_buf = buf
233 def send_headers(self) -> None:
234 """Force sending buffered headers if not already sent."""
235 if not self._headers_buf or self._headers_written:
236 return
238 self._headers_written = True
239 headers_buf = self._headers_buf
240 self._headers_buf = None
242 if TYPE_CHECKING:
243 # Safe because we only enter this block when self._headers_buf is truthy
244 assert headers_buf is not None
246 self._write(headers_buf)
248 def set_eof(self) -> None:
249 """Indicate that the message is complete."""
250 if self._eof:
251 return
253 # If headers haven't been sent yet, send them now
254 # This handles the case where there's no body at all
255 if self._headers_buf and not self._headers_written:
256 self._headers_written = True
257 headers_buf = self._headers_buf
258 self._headers_buf = None
260 if TYPE_CHECKING:
261 # Safe because we only enter this block when self._headers_buf is truthy
262 assert headers_buf is not None
264 # Combine headers and chunked EOF marker in a single write
265 if self.chunked:
266 self._writelines((headers_buf, b"0\r\n\r\n"))
267 else:
268 self._write(headers_buf)
269 elif self.chunked and self._headers_written:
270 # Headers already sent, just send the final chunk marker
271 self._write(b"0\r\n\r\n")
273 self._eof = True
275 async def write_eof(self, chunk: bytes = b"") -> None:
276 if self._eof:
277 return
279 if chunk and self._on_chunk_sent is not None:
280 await self._on_chunk_sent(chunk)
282 # Handle body/compression
283 if self._compress:
284 chunks: list[bytes] = []
285 chunks_len = 0
286 if chunk and (compressed_chunk := await self._compress.compress(chunk)):
287 chunks_len = len(compressed_chunk)
288 chunks.append(compressed_chunk)
290 flush_chunk = self._compress.flush()
291 chunks_len += len(flush_chunk)
292 chunks.append(flush_chunk)
293 assert chunks_len
295 # Send buffered headers with compressed data if not yet sent
296 if self._headers_buf and not self._headers_written:
297 self._headers_written = True
298 headers_buf = self._headers_buf
299 self._headers_buf = None
301 if self.chunked:
302 # Coalesce headers with compressed chunked data
303 chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii")
304 self._writelines(
305 (headers_buf, chunk_len_pre, *chunks, b"\r\n0\r\n\r\n")
306 )
307 else:
308 # Coalesce headers with compressed data
309 self._writelines((headers_buf, *chunks))
310 await self.drain()
311 self._eof = True
312 return
314 # Headers already sent, just write compressed data
315 if self.chunked:
316 chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii")
317 self._writelines((chunk_len_pre, *chunks, b"\r\n0\r\n\r\n"))
318 elif len(chunks) > 1:
319 self._writelines(chunks)
320 else:
321 self._write(chunks[0])
322 await self.drain()
323 self._eof = True
324 return
326 # No compression - send buffered headers if not yet sent
327 if self._headers_buf and not self._headers_written:
328 # Use helper to send headers with payload
329 self._send_headers_with_payload(chunk, True)
330 await self.drain()
331 self._eof = True
332 return
334 # Handle remaining body
335 if self.chunked:
336 if chunk:
337 # Write final chunk with EOF marker
338 self._writelines(
339 (f"{len(chunk):x}\r\n".encode("ascii"), chunk, b"\r\n0\r\n\r\n")
340 )
341 else:
342 self._write(b"0\r\n\r\n")
343 await self.drain()
344 self._eof = True
345 return
347 if chunk:
348 self._write(chunk)
349 await self.drain()
351 self._eof = True
353 async def drain(self) -> None:
354 """Flush the write buffer.
356 The intended use is to write
358 await w.write(data)
359 await w.drain()
360 """
361 protocol = self._protocol
362 if protocol.transport is not None and protocol._paused:
363 await protocol._drain_helper()
366def _safe_header(string: str) -> str:
367 if "\r" in string or "\n" in string or "\x00" in string:
368 raise ValueError(
369 "Newline, carriage return, or null byte detected in headers. "
370 "Potential header injection attack."
371 )
372 return string
375def _py_serialize_headers(status_line: str, headers: "CIMultiDict[str]") -> bytes:
376 _safe_header(status_line)
377 headers_gen = (_safe_header(k) + ": " + _safe_header(v) for k, v in headers.items())
378 line = status_line + "\r\n" + "\r\n".join(headers_gen) + "\r\n\r\n"
379 return line.encode("utf-8")
382_serialize_headers = _py_serialize_headers
384try:
385 import aiohttp._http_writer as _http_writer # type: ignore[import-not-found]
387 _c_serialize_headers = _http_writer._serialize_headers
388 if not NO_EXTENSIONS:
389 _serialize_headers = _c_serialize_headers
390except ImportError:
391 pass