Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/streams.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

384 statements  

1import asyncio 

2import collections 

3import warnings 

4from typing import ( 

5 Awaitable, 

6 Callable, 

7 Deque, 

8 Final, 

9 Generic, 

10 List, 

11 Optional, 

12 Tuple, 

13 Type, 

14 TypeVar, 

15 Union, 

16) 

17 

18from .base_protocol import BaseProtocol 

19from .helpers import ( 

20 _EXC_SENTINEL, 

21 BaseTimerContext, 

22 TimerNoop, 

23 set_exception, 

24 set_result, 

25) 

26from .log import internal_logger 

27 

28__all__ = ( 

29 "EMPTY_PAYLOAD", 

30 "EofStream", 

31 "StreamReader", 

32 "DataQueue", 

33) 

34 

35_T = TypeVar("_T") 

36 

37 

38class EofStream(Exception): 

39 """eof stream indication.""" 

40 

41 

42class AsyncStreamIterator(Generic[_T]): 

43 

44 __slots__ = ("read_func",) 

45 

46 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None: 

47 self.read_func = read_func 

48 

49 def __aiter__(self) -> "AsyncStreamIterator[_T]": 

50 return self 

51 

52 async def __anext__(self) -> _T: 

53 try: 

54 rv = await self.read_func() 

55 except EofStream: 

56 raise StopAsyncIteration 

57 if rv == b"": 

58 raise StopAsyncIteration 

59 return rv 

60 

61 

62class ChunkTupleAsyncStreamIterator: 

63 

64 __slots__ = ("_stream",) 

65 

66 def __init__(self, stream: "StreamReader") -> None: 

67 self._stream = stream 

68 

69 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator": 

70 return self 

71 

72 async def __anext__(self) -> Tuple[bytes, bool]: 

73 rv = await self._stream.readchunk() 

74 if rv == (b"", False): 

75 raise StopAsyncIteration 

76 return rv 

77 

78 

79class AsyncStreamReaderMixin: 

80 

81 __slots__ = () 

82 

83 def __aiter__(self) -> AsyncStreamIterator[bytes]: 

84 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined] 

85 

86 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: 

87 """Returns an asynchronous iterator that yields chunks of size n.""" 

88 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined] 

89 

90 def iter_any(self) -> AsyncStreamIterator[bytes]: 

91 """Yield all available data as soon as it is received.""" 

92 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined] 

93 

94 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: 

95 """Yield chunks of data as they are received by the server. 

96 

97 The yielded objects are tuples 

98 of (bytes, bool) as returned by the StreamReader.readchunk method. 

99 """ 

100 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type] 

101 

102 

103class StreamReader(AsyncStreamReaderMixin): 

104 """An enhancement of asyncio.StreamReader. 

105 

106 Supports asynchronous iteration by line, chunk or as available:: 

107 

108 async for line in reader: 

109 ... 

110 async for chunk in reader.iter_chunked(1024): 

111 ... 

112 async for slice in reader.iter_any(): 

113 ... 

114 

115 """ 

116 

117 __slots__ = ( 

118 "_protocol", 

119 "_low_water", 

120 "_high_water", 

121 "_loop", 

122 "_size", 

123 "_cursor", 

124 "_http_chunk_splits", 

125 "_buffer", 

126 "_buffer_offset", 

127 "_eof", 

128 "_waiter", 

129 "_eof_waiter", 

130 "_exception", 

131 "_timer", 

132 "_eof_callbacks", 

133 "_eof_counter", 

134 "total_bytes", 

135 ) 

136 

137 def __init__( 

138 self, 

139 protocol: BaseProtocol, 

140 limit: int, 

141 *, 

142 timer: Optional[BaseTimerContext] = None, 

143 loop: asyncio.AbstractEventLoop, 

144 ) -> None: 

145 self._protocol = protocol 

146 self._low_water = limit 

147 self._high_water = limit * 2 

148 self._loop = loop 

149 self._size = 0 

150 self._cursor = 0 

151 self._http_chunk_splits: Optional[List[int]] = None 

152 self._buffer: Deque[bytes] = collections.deque() 

153 self._buffer_offset = 0 

154 self._eof = False 

