Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/streams.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

421 statements  

1import asyncio 

2import collections 

3import warnings 

4from typing import ( 

5 Awaitable, 

6 Callable, 

7 Deque, 

8 Final, 

9 Generic, 

10 List, 

11 Optional, 

12 Tuple, 

13 TypeVar, 

14) 

15 

16from .base_protocol import BaseProtocol 

17from .helpers import ( 

18 _EXC_SENTINEL, 

19 BaseTimerContext, 

20 TimerNoop, 

21 set_exception, 

22 set_result, 

23) 

24from .log import internal_logger 

25 

26__all__ = ( 

27 "EMPTY_PAYLOAD", 

28 "EofStream", 

29 "StreamReader", 

30 "DataQueue", 

31) 

32 

33_T = TypeVar("_T") 

34 

35 

36class EofStream(Exception): 

37 """eof stream indication.""" 

38 

39 

40class AsyncStreamIterator(Generic[_T]): 

41 

42 __slots__ = ("read_func",) 

43 

44 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None: 

45 self.read_func = read_func 

46 

47 def __aiter__(self) -> "AsyncStreamIterator[_T]": 

48 return self 

49 

50 async def __anext__(self) -> _T: 

51 try: 

52 rv = await self.read_func() 

53 except EofStream: 

54 raise StopAsyncIteration 

55 if rv == b"": 

56 raise StopAsyncIteration 

57 return rv 

58 

59 

60class ChunkTupleAsyncStreamIterator: 

61 

62 __slots__ = ("_stream",) 

63 

64 def __init__(self, stream: "StreamReader") -> None: 

65 self._stream = stream 

66 

67 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator": 

68 return self 

69 

70 async def __anext__(self) -> Tuple[bytes, bool]: 

71 rv = await self._stream.readchunk() 

72 if rv == (b"", False): 

73 raise StopAsyncIteration 

74 return rv 

75 

76 

77class AsyncStreamReaderMixin: 

78 

79 __slots__ = () 

80 

81 def __aiter__(self) -> AsyncStreamIterator[bytes]: 

82 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined] 

83 

84 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: 

85 """Returns an asynchronous iterator that yields chunks of size n.""" 

86 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined] 

87 

88 def iter_any(self) -> AsyncStreamIterator[bytes]: 

89 """Yield all available data as soon as it is received.""" 

90 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined] 

91 

92 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: 

93 """Yield chunks of data as they are received by the server. 

94 

95 The yielded objects are tuples 

96 of (bytes, bool) as returned by the StreamReader.readchunk method. 

97 """ 

98 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type] 

99 

100 

101class StreamReader(AsyncStreamReaderMixin): 

102 """An enhancement of asyncio.StreamReader. 

103 

104 Supports asynchronous iteration by line, chunk or as available:: 

105 

106 async for line in reader: 

107 ... 

108 async for chunk in reader.iter_chunked(1024): 

109 ... 

110 async for slice in reader.iter_any(): 

111 ... 

112 

113 """ 

114 

115 __slots__ = ( 

116 "_protocol", 

117 "_low_water", 

118 "_high_water", 

119 "_loop", 

120 "_size", 

121 "_cursor", 

122 "_http_chunk_splits", 

123 "_buffer", 

124 "_buffer_offset", 

125 "_eof", 

126 "_waiter", 

127 "_eof_waiter", 

128 "_exception", 

129 "_timer", 

130 "_eof_callbacks", 

131 "_eof_counter", 

132 "total_bytes", 

133 ) 

134 

135 def __init__( 

136 self, 

137 protocol: BaseProtocol, 

138 limit: int, 

139 *, 

140 timer: Optional[BaseTimerContext] = None, 

141 loop: Optional[asyncio.AbstractEventLoop] = None, 

142 ) -> None: 

143 self._protocol = protocol 

144 self._low_water = limit 

145 self._high_water = limit * 2 

146 if loop is None: 

147 loop = asyncio.get_event_loop() 

148 self._loop = loop 

149 self._size = 0 

150 self._cursor = 0 

151 self._http_chunk_splits: Optional[List[int]] = None 

152 self._buffer: Deque[bytes] = collections.deque() 

153 self._buffer_offset = 0 

154 self._eof = False 

155 self._waiter: Optional[asyncio.Future[None]] = None 

156 self._eof_waiter: Optional[asyncio.Future[None]] = None 

157 self._exception: Optional[BaseException] = None 

158 self._timer = TimerNoop() if timer is None else timer 

159 self._eof_callbacks: List[Callable[[], None]] = [] 

160 self._eof_counter = 0 

