Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/http_writer.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

219 statements  

1"""Http related parsers and protocol.""" 

2 

3import asyncio 

4import sys 

5from typing import ( # noqa 

6 TYPE_CHECKING, 

7 Any, 

8 Awaitable, 

9 Callable, 

10 Iterable, 

11 List, 

12 NamedTuple, 

13 Optional, 

14 Union, 

15) 

16 

17from multidict import CIMultiDict 

18 

19from .abc import AbstractStreamWriter 

20from .base_protocol import BaseProtocol 

21from .client_exceptions import ClientConnectionResetError 

22from .compression_utils import ZLibCompressor 

23from .helpers import NO_EXTENSIONS 

24 

25__all__ = ("StreamWriter", "HttpVersion", "HttpVersion10", "HttpVersion11") 

26 

27 

28MIN_PAYLOAD_FOR_WRITELINES = 2048 

29IS_PY313_BEFORE_313_2 = (3, 13, 0) <= sys.version_info < (3, 13, 2) 

30IS_PY_BEFORE_312_9 = sys.version_info < (3, 12, 9) 

31SKIP_WRITELINES = IS_PY313_BEFORE_313_2 or IS_PY_BEFORE_312_9 

32# writelines is not safe for use 

33# on Python 3.12+ until 3.12.9 

34# on Python 3.13+ until 3.13.2 

35# and on older versions it not any faster than write 

36# CVE-2024-12254: https://github.com/python/cpython/pull/127656 

37 

38 

39class HttpVersion(NamedTuple): 

40 major: int 

41 minor: int 

42 

43 

44HttpVersion10 = HttpVersion(1, 0) 

45HttpVersion11 = HttpVersion(1, 1) 

46 

47 

48_T_OnChunkSent = Optional[ 

49 Callable[ 

50 [Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]], 

51 Awaitable[None], 

52 ] 

53] 

54_T_OnHeadersSent = Optional[Callable[["CIMultiDict[str]"], Awaitable[None]]] 

55 

56 

57class StreamWriter(AbstractStreamWriter): 

58 

59 length: int | None = None 

60 chunked: bool = False 

61 _eof: bool = False 

62 _compress: ZLibCompressor | None = None 

63 

64 def __init__( 

65 self, 

66 protocol: BaseProtocol, 

67 loop: asyncio.AbstractEventLoop, 

68 on_chunk_sent: _T_OnChunkSent = None, 

69 on_headers_sent: _T_OnHeadersSent = None, 

70 ) -> None: 

71 self._protocol = protocol 

72 self.loop = loop 

73 self._on_chunk_sent: _T_OnChunkSent = on_chunk_sent 

74 self._on_headers_sent: _T_OnHeadersSent = on_headers_sent 

75 self._headers_buf: bytes | None = None 

76 self._headers_written: bool = False 

77 

78 @property 

79 def transport(self) -> asyncio.Transport | None: 

80 return self._protocol.transport 

81 

82 @property 

83 def protocol(self) -> BaseProtocol: 

84 return self._protocol 

85 

86 def enable_chunking(self) -> None: 

87 self.chunked = True 

88 

89 def enable_compression( 

90 self, encoding: str = "deflate", strategy: int | None = None 

91 ) -> None: 

92 self._compress = ZLibCompressor(encoding=encoding, strategy=strategy) 

93 

94 def _write( 

95 self, chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

96 ) -> None: 

97 size = len(chunk) 

98 self.buffer_size += size 

99 self.output_size += size 

100 transport = self._protocol.transport 

101 if transport is None or transport.is_closing(): 

102 raise ClientConnectionResetError("Cannot write to closing transport") 

103 transport.write(chunk) 

104 

105 def _writelines( 

106 self, 

107 chunks: Iterable[ 

108 Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

109 ], 

110 ) -> None: 

111 size = 0 

112 for chunk in chunks: 

113 size += len(chunk) 

114 self.buffer_size += size 

115 self.output_size += size 

116 transport = self._protocol.transport 

117 if transport is None or transport.is_closing(): 

118 raise ClientConnectionResetError("Cannot write to closing transport") 

119 if SKIP_WRITELINES or size < MIN_PAYLOAD_FOR_WRITELINES: 

120 transport.write(b"".join(chunks)) 

121 else: 

122 transport.writelines(chunks) 

123 

124 def _write_chunked_payload( 

125 self, chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

126 ) -> None: 