155 self._waiter: Optional[asyncio.Future[None]] = None 

156 self._eof_waiter: Optional[asyncio.Future[None]] = None 

157 self._exception: Optional[Union[Type[BaseException], BaseException]] = None 

158 self._timer = TimerNoop() if timer is None else timer 

159 self._eof_callbacks: List[Callable[[], None]] = [] 

160 self._eof_counter = 0 

161 self.total_bytes = 0 

162 

163 def __repr__(self) -> str: 

164 info = [self.__class__.__name__] 

165 if self._size: 

166 info.append("%d bytes" % self._size) 

167 if self._eof: 

168 info.append("eof") 

169 if self._low_water != 2**16: # default limit 

170 info.append("low=%d high=%d" % (self._low_water, self._high_water)) 

171 if self._waiter: 

172 info.append("w=%r" % self._waiter) 

173 if self._exception: 

174 info.append("e=%r" % self._exception) 

175 return "<%s>" % " ".join(info) 

176 

177 def get_read_buffer_limits(self) -> Tuple[int, int]: 

178 return (self._low_water, self._high_water) 

179 

180 def exception(self) -> Optional[Union[Type[BaseException], BaseException]]: 

181 return self._exception 

182 

183 def set_exception( 

184 self, 

185 exc: Union[Type[BaseException], BaseException], 

186 exc_cause: BaseException = _EXC_SENTINEL, 

187 ) -> None: 

188 self._exception = exc 

189 self._eof_callbacks.clear() 

190 

191 waiter = self._waiter 

192 if waiter is not None: 

193 self._waiter = None 

194 set_exception(waiter, exc, exc_cause) 

195 

196 waiter = self._eof_waiter 

197 if waiter is not None: 

198 self._eof_waiter = None 

199 set_exception(waiter, exc, exc_cause) 

200 

201 def on_eof(self, callback: Callable[[], None]) -> None: 

202 if self._eof: 

203 try: 

204 callback() 

205 except Exception: 

206 internal_logger.exception("Exception in eof callback") 

207 else: 

208 self._eof_callbacks.append(callback) 

209 

210 def feed_eof(self) -> None: 

211 self._eof = True 

212 

213 waiter = self._waiter 

214 if waiter is not None: 

215 self._waiter = None 

216 set_result(waiter, None) 

217 

218 waiter = self._eof_waiter 

219 if waiter is not None: 

220 self._eof_waiter = None 

221 set_result(waiter, None) 

222 

223 if self._protocol._reading_paused: 

224 self._protocol.resume_reading() 

225 

226 for cb in self._eof_callbacks: 

227 try: 

228 cb() 

229 except Exception: 

230 internal_logger.exception("Exception in eof callback") 

231 

232 self._eof_callbacks.clear() 

233 

234 def is_eof(self) -> bool: 

235 """Return True if 'feed_eof' was called.""" 

236 return self._eof 

237 

238 def at_eof(self) -> bool: 

239 """Return True if the buffer is empty and 'feed_eof' was called.""" 

240 return self._eof and not self._buffer 

241 

242 async def wait_eof(self) -> None: 

243 if self._eof: 

244 return 

245 

246 assert self._eof_waiter is None 

247 self._eof_waiter = self._loop.create_future() 

248 try: 

249 await self._eof_waiter 

250 finally: 

251 self._eof_waiter = None 

252 

253 def unread_data(self, data: bytes) -> None: 

254 """rollback reading some data from stream, inserting it to buffer head.""" 

255 warnings.warn( 

256 "unread_data() is deprecated " 

257 "and will be removed in future releases (#3260)", 

258 DeprecationWarning, 

259 stacklevel=2, 

260 ) 

261 if not data: 

262 return 

263 

264 if self._buffer_offset: 

265 self._buffer[0] = self._buffer[0][self._buffer_offset :] 

266 self._buffer_offset = 0 

267 self._size += len(data) 

268 self._cursor -= len(data) 

269 self._buffer.appendleft(data) 

270 self._eof_counter = 0 

271 

272 def feed_data(self, data: bytes) -> None: 

273 assert not self._eof, "feed_data after feed_eof" 

274 

275 if not data: 

276 return 

277 

278 data_len = len(data) 