161 self.total_bytes = 0 

162 

163 def __repr__(self) -> str: 

164 info = [self.__class__.__name__] 

165 if self._size: 

166 info.append("%d bytes" % self._size) 

167 if self._eof: 

168 info.append("eof") 

169 if self._low_water != 2**16: # default limit 

170 info.append("low=%d high=%d" % (self._low_water, self._high_water)) 

171 if self._waiter: 

172 info.append("w=%r" % self._waiter) 

173 if self._exception: 

174 info.append("e=%r" % self._exception) 

175 return "<%s>" % " ".join(info) 

176 

177 def get_read_buffer_limits(self) -> Tuple[int, int]: 

178 return (self._low_water, self._high_water) 

179 

180 def exception(self) -> Optional[BaseException]: 

181 return self._exception 

182 

183 def set_exception( 

184 self, 

185 exc: BaseException, 

186 exc_cause: BaseException = _EXC_SENTINEL, 

187 ) -> None: 

188 self._exception = exc 

189 self._eof_callbacks.clear() 

190 

191 waiter = self._waiter 

192 if waiter is not None: 

193 self._waiter = None 

194 set_exception(waiter, exc, exc_cause) 

195 

196 waiter = self._eof_waiter 

197 if waiter is not None: 

198 self._eof_waiter = None 

199 set_exception(waiter, exc, exc_cause) 

200 

201 def on_eof(self, callback: Callable[[], None]) -> None: 

202 if self._eof: 

203 try: 

204 callback() 

205 except Exception: 

206 internal_logger.exception("Exception in eof callback") 

207 else: 

208 self._eof_callbacks.append(callback) 

209 

210 def feed_eof(self) -> None: 

211 self._eof = True 

212 

213 waiter = self._waiter 

214 if waiter is not None: 

215 self._waiter = None 

216 set_result(waiter, None) 

217 

218 waiter = self._eof_waiter 

219 if waiter is not None: 

220 self._eof_waiter = None 

221 set_result(waiter, None) 

222 

223 if self._protocol._reading_paused: 

224 self._protocol.resume_reading() 

225 

226 for cb in self._eof_callbacks: 

227 try: 

228 cb() 

229 except Exception: 

230 internal_logger.exception("Exception in eof callback") 

231 

232 self._eof_callbacks.clear() 

233 

234 def is_eof(self) -> bool: 

235 """Return True if 'feed_eof' was called.""" 

236 return self._eof 

237 

238 def at_eof(self) -> bool: 

239 """Return True if the buffer is empty and 'feed_eof' was called.""" 

240 return self._eof and not self._buffer 

241 

242 async def wait_eof(self) -> None: 

243 if self._eof: 

244 return 

245 

246 assert self._eof_waiter is None 

247 self._eof_waiter = self._loop.create_future() 

248 try: 

249 await self._eof_waiter 

250 finally: 

251 self._eof_waiter = None 

252 

253 def unread_data(self, data: bytes) -> None: 

254 """rollback reading some data from stream, inserting it to buffer head.""" 

255 warnings.warn( 

256 "unread_data() is deprecated " 

257 "and will be removed in future releases (#3260)", 

258 DeprecationWarning, 

259 stacklevel=2, 

260 ) 

261 if not data: 

262 return 

263 

264 if self._buffer_offset: 

265 self._buffer[0] = self._buffer[0][self._buffer_offset :] 

266 self._buffer_offset = 0 

267 self._size += len(data) 

268 self._cursor -= len(data) 

269 self._buffer.appendleft(data) 

270 self._eof_counter = 0 

271 

272 # TODO: size is ignored, remove the param later 

273 def feed_data(self, data: bytes, size: int = 0) -> None: 

274 assert not self._eof, "feed_data after feed_eof" 

275 

276 if not data: 

277 return 

278 

279 data_len = len(data) 

280 self._size += data_len 

281 self._buffer.append(data) 

282 self.total_bytes += data_len 

283 

284 waiter = self._waiter 

285 if waiter is not None: 

286 self._waiter = None 

287 set_result(waiter, None) 

288 

289 if self._size > self._high_water and not self._protocol._reading_paused: 

290 self._protocol.pause_reading() 

291 

292 def begin_http_chunk_receiving(self) -> None: 

293 if self._http_chunk_splits is None: 

294 if self.total_bytes: 

295 raise RuntimeError( 

296 "Called begin_http_chunk_receiving when some data was already fed" 

297 ) 

298 self._http_chunk_splits = [] 

299 

300 def end_http_chunk_receiving(self) -> None: 

301 if self._http_chunk_splits is None: 

