Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/streams.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

427 statements  

1import asyncio 

2import collections 

3import warnings 

4from typing import ( 

5 Awaitable, 

6 Callable, 

7 Deque, 

8 Final, 

9 Generic, 

10 List, 

11 Optional, 

12 Tuple, 

13 TypeVar, 

14) 

15 

16from .base_protocol import BaseProtocol 

17from .helpers import ( 

18 _EXC_SENTINEL, 

19 BaseTimerContext, 

20 TimerNoop, 

21 set_exception, 

22 set_result, 

23) 

24from .log import internal_logger 

25 

26__all__ = ( 

27 "EMPTY_PAYLOAD", 

28 "EofStream", 

29 "StreamReader", 

30 "DataQueue", 

31) 

32 

33_T = TypeVar("_T") 

34 

35 

36class EofStream(Exception): 

37 """eof stream indication.""" 

38 

39 

40class AsyncStreamIterator(Generic[_T]): 

41 

42 __slots__ = ("read_func",) 

43 

44 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None: 

45 self.read_func = read_func 

46 

47 def __aiter__(self) -> "AsyncStreamIterator[_T]": 

48 return self 

49 

50 async def __anext__(self) -> _T: 

51 try: 

52 rv = await self.read_func() 

53 except EofStream: 

54 raise StopAsyncIteration 

55 if rv == b"": 

56 raise StopAsyncIteration 

57 return rv 

58 

59 

60class ChunkTupleAsyncStreamIterator: 

61 

62 __slots__ = ("_stream",) 

63 

64 def __init__(self, stream: "StreamReader") -> None: 

65 self._stream = stream 

66 

67 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator": 

68 return self 

69 

70 async def __anext__(self) -> Tuple[bytes, bool]: 

71 rv = await self._stream.readchunk() 

72 if rv == (b"", False): 

73 raise StopAsyncIteration 

74 return rv 

75 

76 

77class AsyncStreamReaderMixin: 

78 

79 __slots__ = () 

80 

81 def __aiter__(self) -> AsyncStreamIterator[bytes]: 

82 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined] 

83 

84 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: 

85 """Returns an asynchronous iterator that yields chunks of size n.""" 

86 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined] 

87 

88 def iter_any(self) -> AsyncStreamIterator[bytes]: 

89 """Yield all available data as soon as it is received.""" 

90 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined] 

91 

92 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: 

93 """Yield chunks of data as they are received by the server. 

94 

95 The yielded objects are tuples 

96 of (bytes, bool) as returned by the StreamReader.readchunk method. 

97 """ 

98 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type] 

99 

100 

101class StreamReader(AsyncStreamReaderMixin): 

102 """An enhancement of asyncio.StreamReader. 

103 

104 Supports asynchronous iteration by line, chunk or as available:: 

105 

106 async for line in reader: 

107 ... 

108 async for chunk in reader.iter_chunked(1024): 

109 ... 

110 async for slice in reader.iter_any(): 

111 ... 

112 

113 """ 

114 

115 __slots__ = ( 

116 "_protocol", 

117 "_low_water", 

118 "_high_water", 

119 "_loop", 

120 "_size", 

121 "_cursor", 

122 "_http_chunk_splits", 

123 "_buffer", 

124 "_buffer_offset", 

125 "_eof", 

126 "_waiter", 

127 "_eof_waiter", 

128 "_exception", 

129 "_timer", 

130 "_eof_callbacks", 

131 "_eof_counter", 

132 "total_bytes", 

133 "total_compressed_bytes", 

134 ) 

135 

136 def __init__( 

137 self, 

138 protocol: BaseProtocol, 

139 limit: int, 

140 *, 

141 timer: Optional[BaseTimerContext] = None, 

142 loop: Optional[asyncio.AbstractEventLoop] = None, 

143 ) -> None: 

144 self._protocol = protocol 

145 self._low_water = limit 

146 self._high_water = limit * 2 

147 if loop is None: 

148 loop = asyncio.get_event_loop() 

149 self._loop = loop 

150 self._size = 0 

151 self._cursor = 0 

152 self._http_chunk_splits: Optional[List[int]] = None 

153 self._buffer: Deque[bytes] = collections.deque() 

154 self._buffer_offset = 0 

155 self._eof = False 

156 self._waiter: Optional[asyncio.Future[None]] = None 

157 self._eof_waiter: Optional[asyncio.Future[None]] = None 

158 self._exception: Optional[BaseException] = None 

159 self._timer = TimerNoop() if timer is None else timer 

160 self._eof_callbacks: List[Callable[[], None]] = [] 

161 self._eof_counter = 0 

