Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/http_writer.py: 25%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Http related parsers and protocol."""
3import asyncio
4import re
5import sys
6from typing import ( # noqa
7 TYPE_CHECKING,
8 Any,
9 Awaitable,
10 Callable,
11 Iterable,
12 List,
13 NamedTuple,
14 Optional,
15)
17from multidict import CIMultiDict
19from .abc import AbstractStreamWriter
20from .base_protocol import BaseProtocol
21from .client_exceptions import ClientConnectionResetError
22from .compression_utils import ZLibCompressor
23from .helpers import NO_EXTENSIONS
25__all__ = ("StreamWriter", "HttpVersion", "HttpVersion10", "HttpVersion11")
27if sys.version_info >= (3, 12):
28 from collections.abc import Buffer
29else:
30 from typing import Union
32 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
35MIN_PAYLOAD_FOR_WRITELINES = 2048
36IS_PY313_BEFORE_313_2 = (3, 13, 0) <= sys.version_info < (3, 13, 2)
37IS_PY_BEFORE_312_9 = sys.version_info < (3, 12, 9)
38SKIP_WRITELINES = IS_PY313_BEFORE_313_2 or IS_PY_BEFORE_312_9
39# writelines is not safe for use
40# on Python 3.12+ until 3.12.9
41# on Python 3.13+ until 3.13.2
42# and on older versions it not any faster than write
43# CVE-2024-12254: https://github.com/python/cpython/pull/127656
46class HttpVersion(NamedTuple):
47 major: int
48 minor: int
51HttpVersion10 = HttpVersion(1, 0)
52HttpVersion11 = HttpVersion(1, 1)
55_T_OnChunkSent = Optional[Callable[[Buffer], Awaitable[None]]]
56_T_OnHeadersSent = Optional[Callable[["CIMultiDict[str]"], Awaitable[None]]]
59class StreamWriter(AbstractStreamWriter):
61 length: int | None = None
62 chunked: bool = False
63 _eof: bool = False
64 _compress: ZLibCompressor | None = None
66 def __init__(
67 self,
68 protocol: BaseProtocol,
69 loop: asyncio.AbstractEventLoop,
70 on_chunk_sent: _T_OnChunkSent = None,
71 on_headers_sent: _T_OnHeadersSent = None,
72 ) -> None:
73 self._protocol = protocol
74 self.loop = loop
75 self._on_chunk_sent: _T_OnChunkSent = on_chunk_sent
76 self._on_headers_sent: _T_OnHeadersSent = on_headers_sent
77 self._headers_buf: bytes | None = None
78 self._headers_written: bool = False
80 @property
81 def transport(self) -> asyncio.Transport | None:
82 return self._protocol.transport
84 @property
85 def protocol(self) -> BaseProtocol:
86 return self._protocol
88 def enable_chunking(self) -> None:
89 self.chunked = True
91 def enable_compression(
92 self, encoding: str = "deflate", strategy: int | None = None
93 ) -> None:
94 self._compress = ZLibCompressor(encoding=encoding, strategy=strategy)
96 def _write(self, chunk: Buffer) -> None:
97 size = len(chunk)
98 self.buffer_size += size
99 self.output_size += size
100 transport = self._protocol.transport
101 if transport is None or transport.is_closing():
102 raise ClientConnectionResetError("Cannot write to closing transport")
103 transport.write(chunk)
105 def _writelines(self, chunks: Iterable[Buffer]) -> None:
106 size = 0
107 for chunk in chunks:
108 size += len(chunk)
109 self.buffer_size += size
110 self.output_size += size
111 transport = self._protocol.transport
112 if transport is None or transport.is_closing():
113 raise ClientConnectionResetError("Cannot write to closing transport")
114 if SKIP_WRITELINES or size < MIN_PAYLOAD_FOR_WRITELINES:
115 transport.write(b"".join(chunks))
116 else:
117 transport.writelines(chunks)
119 def _write_chunked_payload(self, chunk: Buffer) -> None:
120 """Write a chunk with proper chunked encoding."""
121 chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii")
122 self._writelines((chunk_len_pre, chunk, b"\r\n"))
124 def _send_headers_with_payload(self, chunk: Buffer, is_eof: bool) -> None:
125 """Send buffered headers with payload, coalescing into single write."""
126 # Mark headers as written
127 self._headers_written = True
128 headers_buf = self._headers_buf
129 self._headers_buf = None
131 if TYPE_CHECKING:
132 # Safe because callers (write() and write_eof()) only invoke this method
133 # after checking that self._headers_buf is truthy
134 assert headers_buf is not None
136 if not self.chunked:
137 # Non-chunked: coalesce headers with body
138 if chunk:
139 self._writelines((headers_buf, chunk))
140 else:
141 self._write(headers_buf)
142 return
144 # Coalesce headers with chunked data
145 if chunk:
146 chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii")
147 if is_eof:
148 self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n0\r\n\r\n"))
149 else:
150 self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n"))
151 elif is_eof:
152 self._writelines((headers_buf, b"0\r\n\r\n"))
153 else:
154 self._write(headers_buf)
156 async def write(
157 self, chunk: Buffer, *, drain: bool = True, LIMIT: int = 0x10000
158 ) -> None:
159 """
160 Writes chunk of data to a stream.
162 write_eof() indicates end of stream.
163 writer can't be used after write_eof() method being called.
164 write() return drain future.
165 """
166 if self._on_chunk_sent is not None:
167 await self._on_chunk_sent(chunk)
169 if isinstance(chunk, memoryview):
170 if chunk.nbytes != len(chunk):
171 # just reshape it
172 chunk = chunk.cast("c")
174 if self._compress is not None:
175 chunk = await self._compress.compress(chunk)
176 if not chunk:
177 return
179 if self.length is not None:
180 chunk_len = len(chunk)
181 if self.length >= chunk_len:
182 self.length = self.length - chunk_len
183 else:
184 chunk = chunk[: self.length]
185 self.length = 0
186 if not chunk:
187 return
189 # Handle buffered headers for small payload optimization
190 if self._headers_buf and not self._headers_written:
191 self._send_headers_with_payload(chunk, False)
192 if drain and self.buffer_size > LIMIT:
193 self.buffer_size = 0
194 await self.drain()
195 return
197 if chunk:
198 if self.chunked:
199 self._write_chunked_payload(chunk)
200 else:
201 self._write(chunk)
203 if drain and self.buffer_size > LIMIT:
204 self.buffer_size = 0
205 await self.drain()
207 async def write_headers(
208 self, status_line: str, headers: "CIMultiDict[str]"
209 ) -> None:
210 """Write headers to the stream."""
211 if self._on_headers_sent is not None:
212 await self._on_headers_sent(headers)
213 # status + headers
214 buf = _serialize_headers(status_line, headers)
215 self._headers_written = False
216 self._headers_buf = buf
218 def send_headers(self) -> None:
219 """Force sending buffered headers if not already sent."""
220 if not self._headers_buf or self._headers_written:
221 return
223 self._headers_written = True
224 headers_buf = self._headers_buf
225 self._headers_buf = None
227 if TYPE_CHECKING:
228 # Safe because we only enter this block when self._headers_buf is truthy
229 assert headers_buf is not None
231 self._write(headers_buf)
233 def set_eof(self) -> None:
234 """Indicate that the message is complete."""
235 if self._eof:
236 return
238 # If headers haven't been sent yet, send them now
239 # This handles the case where there's no body at all
240 if self._headers_buf and not self._headers_written:
241 self._headers_written = True
242 headers_buf = self._headers_buf
243 self._headers_buf = None
245 if TYPE_CHECKING:
246 # Safe because we only enter this block when self._headers_buf is truthy
247 assert headers_buf is not None
249 # Combine headers and chunked EOF marker in a single write
250 if self.chunked:
251 self._writelines((headers_buf, b"0\r\n\r\n"))
252 else:
253 self._write(headers_buf)
254 elif self.chunked and self._headers_written:
255 # Headers already sent, just send the final chunk marker
256 self._write(b"0\r\n\r\n")
258 self._eof = True
260 async def write_eof(self, chunk: bytes = b"") -> None:
261 if self._eof:
262 return
264 if chunk and self._on_chunk_sent is not None:
265 await self._on_chunk_sent(chunk)
267 # Handle body/compression
268 if self._compress:
269 chunks: list[bytes] = []
270 chunks_len = 0
271 if chunk and (compressed_chunk := await self._compress.compress(chunk)):
272 chunks_len = len(compressed_chunk)
273 chunks.append(compressed_chunk)
275 flush_chunk = self._compress.flush()
276 chunks_len += len(flush_chunk)
277 chunks.append(flush_chunk)
278 assert chunks_len
280 # Send buffered headers with compressed data if not yet sent
281 if self._headers_buf and not self._headers_written:
282 self._headers_written = True
283 headers_buf = self._headers_buf
284 self._headers_buf = None
286 if self.chunked:
287 # Coalesce headers with compressed chunked data
288 chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii")
289 self._writelines(
290 (headers_buf, chunk_len_pre, *chunks, b"\r\n0\r\n\r\n")
291 )
292 else:
293 # Coalesce headers with compressed data
294 self._writelines((headers_buf, *chunks))
295 await self.drain()
296 self._eof = True
297 return
299 # Headers already sent, just write compressed data
300 if self.chunked:
301 chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii")
302 self._writelines((chunk_len_pre, *chunks, b"\r\n0\r\n\r\n"))
303 elif len(chunks) > 1:
304 self._writelines(chunks)
305 else:
306 self._write(chunks[0])
307 await self.drain()
308 self._eof = True
309 return
311 # No compression - send buffered headers if not yet sent
312 if self._headers_buf and not self._headers_written:
313 # Use helper to send headers with payload
314 self._send_headers_with_payload(chunk, True)
315 await self.drain()
316 self._eof = True
317 return
319 # Handle remaining body
320 if self.chunked:
321 if chunk:
322 # Write final chunk with EOF marker
323 self._writelines(
324 (f"{len(chunk):x}\r\n".encode("ascii"), chunk, b"\r\n0\r\n\r\n")
325 )
326 else:
327 self._write(b"0\r\n\r\n")
328 await self.drain()
329 self._eof = True
330 return
332 if chunk:
333 self._write(chunk)
334 await self.drain()
336 self._eof = True
338 async def drain(self) -> None:
339 """Flush the write buffer.
341 The intended use is to write
343 await w.write(data)
344 await w.drain()
345 """
346 protocol = self._protocol
347 if protocol.transport is not None and protocol._paused:
348 await protocol._drain_helper()
351# https://www.rfc-editor.org/info/rfc9110/#section-5.5-5
352# https://www.rfc-editor.org/info/rfc9112/#section-4-3
353_FORBIDDEN_HEADER_CHARS_RE = re.compile(r"[\x00-\x08\x0a-\x1f\x7f]")
356def _safe_header(string: str) -> str:
357 if _FORBIDDEN_HEADER_CHARS_RE.search(string) is not None:
358 raise ValueError(
359 "Forbidden control character detected in headers. "
360 "Potential header injection attack."
361 )
362 return string
365def _py_serialize_headers(status_line: str, headers: "CIMultiDict[str]") -> bytes:
366 _safe_header(status_line)
367 headers_gen = (_safe_header(k) + ": " + _safe_header(v) for k, v in headers.items())
368 line = status_line + "\r\n" + "\r\n".join(headers_gen) + "\r\n\r\n"
369 return line.encode("utf-8")
372_serialize_headers = _py_serialize_headers
374try:
375 import aiohttp._http_writer as _http_writer # type: ignore[import-not-found]
377 _c_serialize_headers = _http_writer._serialize_headers
378 if not NO_EXTENSIONS:
379 _serialize_headers = _c_serialize_headers
380except ImportError:
381 pass