302 raise RuntimeError( 

303 "Called end_chunk_receiving without calling " 

304 "begin_chunk_receiving first" 

305 ) 

306 

307 # self._http_chunk_splits contains logical byte offsets from start of 

308 # the body transfer. Each offset is the offset of the end of a chunk. 

309 # "Logical" means bytes, accessible for a user. 

310 # If no chunks containing logical data were received, current position 

311 # is difinitely zero. 

312 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0 

313 

314 if self.total_bytes == pos: 

315 # We should not add empty chunks here. So we check for that. 

316 # Note, when chunked + gzip is used, we can receive a chunk 

317 # of compressed data, but that data may not be enough for gzip FSM 

318 # to yield any uncompressed data. That's why current position may 

319 # not change after receiving a chunk. 

320 return 

321 

322 self._http_chunk_splits.append(self.total_bytes) 

323 

324 # wake up readchunk when end of http chunk received 

325 waiter = self._waiter 

326 if waiter is not None: 

327 self._waiter = None 

328 set_result(waiter, None) 

329 

330 async def _wait(self, func_name: str) -> None: 

331 if not self._protocol.connected: 

332 raise RuntimeError("Connection closed.") 

333 

334 # StreamReader uses a future to link the protocol feed_data() method 

335 # to a read coroutine. Running two read coroutines at the same time 

336 # would have an unexpected behaviour. It would not possible to know 

337 # which coroutine would get the next data. 

338 if self._waiter is not None: 

339 raise RuntimeError( 

340 "%s() called while another coroutine is " 

341 "already waiting for incoming data" % func_name 

342 ) 

343 

344 waiter = self._waiter = self._loop.create_future() 

345 try: 

346 with self._timer: 

347 await waiter 

348 finally: 

349 self._waiter = None 

350 

351 async def readline(self) -> bytes: 

352 return await self.readuntil() 

353 

354 async def readuntil(self, separator: bytes = b"\n") -> bytes: 

355 seplen = len(separator) 

356 if seplen == 0: 

357 raise ValueError("Separator should be at least one-byte string") 

358 

359 if self._exception is not None: 

360 raise self._exception 

361 

362 chunk = b"" 

363 chunk_size = 0 

364 not_enough = True 

365 

366 while not_enough: 

367 while self._buffer and not_enough: 

368 offset = self._buffer_offset 

369 ichar = self._buffer[0].find(separator, offset) + 1 

370 # Read from current offset to found separator or to the end. 

371 data = self._read_nowait_chunk( 

372 ichar - offset + seplen - 1 if ichar else -1 

373 ) 

374 chunk += data 

375 chunk_size += len(data) 

376 if ichar: 

377 not_enough = False 

378 

379 if chunk_size > self._high_water: 

380 raise ValueError("Chunk too big") 

381 

382 if self._eof: 

383 break 

384 

385 if not_enough: 

386 await self._wait("readuntil") 

387 

388 return chunk 

389 

390 async def read(self, n: int = -1) -> bytes: 

391 if self._exception is not None: 

392 raise self._exception 

393 

394 # migration problem; with DataQueue you have to catch 

395 # EofStream exception, so common way is to run payload.read() inside 

396 # infinite loop. what can cause real infinite loop with StreamReader 

397 # lets keep this code one major release. 

398 if __debug__: 

399 if self._eof and not self._buffer: 

400 self._eof_counter = getattr(self, "_eof_counter", 0) + 1 

401 if self._eof_counter > 5: 

402 internal_logger.warning( 

403 "Multiple access to StreamReader in eof state, " 

404 "might be infinite loop.", 

405 stack_info=True, 

406 ) 

407 

408 if not n: 

409 return b"" 

410 

411 if n < 0: 

412 # This used to just loop creating a new waiter hoping to 

413 # collect everything in self._buffer, but that would 

414 # deadlock if the subprocess sends more than self.limit 

415 # bytes. So just call self.readany() until EOF. 

416 blocks = [] 

417 while True: 

418 block = await self.readany() 

419 if not block: 

420 break 

421 blocks.append(block) 

422 return b"".join(blocks) 

423 

424 # TODO: should be `if` instead of `while` 

425 # because waiter maybe triggered on chunk end, 

426 # without feeding any data 

427 while not self._buffer and not self._eof: 

428 await self._wait("read") 

429 

430 return self._read_nowait(n) 

431 

432 async def readany(self) -> bytes: 

433 if self._exception is not None: 

434 raise self._exception 

435 

436 # TODO: should be `if` instead of `while` 

437 # because waiter maybe triggered on chunk end, 