162 self.total_bytes = 0 

163 self.total_compressed_bytes: Optional[int] = None 

164 

165 def __repr__(self) -> str: 

166 info = [self.__class__.__name__] 

167 if self._size: 

168 info.append("%d bytes" % self._size) 

169 if self._eof: 

170 info.append("eof") 

171 if self._low_water != 2**16: # default limit 

172 info.append("low=%d high=%d" % (self._low_water, self._high_water)) 

173 if self._waiter: 

174 info.append("w=%r" % self._waiter) 

175 if self._exception: 

176 info.append("e=%r" % self._exception) 

177 return "<%s>" % " ".join(info) 

178 

179 def get_read_buffer_limits(self) -> Tuple[int, int]: 

180 return (self._low_water, self._high_water) 

181 

182 def exception(self) -> Optional[BaseException]: 

183 return self._exception 

184 

185 def set_exception( 

186 self, 

187 exc: BaseException, 

188 exc_cause: BaseException = _EXC_SENTINEL, 

189 ) -> None: 

190 self._exception = exc 

191 self._eof_callbacks.clear() 

192 

193 waiter = self._waiter 

194 if waiter is not None: 

195 self._waiter = None 

196 set_exception(waiter, exc, exc_cause) 

197 

198 waiter = self._eof_waiter 

199 if waiter is not None: 

200 self._eof_waiter = None 

201 set_exception(waiter, exc, exc_cause) 

202 

203 def on_eof(self, callback: Callable[[], None]) -> None: 

204 if self._eof: 

205 try: 

206 callback() 

207 except Exception: 

208 internal_logger.exception("Exception in eof callback") 

209 else: 

210 self._eof_callbacks.append(callback) 

211 

212 def feed_eof(self) -> None: 

213 self._eof = True 

214 

215 waiter = self._waiter 

216 if waiter is not None: 

217 self._waiter = None 

218 set_result(waiter, None) 

219 

220 waiter = self._eof_waiter 

221 if waiter is not None: 

222 self._eof_waiter = None 

223 set_result(waiter, None) 

224 

225 if self._protocol._reading_paused: 

226 self._protocol.resume_reading() 

227 

228 for cb in self._eof_callbacks: 

229 try: 

230 cb() 

231 except Exception: 

232 internal_logger.exception("Exception in eof callback") 

233 

234 self._eof_callbacks.clear() 

235 

236 def is_eof(self) -> bool: 

237 """Return True if 'feed_eof' was called.""" 

238 return self._eof 

239 

240 def at_eof(self) -> bool: 

241 """Return True if the buffer is empty and 'feed_eof' was called.""" 

242 return self._eof and not self._buffer 

243 

244 async def wait_eof(self) -> None: 

245 if self._eof: 

246 return 

247 

248 assert self._eof_waiter is None 

249 self._eof_waiter = self._loop.create_future() 

250 try: 

251 await self._eof_waiter 

252 finally: 

253 self._eof_waiter = None 

254 

255 @property 

256 def total_raw_bytes(self) -> int: 

257 if self.total_compressed_bytes is None: 

258 return self.total_bytes 

259 return self.total_compressed_bytes 

260 

261 def unread_data(self, data: bytes) -> None: 

262 """rollback reading some data from stream, inserting it to buffer head.""" 

263 warnings.warn( 

264 "unread_data() is deprecated " 

265 "and will be removed in future releases (#3260)", 

266 DeprecationWarning, 

267 stacklevel=2, 

268 ) 

269 if not data: 

270 return 

271 

272 if self._buffer_offset: 

273 self._buffer[0] = self._buffer[0][self._buffer_offset :] 

274 self._buffer_offset = 0 

275 self._size += len(data) 

276 self._cursor -= len(data) 

277 self._buffer.appendleft(data) 

278 self._eof_counter = 0 

279 

280 # TODO: size is ignored, remove the param later 

281 def feed_data(self, data: bytes, size: int = 0) -> None: 

282 assert not self._eof, "feed_data after feed_eof" 

283 

284 if not data: 

285 return 

286 

287 data_len = len(data) 

288 self._size += data_len 

289 self._buffer.append(data) 

290 self.total_bytes += data_len 

291 

292 waiter = self._waiter 

293 if waiter is not None: 

294 self._waiter = None 

295 set_result(waiter, None) 

296 

297 if self._size > self._high_water and not self._protocol._reading_paused: 

298 self._protocol.pause_reading() 

299 

300 def begin_http_chunk_receiving(self) -> None: 

301 if self._http_chunk_splits is None: 

302 if self.total_bytes: 

303 raise RuntimeError( 

304 "Called begin_http_chunk_receiving when some data was already fed" 

305 ) 

