Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/streams.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

397 statements  

1import asyncio 

2import collections 

3import warnings 

4from collections.abc import Awaitable, Callable 

5from typing import Final, Generic, TypeVar 

6 

7from .base_protocol import BaseProtocol 

8from .helpers import ( 

9 _EXC_SENTINEL, 

10 BaseTimerContext, 

11 TimerNoop, 

12 set_exception, 

13 set_result, 

14) 

15from .http_exceptions import LineTooLong 

16from .log import internal_logger 

17 

18__all__ = ( 

19 "EMPTY_PAYLOAD", 

20 "EofStream", 

21 "StreamReader", 

22 "DataQueue", 

23) 

24 

25_T = TypeVar("_T") 

26 

27 

28class EofStream(Exception): 

29 """eof stream indication.""" 

30 

31 

32class AsyncStreamIterator(Generic[_T]): 

33 

34 __slots__ = ("read_func",) 

35 

36 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None: 

37 self.read_func = read_func 

38 

39 def __aiter__(self) -> "AsyncStreamIterator[_T]": 

40 return self 

41 

42 async def __anext__(self) -> _T: 

43 try: 

44 rv = await self.read_func() 

45 except EofStream: 

46 raise StopAsyncIteration 

47 if rv == b"": 

48 raise StopAsyncIteration 

49 return rv 

50 

51 

52class ChunkTupleAsyncStreamIterator: 

53 

54 __slots__ = ("_stream",) 

55 

56 def __init__(self, stream: "StreamReader") -> None: 

57 self._stream = stream 

58 

59 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator": 

60 return self 

61 

62 async def __anext__(self) -> tuple[bytes, bool]: 

63 rv = await self._stream.readchunk() 

64 if rv == (b"", False): 

65 raise StopAsyncIteration 

66 return rv 

67 

68 

69class AsyncStreamReaderMixin: 

70 

71 __slots__ = () 

72 

73 def __aiter__(self) -> AsyncStreamIterator[bytes]: 

74 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined] 

75 

76 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: 

77 """Returns an asynchronous iterator that yields chunks of size n.""" 

78 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined] 

79 

80 def iter_any(self) -> AsyncStreamIterator[bytes]: 

81 """Yield all available data as soon as it is received.""" 

82 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined] 

83 

84 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: 

85 """Yield chunks of data as they are received by the server. 

86 

87 The yielded objects are tuples 

88 of (bytes, bool) as returned by the StreamReader.readchunk method. 

89 """ 

90 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type] 

91 

92 

93class StreamReader(AsyncStreamReaderMixin): 

94 """An enhancement of asyncio.StreamReader. 

95 

96 Supports asynchronous iteration by line, chunk or as available:: 

97 

98 async for line in reader: 

99 ... 

100 async for chunk in reader.iter_chunked(1024): 

101 ... 

102 async for slice in reader.iter_any(): 

103 ... 

104 

105 """ 

106 

107 __slots__ = ( 

108 "_protocol", 

109 "_low_water", 

110 "_high_water", 

111 "_low_water_chunks", 

112 "_high_water_chunks", 

113 "_loop", 

114 "_size", 

115 "_cursor", 

116 "_http_chunk_splits", 

117 "_buffer", 

118 "_buffer_offset", 

119 "_eof", 

120 "_waiter", 

121 "_eof_waiter", 

122 "_exception", 

123 "_timer", 

124 "_eof_callbacks", 

125 "_eof_counter", 

126 "total_bytes", 

127 "total_compressed_bytes", 

128 ) 

129 

130 def __init__( 

131 self, 

132 protocol: BaseProtocol, 

133 limit: int, 

134 *, 

135 timer: BaseTimerContext | None = None, 

136 loop: asyncio.AbstractEventLoop, 

137 ) -> None: 

138 self._protocol = protocol 

139 self._low_water = limit 

140 self._high_water = limit * 2 

141 # Ensure high_water_chunks >= 3 so it's always > low_water_chunks. 