438 # without feeding any data 

439 while not self._buffer and not self._eof: 

440 await self._wait("readany") 

441 

442 return self._read_nowait(-1) 

443 

444 async def readchunk(self) -> Tuple[bytes, bool]: 

445 """Returns a tuple of (data, end_of_http_chunk). 

446 

447 When chunked transfer 

448 encoding is used, end_of_http_chunk is a boolean indicating if the end 

449 of the data corresponds to the end of a HTTP chunk , otherwise it is 

450 always False. 

451 """ 

452 while True: 

453 if self._exception is not None: 

454 raise self._exception 

455 

456 while self._http_chunk_splits: 

457 pos = self._http_chunk_splits.pop(0) 

458 if pos == self._cursor: 

459 return (b"", True) 

460 if pos > self._cursor: 

461 return (self._read_nowait(pos - self._cursor), True) 

462 internal_logger.warning( 

463 "Skipping HTTP chunk end due to data " 

464 "consumption beyond chunk boundary" 

465 ) 

466 

467 if self._buffer: 

468 return (self._read_nowait_chunk(-1), False) 

469 # return (self._read_nowait(-1), False) 

470 

471 if self._eof: 

472 # Special case for signifying EOF. 

473 # (b'', True) is not a final return value actually. 

474 return (b"", False) 

475 

476 await self._wait("readchunk") 

477 

478 async def readexactly(self, n: int) -> bytes: 

479 if self._exception is not None: 

480 raise self._exception 

481 

482 blocks: List[bytes] = [] 

483 while n > 0: 

484 block = await self.read(n) 

485 if not block: 

486 partial = b"".join(blocks) 

487 raise asyncio.IncompleteReadError(partial, len(partial) + n) 

488 blocks.append(block) 

489 n -= len(block) 

490 

491 return b"".join(blocks) 

492 

493 def read_nowait(self, n: int = -1) -> bytes: 

494 # default was changed to be consistent with .read(-1) 

495 # 

496 # I believe the most users don't know about the method and 

497 # they are not affected. 

498 if self._exception is not None: 

499 raise self._exception 

500 

501 if self._waiter and not self._waiter.done(): 

502 raise RuntimeError( 

503 "Called while some coroutine is waiting for incoming data." 

504 ) 

505 

506 return self._read_nowait(n) 

507 

508 def _read_nowait_chunk(self, n: int) -> bytes: 

509 first_buffer = self._buffer[0] 

510 offset = self._buffer_offset 

511 if n != -1 and len(first_buffer) - offset > n: 

512 data = first_buffer[offset : offset + n] 

513 self._buffer_offset += n 

514 

515 elif offset: 

516 self._buffer.popleft() 

517 data = first_buffer[offset:] 

518 self._buffer_offset = 0 

519 

520 else: 

521 data = self._buffer.popleft() 

522 

523 data_len = len(data) 

524 self._size -= data_len 

525 self._cursor += data_len 

526 

527 chunk_splits = self._http_chunk_splits 

528 # Prevent memory leak: drop useless chunk splits 

529 while chunk_splits and chunk_splits[0] < self._cursor: 

530 chunk_splits.pop(0) 

531 

532 if self._size < self._low_water and self._protocol._reading_paused: 

533 self._protocol.resume_reading() 

534 return data 

535 

536 def _read_nowait(self, n: int) -> bytes: 

537 """Read not more than n bytes, or whole buffer if n == -1""" 

538 self._timer.assert_timeout() 

539 

540 chunks = [] 

541 while self._buffer: 

542 chunk = self._read_nowait_chunk(n) 

543 chunks.append(chunk) 

544 if n != -1: 

545 n -= len(chunk) 

546 if n == 0: 

547 break 

548 

549 return b"".join(chunks) if chunks else b"" 

550 

551 

552class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init] 

553 

554 __slots__ = ("_read_eof_chunk",) 

555 

556 def __init__(self) -> None: 

557 self._read_eof_chunk = False 

558 self.total_bytes = 0 

559 

560 def __repr__(self) -> str: 

561 return "<%s>" % self.__class__.__name__ 

562 

563 def exception(self) -> Optional[BaseException]: 

564 return None 

565 

566 def set_exception( 

567 self, 

568 exc: BaseException, 

569 exc_cause: BaseException = _EXC_SENTINEL, 

570 ) -> None: 

571 pass 

572 

573 def on_eof(self, callback: Callable[[], None]) -> None: 

574 try: 

575 callback() 

576 except Exception: 

577 internal_logger.exception("Exception in eof callback") 

578 

579 def feed_eof(self) -> None: 