279 self._size += data_len 

280 self._buffer.append(data) 

281 self.total_bytes += data_len 

282 

283 waiter = self._waiter 

284 if waiter is not None: 

285 self._waiter = None 

286 set_result(waiter, None) 

287 

288 if self._size > self._high_water and not self._protocol._reading_paused: 

289 self._protocol.pause_reading() 

290 

291 def begin_http_chunk_receiving(self) -> None: 

292 if self._http_chunk_splits is None: 

293 if self.total_bytes: 

294 raise RuntimeError( 

295 "Called begin_http_chunk_receiving when some data was already fed" 

296 ) 

297 self._http_chunk_splits = [] 

298 

299 def end_http_chunk_receiving(self) -> None: 

300 if self._http_chunk_splits is None: 

301 raise RuntimeError( 

302 "Called end_chunk_receiving without calling " 

303 "begin_chunk_receiving first" 

304 ) 

305 

306 # self._http_chunk_splits contains logical byte offsets from start of 

307 # the body transfer. Each offset is the offset of the end of a chunk. 

308 # "Logical" means bytes, accessible for a user. 

309 # If no chunks containing logical data were received, current position 

310 # is difinitely zero. 

311 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0 

312 

313 if self.total_bytes == pos: 

314 # We should not add empty chunks here. So we check for that. 

315 # Note, when chunked + gzip is used, we can receive a chunk 

316 # of compressed data, but that data may not be enough for gzip FSM 

317 # to yield any uncompressed data. That's why current position may 

318 # not change after receiving a chunk. 

319 return 

320 

321 self._http_chunk_splits.append(self.total_bytes) 

322 

323 # wake up readchunk when end of http chunk received 

324 waiter = self._waiter 

325 if waiter is not None: 

326 self._waiter = None 

327 set_result(waiter, None) 

328 

329 async def _wait(self, func_name: str) -> None: 

330 if not self._protocol.connected: 

331 raise RuntimeError("Connection closed.") 

332 

333 # StreamReader uses a future to link the protocol feed_data() method 

334 # to a read coroutine. Running two read coroutines at the same time 

335 # would have an unexpected behaviour. It would not possible to know 

336 # which coroutine would get the next data. 

337 if self._waiter is not None: 

338 raise RuntimeError( 

339 "%s() called while another coroutine is " 

340 "already waiting for incoming data" % func_name 

341 ) 

342 

343 waiter = self._waiter = self._loop.create_future() 

344 try: 

345 with self._timer: 

346 await waiter 

347 finally: 

348 self._waiter = None 

349 

350 async def readline(self) -> bytes: 

351 return await self.readuntil() 

352 

353 async def readuntil(self, separator: bytes = b"\n") -> bytes: 

354 seplen = len(separator) 

355 if seplen == 0: 

356 raise ValueError("Separator should be at least one-byte string") 

357 

358 if self._exception is not None: 

359 raise self._exception 

360 

361 chunk = b"" 

362 chunk_size = 0 

363 not_enough = True 

364 

365 while not_enough: 

366 while self._buffer and not_enough: 

367 offset = self._buffer_offset 

368 ichar = self._buffer[0].find(separator, offset) + 1 

369 # Read from current offset to found separator or to the end. 

370 data = self._read_nowait_chunk( 

371 ichar - offset + seplen - 1 if ichar else -1 

372 ) 

373 chunk += data 

374 chunk_size += len(data) 

375 if ichar: 

376 not_enough = False 

377 

378 if chunk_size > self._high_water: 

379 raise ValueError("Chunk too big") 

380 

381 if self._eof: 

382 break 

383 

384 if not_enough: 

385 await self._wait("readuntil") 

386 

387 return chunk 

388 

389 async def read(self, n: int = -1) -> bytes: 

390 if self._exception is not None: 

391 raise self._exception 

392 

393 if not n: 

394 return b"" 

395 

396 if n < 0: 

397 # This used to just loop creating a new waiter hoping to 

398 # collect everything in self._buffer, but that would 

399 # deadlock if the subprocess sends more than self.limit 

400 # bytes. So just call self.readany() until EOF. 

401 blocks = [] 

402 while True: 

403 block = await self.readany() 

