Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/http_writer.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

225 statements  

1"""Http related parsers and protocol.""" 

2 

3import asyncio 

4import re 

5import sys 

6from typing import ( # noqa 

7 TYPE_CHECKING, 

8 Any, 

9 Awaitable, 

10 Callable, 

11 Iterable, 

12 List, 

13 NamedTuple, 

14 Optional, 

15) 

16 

17from multidict import CIMultiDict 

18 

19from .abc import AbstractStreamWriter 

20from .base_protocol import BaseProtocol 

21from .client_exceptions import ClientConnectionResetError 

22from .compression_utils import ZLibCompressor 

23from .helpers import NO_EXTENSIONS 

24 

25__all__ = ("StreamWriter", "HttpVersion", "HttpVersion10", "HttpVersion11") 

26 

27if sys.version_info >= (3, 12): 

28 from collections.abc import Buffer 

29else: 

30 from typing import Union 

31 

32 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

33 

34 

35MIN_PAYLOAD_FOR_WRITELINES = 2048 

36IS_PY313_BEFORE_313_2 = (3, 13, 0) <= sys.version_info < (3, 13, 2) 

37IS_PY_BEFORE_312_9 = sys.version_info < (3, 12, 9) 

38SKIP_WRITELINES = IS_PY313_BEFORE_313_2 or IS_PY_BEFORE_312_9 

39# writelines is not safe for use 

40# on Python 3.12+ until 3.12.9 

41# on Python 3.13+ until 3.13.2 

42# and on older versions it not any faster than write 

43# CVE-2024-12254: https://github.com/python/cpython/pull/127656 

44 

45 

46class HttpVersion(NamedTuple): 

47 major: int 

48 minor: int 

49 

50 

51HttpVersion10 = HttpVersion(1, 0) 

52HttpVersion11 = HttpVersion(1, 1) 

53 

54 

55_T_OnChunkSent = Optional[Callable[[Buffer], Awaitable[None]]] 

56_T_OnHeadersSent = Optional[Callable[["CIMultiDict[str]"], Awaitable[None]]] 

57 

58 

59class StreamWriter(AbstractStreamWriter): 

60 

61 length: int | None = None 

62 chunked: bool = False 

63 _eof: bool = False 

64 _compress: ZLibCompressor | None = None 

65 

66 def __init__( 

67 self, 

68 protocol: BaseProtocol, 

69 loop: asyncio.AbstractEventLoop, 

70 on_chunk_sent: _T_OnChunkSent = None, 

71 on_headers_sent: _T_OnHeadersSent = None, 

72 ) -> None: 

73 self._protocol = protocol 

74 self.loop = loop 

75 self._on_chunk_sent: _T_OnChunkSent = on_chunk_sent 

76 self._on_headers_sent: _T_OnHeadersSent = on_headers_sent 

77 self._headers_buf: bytes | None = None 

78 self._headers_written: bool = False 

79 

80 @property 

81 def transport(self) -> asyncio.Transport | None: 

82 return self._protocol.transport 

83 

84 @property 

85 def protocol(self) -> BaseProtocol: 

86 return self._protocol 

87 

88 def enable_chunking(self) -> None: 

89 self.chunked = True 

90 

91 def enable_compression( 

92 self, encoding: str = "deflate", strategy: int | None = None 

93 ) -> None: 

94 self._compress = ZLibCompressor(encoding=encoding, strategy=strategy) 

95 

96 def _write(self, chunk: Buffer) -> None: 

97 size = len(chunk) 

98 self.buffer_size += size 

99 self.output_size += size 

100 transport = self._protocol.transport 

101 if transport is None or transport.is_closing(): 

102 raise ClientConnectionResetError("Cannot write to closing transport") 

103 transport.write(chunk) 

104 

105 def _writelines(self, chunks: Iterable[Buffer]) -> None: 

106 size = 0 

107 for chunk in chunks: 

108 size += len(chunk) 

109 self.buffer_size += size 

110 self.output_size += size 

111 transport = self._protocol.transport 

112 if transport is None or transport.is_closing(): 

113 raise ClientConnectionResetError("Cannot write to closing transport") 

114 if SKIP_WRITELINES or size < MIN_PAYLOAD_FOR_WRITELINES: 

115 transport.write(b"".join(chunks)) 

116 else: 

117 transport.writelines(chunks) 

118 

119 def _write_chunked_payload(self, chunk: Buffer) -> None: 

120 """Write a chunk with proper chunked encoding.""" 