580 pass 

581 

582 def is_eof(self) -> bool: 

583 return True 

584 

585 def at_eof(self) -> bool: 

586 return True 

587 

588 async def wait_eof(self) -> None: 

589 return 

590 

591 def feed_data(self, data: bytes, n: int = 0) -> None: 

592 pass 

593 

594 async def readline(self) -> bytes: 

595 return b"" 

596 

597 async def read(self, n: int = -1) -> bytes: 

598 return b"" 

599 

600 # TODO add async def readuntil 

601 

602 async def readany(self) -> bytes: 

603 return b"" 

604 

605 async def readchunk(self) -> Tuple[bytes, bool]: 

606 if not self._read_eof_chunk: 

607 self._read_eof_chunk = True 

608 return (b"", False) 

609 

610 return (b"", True) 

611 

612 async def readexactly(self, n: int) -> bytes: 

613 raise asyncio.IncompleteReadError(b"", n) 

614 

615 def read_nowait(self, n: int = -1) -> bytes: 

616 return b"" 

617 

618 

619EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader() 

620 

621 

622class DataQueue(Generic[_T]): 

623 """DataQueue is a general-purpose blocking queue with one reader.""" 

624 

625 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

626 self._loop = loop 

627 self._eof = False 

628 self._waiter: Optional[asyncio.Future[None]] = None 

629 self._exception: Optional[BaseException] = None 

630 self._buffer: Deque[Tuple[_T, int]] = collections.deque() 

631 

632 def __len__(self) -> int: 

633 return len(self._buffer) 

634 

635 def is_eof(self) -> bool: 

636 return self._eof 

637 

638 def at_eof(self) -> bool: 

639 return self._eof and not self._buffer 

640 

641 def exception(self) -> Optional[BaseException]: 

642 return self._exception 

643 

644 def set_exception( 

645 self, 

646 exc: BaseException, 

647 exc_cause: BaseException = _EXC_SENTINEL, 

648 ) -> None: 

649 self._eof = True 

650 self._exception = exc 

651 if (waiter := self._waiter) is not None: 

652 self._waiter = None 

653 set_exception(waiter, exc, exc_cause) 

654 

655 def feed_data(self, data: _T, size: int = 0) -> None: 

656 self._buffer.append((data, size)) 

657 if (waiter := self._waiter) is not None: 

658 self._waiter = None 

659 set_result(waiter, None) 

660 

661 def feed_eof(self) -> None: 

662 self._eof = True 

663 if (waiter := self._waiter) is not None: 

664 self._waiter = None 

665 set_result(waiter, None) 

666 

667 async def read(self) -> _T: 

668 if not self._buffer and not self._eof: 

669 assert not self._waiter 

670 self._waiter = self._loop.create_future() 

671 try: 

672 await self._waiter 

673 except (asyncio.CancelledError, asyncio.TimeoutError): 

674 self._waiter = None 

675 raise 

676 if self._buffer: 

677 data, _ = self._buffer.popleft() 

678 return data 

679 if self._exception is not None: 

680 raise self._exception 

681 raise EofStream 

682 

683 def __aiter__(self) -> AsyncStreamIterator[_T]: 

684 return AsyncStreamIterator(self.read) 

685 

686 

687class FlowControlDataQueue(DataQueue[_T]): 

688 """FlowControlDataQueue resumes and pauses an underlying stream. 

689 

690 It is a destination for parsed data. 

691 

692 This class is deprecated and will be removed in version 4.0. 

693 """ 

694 

695 def __init__( 

696 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop 

697 ) -> None: 

698 super().__init__(loop=loop) 

699 self._size = 0 

700 self._protocol = protocol 

701 self._limit = limit * 2 

702 

703 def feed_data(self, data: _T, size: int = 0) -> None: 

704 super().feed_data(data, size) 

705 self._size += size 

706 

707 if self._size > self._limit and not self._protocol._reading_paused: 

708 self._protocol.pause_reading() 

709 

710 async def read(self) -> _T: 

711 if not self._buffer and not self._eof: 

712 assert not self._waiter 

713 self._waiter = self._loop.create_future() 

714 try: 

715 await self._waiter 

716 except (asyncio.CancelledError, asyncio.TimeoutError): 

717 self._waiter = None 

718 raise 

719 if self._buffer: 

720 data, size = self._buffer.popleft() 

721 self._size -= size 

722 if self._size < self._limit and self._protocol._reading_paused: 

723 self._protocol.resume_reading() 

724 return data 

725 if self._exception is not None: 

726 raise self._exception 

727 raise EofStream