Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/http_writer.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

221 statements  

1"""Http related parsers and protocol.""" 

2 

3import asyncio 

4import re 

5import sys 

6from typing import ( # noqa 

7 TYPE_CHECKING, 

8 Any, 

9 Awaitable, 

10 Callable, 

11 Iterable, 

12 List, 

13 NamedTuple, 

14 Optional, 

15 Union, 

16) 

17 

18from multidict import CIMultiDict 

19 

20from .abc import AbstractStreamWriter 

21from .base_protocol import BaseProtocol 

22from .client_exceptions import ClientConnectionResetError 

23from .compression_utils import ZLibCompressor 

24from .helpers import NO_EXTENSIONS 

25 

26__all__ = ("StreamWriter", "HttpVersion", "HttpVersion10", "HttpVersion11") 

27 

28 

29MIN_PAYLOAD_FOR_WRITELINES = 2048 

30IS_PY313_BEFORE_313_2 = (3, 13, 0) <= sys.version_info < (3, 13, 2) 

31IS_PY_BEFORE_312_9 = sys.version_info < (3, 12, 9) 

32SKIP_WRITELINES = IS_PY313_BEFORE_313_2 or IS_PY_BEFORE_312_9 

33# writelines is not safe for use 

34# on Python 3.12+ until 3.12.9 

35# on Python 3.13+ until 3.13.2 

36# and on older versions it not any faster than write 

37# CVE-2024-12254: https://github.com/python/cpython/pull/127656 

38 

39 

40class HttpVersion(NamedTuple): 

41 major: int 

42 minor: int 

43 

44 

45HttpVersion10 = HttpVersion(1, 0) 

46HttpVersion11 = HttpVersion(1, 1) 

47 

48 

49_T_OnChunkSent = Optional[ 

50 Callable[ 

51 [Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]], 

52 Awaitable[None], 

53 ] 

54] 

55_T_OnHeadersSent = Optional[Callable[["CIMultiDict[str]"], Awaitable[None]]] 

56 

57 

58class StreamWriter(AbstractStreamWriter): 

59 

60 length: int | None = None 

61 chunked: bool = False 

62 _eof: bool = False 

63 _compress: ZLibCompressor | None = None 

64 

65 def __init__( 

66 self, 

67 protocol: BaseProtocol, 

68 loop: asyncio.AbstractEventLoop, 

69 on_chunk_sent: _T_OnChunkSent = None, 

70 on_headers_sent: _T_OnHeadersSent = None, 

71 ) -> None: 

72 self._protocol = protocol 

73 self.loop = loop 

74 self._on_chunk_sent: _T_OnChunkSent = on_chunk_sent 

75 self._on_headers_sent: _T_OnHeadersSent = on_headers_sent 

76 self._headers_buf: bytes | None = None 

77 self._headers_written: bool = False 

78 

79 @property 

80 def transport(self) -> asyncio.Transport | None: 

81 return self._protocol.transport 

82 

83 @property 

84 def protocol(self) -> BaseProtocol: 

85 return self._protocol 

86 

87 def enable_chunking(self) -> None: 

88 self.chunked = True 

89 

90 def enable_compression( 

91 self, encoding: str = "deflate", strategy: int | None = None 

92 ) -> None: 

93 self._compress = ZLibCompressor(encoding=encoding, strategy=strategy) 

94 

95 def _write( 

96 self, chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

97 ) -> None: 

98 size = len(chunk) 

99 self.buffer_size += size 

100 self.output_size += size 

101 transport = self._protocol.transport 

102 if transport is None or transport.is_closing(): 

103 raise ClientConnectionResetError("Cannot write to closing transport") 

104 transport.write(chunk) 

105 

106 def _writelines( 

107 self, 

108 chunks: Iterable[ 

109 Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

110 ], 

111 ) -> None: 

112 size = 0 

113 for chunk in chunks: 

114 size += len(chunk) 

115 self.buffer_size += size 

116 self.output_size += size 

117 transport = self._protocol.transport 

118 if transport is None or transport.is_closing(): 

119 raise ClientConnectionResetError("Cannot write to closing transport") 

120 if SKIP_WRITELINES or size < MIN_PAYLOAD_FOR_WRITELINES: 

121 transport.write(b"".join(chunks)) 

122 else: 

123 transport.writelines(chunks) 

124 

125 def _write_chunked_payload( 

126 self, chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

127 ) -> None: 

128 """Write a chunk with proper chunked encoding.""" 

129 chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii") 

