Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/http_writer.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Http related parsers and protocol."""
3import asyncio
4import re
5import sys
6from typing import ( # noqa
7 TYPE_CHECKING,
8 Any,
9 Awaitable,
10 Callable,
11 Iterable,
12 List,
13 NamedTuple,
14 Optional,
15 Union,
16)
18from multidict import CIMultiDict
20from .abc import AbstractStreamWriter
21from .base_protocol import BaseProtocol
22from .client_exceptions import ClientConnectionResetError
23from .compression_utils import ZLibCompressor
24from .helpers import NO_EXTENSIONS
26__all__ = ("StreamWriter", "HttpVersion", "HttpVersion10", "HttpVersion11")
29MIN_PAYLOAD_FOR_WRITELINES = 2048
30IS_PY313_BEFORE_313_2 = (3, 13, 0) <= sys.version_info < (3, 13, 2)
31IS_PY_BEFORE_312_9 = sys.version_info < (3, 12, 9)
32SKIP_WRITELINES = IS_PY313_BEFORE_313_2 or IS_PY_BEFORE_312_9
33# writelines is not safe for use
34# on Python 3.12+ until 3.12.9
35# on Python 3.13+ until 3.13.2
36# and on older versions it not any faster than write
37# CVE-2024-12254: https://github.com/python/cpython/pull/127656
40class HttpVersion(NamedTuple):
41 major: int
42 minor: int
45HttpVersion10 = HttpVersion(1, 0)
46HttpVersion11 = HttpVersion(1, 1)
49_T_OnChunkSent = Optional[
50 Callable[
51 [Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]],
52 Awaitable[None],
53 ]
54]
55_T_OnHeadersSent = Optional[Callable[["CIMultiDict[str]"], Awaitable[None]]]
58class StreamWriter(AbstractStreamWriter):
60 length: int | None = None
61 chunked: bool = False
62 _eof: bool = False
63 _compress: ZLibCompressor | None = None
65 def __init__(
66 self,
67 protocol: BaseProtocol,
68 loop: asyncio.AbstractEventLoop,
69 on_chunk_sent: _T_OnChunkSent = None,
70 on_headers_sent: _T_OnHeadersSent = None,
71 ) -> None:
72 self._protocol = protocol
73 self.loop = loop
74 self._on_chunk_sent: _T_OnChunkSent = on_chunk_sent
75 self._on_headers_sent: _T_OnHeadersSent = on_headers_sent
76 self._headers_buf: bytes | None = None
77 self._headers_written: bool = False
79 @property
80 def transport(self) -> asyncio.Transport | None:
81 return self._protocol.transport
83 @property
84 def protocol(self) -> BaseProtocol:
85 return self._protocol
87 def enable_chunking(self) -> None:
88 self.chunked = True
90 def enable_compression(
91 self, encoding: str = "deflate", strategy: int | None = None
92 ) -> None:
93 self._compress = ZLibCompressor(encoding=encoding, strategy=strategy)
95 def _write(
96 self, chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
97 ) -> None:
98 size = len(chunk)
99 self.buffer_size += size
100 self.output_size += size
101 transport = self._protocol.transport
102 if transport is None or transport.is_closing():
103 raise ClientConnectionResetError("Cannot write to closing transport")
104 transport.write(chunk)
106 def _writelines(
107 self,
108 chunks: Iterable[
109 Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
110 ],
111 ) -> None:
112 size = 0
113 for chunk in chunks:
114 size += len(chunk)
115 self.buffer_size += size
116 self.output_size += size
117 transport = self._protocol.transport
118 if transport is None or transport.is_closing():
119 raise ClientConnectionResetError("Cannot write to closing transport")
120 if SKIP_WRITELINES or size < MIN_PAYLOAD_FOR_WRITELINES:
121 transport.write(b"".join(chunks))
122 else:
123 transport.writelines(chunks)
125 def _write_chunked_payload(
126 self, chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
127 ) -> None:
128 """Write a chunk with proper chunked encoding."""
129 chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii")
130 self._writelines((chunk_len_pre, chunk, b"\r\n"))
132 def _send_headers_with_payload(
133 self,
134 chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"],
135 is_eof: bool,
136 ) -> None:
137 """Send buffered headers with payload, coalescing into single write."""
138 # Mark headers as written
139 self._headers_written = True
140 headers_buf = self._headers_buf
141 self._headers_buf = None
143 if TYPE_CHECKING:
144 # Safe because callers (write() and write_eof()) only invoke this method
145 # after checking that self._headers_buf is truthy
146 assert headers_buf is not None
148 if not self.chunked:
149 # Non-chunked: coalesce headers with body
150 if chunk:
151 self._writelines((headers_buf, chunk))
152 else:
153 self._write(headers_buf)
154 return
156 # Coalesce headers with chunked data
157 if chunk:
158 chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii")
159 if is_eof:
160 self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n0\r\n\r\n"))
161 else:
162 self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n"))
163 elif is_eof:
164 self._writelines((headers_buf, b"0\r\n\r\n"))
165 else:
166 self._write(headers_buf)
168 async def write(
169 self,
170 chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"],
171 *,
172 drain: bool = True,
173 LIMIT: int = 0x10000,
174 ) -> None:
175 """
176 Writes chunk of data to a stream.
178 write_eof() indicates end of stream.
179 writer can't be used after write_eof() method being called.
180 write() return drain future.
181 """
182 if self._on_chunk_sent is not None:
183 await self._on_chunk_sent(chunk)
185 if isinstance(chunk, memoryview):
186 if chunk.nbytes != len(chunk):
187 # just reshape it
188 chunk = chunk.cast("c")
190 if self._compress is not None:
191 chunk = await self._compress.compress(chunk)
192 if not chunk:
193 return
195 if self.length is not None:
196 chunk_len = len(chunk)
197 if self.length >= chunk_len:
198 self.length = self.length - chunk_len
199 else:
200 chunk = chunk[: self.length]
201 self.length = 0
202 if not chunk:
203 return
205 # Handle buffered headers for small payload optimization
206 if self._headers_buf and not self._headers_written:
207 self._send_headers_with_payload(chunk, False)
208 if drain and self.buffer_size > LIMIT:
209 self.buffer_size = 0
210 await self.drain()
211 return
213 if chunk:
214 if self.chunked:
215 self._write_chunked_payload(chunk)
216 else:
217 self._write(chunk)
219 if drain and self.buffer_size > LIMIT:
220 self.buffer_size = 0
221 await self.drain()
223 async def write_headers(
224 self, status_line: str, headers: "CIMultiDict[str]"
225 ) -> None:
226 """Write headers to the stream."""
227 if self._on_headers_sent is not None:
228 await self._on_headers_sent(headers)
229 # status + headers
230 buf = _serialize_headers(status_line, headers)
231 self._headers_written = False
232 self._headers_buf = buf
234 def send_headers(self) -> None:
235 """Force sending buffered headers if not already sent."""
236 if not self._headers_buf or self._headers_written:
237 return
239 self._headers_written = True
240 headers_buf = self._headers_buf
241 self._headers_buf = None
243 if TYPE_CHECKING:
244 # Safe because we only enter this block when self._headers_buf is truthy
245 assert headers_buf is not None
247 self._write(headers_buf)
249 def set_eof(self) -> None:
250 """Indicate that the message is complete."""
251 if self._eof:
252 return
254 # If headers haven't been sent yet, send them now
255 # This handles the case where there's no body at all
256 if self._headers_buf and not self._headers_written:
257 self._headers_written = True
258 headers_buf = self._headers_buf
259 self._headers_buf = None
261 if TYPE_CHECKING:
262 # Safe because we only enter this block when self._headers_buf is truthy
263 assert headers_buf is not None
265 # Combine headers and chunked EOF marker in a single write
266 if self.chunked:
267 self._writelines((headers_buf, b"0\r\n\r\n"))
268 else:
269 self._write(headers_buf)
270 elif self.chunked and self._headers_written:
271 # Headers already sent, just send the final chunk marker
272 self._write(b"0\r\n\r\n")
274 self._eof = True
276 async def write_eof(self, chunk: bytes = b"") -> None:
277 if self._eof:
278 return
280 if chunk and self._on_chunk_sent is not None:
281 await self._on_chunk_sent(chunk)
283 # Handle body/compression
284 if self._compress:
285 chunks: list[bytes] = []
286 chunks_len = 0
287 if chunk and (compressed_chunk := await self._compress.compress(chunk)):
288 chunks_len = len(compressed_chunk)
289 chunks.append(compressed_chunk)
291 flush_chunk = self._compress.flush()
292 chunks_len += len(flush_chunk)
293 chunks.append(flush_chunk)
294 assert chunks_len
296 # Send buffered headers with compressed data if not yet sent
297 if self._headers_buf and not self._headers_written:
298 self._headers_written = True
299 headers_buf = self._headers_buf
300 self._headers_buf = None
302 if self.chunked:
303 # Coalesce headers with compressed chunked data
304 chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii")
305 self._writelines(
306 (headers_buf, chunk_len_pre, *chunks, b"\r\n0\r\n\r\n")
307 )
308 else:
309 # Coalesce headers with compressed data
310 self._writelines((headers_buf, *chunks))
311 await self.drain()
312 self._eof = True
313 return
315 # Headers already sent, just write compressed data
316 if self.chunked:
317 chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii")
318 self._writelines((chunk_len_pre, *chunks, b"\r\n0\r\n\r\n"))
319 elif len(chunks) > 1:
320 self._writelines(chunks)
321 else:
322 self._write(chunks[0])
323 await self.drain()
324 self._eof = True
325 return
327 # No compression - send buffered headers if not yet sent
328 if self._headers_buf and not self._headers_written:
329 # Use helper to send headers with payload
330 self._send_headers_with_payload(chunk, True)
331 await self.drain()
332 self._eof = True
333 return
335 # Handle remaining body
336 if self.chunked:
337 if chunk:
338 # Write final chunk with EOF marker
339 self._writelines(
340 (f"{len(chunk):x}\r\n".encode("ascii"), chunk, b"\r\n0\r\n\r\n")
341 )
342 else:
343 self._write(b"0\r\n\r\n")
344 await self.drain()
345 self._eof = True
346 return
348 if chunk:
349 self._write(chunk)
350 await self.drain()
352 self._eof = True
354 async def drain(self) -> None:
355 """Flush the write buffer.
357 The intended use is to write
359 await w.write(data)
360 await w.drain()
361 """
362 protocol = self._protocol
363 if protocol.transport is not None and protocol._paused:
364 await protocol._drain_helper()
367# https://www.rfc-editor.org/info/rfc9110/#section-5.5-5
368# https://www.rfc-editor.org/info/rfc9112/#section-4-3
369_FORBIDDEN_HEADER_CHARS_RE = re.compile(r"[\x00-\x08\x0a-\x1f\x7f]")
372def _safe_header(string: str) -> str:
373 if _FORBIDDEN_HEADER_CHARS_RE.search(string) is not None:
374 raise ValueError(
375 "Forbidden control character detected in headers. "
376 "Potential header injection attack."
377 )
378 return string
381def _py_serialize_headers(status_line: str, headers: "CIMultiDict[str]") -> bytes:
382 _safe_header(status_line)
383 headers_gen = (_safe_header(k) + ": " + _safe_header(v) for k, v in headers.items())
384 line = status_line + "\r\n" + "\r\n".join(headers_gen) + "\r\n\r\n"
385 return line.encode("utf-8")
388_serialize_headers = _py_serialize_headers
390try:
391 import aiohttp._http_writer as _http_writer # type: ignore[import-not-found]
393 _c_serialize_headers = _http_writer._serialize_headers
394 if not NO_EXTENSIONS:
395 _serialize_headers = _c_serialize_headers
396except ImportError:
397 pass