142 self._high_water_chunks = max(3, limit // 4) 

143 # Use max(2, ...) because there's always at least 1 chunk split remaining 

144 # (the current position), so we need low_water >= 2 to allow resume. 

145 self._low_water_chunks = max(2, self._high_water_chunks // 2) 

146 self._loop = loop 

147 self._size = 0 

148 self._cursor = 0 

149 self._http_chunk_splits: collections.deque[int] | None = None 

150 self._buffer: collections.deque[bytes] = collections.deque() 

151 self._buffer_offset = 0 

152 self._eof = False 

153 self._waiter: asyncio.Future[None] | None = None 

154 self._eof_waiter: asyncio.Future[None] | None = None 

155 self._exception: type[BaseException] | BaseException | None = None 

156 self._timer = TimerNoop() if timer is None else timer 

157 self._eof_callbacks: list[Callable[[], None]] = [] 

158 self._eof_counter = 0 

159 self.total_bytes = 0 

160 self.total_compressed_bytes: int | None = None 

161 

162 def __repr__(self) -> str: 

163 info = [self.__class__.__name__] 

164 if self._size: 

165 info.append("%d bytes" % self._size) 

166 if self._eof: 

167 info.append("eof") 

168 if self._low_water != 2**16: # default limit 

169 info.append("low=%d high=%d" % (self._low_water, self._high_water)) 

170 if self._waiter: 

171 info.append("w=%r" % self._waiter) 

172 if self._exception: 

173 info.append("e=%r" % self._exception) 

174 return "<%s>" % " ".join(info) 

175 

176 def get_read_buffer_limits(self) -> tuple[int, int]: 

177 return (self._low_water, self._high_water) 

178 

179 def exception(self) -> type[BaseException] | BaseException | None: 

180 return self._exception 

181 

182 def set_exception( 

183 self, 

184 exc: type[BaseException] | BaseException, 

185 exc_cause: BaseException = _EXC_SENTINEL, 

186 ) -> None: 

187 self._exception = exc 

188 self._eof_callbacks.clear() 

189 

190 waiter = self._waiter 

191 if waiter is not None: 

192 self._waiter = None 

193 set_exception(waiter, exc, exc_cause) 

194 

195 waiter = self._eof_waiter 

196 if waiter is not None: 

197 self._eof_waiter = None 

198 set_exception(waiter, exc, exc_cause) 

199 

200 def on_eof(self, callback: Callable[[], None]) -> None: 

201 if self._eof: 

202 try: 

203 callback() 

204 except Exception: 

205 internal_logger.exception("Exception in eof callback") 

206 else: 

207 self._eof_callbacks.append(callback) 

208 

209 def feed_eof(self) -> None: 

210 self._eof = True 

211 

212 waiter = self._waiter 

213 if waiter is not None: 

214 self._waiter = None 

215 set_result(waiter, None) 

216 

217 waiter = self._eof_waiter 

218 if waiter is not None: 

219 self._eof_waiter = None 

220 set_result(waiter, None) 

221 

222 if self._protocol._reading_paused: 

223 self._protocol.resume_reading() 

224 

225 for cb in self._eof_callbacks: 

226 try: 

227 cb() 

228 except Exception: 

229 internal_logger.exception("Exception in eof callback") 

230 

231 self._eof_callbacks.clear() 

232 

233 def is_eof(self) -> bool: 

234 """Return True if 'feed_eof' was called.""" 

235 return self._eof 

236 

237 def at_eof(self) -> bool: 

238 """Return True if the buffer is empty and 'feed_eof' was called.""" 

239 return self._eof and not self._buffer 

240 

241 async def wait_eof(self) -> None: 

242 if self._eof: 

243 return 

244 

245 assert self._eof_waiter is None 

246 self._eof_waiter = self._loop.create_future() 

247 try: 

248 await self._eof_waiter 

249 finally: 

250 self._eof_waiter = None 

251 

252 @property 

253 def total_raw_bytes(self) -> int: 

254 if self.total_compressed_bytes is None: 

255 return self.total_bytes 

256 return self.total_compressed_bytes 

257 

258 def unread_data(self, data: bytes) -> None: 

259 """rollback reading some data from stream, inserting it to buffer head.""" 

260 warnings.warn( 

261 "unread_data() is deprecated " 

262 "and will be removed in future releases (#3260)", 

263 DeprecationWarning, 

264 stacklevel=2, 

265 ) 

266 if not data: 

267 return 

268 

269 if self._buffer_offset: 

270 self._buffer[0] = self._buffer[0][self._buffer_offset :] 

271 self._buffer_offset = 0 

272 self._size += len(data) 

273 self._cursor -= len(data) 

274 self._buffer.appendleft(data) 

275 self._eof_counter = 0 

276 

277 def feed_data(self, data: bytes) -> None: 

278 assert not self._eof, "feed_data after feed_eof" 

279 

280 if not data: 

281 return 

282 

283 data_len = len(data) 

284 self._size += data_len 

285 self._buffer.append(data) 

286 self.total_bytes += data_len 

287 

288 waiter = self._waiter 

289 if waiter is not None: 

290 self._waiter = None 

291 set_result(waiter, None) 

292 

293 if self._size > self._high_water and not self._protocol._reading_paused: 

294 self._protocol.pause_reading() 

295 

296 def begin_http_chunk_receiving(self) -> None: 

297 if self._http_chunk_splits is None: 

298 if self.total_bytes: 

299 raise RuntimeError( 

300 "Called begin_http_chunk_receiving when some data was already fed" 

301 ) 

302 self._http_chunk_splits = collections.deque() 

303 

304 def end_http_chunk_receiving(self) -> None: 

305 if self._http_chunk_splits is None: 

306 raise RuntimeError( 

307 "Called end_chunk_receiving without calling " 

308 "begin_chunk_receiving first" 

309 ) 

310 

311 # self._http_chunk_splits contains logical byte offsets from start of 

312 # the body transfer. Each offset is the offset of the end of a chunk. 

313 # "Logical" means bytes, accessible for a user. 

314 # If no chunks containing logical data were received, current position 

315 # is difinitely zero. 

316 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0 

317 

318 if self.total_bytes == pos: 

319 # We should not add empty chunks here. So we check for that. 

320 # Note, when chunked + gzip is used, we can receive a chunk 

321 # of compressed data, but that data may not be enough for gzip FSM 

322 # to yield any uncompressed data. That's why current position may 

323 # not change after receiving a chunk. 

324 return 

325 

326 self._http_chunk_splits.append(self.total_bytes) 

327 

328 # If we get too many small chunks before self._high_water is reached, then any 

329 # .read() call becomes computationally expensive, and could block the event loop 

330 # for too long, hence an additional self._high_water_chunks here. 

331 if ( 

332 len(self._http_chunk_splits) > self._high_water_chunks 

333 and not self._protocol._reading_paused 

334 ): 

335 self._protocol.pause_reading() 

336 

337 # wake up readchunk when end of http chunk received 

338 waiter = self._waiter 

339 if waiter is not None: 

340 self._waiter = None 

341 set_result(waiter, None) 

342 

343 async def _wait(self, func_name: str) -> None: 

344 if not self._protocol.connected: 

345 raise RuntimeError("Connection closed.") 

346 

347 # StreamReader uses a future to link the protocol feed_data() method 

348 # to a read coroutine. Running two read coroutines at the same time 

349 # would have an unexpected behaviour. It would not possible to know 

350 # which coroutine would get the next data. 

351 if self._waiter is not None: 

352 raise RuntimeError( 

353 "%s() called while another coroutine is " 

354 "already waiting for incoming data" % func_name 

355 ) 

356 

357 waiter = self._waiter = self._loop.create_future() 

358 try: 

359 with self._timer: 

360 await waiter 

361 finally: 

362 self._waiter = None 

363 

364 async def readline(self, *, max_line_length: int | None = None) -> bytes: 

365 return await self.readuntil(max_size=max_line_length) 

366 

367 async def readuntil( 

368 self, separator: bytes = b"\n", *, max_size: int | None = None 

369 ) -> bytes: 

370 seplen = len(separator) 

371 if seplen == 0: 

372 raise ValueError("Separator should be at least one-byte string") 

373 

374 if self._exception is not None: 

375 raise self._exception 

376 

377 chunk = b"" 

378 chunk_size = 0 

379 not_enough = True 

380 max_size = max_size or self._high_water 

381 

382 while not_enough: 

383 while self._buffer and not_enough: 

384 offset = self._buffer_offset 

385 ichar = self._buffer[0].find(separator, offset) + 1 

386 # Read from current offset to found separator or to the end. 

387 data = self._read_nowait_chunk( 

388 ichar - offset + seplen - 1 if ichar else -1 

389 ) 

390 chunk += data 

391 chunk_size += len(data) 

392 if ichar: 

393 not_enough = False 

394 

395 if chunk_size > max_size: 

396 raise LineTooLong(chunk[:100] + b"...", max_size) 

397 

398 if self._eof: 

399 break 

400 

401 if not_enough: 

402 await self._wait("readuntil") 

403 

404 return chunk 

405 

406 async def read(self, n: int = -1) -> bytes: 

407 if self._exception is not None: 

408 raise self._exception 

409 

410 if not n: 

411 return b"" 

412 

413 if n < 0: 

414 # This used to just loop creating a new waiter hoping to 

415 # collect everything in self._buffer, but that would 

416 # deadlock if the subprocess sends more than self.limit 

417 # bytes. So just call self.readany() until EOF. 

418 blocks = [] 

419 while True: 

420 block = await self.readany() 

421 if not block: 

422 break 

423 blocks.append(block) 

424 return b"".join(blocks) 

425 

426 # TODO: should be `if` instead of `while` 

427 # because waiter maybe triggered on chunk end, 

428 # without feeding any data 

429 while not self._buffer and not self._eof: 

430 await self._wait("read") 

431 

432 return self._read_nowait(n) 

433 

434 async def readany(self) -> bytes: 

435 if self._exception is not None: 

436 raise self._exception 

437 

438 # TODO: should be `if` instead of `while` 

439 # because waiter maybe triggered on chunk end, 

440 # without feeding any data 

441 while not self._buffer and not self._eof: 

442 await self._wait("readany") 

443 

444 return self._read_nowait(-1) 

445 

446 async def readchunk(self) -> tuple[bytes, bool]: 

447 """Returns a tuple of (data, end_of_http_chunk). 

448 

449 When chunked transfer 

450 encoding is used, end_of_http_chunk is a boolean indicating if the end 

451 of the data corresponds to the end of a HTTP chunk , otherwise it is 

452 always False. 

453 """ 

454 while True: 

455 if self._exception is not None: 

456 raise self._exception 

457 

458 while self._http_chunk_splits: 

459 pos = self._http_chunk_splits.popleft() 

460 if pos == self._cursor: 

461 return (b"", True) 

462 if pos > self._cursor: 

463 return (self._read_nowait(pos - self._cursor), True) 

464 internal_logger.warning( 

465 "Skipping HTTP chunk end due to data " 

466 "consumption beyond chunk boundary" 

467 ) 

468 

469 if self._buffer: 

470 return (self._read_nowait_chunk(-1), False) 

471 # return (self._read_nowait(-1), False) 

472 

473 if self._eof: 

474 # Special case for signifying EOF. 

475 # (b'', True) is not a final return value actually. 

476 return (b"", False) 

477 

478 await self._wait("readchunk") 

479 

480 async def readexactly(self, n: int) -> bytes: 

481 if self._exception is not None: 

482 raise self._exception 

483 

484 blocks: list[bytes] = [] 

485 while n > 0: 

486 block = await self.read(n) 

487 if not block: 

488 partial = b"".join(blocks) 

489 raise asyncio.IncompleteReadError(partial, len(partial) + n) 

490 blocks.append(block) 

491 n -= len(block) 

492 

493 return b"".join(blocks) 

494 

495 def read_nowait(self, n: int = -1) -> bytes: 

496 # default was changed to be consistent with .read(-1) 

497 # 

498 # I believe the most users don't know about the method and 

499 # they are not affected. 

500 if self._exception is not None: 

501 raise self._exception 

502 

503 if self._waiter and not self._waiter.done(): 

504 raise RuntimeError( 

505 "Called while some coroutine is waiting for incoming data." 

506 ) 

507 

508 return self._read_nowait(n) 

509 

510 def _read_nowait_chunk(self, n: int) -> bytes: 

511 first_buffer = self._buffer[0] 

512 offset = self._buffer_offset 

513 if n != -1 and len(first_buffer) - offset > n: 

514 data = first_buffer[offset : offset + n] 

515 self._buffer_offset += n 

516 

517 elif offset: 

518 self._buffer.popleft() 

519 data = first_buffer[offset:] 

520 self._buffer_offset = 0 

521 

522 else: 

523 data = self._buffer.popleft() 

524 

525 data_len = len(data) 

526 self._size -= data_len 

527 self._cursor += data_len 

528 

529 chunk_splits = self._http_chunk_splits 

530 # Prevent memory leak: drop useless chunk splits 

531 while chunk_splits and chunk_splits[0] < self._cursor: 

532 chunk_splits.popleft() 

533 

534 if ( 

535 self._protocol._reading_paused 

536 and self._size < self._low_water 

537 and ( 

538 self._http_chunk_splits is None 

539 or len(self._http_chunk_splits) < self._low_water_chunks 

540 ) 

541 ): 

542 self._protocol.resume_reading() 

543 return data 

544 

545 def _read_nowait(self, n: int) -> bytes: 

546 """Read not more than n bytes, or whole buffer if n == -1""" 

547 self._timer.assert_timeout() 

548 

549 chunks = [] 

550 while self._buffer: 

551 chunk = self._read_nowait_chunk(n) 

552 chunks.append(chunk) 

553 if n != -1: 

554 n -= len(chunk) 

555 if n == 0: 

556 break 

557 

558 return b"".join(chunks) if chunks else b"" 

559 

560 

561class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init] 

562 

563 __slots__ = ("_read_eof_chunk",) 

564 

565 def __init__(self) -> None: 

566 self._read_eof_chunk = False 

567 self.total_bytes = 0 

568 

569 def __repr__(self) -> str: 

570 return "<%s>" % self.__class__.__name__ 

571 

572 def exception(self) -> BaseException | None: 

573 return None 

574 

575 def set_exception( 

576 self, 

577 exc: type[BaseException] | BaseException, 

578 exc_cause: BaseException = _EXC_SENTINEL, 

579 ) -> None: 

580 pass 

581 

582 def on_eof(self, callback: Callable[[], None]) -> None: 

583 try: 

584 callback() 

585 except Exception: 

586 internal_logger.exception("Exception in eof callback") 

587 

588 def feed_eof(self) -> None: 

589 pass 

590 

591 def is_eof(self) -> bool: 

592 return True 

593 

594 def at_eof(self) -> bool: 

595 return True 

596 

597 async def wait_eof(self) -> None: 

598 return 

599 

600 def feed_data(self, data: bytes) -> None: 

601 pass 

602 

603 async def readline(self, *, max_line_length: int | None = None) -> bytes: 

604 return b"" 

605 

606 async def read(self, n: int = -1) -> bytes: 

607 return b"" 

608 

609 # TODO add async def readuntil 

610 

611 async def readany(self) -> bytes: 

612 return b"" 

613 

614 async def readchunk(self) -> tuple[bytes, bool]: 

615 if not self._read_eof_chunk: 

616 self._read_eof_chunk = True 

617 return (b"", False) 

618 

619 return (b"", True) 

620 

621 async def readexactly(self, n: int) -> bytes: 

622 raise asyncio.IncompleteReadError(b"", n) 

623 

624 def read_nowait(self, n: int = -1) -> bytes: 

625 return b"" 

626 

627 

628EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader() 

629 

630 

631class DataQueue(Generic[_T]): 

632 """DataQueue is a general-purpose blocking queue with one reader.""" 

633 

634 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

635 self._loop = loop 

636 self._eof = False 

637 self._waiter: asyncio.Future[None] | None = None 

638 self._exception: type[BaseException] | BaseException | None = None 

639 self._buffer: collections.deque[_T] = collections.deque() 

640 

641 def __len__(self) -> int: 

642 return len(self._buffer) 

643 

644 def is_eof(self) -> bool: 

645 return self._eof 

646 

647 def at_eof(self) -> bool: 

648 return self._eof and not self._buffer 

649 

650 def exception(self) -> type[BaseException] | BaseException | None: 

651 return self._exception 

652 

653 def set_exception( 

654 self, 

655 exc: type[BaseException] | BaseException, 

656 exc_cause: BaseException = _EXC_SENTINEL, 

657 ) -> None: 

658 self._eof = True 

659 self._exception = exc 

660 if (waiter := self._waiter) is not None: 

661 self._waiter = None 

662 set_exception(waiter, exc, exc_cause) 

663 

664 def feed_data(self, data: _T) -> None: 

665 self._buffer.append(data) 

666 if (waiter := self._waiter) is not None: 

667 self._waiter = None 

668 set_result(waiter, None) 

669 

670 def feed_eof(self) -> None: 

671 self._eof = True 

672 if (waiter := self._waiter) is not None: 

673 self._waiter = None 

674 set_result(waiter, None) 

675 

676 async def read(self) -> _T: 

677 if not self._buffer and not self._eof: 

678 assert not self._waiter 

679 self._waiter = self._loop.create_future() 

680 try: 

681 await self._waiter 

682 except (asyncio.CancelledError, asyncio.TimeoutError): 

683 self._waiter = None 

684 raise 

685 if self._buffer: 

686 return self._buffer.popleft() 

687 if self._exception is not None: 

688 raise self._exception 

689 raise EofStream 

690 

691 def __aiter__(self) -> AsyncStreamIterator[_T]: 

692 return AsyncStreamIterator(self.read)