127 """Write a chunk with proper chunked encoding.""" 

128 chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii") 

129 self._writelines((chunk_len_pre, chunk, b"\r\n")) 

130 

131 def _send_headers_with_payload( 

132 self, 

133 chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"], 

134 is_eof: bool, 

135 ) -> None: 

136 """Send buffered headers with payload, coalescing into single write.""" 

137 # Mark headers as written 

138 self._headers_written = True 

139 headers_buf = self._headers_buf 

140 self._headers_buf = None 

141 

142 if TYPE_CHECKING: 

143 # Safe because callers (write() and write_eof()) only invoke this method 

144 # after checking that self._headers_buf is truthy 

145 assert headers_buf is not None 

146 

147 if not self.chunked: 

148 # Non-chunked: coalesce headers with body 

149 if chunk: 

150 self._writelines((headers_buf, chunk)) 

151 else: 

152 self._write(headers_buf) 

153 return 

154 

155 # Coalesce headers with chunked data 

156 if chunk: 

157 chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii") 

158 if is_eof: 

159 self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n0\r\n\r\n")) 

160 else: 

161 self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n")) 

162 elif is_eof: 

163 self._writelines((headers_buf, b"0\r\n\r\n")) 

164 else: 

165 self._write(headers_buf) 

166 

167 async def write( 

168 self, 

169 chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"], 

170 *, 

171 drain: bool = True, 

172 LIMIT: int = 0x10000, 

173 ) -> None: 

174 """ 

175 Writes chunk of data to a stream. 

176 

177 write_eof() indicates end of stream. 

178 writer can't be used after write_eof() method being called. 

179 write() return drain future. 

180 """ 

181 if self._on_chunk_sent is not None: 

182 await self._on_chunk_sent(chunk) 

183 

184 if isinstance(chunk, memoryview): 

185 if chunk.nbytes != len(chunk): 

186 # just reshape it 

187 chunk = chunk.cast("c") 

188 

189 if self._compress is not None: 

190 chunk = await self._compress.compress(chunk) 

191 if not chunk: 

192 return 

193 

194 if self.length is not None: 

195 chunk_len = len(chunk) 

196 if self.length >= chunk_len: 

197 self.length = self.length - chunk_len 

198 else: 

199 chunk = chunk[: self.length] 

200 self.length = 0 

201 if not chunk: 

202 return 

203 

204 # Handle buffered headers for small payload optimization 

205 if self._headers_buf and not self._headers_written: 

206 self._send_headers_with_payload(chunk, False) 

207 if drain and self.buffer_size > LIMIT: 

208 self.buffer_size = 0 

209 await self.drain() 

210 return 

211 

212 if chunk: 

213 if self.chunked: 

214 self._write_chunked_payload(chunk) 

215 else: 

216 self._write(chunk) 

217 

218 if drain and self.buffer_size > LIMIT: 

219 self.buffer_size = 0 

220 await self.drain() 

221 

222 async def write_headers( 

223 self, status_line: str, headers: "CIMultiDict[str]" 

224 ) -> None: 

225 """Write headers to the stream.""" 

226 if self._on_headers_sent is not None: 

227 await self._on_headers_sent(headers) 

228 # status + headers 

229 buf = _serialize_headers(status_line, headers) 

230 self._headers_written = False 

231 self._headers_buf = buf 

232 

233 def send_headers(self) -> None: 

234 """Force sending buffered headers if not already sent.""" 

235 if not self._headers_buf or self._headers_written: 

236 return 

237 

238 self._headers_written = True 

239 headers_buf = self._headers_buf 

240 self._headers_buf = None 

241 

242 if TYPE_CHECKING: 

243 # Safe because we only enter this block when self._headers_buf is truthy 

244 assert headers_buf is not None 

245 

246 self._write(headers_buf) 

247 

248 def set_eof(self) -> None: 

249 """Indicate that the message is complete.""" 

250 if self._eof: 

251 return 

252 

253 # If headers haven't been sent yet, send them now 

254 # This handles the case where there's no body at all 

255 if self._headers_buf and not self._headers_written: 

256 self._headers_written = True 

257 headers_buf = self._headers_buf 

258 self._headers_buf = None 

259 

260 if TYPE_CHECKING: 

261 # Safe because we only enter this block when self._headers_buf is truthy 

262 assert headers_buf is not None 

