Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/streams.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

431 statements  

1import asyncio 

2import collections 

3import warnings 

4from typing import ( 

5 Awaitable, 

6 Callable, 

7 Deque, 

8 Final, 

9 Generic, 

10 List, 

11 Optional, 

12 Tuple, 

13 TypeVar, 

14) 

15 

16from .base_protocol import BaseProtocol 

17from .helpers import ( 

18 _EXC_SENTINEL, 

19 BaseTimerContext, 

20 TimerNoop, 

21 set_exception, 

22 set_result, 

23) 

24from .log import internal_logger 

25 

26__all__ = ( 

27 "EMPTY_PAYLOAD", 

28 "EofStream", 

29 "StreamReader", 

30 "DataQueue", 

31) 

32 

33_T = TypeVar("_T") 

34 

35 

36class EofStream(Exception): 

37 """eof stream indication.""" 

38 

39 

40class AsyncStreamIterator(Generic[_T]): 

41 

42 __slots__ = ("read_func",) 

43 

44 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None: 

45 self.read_func = read_func 

46 

47 def __aiter__(self) -> "AsyncStreamIterator[_T]": 

48 return self 

49 

50 async def __anext__(self) -> _T: 

51 try: 

52 rv = await self.read_func() 

53 except EofStream: 

54 raise StopAsyncIteration 

55 if rv == b"": 

56 raise StopAsyncIteration 

57 return rv 

58 

59 

60class ChunkTupleAsyncStreamIterator: 

61 

62 __slots__ = ("_stream",) 

63 

64 def __init__(self, stream: "StreamReader") -> None: 

65 self._stream = stream 

66 

67 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator": 

68 return self 

69 

70 async def __anext__(self) -> Tuple[bytes, bool]: 

71 rv = await self._stream.readchunk() 

72 if rv == (b"", False): 

73 raise StopAsyncIteration 

74 return rv 

75 

76 

77class AsyncStreamReaderMixin: 

78 

79 __slots__ = () 

80 

81 def __aiter__(self) -> AsyncStreamIterator[bytes]: 

82 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined] 

83 

84 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: 

85 """Returns an asynchronous iterator that yields chunks of size n.""" 

86 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined] 

87 

88 def iter_any(self) -> AsyncStreamIterator[bytes]: 

89 """Yield all available data as soon as it is received.""" 

90 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined] 

91 

92 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: 

93 """Yield chunks of data as they are received by the server. 

94 

95 The yielded objects are tuples 

96 of (bytes, bool) as returned by the StreamReader.readchunk method. 

97 """ 

98 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type] 

99 

100 

101class StreamReader(AsyncStreamReaderMixin): 

102 """An enhancement of asyncio.StreamReader. 

103 

104 Supports asynchronous iteration by line, chunk or as available:: 

105 

106 async for line in reader: 

107 ... 

108 async for chunk in reader.iter_chunked(1024): 

109 ... 

110 async for slice in reader.iter_any(): 

111 ... 

112 

113 """ 

114 

115 __slots__ = ( 

116 "_protocol", 

117 "_low_water", 

118 "_high_water", 

119 "_low_water_chunks", 

120 "_high_water_chunks", 

121 "_loop", 

122 "_size", 

123 "_cursor", 

124 "_http_chunk_splits", 

125 "_buffer", 

126 "_buffer_offset", 

127 "_eof", 

128 "_waiter", 

129 "_eof_waiter", 

130 "_exception", 

131 "_timer", 

132 "_eof_callbacks", 

133 "_eof_counter", 

134 "total_bytes", 

135 "total_compressed_bytes", 

136 ) 

137 

138 def __init__( 

139 self, 

140 protocol: BaseProtocol, 

141 limit: int, 

142 *, 

143 timer: Optional[BaseTimerContext] = None, 

144 loop: Optional[asyncio.AbstractEventLoop] = None, 

145 ) -> None: 

146 self._protocol = protocol 

147 self._low_water = limit 

148 self._high_water = limit * 2 

149 if loop is None: 