121 chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii") 

122 self._writelines((chunk_len_pre, chunk, b"\r\n")) 

123 

124 def _send_headers_with_payload(self, chunk: Buffer, is_eof: bool) -> None: 

125 """Send buffered headers with payload, coalescing into single write.""" 

126 # Mark headers as written 

127 self._headers_written = True 

128 headers_buf = self._headers_buf 

129 self._headers_buf = None 

130 

131 if TYPE_CHECKING: 

132 # Safe because callers (write() and write_eof()) only invoke this method 

133 # after checking that self._headers_buf is truthy 

134 assert headers_buf is not None 

135 

136 if not self.chunked: 

137 # Non-chunked: coalesce headers with body 

138 if chunk: 

139 self._writelines((headers_buf, chunk)) 

140 else: 

141 self._write(headers_buf) 

142 return 

143 

144 # Coalesce headers with chunked data 

145 if chunk: 

146 chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii") 

147 if is_eof: 

148 self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n0\r\n\r\n")) 

149 else: 

150 self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n")) 

151 elif is_eof: 

152 self._writelines((headers_buf, b"0\r\n\r\n")) 

153 else: 

154 self._write(headers_buf) 

155 

156 async def write( 

157 self, chunk: Buffer, *, drain: bool = True, LIMIT: int = 0x10000 

158 ) -> None: 

159 """ 

160 Writes chunk of data to a stream. 

161 

162 write_eof() indicates end of stream. 

163 writer can't be used after write_eof() method being called. 

164 write() return drain future. 

165 """ 

166 if self._on_chunk_sent is not None: 

167 await self._on_chunk_sent(chunk) 

168 

169 if isinstance(chunk, memoryview): 

170 if chunk.nbytes != len(chunk): 

171 # just reshape it 

172 chunk = chunk.cast("c") 

173 

174 if self._compress is not None: 

175 chunk = await self._compress.compress(chunk) 

176 if not chunk: 

177 return 

178 

179 if self.length is not None: 

180 chunk_len = len(chunk) 

181 if self.length >= chunk_len: 

182 self.length = self.length - chunk_len 

183 else: 

184 chunk = chunk[: self.length] 

185 self.length = 0 

186 if not chunk: 

187 return 

188 

189 # Handle buffered headers for small payload optimization 

190 if self._headers_buf and not self._headers_written: 

191 self._send_headers_with_payload(chunk, False) 

192 if drain and self.buffer_size > LIMIT: 

193 self.buffer_size = 0 

194 await self.drain() 

195 return 

196 

197 if chunk: 

198 if self.chunked: 

199 self._write_chunked_payload(chunk) 

200 else: 

201 self._write(chunk) 

202 

203 if drain and self.buffer_size > LIMIT: 

204 self.buffer_size = 0 

205 await self.drain() 

206 

207 async def write_headers( 

208 self, status_line: str, headers: "CIMultiDict[str]" 

209 ) -> None: 

210 """Write headers to the stream.""" 

211 if self._on_headers_sent is not None: 

212 await self._on_headers_sent(headers) 

213 # status + headers 

214 buf = _serialize_headers(status_line, headers) 

215 self._headers_written = False 

216 self._headers_buf = buf 

217 

218 def send_headers(self) -> None: 

219 """Force sending buffered headers if not already sent.""" 

220 if not self._headers_buf or self._headers_written: 

221 return 

222 

223 self._headers_written = True 

224 headers_buf = self._headers_buf 

225 self._headers_buf = None 

226 

227 if TYPE_CHECKING: 

228 # Safe because we only enter this block when self._headers_buf is truthy 

229 assert headers_buf is not None 

230 

231 self._write(headers_buf) 

232 

233 def set_eof(self) -> None: 

234 """Indicate that the message is complete.""" 

235 if self._eof: 

236 return 

237 

238 # If headers haven't been sent yet, send them now 

239 # This handles the case where there's no body at all 

240 if self._headers_buf and not self._headers_written: 

241 self._headers_written = True 

242 headers_buf = self._headers_buf 

243 self._headers_buf = None 

244 

245 if TYPE_CHECKING: 

246 # Safe because we only enter this block when self._headers_buf is truthy 

247 assert headers_buf is not None 

248 

249 # Combine headers and chunked EOF marker in a single write 

250 if self.chunked: 

251 self._writelines((headers_buf, b"0\r\n\r\n")) 

252 else: 

253 self._write(headers_buf) 