263 

264 # Combine headers and chunked EOF marker in a single write 

265 if self.chunked: 

266 self._writelines((headers_buf, b"0\r\n\r\n")) 

267 else: 

268 self._write(headers_buf) 

269 elif self.chunked and self._headers_written: 

270 # Headers already sent, just send the final chunk marker 

271 self._write(b"0\r\n\r\n") 

272 

273 self._eof = True 

274 

275 async def write_eof(self, chunk: bytes = b"") -> None: 

276 if self._eof: 

277 return 

278 

279 if chunk and self._on_chunk_sent is not None: 

280 await self._on_chunk_sent(chunk) 

281 

282 # Handle body/compression 

283 if self._compress: 

284 chunks: list[bytes] = [] 

285 chunks_len = 0 

286 if chunk and (compressed_chunk := await self._compress.compress(chunk)): 

287 chunks_len = len(compressed_chunk) 

288 chunks.append(compressed_chunk) 

289 

290 flush_chunk = self._compress.flush() 

291 chunks_len += len(flush_chunk) 

292 chunks.append(flush_chunk) 

293 assert chunks_len 

294 

295 # Send buffered headers with compressed data if not yet sent 

296 if self._headers_buf and not self._headers_written: 

297 self._headers_written = True 

298 headers_buf = self._headers_buf 

299 self._headers_buf = None 

300 

301 if self.chunked: 

302 # Coalesce headers with compressed chunked data 

303 chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii") 

304 self._writelines( 

305 (headers_buf, chunk_len_pre, *chunks, b"\r\n0\r\n\r\n") 

306 ) 

307 else: 

308 # Coalesce headers with compressed data 

309 self._writelines((headers_buf, *chunks)) 

310 await self.drain() 

311 self._eof = True 

312 return 

313 

314 # Headers already sent, just write compressed data 

315 if self.chunked: 

316 chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii") 

317 self._writelines((chunk_len_pre, *chunks, b"\r\n0\r\n\r\n")) 

318 elif len(chunks) > 1: 

319 self._writelines(chunks) 

320 else: 

321 self._write(chunks[0]) 

322 await self.drain() 

323 self._eof = True 

324 return 

325 

326 # No compression - send buffered headers if not yet sent 

327 if self._headers_buf and not self._headers_written: 

328 # Use helper to send headers with payload 

329 self._send_headers_with_payload(chunk, True) 

330 await self.drain() 

331 self._eof = True 

332 return 

333 

334 # Handle remaining body 

335 if self.chunked: 

336 if chunk: 

337 # Write final chunk with EOF marker 

338 self._writelines( 

339 (f"{len(chunk):x}\r\n".encode("ascii"), chunk, b"\r\n0\r\n\r\n") 

340 ) 

341 else: 

342 self._write(b"0\r\n\r\n") 

343 await self.drain() 

344 self._eof = True 

345 return 

346 

347 if chunk: 

348 self._write(chunk) 

349 await self.drain() 

350 

351 self._eof = True 

352 

353 async def drain(self) -> None: 

354 """Flush the write buffer. 

355 

356 The intended use is to write 

357 

358 await w.write(data) 

359 await w.drain() 

360 """ 

361 protocol = self._protocol 

362 if protocol.transport is not None and protocol._paused: 

363 await protocol._drain_helper() 

364 

365 

366def _safe_header(string: str) -> str: 

367 if "\r" in string or "\n" in string or "\x00" in string: 

368 raise ValueError( 

369 "Newline, carriage return, or null byte detected in headers. " 

370 "Potential header injection attack." 

371 ) 

372 return string 

373 

374 

375def _py_serialize_headers(status_line: str, headers: "CIMultiDict[str]") -> bytes: 

376 _safe_header(status_line) 

377 headers_gen = (_safe_header(k) + ": " + _safe_header(v) for k, v in headers.items()) 

378 line = status_line + "\r\n" + "\r\n".join(headers_gen) + "\r\n\r\n" 

379 return line.encode("utf-8") 

380 

381 

382_serialize_headers = _py_serialize_headers 

383 

384try: 

385 import aiohttp._http_writer as _http_writer # type: ignore[import-not-found] 

386 

387 _c_serialize_headers = _http_writer._serialize_headers 

388 if not NO_EXTENSIONS: 

389 _serialize_headers = _c_serialize_headers 

390except ImportError: 

391 pass