150 loop = asyncio.get_event_loop() 

151 # Ensure high_water_chunks >= 3 so it's always > low_water_chunks. 

152 self._high_water_chunks = max(3, limit // 4) 

153 # Use max(2, ...) because there's always at least 1 chunk split remaining 

154 # (the current position), so we need low_water >= 2 to allow resume. 

155 self._low_water_chunks = max(2, self._high_water_chunks // 2) 

156 self._loop = loop 

157 self._size = 0 

158 self._cursor = 0 

159 self._http_chunk_splits: Optional[Deque[int]] = None 

160 self._buffer: Deque[bytes] = collections.deque() 

161 self._buffer_offset = 0 

162 self._eof = False 

163 self._waiter: Optional[asyncio.Future[None]] = None 

164 self._eof_waiter: Optional[asyncio.Future[None]] = None 

165 self._exception: Optional[BaseException] = None 

166 self._timer = TimerNoop() if timer is None else timer 

167 self._eof_callbacks: List[Callable[[], None]] = [] 

168 self._eof_counter = 0 

169 self.total_bytes = 0 

170 self.total_compressed_bytes: Optional[int] = None 

171 

172 def __repr__(self) -> str: 

173 info = [self.__class__.__name__] 

174 if self._size: 

175 info.append("%d bytes" % self._size) 

176 if self._eof: 

177 info.append("eof") 

178 if self._low_water != 2**16: # default limit 

179 info.append("low=%d high=%d" % (self._low_water, self._high_water)) 

180 if self._waiter: 

181 info.append("w=%r" % self._waiter) 

182 if self._exception: 

183 info.append("e=%r" % self._exception) 

184 return "<%s>" % " ".join(info) 

185 

186 def get_read_buffer_limits(self) -> Tuple[int, int]: 

187 return (self._low_water, self._high_water) 

188 

189 def exception(self) -> Optional[BaseException]: 

190 return self._exception 

191 

192 def set_exception( 

193 self, 

194 exc: BaseException, 

195 exc_cause: BaseException = _EXC_SENTINEL, 

196 ) -> None: 

197 self._exception = exc 

198 self._eof_callbacks.clear() 

199 

200 waiter = self._waiter 

201 if waiter is not None: 

202 self._waiter = None 

203 set_exception(waiter, exc, exc_cause) 

204 

205 waiter = self._eof_waiter 

206 if waiter is not None: 

207 self._eof_waiter = None 

208 set_exception(waiter, exc, exc_cause) 

209 

210 def on_eof(self, callback: Callable[[], None]) -> None: 

211 if self._eof: 

212 try: 

213 callback() 

214 except Exception: 

215 internal_logger.exception("Exception in eof callback") 

216 else: 

217 self._eof_callbacks.append(callback) 

218 

219 def feed_eof(self) -> None: 

220 self._eof = True 

221 

222 waiter = self._waiter 

223 if waiter is not None: 

224 self._waiter = None 

225 set_result(waiter, None) 

226 

227 waiter = self._eof_waiter 

228 if waiter is not None: 

229 self._eof_waiter = None 

230 set_result(waiter, None) 

231 

232 if self._protocol._reading_paused: 

233 self._protocol.resume_reading() 

234 

235 for cb in self._eof_callbacks: 

236 try: 

237 cb() 

238 except Exception: 

239 internal_logger.exception("Exception in eof callback") 

240 

241 self._eof_callbacks.clear() 

242 

243 def is_eof(self) -> bool: 

244 """Return True if 'feed_eof' was called.""" 

245 return self._eof 

246 

247 def at_eof(self) -> bool: 

248 """Return True if the buffer is empty and 'feed_eof' was called.""" 

249 return self._eof and not self._buffer 

250 

251 async def wait_eof(self) -> None: 

252 if self._eof: 

253 return 

254 

255 assert self._eof_waiter is None 

256 self._eof_waiter = self._loop.create_future() 

257 try: 

258 await self._eof_waiter 

259 finally: 

260 self._eof_waiter = None 

261 

262 @property 

263 def total_raw_bytes(self) -> int: 

264 if self.total_compressed_bytes is None: 

265 return self.total_bytes 

266 return self.total_compressed_bytes 

267 

268 def unread_data(self, data: bytes) -> None: 

269 """rollback reading some data from stream, inserting it to buffer head.""" 

270 warnings.warn( 

271 "unread_data() is deprecated " 

272 "and will be removed in future releases (#3260)", 

273 DeprecationWarning, 

274 stacklevel=2, 

275 ) 

276 if not data: 

277 return 

278 

279 if self._buffer_offset: 

280 self._buffer[0] = self._buffer[0][self._buffer_offset :] 

281 self._buffer_offset = 0 

282 self._size += len(data) 

283 self._cursor -= len(data) 

284 self._buffer.appendleft(data) 

285 self._eof_counter = 0 

286 

287 # TODO: size is ignored, remove the param later 

288 def feed_data(self, data: bytes, size: int = 0) -> None: 

289 assert not self._eof, "feed_data after feed_eof" 

290 

291 if not data: 

292 return 

293 

294 data_len = len(data) 

295 self._size += data_len 

296 self._buffer.append(data) 

297 self.total_bytes += data_len 

298 

299 waiter = self._waiter 

300 if waiter is not None: 

301 self._waiter = None 

302 set_result(waiter, None) 

303 

304 if self._size > self._high_water and not self._protocol._reading_paused: 

305 self._protocol.pause_reading() 

306 

307 def begin_http_chunk_receiving(self) -> None: 

308 if self._http_chunk_splits is None: 

309 if self.total_bytes: 

310 raise RuntimeError( 

311 "Called begin_http_chunk_receiving when some data was already fed" 

312 ) 

313 self._http_chunk_splits = collections.deque() 

314 

315 def end_http_chunk_receiving(self) -> None: 

316 if self._http_chunk_splits is None: 

317 raise RuntimeError( 

318 "Called end_chunk_receiving without calling " 

319 "begin_chunk_receiving first" 

320 ) 

321 

322 # self._http_chunk_splits contains logical byte offsets from start of 

323 # the body transfer. Each offset is the offset of the end of a chunk. 

324 # "Logical" means bytes, accessible for a user. 

325 # If no chunks containing logical data were received, current position 

326 # is difinitely zero. 

327 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0 

328 

329 if self.total_bytes == pos: 

330 # We should not add empty chunks here. So we check for that. 

331 # Note, when chunked + gzip is used, we can receive a chunk 

332 # of compressed data, but that data may not be enough for gzip FSM 

333 # to yield any uncompressed data. That's why current position may 

334 # not change after receiving a chunk. 

335 return 

336 

337 self._http_chunk_splits.append(self.total_bytes) 

338 

339 # If we get too many small chunks before self._high_water is reached, then any 

340 # .read() call becomes computationally expensive, and could block the event loop 

341 # for too long, hence an additional self._high_water_chunks here. 

342 if ( 

343 len(self._http_chunk_splits) > self._high_water_chunks 

344 and not self._protocol._reading_paused 

345 ): 

346 self._protocol.pause_reading() 

347 

348 # wake up readchunk when end of http chunk received 

349 waiter = self._waiter 

350 if waiter is not None: 

351 self._waiter = None 

352 set_result(waiter, None) 

353 

354 async def _wait(self, func_name: str) -> None: 

355 if not self._protocol.connected: 

356 raise RuntimeError("Connection closed.") 

357 

358 # StreamReader uses a future to link the protocol feed_data() method 

359 # to a read coroutine. Running two read coroutines at the same time 

360 # would have an unexpected behaviour. It would not possible to know 

361 # which coroutine would get the next data. 

362 if self._waiter is not None: 

363 raise RuntimeError( 

364 "%s() called while another coroutine is " 

365 "already waiting for incoming data" % func_name 

366 ) 

367 

368 waiter = self._waiter = self._loop.create_future() 

369 try: 

370 with self._timer: 

371 await waiter 

372 finally: 

373 self._waiter = None 

374 

375 async def readline(self) -> bytes: 

376 return await self.readuntil() 

377 

378 async def readuntil(self, separator: bytes = b"\n") -> bytes: 

379 seplen = len(separator) 

380 if seplen == 0: 

381 raise ValueError("Separator should be at least one-byte string") 

382 

383 if self._exception is not None: 

384 raise self._exception 

385 

386 chunk = b"" 

387 chunk_size = 0 

388 not_enough = True 

389 

390 while not_enough: 

391 while self._buffer and not_enough: 

392 offset = self._buffer_offset 

393 ichar = self._buffer[0].find(separator, offset) + 1 

394 # Read from current offset to found separator or to the end. 

395 data = self._read_nowait_chunk( 

396 ichar - offset + seplen - 1 if ichar else -1 

397 ) 

398 chunk += data 

399 chunk_size += len(data) 

400 if ichar: 

401 not_enough = False 

402 

403 if chunk_size > self._high_water: 

404 raise ValueError("Chunk too big") 

405 

406 if self._eof: 

407 break 

408 

409 if not_enough: 

410 await self._wait("readuntil") 

411 

412 return chunk 

413 

414 async def read(self, n: int = -1) -> bytes: 

415 if self._exception is not None: 

416 raise self._exception 

417 

418 # migration problem; with DataQueue you have to catch 

419 # EofStream exception, so common way is to run payload.read() inside 

420 # infinite loop. what can cause real infinite loop with StreamReader 

421 # lets keep this code one major release. 

422 if __debug__: 

423 if self._eof and not self._buffer: 

424 self._eof_counter = getattr(self, "_eof_counter", 0) + 1 

425 if self._eof_counter > 5: 

426 internal_logger.warning( 

427 "Multiple access to StreamReader in eof state, " 

428 "might be infinite loop.", 

429 stack_info=True, 

430 ) 

431 

432 if not n: 

433 return b"" 

434 

435 if n < 0: 

436 # This used to just loop creating a new waiter hoping to 

437 # collect everything in self._buffer, but that would 

438 # deadlock if the subprocess sends more than self.limit 

439 # bytes. So just call self.readany() until EOF. 

440 blocks = [] 

441 while True: 

442 block = await self.readany() 

443 if not block: 

444 break 

445 blocks.append(block) 

446 return b"".join(blocks) 

447 

448 # TODO: should be `if` instead of `while` 

449 # because waiter maybe triggered on chunk end, 

450 # without feeding any data 

451 while not self._buffer and not self._eof: 

452 await self._wait("read") 

453 

454 return self._read_nowait(n) 

455 

456 async def readany(self) -> bytes: 

457 if self._exception is not None: 

458 raise self._exception 

459 

460 # TODO: should be `if` instead of `while` 

461 # because waiter maybe triggered on chunk end, 

462 # without feeding any data 

463 while not self._buffer and not self._eof: 

464 await self._wait("readany") 

465 

466 return self._read_nowait(-1) 

467 

468 async def readchunk(self) -> Tuple[bytes, bool]: 

469 """Returns a tuple of (data, end_of_http_chunk). 

470 

471 When chunked transfer 

472 encoding is used, end_of_http_chunk is a boolean indicating if the end 

473 of the data corresponds to the end of a HTTP chunk , otherwise it is 

474 always False. 

475 """ 

476 while True: 

477 if self._exception is not None: 

478 raise self._exception 

479 

480 while self._http_chunk_splits: 

481 pos = self._http_chunk_splits.popleft() 

482 if pos == self._cursor: 

483 return (b"", True) 

484 if pos > self._cursor: 

485 return (self._read_nowait(pos - self._cursor), True) 

486 internal_logger.warning( 

487 "Skipping HTTP chunk end due to data " 

488 "consumption beyond chunk boundary" 

489 ) 

490 

491 if self._buffer: 

492 return (self._read_nowait_chunk(-1), False) 

493 # return (self._read_nowait(-1), False) 

494 

495 if self._eof: 

496 # Special case for signifying EOF. 

497 # (b'', True) is not a final return value actually. 

498 return (b"", False) 

499 

500 await self._wait("readchunk") 

501 

502 async def readexactly(self, n: int) -> bytes: 

503 if self._exception is not None: 

504 raise self._exception 

505 

506 blocks: List[bytes] = [] 

507 while n > 0: 

508 block = await self.read(n) 

509 if not block: 

510 partial = b"".join(blocks) 

511 raise asyncio.IncompleteReadError(partial, len(partial) + n) 

512 blocks.append(block) 

513 n -= len(block) 

514 

515 return b"".join(blocks) 

516 

517 def read_nowait(self, n: int = -1) -> bytes: 

518 # default was changed to be consistent with .read(-1) 

519 # 

520 # I believe the most users don't know about the method and 

521 # they are not affected. 

522 if self._exception is not None: 

523 raise self._exception 

524 

525 if self._waiter and not self._waiter.done(): 

526 raise RuntimeError( 

527 "Called while some coroutine is waiting for incoming data." 

528 ) 

529 

530 return self._read_nowait(n) 

531 

532 def _read_nowait_chunk(self, n: int) -> bytes: 

533 first_buffer = self._buffer[0] 

534 offset = self._buffer_offset 

535 if n != -1 and len(first_buffer) - offset > n: 

536 data = first_buffer[offset : offset + n] 

537 self._buffer_offset += n 

538 

539 elif offset: 

540 self._buffer.popleft() 

541 data = first_buffer[offset:] 

542 self._buffer_offset = 0 

543 

544 else: 

545 data = self._buffer.popleft() 

546 

547 data_len = len(data) 

548 self._size -= data_len 

549 self._cursor += data_len 

550 

551 chunk_splits = self._http_chunk_splits 

552 # Prevent memory leak: drop useless chunk splits 

553 while chunk_splits and chunk_splits[0] < self._cursor: 

554 chunk_splits.popleft() 

555 

556 if ( 

557 self._protocol._reading_paused 

558 and self._size < self._low_water 

559 and ( 

560 self._http_chunk_splits is None 

561 or len(self._http_chunk_splits) < self._low_water_chunks 

562 ) 

563 ): 

564 self._protocol.resume_reading() 

565 return data 

566 

567 def _read_nowait(self, n: int) -> bytes: 

568 """Read not more than n bytes, or whole buffer if n == -1""" 

569 self._timer.assert_timeout() 

570 

571 chunks = [] 

572 while self._buffer: 

573 chunk = self._read_nowait_chunk(n) 

574 chunks.append(chunk) 

575 if n != -1: 

576 n -= len(chunk) 

577 if n == 0: 

578 break 

579 

580 return b"".join(chunks) if chunks else b"" 

581 

582 

583class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init] 

584 

585 __slots__ = ("_read_eof_chunk",) 

586 

587 def __init__(self) -> None: 

588 self._read_eof_chunk = False 

589 self.total_bytes = 0 

590 

591 def __repr__(self) -> str: 

592 return "<%s>" % self.__class__.__name__ 

593 

594 def exception(self) -> Optional[BaseException]: 

595 return None 

596 

597 def set_exception( 

598 self, 

599 exc: BaseException, 

600 exc_cause: BaseException = _EXC_SENTINEL, 

601 ) -> None: 

602 pass 

603 

604 def on_eof(self, callback: Callable[[], None]) -> None: 

605 try: 

606 callback() 

607 except Exception: 

608 internal_logger.exception("Exception in eof callback") 

609 

610 def feed_eof(self) -> None: 

611 pass 

612 

613 def is_eof(self) -> bool: 

614 return True 

615 

616 def at_eof(self) -> bool: 

617 return True 

618 

619 async def wait_eof(self) -> None: 

620 return 

621 

622 def feed_data(self, data: bytes, n: int = 0) -> None: 

623 pass 

624 

625 async def readline(self) -> bytes: 

626 return b"" 

627 

628 async def read(self, n: int = -1) -> bytes: 

629 return b"" 

630 

631 # TODO add async def readuntil 

632 

633 async def readany(self) -> bytes: 

634 return b"" 

635 

636 async def readchunk(self) -> Tuple[bytes, bool]: 

637 if not self._read_eof_chunk: 

638 self._read_eof_chunk = True 

639 return (b"", False) 

640 

641 return (b"", True) 

642 

643 async def readexactly(self, n: int) -> bytes: 

644 raise asyncio.IncompleteReadError(b"", n) 

645 

646 def read_nowait(self, n: int = -1) -> bytes: 

647 return b"" 

648 

649 

650EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader() 

651 

652 

653class DataQueue(Generic[_T]): 

654 """DataQueue is a general-purpose blocking queue with one reader.""" 

655 

656 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

657 self._loop = loop 

658 self._eof = False 

659 self._waiter: Optional[asyncio.Future[None]] = None 

660 self._exception: Optional[BaseException] = None 

661 self._buffer: Deque[Tuple[_T, int]] = collections.deque() 

662 

663 def __len__(self) -> int: 

664 return len(self._buffer) 

665 

666 def is_eof(self) -> bool: 

667 return self._eof 

668 

669 def at_eof(self) -> bool: 

670 return self._eof and not self._buffer 

671 

672 def exception(self) -> Optional[BaseException]: 

673 return self._exception 

674 

675 def set_exception( 

676 self, 

677 exc: BaseException, 

678 exc_cause: BaseException = _EXC_SENTINEL, 

679 ) -> None: 

680 self._eof = True 

681 self._exception = exc 

682 if (waiter := self._waiter) is not None: 

683 self._waiter = None 

684 set_exception(waiter, exc, exc_cause) 

685 

686 def feed_data(self, data: _T, size: int = 0) -> None: 

687 self._buffer.append((data, size)) 

688 if (waiter := self._waiter) is not None: 

689 self._waiter = None 

690 set_result(waiter, None) 

691 

692 def feed_eof(self) -> None: 

693 self._eof = True 

694 if (waiter := self._waiter) is not None: 

695 self._waiter = None 

696 set_result(waiter, None) 

697 

698 async def read(self) -> _T: 

699 if not self._buffer and not self._eof: 

700 assert not self._waiter 

701 self._waiter = self._loop.create_future() 

702 try: 

703 await self._waiter 

704 except (asyncio.CancelledError, asyncio.TimeoutError): 

705 self._waiter = None 

706 raise 

707 if self._buffer: 

708 data, _ = self._buffer.popleft() 

709 return data 

710 if self._exception is not None: 

711 raise self._exception 

712 raise EofStream 

713 

714 def __aiter__(self) -> AsyncStreamIterator[_T]: 

715 return AsyncStreamIterator(self.read) 

716 

717 

718class FlowControlDataQueue(DataQueue[_T]): 

719 """FlowControlDataQueue resumes and pauses an underlying stream. 

720 

721 It is a destination for parsed data. 

722 

723 This class is deprecated and will be removed in version 4.0. 

724 """ 

725 

726 def __init__( 

727 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop 

728 ) -> None: 

729 super().__init__(loop=loop) 

730 self._size = 0 

731 self._protocol = protocol 

732 self._limit = limit * 2 

733 

734 def feed_data(self, data: _T, size: int = 0) -> None: 

735 super().feed_data(data, size) 

736 self._size += size 

737 

738 if self._size > self._limit and not self._protocol._reading_paused: 

739 self._protocol.pause_reading() 

740 

741 async def read(self) -> _T: 

742 if not self._buffer and not self._eof: 

743 assert not self._waiter 

744 self._waiter = self._loop.create_future() 

745 try: 

746 await self._waiter 

747 except (asyncio.CancelledError, asyncio.TimeoutError): 

748 self._waiter = None 

749 raise 

750 if self._buffer: 

751 data, size = self._buffer.popleft() 

752 self._size -= size 

753 if self._size < self._limit and self._protocol._reading_paused: 

754 self._protocol.resume_reading() 

755 return data 

756 if self._exception is not None: 

757 raise self._exception 

758 raise EofStream