306 self._http_chunk_splits = [] 

307 

308 def end_http_chunk_receiving(self) -> None: 

309 if self._http_chunk_splits is None: 

310 raise RuntimeError( 

311 "Called end_chunk_receiving without calling " 

312 "begin_chunk_receiving first" 

313 ) 

314 

315 # self._http_chunk_splits contains logical byte offsets from start of 

316 # the body transfer. Each offset is the offset of the end of a chunk. 

317 # "Logical" means bytes, accessible for a user. 

318 # If no chunks containing logical data were received, current position 

319 # is difinitely zero. 

320 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0 

321 

322 if self.total_bytes == pos: 

323 # We should not add empty chunks here. So we check for that. 

324 # Note, when chunked + gzip is used, we can receive a chunk 

325 # of compressed data, but that data may not be enough for gzip FSM 

326 # to yield any uncompressed data. That's why current position may 

327 # not change after receiving a chunk. 

328 return 

329 

330 self._http_chunk_splits.append(self.total_bytes) 

331 

332 # wake up readchunk when end of http chunk received 

333 waiter = self._waiter 

334 if waiter is not None: 

335 self._waiter = None 

336 set_result(waiter, None) 

337 

338 async def _wait(self, func_name: str) -> None: 

339 if not self._protocol.connected: 

340 raise RuntimeError("Connection closed.") 

341 

342 # StreamReader uses a future to link the protocol feed_data() method 

343 # to a read coroutine. Running two read coroutines at the same time 

344 # would have an unexpected behaviour. It would not possible to know 

345 # which coroutine would get the next data. 

346 if self._waiter is not None: 

347 raise RuntimeError( 

348 "%s() called while another coroutine is " 

349 "already waiting for incoming data" % func_name 

350 ) 

351 

352 waiter = self._waiter = self._loop.create_future() 

353 try: 

354 with self._timer: 

355 await waiter 

356 finally: 

357 self._waiter = None 

358 

359 async def readline(self) -> bytes: 

360 return await self.readuntil() 

361 

362 async def readuntil(self, separator: bytes = b"\n") -> bytes: 

363 seplen = len(separator) 

364 if seplen == 0: 

365 raise ValueError("Separator should be at least one-byte string") 

366 

367 if self._exception is not None: 

368 raise self._exception 

369 

370 chunk = b"" 

371 chunk_size = 0 

372 not_enough = True 

373 

374 while not_enough: 

375 while self._buffer and not_enough: 

376 offset = self._buffer_offset 

377 ichar = self._buffer[0].find(separator, offset) + 1 

378 # Read from current offset to found separator or to the end. 

379 data = self._read_nowait_chunk( 

380 ichar - offset + seplen - 1 if ichar else -1 

381 ) 

382 chunk += data 

383 chunk_size += len(data) 

384 if ichar: 

385 not_enough = False 

386 

387 if chunk_size > self._high_water: 

388 raise ValueError("Chunk too big") 

389 

390 if self._eof: 

391 break 

392 

393 if not_enough: 

394 await self._wait("readuntil") 

395 

396 return chunk 

397 

398 async def read(self, n: int = -1) -> bytes: 

399 if self._exception is not None: 

400 raise self._exception 

401 

402 # migration problem; with DataQueue you have to catch 

403 # EofStream exception, so common way is to run payload.read() inside 

404 # infinite loop. what can cause real infinite loop with StreamReader 

405 # lets keep this code one major release. 

406 if __debug__: 

407 if self._eof and not self._buffer: 

408 self._eof_counter = getattr(self, "_eof_counter", 0) + 1 

409 if self._eof_counter > 5: 

410 internal_logger.warning( 

411 "Multiple access to StreamReader in eof state, " 

412 "might be infinite loop.", 

413 stack_info=True, 

414 ) 

415 

416 if not n: 

417 return b"" 

418 

419 if n < 0: 

420 # This used to just loop creating a new waiter hoping to 

421 # collect everything in self._buffer, but that would 

422 # deadlock if the subprocess sends more than self.limit 

423 # bytes. So just call self.readany() until EOF. 

424 blocks = [] 

425 while True: 

426 block = await self.readany() 

427 if not block: 

428 break 

429 blocks.append(block) 

430 return b"".join(blocks) 

431 

432 # TODO: should be `if` instead of `while` 

433 # because waiter maybe triggered on chunk end, 

434 # without feeding any data 

435 while not self._buffer and not self._eof: 

436 await self._wait("read") 

437 

438 return self._read_nowait(n) 

439 

440 async def readany(self) -> bytes: 

441 if self._exception is not None: 