404 if not block: 

405 break 

406 blocks.append(block) 

407 return b"".join(blocks) 

408 

409 # TODO: should be `if` instead of `while` 

410 # because waiter maybe triggered on chunk end, 

411 # without feeding any data 

412 while not self._buffer and not self._eof: 

413 await self._wait("read") 

414 

415 return self._read_nowait(n) 

416 

417 async def readany(self) -> bytes: 

418 if self._exception is not None: 

419 raise self._exception 

420 

421 # TODO: should be `if` instead of `while` 

422 # because waiter maybe triggered on chunk end, 

423 # without feeding any data 

424 while not self._buffer and not self._eof: 

425 await self._wait("readany") 

426 

427 return self._read_nowait(-1) 

428 

429 async def readchunk(self) -> Tuple[bytes, bool]: 

430 """Returns a tuple of (data, end_of_http_chunk). 

431 

432 When chunked transfer 

433 encoding is used, end_of_http_chunk is a boolean indicating if the end 

434 of the data corresponds to the end of a HTTP chunk , otherwise it is 

435 always False. 

436 """ 

437 while True: 

438 if self._exception is not None: 

439 raise self._exception 

440 

441 while self._http_chunk_splits: 

442 pos = self._http_chunk_splits.pop(0) 

443 if pos == self._cursor: 

444 return (b"", True) 

445 if pos > self._cursor: 

446 return (self._read_nowait(pos - self._cursor), True) 

447 internal_logger.warning( 

448 "Skipping HTTP chunk end due to data " 

449 "consumption beyond chunk boundary" 

450 ) 

451 

452 if self._buffer: 

453 return (self._read_nowait_chunk(-1), False) 

454 # return (self._read_nowait(-1), False) 

455 

456 if self._eof: 

457 # Special case for signifying EOF. 

458 # (b'', True) is not a final return value actually. 

459 return (b"", False) 

460 

461 await self._wait("readchunk") 

462 

463 async def readexactly(self, n: int) -> bytes: 

464 if self._exception is not None: 

465 raise self._exception 

466 

467 blocks: List[bytes] = [] 

468 while n > 0: 

469 block = await self.read(n) 

470 if not block: 

471 partial = b"".join(blocks) 

472 raise asyncio.IncompleteReadError(partial, len(partial) + n) 

473 blocks.append(block) 

474 n -= len(block) 

475 

476 return b"".join(blocks) 

477 

478 def read_nowait(self, n: int = -1) -> bytes: 

479 # default was changed to be consistent with .read(-1) 

480 # 

481 # I believe the most users don't know about the method and 

482 # they are not affected. 

483 if self._exception is not None: 

484 raise self._exception 

485 

486 if self._waiter and not self._waiter.done(): 

487 raise RuntimeError( 

488 "Called while some coroutine is waiting for incoming data." 

489 ) 

490 

491 return self._read_nowait(n) 

492 

493 def _read_nowait_chunk(self, n: int) -> bytes: 

494 first_buffer = self._buffer[0] 

495 offset = self._buffer_offset 

496 if n != -1 and len(first_buffer) - offset > n: 

497 data = first_buffer[offset : offset + n] 

498 self._buffer_offset += n 

499 

500 elif offset: 

501 self._buffer.popleft() 

502 data = first_buffer[offset:] 

503 self._buffer_offset = 0 

504 

505 else: 

506 data = self._buffer.popleft() 

507 

508 data_len = len(data) 

509 self._size -= data_len 

510 self._cursor += data_len 

511 

512 chunk_splits = self._http_chunk_splits 

513 # Prevent memory leak: drop useless chunk splits 

514 while chunk_splits and chunk_splits[0] < self._cursor: 

515 chunk_splits.pop(0) 

516 

517 if self._size < self._low_water and self._protocol._reading_paused: 

518 self._protocol.resume_reading() 

519 return data 

520 

521 def _read_nowait(self, n: int) -> bytes: 

522 """Read not more than n bytes, or whole buffer if n == -1""" 

523 self._timer.assert_timeout() 

524 

525 chunks = [] 

526 while self._buffer: 

527 chunk = self._read_nowait_chunk(n) 

528 chunks.append(chunk) 

529 if n != -1: 