130 self._writelines((chunk_len_pre, chunk, b"\r\n")) 

131 

132 def _send_headers_with_payload( 

133 self, 

134 chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"], 

135 is_eof: bool, 

136 ) -> None: 

137 """Send buffered headers with payload, coalescing into single write.""" 

138 # Mark headers as written 

139 self._headers_written = True 

140 headers_buf = self._headers_buf 

141 self._headers_buf = None 

142 

143 if TYPE_CHECKING: 

144 # Safe because callers (write() and write_eof()) only invoke this method 

145 # after checking that self._headers_buf is truthy 

146 assert headers_buf is not None 

147 

148 if not self.chunked: 

149 # Non-chunked: coalesce headers with body 

150 if chunk: 

151 self._writelines((headers_buf, chunk)) 

152 else: 

153 self._write(headers_buf) 

154 return 

155 

156 # Coalesce headers with chunked data 

157 if chunk: 

158 chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii") 

159 if is_eof: 

160 self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n0\r\n\r\n")) 

161 else: 

162 self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n")) 

163 elif is_eof: 

164 self._writelines((headers_buf, b"0\r\n\r\n")) 

165 else: 

166 self._write(headers_buf) 

167 

168 async def write( 

169 self, 

170 chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"], 

171 *, 

172 drain: bool = True, 

173 LIMIT: int = 0x10000, 

174 ) -> None: 

175 """ 

176 Writes chunk of data to a stream. 

177 

178 write_eof() indicates end of stream. 

179 writer can't be used after write_eof() method being called. 

180 write() return drain future. 

181 """ 

182 if self._on_chunk_sent is not None: 

183 await self._on_chunk_sent(chunk) 

184 

185 if isinstance(chunk, memoryview): 

186 if chunk.nbytes != len(chunk): 

187 # just reshape it 

188 chunk = chunk.cast("c") 

189 

190 if self._compress is not None: 

191 chunk = await self._compress.compress(chunk) 

192 if not chunk: 

193 return 

194 

195 if self.length is not None: 

196 chunk_len = len(chunk) 

197 if self.length >= chunk_len: 

198 self.length = self.length - chunk_len 

199 else: 

200 chunk = chunk[: self.length] 

201 self.length = 0 

202 if not chunk: 

203 return 

204 

205 # Handle buffered headers for small payload optimization 

206 if self._headers_buf and not self._headers_written: 

207 self._send_headers_with_payload(chunk, False) 

208 if drain and self.buffer_size > LIMIT: 

209 self.buffer_size = 0 

210 await self.drain() 

211 return 

212 

213 if chunk: 

214 if self.chunked: 

215 self._write_chunked_payload(chunk) 

216 else: 

217 self._write(chunk) 

218 

219 if drain and self.buffer_size > LIMIT: 

220 self.buffer_size = 0 

221 await self.drain() 

222 

223 async def write_headers( 

224 self, status_line: str, headers: "CIMultiDict[str]" 

225 ) -> None: 

226 """Write headers to the stream.""" 

227 if self._on_headers_sent is not None: 

228 await self._on_headers_sent(headers) 

229 # status + headers 

230 buf = _serialize_headers(status_line, headers) 

231 self._headers_written = False 

232 self._headers_buf = buf 

233 

234 def send_headers(self) -> None: 

235 """Force sending buffered headers if not already sent.""" 

236 if not self._headers_buf or self._headers_written: 

237 return 

238 

239 self._headers_written = True 

240 headers_buf = self._headers_buf 

241 self._headers_buf = None 

242 

243 if TYPE_CHECKING: 

244 # Safe because we only enter this block when self._headers_buf is truthy 

245 assert headers_buf is not None 

246 

247 self._write(headers_buf) 

248 

249 def set_eof(self) -> None: 

250 """Indicate that the message is complete.""" 

251 if self._eof: 

252 return 

253 

254 # If headers haven't been sent yet, send them now 

255 # This handles the case where there's no body at all 

256 if self._headers_buf and not self._headers_written: 

257 self._headers_written = True 

258 headers_buf = self._headers_buf 

259 self._headers_buf = None 

260 

261 if TYPE_CHECKING: 

262 # Safe because we only enter this block when self._headers_buf is truthy 

263 assert headers_buf is not None 

264 

265 # Combine headers and chunked EOF marker in a single write 

266 if self.chunked: 

267 self._writelines((headers_buf, b"0\r\n\r\n")) 

268 else: 

269 self._write(headers_buf) 