442 raise self._exception 

443 

444 # TODO: should be `if` instead of `while` 

445 # because waiter maybe triggered on chunk end, 

446 # without feeding any data 

447 while not self._buffer and not self._eof: 

448 await self._wait("readany") 

449 

450 return self._read_nowait(-1) 

451 

452 async def readchunk(self) -> Tuple[bytes, bool]: 

453 """Returns a tuple of (data, end_of_http_chunk). 

454 

455 When chunked transfer 

456 encoding is used, end_of_http_chunk is a boolean indicating if the end 

457 of the data corresponds to the end of a HTTP chunk , otherwise it is 

458 always False. 

459 """ 

460 while True: 

461 if self._exception is not None: 

462 raise self._exception 

463 

464 while self._http_chunk_splits: 

465 pos = self._http_chunk_splits.pop(0) 

466 if pos == self._cursor: 

467 return (b"", True) 

468 if pos > self._cursor: 

469 return (self._read_nowait(pos - self._cursor), True) 

470 internal_logger.warning( 

471 "Skipping HTTP chunk end due to data " 

472 "consumption beyond chunk boundary" 

473 ) 

474 

475 if self._buffer: 

476 return (self._read_nowait_chunk(-1), False) 

477 # return (self._read_nowait(-1), False) 

478 

479 if self._eof: 

480 # Special case for signifying EOF. 

481 # (b'', True) is not a final return value actually. 

482 return (b"", False) 

483 

484 await self._wait("readchunk") 

485 

486 async def readexactly(self, n: int) -> bytes: 

487 if self._exception is not None: 

488 raise self._exception 

489 

490 blocks: List[bytes] = [] 

491 while n > 0: 

492 block = await self.read(n) 

493 if not block: 

494 partial = b"".join(blocks) 

495 raise asyncio.IncompleteReadError(partial, len(partial) + n) 

496 blocks.append(block) 

497 n -= len(block) 

498 

499 return b"".join(blocks) 

500 

501 def read_nowait(self, n: int = -1) -> bytes: 

502 # default was changed to be consistent with .read(-1) 

503 # 

504 # I believe the most users don't know about the method and 

505 # they are not affected. 

506 if self._exception is not None: 

507 raise self._exception 

508 

509 if self._waiter and not self._waiter.done(): 

510 raise RuntimeError( 

511 "Called while some coroutine is waiting for incoming data." 

512 ) 

513 

514 return self._read_nowait(n) 

515 

516 def _read_nowait_chunk(self, n: int) -> bytes: 

517 first_buffer = self._buffer[0] 

518 offset = self._buffer_offset 

519 if n != -1 and len(first_buffer) - offset > n: 

520 data = first_buffer[offset : offset + n] 

521 self._buffer_offset += n 

522 

523 elif offset: 

524 self._buffer.popleft() 

525 data = first_buffer[offset:] 

526 self._buffer_offset = 0 

527 

528 else: 

529 data = self._buffer.popleft() 

530 

531 data_len = len(data) 

532 self._size -= data_len 

533 self._cursor += data_len 

534 

535 chunk_splits = self._http_chunk_splits 

536 # Prevent memory leak: drop useless chunk splits 

537 while chunk_splits and chunk_splits[0] < self._cursor: 

538 chunk_splits.pop(0) 

539 

540 if self._size < self._low_water and self._protocol._reading_paused: 

541 self._protocol.resume_reading() 

542 return data 

543 

544 def _read_nowait(self, n: int) -> bytes: 

545 """Read not more than n bytes, or whole buffer if n == -1""" 

546 self._timer.assert_timeout() 

547 

548 chunks = [] 

549 while self._buffer: 

550 chunk = self._read_nowait_chunk(n) 

551 chunks.append(chunk) 

552 if n != -1: 

553 n -= len(chunk) 

554 if n == 0: 

555 break 

556 

557 return b"".join(chunks) if chunks else b"" 

558 

559 

560class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init] 

561 

562 __slots__ = ("_read_eof_chunk",) 

563 

564 def __init__(self) -> None: 

565 self._read_eof_chunk = False 

566 self.total_bytes = 0 

567 

568 def __repr__(self) -> str: 

569 return "<%s>" % self.__class__.__name__ 

570 

571 def exception(self) -> Optional[BaseException]: 

572 return None 

573 

574 def set_exception( 

575 self, 

576 exc: BaseException, 

577 exc_cause: BaseException = _EXC_SENTINEL, 

578 ) -> None: 

579 pass 

580 

581 def on_eof(self, callback: Callable[[], None]) -> None: 

582 try: 

583 callback() 

584 except Exception: 

585 internal_logger.exception("Exception in eof callback") 