530 n -= len(chunk) 

531 if n == 0: 

532 break 

533 

534 return b"".join(chunks) if chunks else b"" 

535 

536 

537class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init] 

538 

539 __slots__ = ("_read_eof_chunk",) 

540 

541 def __init__(self) -> None: 

542 self._read_eof_chunk = False 

543 self.total_bytes = 0 

544 

545 def __repr__(self) -> str: 

546 return "<%s>" % self.__class__.__name__ 

547 

548 def exception(self) -> Optional[BaseException]: 

549 return None 

550 

551 def set_exception( 

552 self, 

553 exc: Union[Type[BaseException], BaseException], 

554 exc_cause: BaseException = _EXC_SENTINEL, 

555 ) -> None: 

556 pass 

557 

558 def on_eof(self, callback: Callable[[], None]) -> None: 

559 try: 

560 callback() 

561 except Exception: 

562 internal_logger.exception("Exception in eof callback") 

563 

564 def feed_eof(self) -> None: 

565 pass 

566 

567 def is_eof(self) -> bool: 

568 return True 

569 

570 def at_eof(self) -> bool: 

571 return True 

572 

573 async def wait_eof(self) -> None: 

574 return 

575 

576 def feed_data(self, data: bytes) -> None: 

577 pass 

578 

579 async def readline(self) -> bytes: 

580 return b"" 

581 

582 async def read(self, n: int = -1) -> bytes: 

583 return b"" 

584 

585 # TODO add async def readuntil 

586 

587 async def readany(self) -> bytes: 

588 return b"" 

589 

590 async def readchunk(self) -> Tuple[bytes, bool]: 

591 if not self._read_eof_chunk: 

592 self._read_eof_chunk = True 

593 return (b"", False) 

594 

595 return (b"", True) 

596 

597 async def readexactly(self, n: int) -> bytes: 

598 raise asyncio.IncompleteReadError(b"", n) 

599 

600 def read_nowait(self, n: int = -1) -> bytes: 

601 return b"" 

602 

603 

604EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader() 

605 

606 

607class DataQueue(Generic[_T]): 

608 """DataQueue is a general-purpose blocking queue with one reader.""" 

609 

610 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

611 self._loop = loop 

612 self._eof = False 

613 self._waiter: Optional[asyncio.Future[None]] = None 

614 self._exception: Union[Type[BaseException], BaseException, None] = None 

615 self._buffer: Deque[_T] = collections.deque() 

616 

617 def __len__(self) -> int: 

618 return len(self._buffer) 

619 

620 def is_eof(self) -> bool: 

621 return self._eof 

622 

623 def at_eof(self) -> bool: 

624 return self._eof and not self._buffer 

625 

626 def exception(self) -> Optional[Union[Type[BaseException], BaseException]]: 

627 return self._exception 

628 

629 def set_exception( 

630 self, 

631 exc: Union[Type[BaseException], BaseException], 

632 exc_cause: BaseException = _EXC_SENTINEL, 

633 ) -> None: 

634 self._eof = True 

635 self._exception = exc 

636 if (waiter := self._waiter) is not None: 

637 self._waiter = None 

638 set_exception(waiter, exc, exc_cause) 

639 

640 def feed_data(self, data: _T) -> None: 

641 self._buffer.append(data) 

642 if (waiter := self._waiter) is not None: 

643 self._waiter = None 

644 set_result(waiter, None) 

645 

646 def feed_eof(self) -> None: 

647 self._eof = True 

648 if (waiter := self._waiter) is not None: 

649 self._waiter = None 

650 set_result(waiter, None) 

651 

652 async def read(self) -> _T: 

653 if not self._buffer and not self._eof: 

654 assert not self._waiter 

655 self._waiter = self._loop.create_future() 

656 try: 

657 await self._waiter 

658 except (asyncio.CancelledError, asyncio.TimeoutError): 

659 self._waiter = None 

660 raise 

661 if self._buffer: 

662 return self._buffer.popleft() 

663 if self._exception is not None: 

664 raise self._exception 

665 raise EofStream 

666 

667 def __aiter__(self) -> AsyncStreamIterator[_T]: 

668 return AsyncStreamIterator(self.read)