Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/streams.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

433 statements  

1import asyncio 

2import collections 

3import warnings 

4from typing import ( 

5 Awaitable, 

6 Callable, 

7 Deque, 

8 Final, 

9 Generic, 

10 List, 

11 Optional, 

12 Tuple, 

13 TypeVar, 

14) 

15 

16from .base_protocol import BaseProtocol 

17from .helpers import ( 

18 _EXC_SENTINEL, 

19 BaseTimerContext, 

20 TimerNoop, 

21 set_exception, 

22 set_result, 

23) 

24from .http_exceptions import LineTooLong 

25from .log import internal_logger 

26 

27__all__ = ( 

28 "EMPTY_PAYLOAD", 

29 "EofStream", 

30 "StreamReader", 

31 "DataQueue", 

32) 

33 

34_T = TypeVar("_T") 

35 

36 

37class EofStream(Exception): 

38 """eof stream indication.""" 

39 

40 

41class AsyncStreamIterator(Generic[_T]): 

42 

43 __slots__ = ("read_func",) 

44 

45 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None: 

46 self.read_func = read_func 

47 

48 def __aiter__(self) -> "AsyncStreamIterator[_T]": 

49 return self 

50 

51 async def __anext__(self) -> _T: 

52 try: 

53 rv = await self.read_func() 

54 except EofStream: 

55 raise StopAsyncIteration 

56 if rv == b"": 

57 raise StopAsyncIteration 

58 return rv 

59 

60 

61class ChunkTupleAsyncStreamIterator: 

62 

63 __slots__ = ("_stream",) 

64 

65 def __init__(self, stream: "StreamReader") -> None: 

66 self._stream = stream 

67 

68 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator": 

69 return self 

70 

71 async def __anext__(self) -> Tuple[bytes, bool]: 

72 rv = await self._stream.readchunk() 

73 if rv == (b"", False): 

74 raise StopAsyncIteration 

75 return rv 

76 

77 

78class AsyncStreamReaderMixin: 

79 

80 __slots__ = () 

81 

82 def __aiter__(self) -> AsyncStreamIterator[bytes]: 

83 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined] 

84 

85 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: 

86 """Returns an asynchronous iterator that yields chunks of size n.""" 

87 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined] 

88 

89 def iter_any(self) -> AsyncStreamIterator[bytes]: 

90 """Yield all available data as soon as it is received.""" 

91 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined] 

92 

93 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: 

94 """Yield chunks of data as they are received by the server. 

95 

96 The yielded objects are tuples 

97 of (bytes, bool) as returned by the StreamReader.readchunk method. 

98 """ 

99 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type] 

100 

101 

102class StreamReader(AsyncStreamReaderMixin): 

103 """An enhancement of asyncio.StreamReader. 

104 

105 Supports asynchronous iteration by line, chunk or as available:: 

106 

107 async for line in reader: 

108 ... 

109 async for chunk in reader.iter_chunked(1024): 

110 ... 

111 async for slice in reader.iter_any(): 

112 ... 

113 

114 """ 

115 

116 __slots__ = ( 

117 "_protocol", 

118 "_low_water", 

119 "_high_water", 

120 "_low_water_chunks", 

121 "_high_water_chunks", 

122 "_loop", 

123 "_size", 

124 "_cursor", 

125 "_http_chunk_splits", 

126 "_buffer", 

127 "_buffer_offset", 

128 "_eof", 

129 "_waiter", 

130 "_eof_waiter", 

131 "_exception", 

132 "_timer", 

133 "_eof_callbacks", 

134 "_eof_counter", 

135 "total_bytes", 

136 "total_compressed_bytes", 

137 ) 

138 

139 def __init__( 

140 self, 

141 protocol: BaseProtocol, 

142 limit: int, 

143 *, 

144 timer: Optional[BaseTimerContext] = None, 

145 loop: Optional[asyncio.AbstractEventLoop] = None, 

146 ) -> None: 

147 self._protocol = protocol 

148 self._low_water = limit 

149 self._high_water = limit * 2 

150 if loop is None: 