586 

587 def feed_eof(self) -> None: 

588 pass 

589 

590 def is_eof(self) -> bool: 

591 return True 

592 

593 def at_eof(self) -> bool: 

594 return True 

595 

596 async def wait_eof(self) -> None: 

597 return 

598 

599 def feed_data(self, data: bytes, n: int = 0) -> None: 

600 pass 

601 

602 async def readline(self) -> bytes: 

603 return b"" 

604 

605 async def read(self, n: int = -1) -> bytes: 

606 return b"" 

607 

608 # TODO add async def readuntil 

609 

610 async def readany(self) -> bytes: 

611 return b"" 

612 

613 async def readchunk(self) -> Tuple[bytes, bool]: 

614 if not self._read_eof_chunk: 

615 self._read_eof_chunk = True 

616 return (b"", False) 

617 

618 return (b"", True) 

619 

620 async def readexactly(self, n: int) -> bytes: 

621 raise asyncio.IncompleteReadError(b"", n) 

622 

623 def read_nowait(self, n: int = -1) -> bytes: 

624 return b"" 

625 

626 

627EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader() 

628 

629 

630class DataQueue(Generic[_T]): 

631 """DataQueue is a general-purpose blocking queue with one reader.""" 

632 

633 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

634 self._loop = loop 

635 self._eof = False 

636 self._waiter: Optional[asyncio.Future[None]] = None 

637 self._exception: Optional[BaseException] = None 

638 self._buffer: Deque[Tuple[_T, int]] = collections.deque() 

639 

640 def __len__(self) -> int: 

641 return len(self._buffer) 

642 

643 def is_eof(self) -> bool: 

644 return self._eof 

645 

646 def at_eof(self) -> bool: 

647 return self._eof and not self._buffer 

648 

649 def exception(self) -> Optional[BaseException]: 

650 return self._exception 

651 

652 def set_exception( 

653 self, 

654 exc: BaseException, 

655 exc_cause: BaseException = _EXC_SENTINEL, 

656 ) -> None: 

657 self._eof = True 

658 self._exception = exc 

659 if (waiter := self._waiter) is not None: 

660 self._waiter = None 

661 set_exception(waiter, exc, exc_cause) 

662 

663 def feed_data(self, data: _T, size: int = 0) -> None: 

664 self._buffer.append((data, size)) 

665 if (waiter := self._waiter) is not None: 

666 self._waiter = None 

667 set_result(waiter, None) 

668 

669 def feed_eof(self) -> None: 

670 self._eof = True 

671 if (waiter := self._waiter) is not None: 

672 self._waiter = None 

673 set_result(waiter, None) 

674 

675 async def read(self) -> _T: 

676 if not self._buffer and not self._eof: 

677 assert not self._waiter 

678 self._waiter = self._loop.create_future() 

679 try: 

680 await self._waiter 

681 except (asyncio.CancelledError, asyncio.TimeoutError): 

682 self._waiter = None 

683 raise 

684 if self._buffer: 

685 data, _ = self._buffer.popleft() 

686 return data 

687 if self._exception is not None: 

688 raise self._exception 

689 raise EofStream 

690 

691 def __aiter__(self) -> AsyncStreamIterator[_T]: 

692 return AsyncStreamIterator(self.read) 

693 

694 

695class FlowControlDataQueue(DataQueue[_T]): 

696 """FlowControlDataQueue resumes and pauses an underlying stream. 

697 

698 It is a destination for parsed data. 

699 

700 This class is deprecated and will be removed in version 4.0. 

701 """ 

702 

703 def __init__( 

704 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop 

705 ) -> None: 

706 super().__init__(loop=loop) 

707 self._size = 0 

708 self._protocol = protocol 

709 self._limit = limit * 2 

710 

711 def feed_data(self, data: _T, size: int = 0) -> None: 

712 super().feed_data(data, size) 

713 self._size += size 

714 

715 if self._size > self._limit and not self._protocol._reading_paused: 

716 self._protocol.pause_reading() 

717 

718 async def read(self) -> _T: 

719 if not self._buffer and not self._eof: 

720 assert not self._waiter 

721 self._waiter = self._loop.create_future() 

722 try: 

723 await self._waiter 

724 except (asyncio.CancelledError, asyncio.TimeoutError): 

725 self._waiter = None 

726 raise 

727 if self._buffer: 

728 data, size = self._buffer.popleft() 

729 self._size -= size 

730 if self._size < self._limit and self._protocol._reading_paused: 

731 self._protocol.resume_reading() 

732 return data 

733 if self._exception is not None: 

734 raise self._exception 

735 raise EofStream