254 elif self.chunked and self._headers_written: 

255 # Headers already sent, just send the final chunk marker 

256 self._write(b"0\r\n\r\n") 

257 

258 self._eof = True 

259 

260 async def write_eof(self, chunk: bytes = b"") -> None: 

261 if self._eof: 

262 return 

263 

264 if chunk and self._on_chunk_sent is not None: 

265 await self._on_chunk_sent(chunk) 

266 

267 # Handle body/compression 

268 if self._compress: 

269 chunks: list[bytes] = [] 

270 chunks_len = 0 

271 if chunk and (compressed_chunk := await self._compress.compress(chunk)): 

272 chunks_len = len(compressed_chunk) 

273 chunks.append(compressed_chunk) 

274 

275 flush_chunk = self._compress.flush() 

276 chunks_len += len(flush_chunk) 

277 chunks.append(flush_chunk) 

278 assert chunks_len 

279 

280 # Send buffered headers with compressed data if not yet sent 

281 if self._headers_buf and not self._headers_written: 

282 self._headers_written = True 

283 headers_buf = self._headers_buf 

284 self._headers_buf = None 

285 

286 if self.chunked: 

287 # Coalesce headers with compressed chunked data 

288 chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii") 

289 self._writelines( 

290 (headers_buf, chunk_len_pre, *chunks, b"\r\n0\r\n\r\n") 

291 ) 

292 else: 

293 # Coalesce headers with compressed data 

294 self._writelines((headers_buf, *chunks)) 

295 await self.drain() 

296 self._eof = True 

297 return 

298 

299 # Headers already sent, just write compressed data 

300 if self.chunked: 

301 chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii") 

302 self._writelines((chunk_len_pre, *chunks, b"\r\n0\r\n\r\n")) 

303 elif len(chunks) > 1: 

304 self._writelines(chunks) 

305 else: 

306 self._write(chunks[0]) 

307 await self.drain() 

308 self._eof = True 

309 return 

310 

311 # No compression - send buffered headers if not yet sent 

312 if self._headers_buf and not self._headers_written: 

313 # Use helper to send headers with payload 

314 self._send_headers_with_payload(chunk, True) 

315 await self.drain() 

316 self._eof = True 

317 return 

318 

319 # Handle remaining body 

320 if self.chunked: 

321 if chunk: 

322 # Write final chunk with EOF marker 

323 self._writelines( 

324 (f"{len(chunk):x}\r\n".encode("ascii"), chunk, b"\r\n0\r\n\r\n") 

325 ) 

326 else: 

327 self._write(b"0\r\n\r\n") 

328 await self.drain() 

329 self._eof = True 

330 return 

331 

332 if chunk: 

333 self._write(chunk) 

334 await self.drain() 

335 

336 self._eof = True 

337 

338 async def drain(self) -> None: 

339 """Flush the write buffer. 

340 

341 The intended use is to write 

342 

343 await w.write(data) 

344 await w.drain() 

345 """ 

346 protocol = self._protocol 

347 if protocol.transport is not None and protocol._paused: 

348 await protocol._drain_helper() 

349 

350 

351# https://www.rfc-editor.org/info/rfc9110/#section-5.5-5 

352# https://www.rfc-editor.org/info/rfc9112/#section-4-3 

353_FORBIDDEN_HEADER_CHARS_RE = re.compile(r"[\x00-\x08\x0a-\x1f\x7f]") 

354 

355 

356def _safe_header(string: str) -> str: 

357 if _FORBIDDEN_HEADER_CHARS_RE.search(string) is not None: 

358 raise ValueError( 

359 "Forbidden control character detected in headers. " 

360 "Potential header injection attack." 

361 ) 

362 return string 

363 

364 

365def _py_serialize_headers(status_line: str, headers: "CIMultiDict[str]") -> bytes: 

366 _safe_header(status_line) 

367 headers_gen = (_safe_header(k) + ": " + _safe_header(v) for k, v in headers.items()) 

368 line = status_line + "\r\n" + "\r\n".join(headers_gen) + "\r\n\r\n" 

369 return line.encode("utf-8") 

370 

371 

372_serialize_headers = _py_serialize_headers 

373 

374try: 

375 import aiohttp._http_writer as _http_writer # type: ignore[import-not-found] 

376 

377 _c_serialize_headers = _http_writer._serialize_headers 

378 if not NO_EXTENSIONS: 

379 _serialize_headers = _c_serialize_headers 

380except ImportError: 

381 pass