151 loop = asyncio.get_event_loop() 

152 # Ensure high_water_chunks >= 3 so it's always > low_water_chunks. 

153 self._high_water_chunks = max(3, limit // 4) 

154 # Use max(2, ...) because there's always at least 1 chunk split remaining 

155 # (the current position), so we need low_water >= 2 to allow resume. 

156 self._low_water_chunks = max(2, self._high_water_chunks // 2) 

157 self._loop = loop 

158 self._size = 0 

159 self._cursor = 0 

160 self._http_chunk_splits: Optional[Deque[int]] = None 

161 self._buffer: Deque[bytes] = collections.deque() 

162 self._buffer_offset = 0 

163 self._eof = False 

164 self._waiter: Optional[asyncio.Future[None]] = None 

165 self._eof_waiter: Optional[asyncio.Future[None]] = None 

166 self._exception: Optional[BaseException] = None 

167 self._timer = TimerNoop() if timer is None else timer 

168 self._eof_callbacks: List[Callable[[], None]] = [] 

169 self._eof_counter = 0 

170 self.total_bytes = 0 

171 self.total_compressed_bytes: Optional[int] = None 

172 

173 def __repr__(self) -> str: 

174 info = [self.__class__.__name__] 

175 if self._size: 

176 info.append("%d bytes" % self._size) 

177 if self._eof: 

178 info.append("eof") 

179 if self._low_water != 2**16: # default limit 

180 info.append("low=%d high=%d" % (self._low_water, self._high_water)) 

181 if self._waiter: 

182 info.append("w=%r" % self._waiter) 

183 if self._exception: 

184 info.append("e=%r" % self._exception) 

185 return "<%s>" % " ".join(info) 

186 

187 def get_read_buffer_limits(self) -> Tuple[int, int]: 

188 return (self._low_water, self._high_water) 

189 

190 def exception(self) -> Optional[BaseException]: 

191 return self._exception 

192 

193 def set_exception( 

194 self, 

195 exc: BaseException, 

196 exc_cause: BaseException = _EXC_SENTINEL, 

197 ) -> None: 

198 self._exception = exc 

199 self._eof_callbacks.clear() 

200 

201 waiter = self._waiter 

202 if waiter is not None: 

203 self._waiter = None 

204 set_exception(waiter, exc, exc_cause) 

205 

206 waiter = self._eof_waiter 

207 if waiter is not None: 

208 self._eof_waiter = None 

209 set_exception(waiter, exc, exc_cause) 

210 

211 def on_eof(self, callback: Callable[[], None]) -> None: 

212 if self._eof: 

213 try: 

214 callback() 

215 except Exception: 

216 internal_logger.exception("Exception in eof callback") 

217 else: 

218 self._eof_callbacks.append(callback) 

219 

220 def feed_eof(self) -> None: 

221 self._eof = True 

222 

223 waiter = self._waiter 

224 if waiter is not None: 

225 self._waiter = None 

226 set_result(waiter, None) 

227 

228 waiter = self._eof_waiter 

229 if waiter is not None: 

230 self._eof_waiter = None 

231 set_result(waiter, None) 

232 

233 if self._protocol._reading_paused: 

234 self._protocol.resume_reading() 

235 

236 for cb in self._eof_callbacks: 

237 try: 

238 cb() 

239 except Exception: 

240 internal_logger.exception("Exception in eof callback") 

241 

242 self._eof_callbacks.clear() 

243 

244 def is_eof(self) -> bool: 

245 """Return True if 'feed_eof' was called.""" 

246 return self._eof 

247 

248 def at_eof(self) -> bool: 

249 """Return True if the buffer is empty and 'feed_eof' was called.""" 

250 return self._eof and not self._buffer 

251 

252 async def wait_eof(self) -> None: 

253 if self._eof: 

254 return 

255 

256 assert self._eof_waiter is None 

257 self._eof_waiter = self._loop.create_future() 

258 try: 

259 await self._eof_waiter 

260 finally: 

261 self._eof_waiter = None 

262 

263 @property 

264 def total_raw_bytes(self) -> int: 

265 if self.total_compressed_bytes is None: 

266 return self.total_bytes 

267 return self.total_compressed_bytes 

268 

269 def unread_data(self, data: bytes) -> None: 

270 """rollback reading some data from stream, inserting it to buffer head.""" 

271 warnings.warn( 

272 "unread_data() is deprecated " 

273 "and will be removed in future releases (#3260)", 

274 DeprecationWarning, 

275 stacklevel=2, 

276 ) 

277 if not data: 

278 return 

279 

280 if self._buffer_offset: 

281 self._buffer[0] = self._buffer[0][self._buffer_offset :] 

282 self._buffer_offset = 0 

283 self._size += len(data) 

284 self._cursor -= len(data) 

285 self._buffer.appendleft(data) 

286 self._eof_counter = 0 

287 

288 # TODO: size is ignored, remove the param later 

289 def feed_data(self, data: bytes, size: int = 0) -> None: 

290 assert not self._eof, "feed_data after feed_eof" 

291 

292 if not data: 

293 return 

294 

295 data_len = len(data) 

296 self._size += data_len 

297 self._buffer.append(data) 

298 self.total_bytes += data_len 

299 

300 waiter = self._waiter 

301 if waiter is not None: 

302 self._waiter = None 

303 set_result(waiter, None) 

304 

305 if self._size > self._high_water and not self._protocol._reading_paused: 

306 self._protocol.pause_reading() 

307 

308 def begin_http_chunk_receiving(self) -> None: 

309 if self._http_chunk_splits is None: 

310 if self.total_bytes: 

311 raise RuntimeError( 

312 "Called begin_http_chunk_receiving when some data was already fed" 

313 ) 

314 self._http_chunk_splits = collections.deque() 

315 

316 def end_http_chunk_receiving(self) -> None: 

317 if self._http_chunk_splits is None: 

318 raise RuntimeError( 

319 "Called end_chunk_receiving without calling " 

320 "begin_chunk_receiving first" 

321 ) 

322 

323 # self._http_chunk_splits contains logical byte offsets from start of 

324 # the body transfer. Each offset is the offset of the end of a chunk. 

325 # "Logical" means bytes, accessible for a user. 

326 # If no chunks containing logical data were received, current position 

327 # is difinitely zero. 

328 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0 

329 

330 if self.total_bytes == pos: 

331 # We should not add empty chunks here. So we check for that. 

332 # Note, when chunked + gzip is used, we can receive a chunk 

333 # of compressed data, but that data may not be enough for gzip FSM 

334 # to yield any uncompressed data. That's why current position may 

335 # not change after receiving a chunk. 

336 return 

337 

338 self._http_chunk_splits.append(self.total_bytes) 

339 

340 # If we get too many small chunks before self._high_water is reached, then any 

341 # .read() call becomes computationally expensive, and could block the event loop 

342 # for too long, hence an additional self._high_water_chunks here. 

343 if ( 

344 len(self._http_chunk_splits) > self._high_water_chunks 

345 and not self._protocol._reading_paused 

346 ): 

347 self._protocol.pause_reading() 

348 

349 # wake up readchunk when end of http chunk received 

350 waiter = self._waiter 

351 if waiter is not None: 

352 self._waiter = None 

353 set_result(waiter, None) 

354 

355 async def _wait(self, func_name: str) -> None: 

356 if not self._protocol.connected: 

357 raise RuntimeError("Connection closed.") 

358 

359 # StreamReader uses a future to link the protocol feed_data() method 

360 # to a read coroutine. Running two read coroutines at the same time 

361 # would have an unexpected behaviour. It would not possible to know 

362 # which coroutine would get the next data. 

363 if self._waiter is not None: 

364 raise RuntimeError( 

365 "%s() called while another coroutine is " 

366 "already waiting for incoming data" % func_name 

367 ) 

368 

369 waiter = self._waiter = self._loop.create_future() 

370 try: 

371 with self._timer: 

372 await waiter 

373 finally: 

374 self._waiter = None 

375 

376 async def readline(self, *, max_line_length: Optional[int] = None) -> bytes: 

377 return await self.readuntil(max_size=max_line_length) 

378 

379 async def readuntil( 

380 self, separator: bytes = b"\n", *, max_size: Optional[int] = None 

381 ) -> bytes: 

382 seplen = len(separator) 

383 if seplen == 0: 

384 raise ValueError("Separator should be at least one-byte string") 

385 

386 if self._exception is not None: 

387 raise self._exception 

388 

389 chunk = b"" 

390 chunk_size = 0 

391 not_enough = True 

392 max_size = max_size or self._high_water 

393 

394 while not_enough: 

395 while self._buffer and not_enough: 

396 offset = self._buffer_offset 

397 ichar = self._buffer[0].find(separator, offset) + 1 

398 # Read from current offset to found separator or to the end. 

399 data = self._read_nowait_chunk( 

400 ichar - offset + seplen - 1 if ichar else -1 

401 ) 

402 chunk += data 

403 chunk_size += len(data) 

404 if ichar: 

405 not_enough = False 

406 

407 if chunk_size > max_size: 

408 raise LineTooLong(chunk[:100] + b"...", max_size) 

409 

410 if self._eof: 

411 break 

412 

413 if not_enough: 

414 await self._wait("readuntil") 

415 

416 return chunk 

417 

418 async def read(self, n: int = -1) -> bytes: 

419 if self._exception is not None: 

420 raise self._exception 

421 

422 # migration problem; with DataQueue you have to catch 

423 # EofStream exception, so common way is to run payload.read() inside 

424 # infinite loop. what can cause real infinite loop with StreamReader 

425 # lets keep this code one major release. 

426 if __debug__: 

427 if self._eof and not self._buffer: 

428 self._eof_counter = getattr(self, "_eof_counter", 0) + 1 

429 if self._eof_counter > 5: 

430 internal_logger.warning( 

431 "Multiple access to StreamReader in eof state, " 

432 "might be infinite loop.", 

433 stack_info=True, 

434 ) 

435 

436 if not n: 

437 return b"" 

438 

439 if n < 0: 

440 # This used to just loop creating a new waiter hoping to 

441 # collect everything in self._buffer, but that would 

442 # deadlock if the subprocess sends more than self.limit 

443 # bytes. So just call self.readany() until EOF. 

444 blocks = [] 

445 while True: 

446 block = await self.readany() 

447 if not block: 

448 break 

449 blocks.append(block) 

450 return b"".join(blocks) 

451 

452 # TODO: should be `if` instead of `while` 

453 # because waiter maybe triggered on chunk end, 

454 # without feeding any data 

455 while not self._buffer and not self._eof: 

456 await self._wait("read") 

457 

458 return self._read_nowait(n) 

459 

460 async def readany(self) -> bytes: 

461 if self._exception is not None: 

462 raise self._exception 

463 

464 # TODO: should be `if` instead of `while` 

465 # because waiter maybe triggered on chunk end, 

466 # without feeding any data 

467 while not self._buffer and not self._eof: 

468 await self._wait("readany") 

469 

470 return self._read_nowait(-1) 

471 

472 async def readchunk(self) -> Tuple[bytes, bool]: 

473 """Returns a tuple of (data, end_of_http_chunk). 

474 

475 When chunked transfer 

476 encoding is used, end_of_http_chunk is a boolean indicating if the end 

477 of the data corresponds to the end of a HTTP chunk , otherwise it is 

478 always False. 

479 """ 

480 while True: 

481 if self._exception is not None: 

482 raise self._exception 

483 

484 while self._http_chunk_splits: 

485 pos = self._http_chunk_splits.popleft() 

486 if pos == self._cursor: 

487 return (b"", True) 

488 if pos > self._cursor: 

489 return (self._read_nowait(pos - self._cursor), True) 

490 internal_logger.warning( 

491 "Skipping HTTP chunk end due to data " 

492 "consumption beyond chunk boundary" 

493 ) 

494 

495 if self._buffer: 

496 return (self._read_nowait_chunk(-1), False) 

497 # return (self._read_nowait(-1), False) 

498 

499 if self._eof: 

500 # Special case for signifying EOF. 

501 # (b'', True) is not a final return value actually. 

502 return (b"", False) 

503 

504 await self._wait("readchunk") 

505 

506 async def readexactly(self, n: int) -> bytes: 

507 if self._exception is not None: 

508 raise self._exception 

509 

510 blocks: List[bytes] = [] 

511 while n > 0: 

512 block = await self.read(n) 

513 if not block: 

514 partial = b"".join(blocks) 

515 raise asyncio.IncompleteReadError(partial, len(partial) + n) 

516 blocks.append(block) 

517 n -= len(block) 

518 

519 return b"".join(blocks) 

520 

521 def read_nowait(self, n: int = -1) -> bytes: 

522 # default was changed to be consistent with .read(-1) 

523 # 

524 # I believe the most users don't know about the method and 

525 # they are not affected. 

526 if self._exception is not None: 

527 raise self._exception 

528 

529 if self._waiter and not self._waiter.done(): 

530 raise RuntimeError( 

531 "Called while some coroutine is waiting for incoming data." 

532 ) 

533 

534 return self._read_nowait(n) 

535 

536 def _read_nowait_chunk(self, n: int) -> bytes: 

537 first_buffer = self._buffer[0] 

538 offset = self._buffer_offset 

539 if n != -1 and len(first_buffer) - offset > n: 

540 data = first_buffer[offset : offset + n] 

541 self._buffer_offset += n 

542 

543 elif offset: 

544 self._buffer.popleft() 

545 data = first_buffer[offset:] 

546 self._buffer_offset = 0 

547 

548 else: 

549 data = self._buffer.popleft() 

550 

551 data_len = len(data) 

552 self._size -= data_len 

553 self._cursor += data_len 

554 

555 chunk_splits = self._http_chunk_splits 

556 # Prevent memory leak: drop useless chunk splits 

557 while chunk_splits and chunk_splits[0] < self._cursor: 

558 chunk_splits.popleft() 

559 

560 if ( 

561 self._protocol._reading_paused 

562 and self._size < self._low_water 

563 and ( 

564 self._http_chunk_splits is None 

565 or len(self._http_chunk_splits) < self._low_water_chunks 

566 ) 

567 ): 

568 self._protocol.resume_reading() 

569 return data 

570 

571 def _read_nowait(self, n: int) -> bytes: 

572 """Read not more than n bytes, or whole buffer if n == -1""" 

573 self._timer.assert_timeout() 

574 

575 chunks = [] 

576 while self._buffer: 

577 chunk = self._read_nowait_chunk(n) 

578 chunks.append(chunk) 

579 if n != -1: 

580 n -= len(chunk) 

581 if n == 0: 

582 break 

583 

584 return b"".join(chunks) if chunks else b"" 

585 

586 

587class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init] 

588 

589 __slots__ = ("_read_eof_chunk",) 

590 

591 def __init__(self) -> None: 

592 self._read_eof_chunk = False 

593 self.total_bytes = 0 

594 

595 def __repr__(self) -> str: 

596 return "<%s>" % self.__class__.__name__ 

597 

598 def exception(self) -> Optional[BaseException]: 

599 return None 

600 

601 def set_exception( 

602 self, 

603 exc: BaseException, 

604 exc_cause: BaseException = _EXC_SENTINEL, 

605 ) -> None: 

606 pass 

607 

608 def on_eof(self, callback: Callable[[], None]) -> None: 

609 try: 

610 callback() 

611 except Exception: 

612 internal_logger.exception("Exception in eof callback") 

613 

614 def feed_eof(self) -> None: 

615 pass 

616 

617 def is_eof(self) -> bool: 

618 return True 

619 

620 def at_eof(self) -> bool: 

621 return True 

622 

623 async def wait_eof(self) -> None: 

624 return 

625 

626 def feed_data(self, data: bytes, n: int = 0) -> None: 

627 pass 

628 

629 async def readline(self, *, max_line_length: Optional[int] = None) -> bytes: 

630 return b"" 

631 

632 async def read(self, n: int = -1) -> bytes: 

633 return b"" 

634 

635 # TODO add async def readuntil 

636 

637 async def readany(self) -> bytes: 

638 return b"" 

639 

640 async def readchunk(self) -> Tuple[bytes, bool]: 

641 if not self._read_eof_chunk: 

642 self._read_eof_chunk = True 

643 return (b"", False) 

644 

645 return (b"", True) 

646 

647 async def readexactly(self, n: int) -> bytes: 

648 raise asyncio.IncompleteReadError(b"", n) 

649 

650 def read_nowait(self, n: int = -1) -> bytes: 

651 return b"" 

652 

653 

654EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader() 

655 

656 

657class DataQueue(Generic[_T]): 

658 """DataQueue is a general-purpose blocking queue with one reader.""" 

659 

660 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

661 self._loop = loop 

662 self._eof = False 

663 self._waiter: Optional[asyncio.Future[None]] = None 

664 self._exception: Optional[BaseException] = None 

665 self._buffer: Deque[Tuple[_T, int]] = collections.deque() 

666 

667 def __len__(self) -> int: 

668 return len(self._buffer) 

669 

670 def is_eof(self) -> bool: 

671 return self._eof 

672 

673 def at_eof(self) -> bool: 

674 return self._eof and not self._buffer 

675 

676 def exception(self) -> Optional[BaseException]: 

677 return self._exception 

678 

679 def set_exception( 

680 self, 

681 exc: BaseException, 

682 exc_cause: BaseException = _EXC_SENTINEL, 

683 ) -> None: 

684 self._eof = True 

685 self._exception = exc 

686 if (waiter := self._waiter) is not None: 

687 self._waiter = None 

688 set_exception(waiter, exc, exc_cause) 

689 

690 def feed_data(self, data: _T, size: int = 0) -> None: 

691 self._buffer.append((data, size)) 

692 if (waiter := self._waiter) is not None: 

693 self._waiter = None 

694 set_result(waiter, None) 

695 

696 def feed_eof(self) -> None: 

697 self._eof = True 

698 if (waiter := self._waiter) is not None: 

699 self._waiter = None 

700 set_result(waiter, None) 

701 

702 async def read(self) -> _T: 

703 if not self._buffer and not self._eof: 

704 assert not self._waiter 

705 self._waiter = self._loop.create_future() 

706 try: 

707 await self._waiter 

708 except (asyncio.CancelledError, asyncio.TimeoutError): 

709 self._waiter = None 

710 raise 

711 if self._buffer: 

712 data, _ = self._buffer.popleft() 

713 return data 

714 if self._exception is not None: 

715 raise self._exception 

716 raise EofStream 

717 

718 def __aiter__(self) -> AsyncStreamIterator[_T]: 

719 return AsyncStreamIterator(self.read) 

720 

721 

722class FlowControlDataQueue(DataQueue[_T]): 

723 """FlowControlDataQueue resumes and pauses an underlying stream. 

724 

725 It is a destination for parsed data. 

726 

727 This class is deprecated and will be removed in version 4.0. 

728 """ 

729 

730 def __init__( 

731 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop 

732 ) -> None: 

733 super().__init__(loop=loop) 

734 self._size = 0 

735 self._protocol = protocol 

736 self._limit = limit * 2 

737 

738 def feed_data(self, data: _T, size: int = 0) -> None: 

739 super().feed_data(data, size) 

740 self._size += size 

741 

742 if self._size > self._limit and not self._protocol._reading_paused: 

743 self._protocol.pause_reading() 

744 

745 async def read(self) -> _T: 

746 if not self._buffer and not self._eof: 

747 assert not self._waiter 

748 self._waiter = self._loop.create_future() 

749 try: 

750 await self._waiter 

751 except (asyncio.CancelledError, asyncio.TimeoutError): 

752 self._waiter = None 

753 raise 

754 if self._buffer: 

755 data, size = self._buffer.popleft() 

756 self._size -= size 

757 if self._size < self._limit and self._protocol._reading_paused: 

758 self._protocol.resume_reading() 

759 return data 

760 if self._exception is not None: 

761 raise self._exception 

762 raise EofStream