270 elif self.chunked and self._headers_written: 

271 # Headers already sent, just send the final chunk marker 

272 self._write(b"0\r\n\r\n") 

273 

274 self._eof = True 

275 

276 async def write_eof(self, chunk: bytes = b"") -> None: 

277 if self._eof: 

278 return 

279 

280 if chunk and self._on_chunk_sent is not None: 

281 await self._on_chunk_sent(chunk) 

282 

283 # Handle body/compression 

284 if self._compress: 

285 chunks: list[bytes] = [] 

286 chunks_len = 0 

287 if chunk and (compressed_chunk := await self._compress.compress(chunk)): 

288 chunks_len = len(compressed_chunk) 

289 chunks.append(compressed_chunk) 

290 

291 flush_chunk = self._compress.flush() 

292 chunks_len += len(flush_chunk) 

293 chunks.append(flush_chunk) 

294 assert chunks_len 

295 

296 # Send buffered headers with compressed data if not yet sent 

297 if self._headers_buf and not self._headers_written: 

298 self._headers_written = True 

299 headers_buf = self._headers_buf 

300 self._headers_buf = None 

301 

302 if self.chunked: 

303 # Coalesce headers with compressed chunked data 

304 chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii") 

305 self._writelines( 

306 (headers_buf, chunk_len_pre, *chunks, b"\r\n0\r\n\r\n") 

307 ) 

308 else: 

309 # Coalesce headers with compressed data 

310 self._writelines((headers_buf, *chunks)) 

311 await self.drain() 

312 self._eof = True 

313 return 

314 

315 # Headers already sent, just write compressed data 

316 if self.chunked: 

317 chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii") 

318 self._writelines((chunk_len_pre, *chunks, b"\r\n0\r\n\r\n")) 

319 elif len(chunks) > 1: 

320 self._writelines(chunks) 

321 else: 

322 self._write(chunks[0]) 

323 await self.drain() 

324 self._eof = True 

325 return 

326 

327 # No compression - send buffered headers if not yet sent 

328 if self._headers_buf and not self._headers_written: 

329 # Use helper to send headers with payload 

330 self._send_headers_with_payload(chunk, True) 

331 await self.drain() 

332 self._eof = True 

333 return 

334 

335 # Handle remaining body 

336 if self.chunked: 

337 if chunk: 

338 # Write final chunk with EOF marker 

339 self._writelines( 

340 (f"{len(chunk):x}\r\n".encode("ascii"), chunk, b"\r\n0\r\n\r\n") 

341 ) 

342 else: 

343 self._write(b"0\r\n\r\n") 

344 await self.drain() 

345 self._eof = True 

346 return 

347 

348 if chunk: 

349 self._write(chunk) 

350 await self.drain() 

351 

352 self._eof = True 

353 

354 async def drain(self) -> None: 

355 """Flush the write buffer. 

356 

357 The intended use is to write 

358 

359 await w.write(data) 

360 await w.drain() 

361 """ 

362 protocol = self._protocol 

363 if protocol.transport is not None and protocol._paused: 

364 await protocol._drain_helper() 

365 

366 

367# https://www.rfc-editor.org/info/rfc9110/#section-5.5-5 

368# https://www.rfc-editor.org/info/rfc9112/#section-4-3 

369_FORBIDDEN_HEADER_CHARS_RE = re.compile(r"[\x00-\x08\x0a-\x1f\x7f]") 

370 

371 

372def _safe_header(string: str) -> str: 

373 if _FORBIDDEN_HEADER_CHARS_RE.search(string) is not None: 

374 raise ValueError( 

375 "Forbidden control character detected in headers. " 

376 "Potential header injection attack." 

377 ) 

378 return string 

379 

380 

381def _py_serialize_headers(status_line: str, headers: "CIMultiDict[str]") -> bytes: 

382 _safe_header(status_line) 

383 headers_gen = (_safe_header(k) + ": " + _safe_header(v) for k, v in headers.items()) 

384 line = status_line + "\r\n" + "\r\n".join(headers_gen) + "\r\n\r\n" 

385 return line.encode("utf-8") 

386 

387 

388_serialize_headers = _py_serialize_headers 

389 

390try: 

391 import aiohttp._http_writer as _http_writer # type: ignore[import-not-found] 

392 

393 _c_serialize_headers = _http_writer._serialize_headers 

394 if not NO_EXTENSIONS: 

395 _serialize_headers = _c_serialize_headers 

396except ImportError: 

397 pass