Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/streams.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

405 statements  

1import asyncio 

2import collections 

3import sys 

4import warnings 

5from collections.abc import Awaitable, Callable 

6from typing import Final, Generic, TypeVar 

7 

8from .base_protocol import BaseProtocol 

9from .helpers import ( 

10 _EXC_SENTINEL, 

11 DEFAULT_CHUNK_SIZE, 

12 BaseTimerContext, 

13 TimerNoop, 

14 set_exception, 

15 set_result, 

16) 

17from .http_exceptions import LineTooLong 

18from .log import internal_logger 

19 

20__all__ = ( 

21 "EMPTY_PAYLOAD", 

22 "EofStream", 

23 "StreamReader", 

24 "DataQueue", 

25) 

26 

27_T = TypeVar("_T") 

28 

29 

30class EofStream(Exception): 

31 """eof stream indication.""" 

32 

33 

34class AsyncStreamIterator(Generic[_T]): 

35 

36 __slots__ = ("read_func",) 

37 

38 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None: 

39 self.read_func = read_func 

40 

41 def __aiter__(self) -> "AsyncStreamIterator[_T]": 

42 return self 

43 

44 async def __anext__(self) -> _T: 

45 try: 

46 rv = await self.read_func() 

47 except EofStream: 

48 raise StopAsyncIteration 

49 if rv == b"": 

50 raise StopAsyncIteration 

51 return rv 

52 

53 

54class ChunkTupleAsyncStreamIterator: 

55 

56 __slots__ = ("_stream",) 

57 

58 def __init__(self, stream: "StreamReader") -> None: 

59 self._stream = stream 

60 

61 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator": 

62 return self 

63 

64 async def __anext__(self) -> tuple[bytes, bool]: 

65 rv = await self._stream.readchunk() 

66 if rv == (b"", False): 

67 raise StopAsyncIteration 

68 return rv 

69 

70 

71class StreamReader: 

72 """An enhancement of asyncio.StreamReader. 

73 

74 Supports asynchronous iteration by line, chunk or as available:: 

75 

76 async for line in reader: 

77 ... 

78 async for chunk in reader.iter_chunked(1024): 

79 ... 

80 async for slice in reader.iter_any(): 

81 ... 

82 

83 """ 

84 

85 __slots__ = ( 

86 "_protocol", 

87 "_low_water", 

88 "_high_water", 

89 "_low_water_chunks", 

90 "_high_water_chunks", 

91 "_loop", 

92 "_size", 

93 "_cursor", 

94 "_http_chunk_splits", 

95 "_buffer", 

96 "_buffer_offset", 

97 "_eof", 

98 "_waiter", 

99 "_eof_waiter", 

100 "_exception", 

101 "_timer", 

102 "_eof_callbacks", 

103 "_eof_counter", 

104 "total_bytes", 

105 "total_compressed_bytes", 

106 ) 

107 

108 def __init__( 

109 self, 

110 protocol: BaseProtocol, 

111 limit: int, 

112 *, 

113 timer: BaseTimerContext | None = None, 

114 loop: asyncio.AbstractEventLoop, 

115 ) -> None: 

116 self._protocol = protocol 

117 self._low_water = limit 

118 self._high_water = limit * 2 

119 # Use max(4, ...) because there's always at least 1 chunk split remaining 

120 # (the current position), so we need low_water >= 2 to allow resume. 

121 # limit // 16 gets us a reasonable value of 16k with default 256KiB limit. 

122 self._high_water_chunks = max(4, limit // 16) 

123 self._low_water_chunks = self._high_water_chunks // 2 

124 self._loop = loop 

125 self._size = 0 

126 self._cursor = 0 

127 self._http_chunk_splits: collections.deque[int] | None = None 

128 self._buffer: collections.deque[bytes] = collections.deque() 

129 self._buffer_offset = 0 

130 self._eof = False 

131 self._waiter: asyncio.Future[None] | None = None 

132 self._eof_waiter: asyncio.Future[None] | None = None 

133 self._exception: type[BaseException] | BaseException | None = None 

134 self._timer = TimerNoop() if timer is None else timer 

135 self._eof_callbacks: list[Callable[[], None]] = [] 

136 self._eof_counter = 0 

137 self.total_bytes = 0 

138 self.total_compressed_bytes: int | None = None 

139 

140 def __repr__(self) -> str: 

141 info = [self.__class__.__name__] 

142 if self._size: 

143 info.append("%d bytes" % self._size) 

144 if self._eof: 

145 info.append("eof") 

146 if self._low_water != DEFAULT_CHUNK_SIZE: 

147 info.append("low=%d high=%d" % (self._low_water, self._high_water)) 

148 if self._waiter: 

149 info.append("w=%r" % self._waiter) 

150 if self._exception: 

151 info.append("e=%r" % self._exception) 

152 return "<%s>" % " ".join(info) 

153 

154 def __aiter__(self) -> AsyncStreamIterator[bytes]: 

155 return AsyncStreamIterator(self.readline) 

156 

157 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: 

158 """Returns an asynchronous iterator that yields chunks of size n.""" 

159 self.set_read_chunk_size(n) 

160 return AsyncStreamIterator(lambda: self.read(n)) 

161 

162 def iter_any(self) -> AsyncStreamIterator[bytes]: 

163 """Yield all available data as soon as it is received.""" 

164 return AsyncStreamIterator(self.readany) 

165 

166 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: 

167 """Yield chunks of data as they are received by the server. 

168 

169 The yielded objects are tuples 

170 of (bytes, bool) as returned by the StreamReader.readchunk method. 

171 """ 

172 return ChunkTupleAsyncStreamIterator(self) 

173 

174 def get_read_buffer_limits(self) -> tuple[int, int]: 

175 return (self._low_water, self._high_water) 

176 

177 def set_read_chunk_size(self, n: int) -> None: 

178 """Raise buffer limits to match the consumer's chunk size.""" 

179 if n > self._low_water: 

180 self._low_water = n 

181 self._high_water = n * 2 

182 

183 def exception(self) -> type[BaseException] | BaseException | None: 

184 return self._exception 

185 

186 def set_exception( 

187 self, 

188 exc: type[BaseException] | BaseException, 

189 exc_cause: BaseException = _EXC_SENTINEL, 

190 ) -> None: 

191 self._exception = exc 

192 self._eof_callbacks.clear() 

193 

194 waiter = self._waiter 

195 if waiter is not None: 

196 self._waiter = None 

197 set_exception(waiter, exc, exc_cause) 

198 

199 waiter = self._eof_waiter 

200 if waiter is not None: 

201 self._eof_waiter = None 

202 set_exception(waiter, exc, exc_cause) 

203 

204 def on_eof(self, callback: Callable[[], None]) -> None: 

205 if self._eof: 

206 try: 

207 callback() 

208 except Exception: 

209 internal_logger.exception("Exception in eof callback") 

210 else: 

211 self._eof_callbacks.append(callback) 

212 

213 def feed_eof(self) -> None: 

214 self._eof = True 

215 

216 waiter = self._waiter 

217 if waiter is not None: 

218 self._waiter = None 

219 set_result(waiter, None) 

220 

221 waiter = self._eof_waiter 

222 if waiter is not None: 

223 self._eof_waiter = None 

224 set_result(waiter, None) 

225 

226 # At EOF the parser is done, there won't be unprocessed data. 

227 self._protocol.resume_reading(resume_parser=False) 

228 

229 for cb in self._eof_callbacks: 

230 try: 

231 cb() 

232 except Exception: 

233 internal_logger.exception("Exception in eof callback") 

234 

235 self._eof_callbacks.clear() 

236 

237 def is_eof(self) -> bool: 

238 """Return True if 'feed_eof' was called.""" 

239 return self._eof 

240 

241 def at_eof(self) -> bool: 

242 """Return True if the buffer is empty and 'feed_eof' was called.""" 

243 return self._eof and not self._buffer 

244 

245 async def wait_eof(self) -> None: 

246 if self._eof: 

247 return 

248 

249 assert self._eof_waiter is None 

250 self._eof_waiter = self._loop.create_future() 

251 try: 

252 await self._eof_waiter 

253 finally: 

254 self._eof_waiter = None 

255 

256 @property 

257 def total_raw_bytes(self) -> int: 

258 if self.total_compressed_bytes is None: 

259 return self.total_bytes 

260 return self.total_compressed_bytes 

261 

262 def unread_data(self, data: bytes) -> None: 

263 """rollback reading some data from stream, inserting it to buffer head.""" 

264 warnings.warn( 

265 "unread_data() is deprecated " 

266 "and will be removed in future releases (#3260)", 

267 DeprecationWarning, 

268 stacklevel=2, 

269 ) 

270 if not data: 

271 return 

272 

273 if self._buffer_offset: 

274 self._buffer[0] = self._buffer[0][self._buffer_offset :] 

275 self._buffer_offset = 0 

276 self._size += len(data) 

277 self._cursor -= len(data) 

278 self._buffer.appendleft(data) 

279 self._eof_counter = 0 

280 

281 def feed_data(self, data: bytes) -> bool: 

282 assert not self._eof, "feed_data after feed_eof" 

283 

284 if not data: 

285 return False 

286 

287 data_len = len(data) 

288 self._size += data_len 

289 self._buffer.append(data) 

290 self.total_bytes += data_len 

291 

292 waiter = self._waiter 

293 if waiter is not None: 

294 self._waiter = None 

295 set_result(waiter, None) 

296 

297 if self._size > self._high_water: 

298 self._protocol.pause_reading() 

299 return False 

300 

301 def begin_http_chunk_receiving(self) -> None: 

302 if self._http_chunk_splits is None: 

303 if self.total_bytes: 

304 raise RuntimeError( 

305 "Called begin_http_chunk_receiving when some data was already fed" 

306 ) 

307 self._http_chunk_splits = collections.deque() 

308 

309 def end_http_chunk_receiving(self) -> None: 

310 if self._http_chunk_splits is None: 

311 raise RuntimeError( 

312 "Called end_chunk_receiving without calling " 

313 "begin_chunk_receiving first" 

314 ) 

315 

316 # self._http_chunk_splits contains logical byte offsets from start of 

317 # the body transfer. Each offset is the offset of the end of a chunk. 

318 # "Logical" means bytes, accessible for a user. 

319 # If no chunks containing logical data were received, current position 

320 # is difinitely zero. 

321 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0 

322 

323 if self.total_bytes == pos: 

324 # We should not add empty chunks here. So we check for that. 

325 # Note, when chunked + gzip is used, we can receive a chunk 

326 # of compressed data, but that data may not be enough for gzip FSM 

327 # to yield any uncompressed data. That's why current position may 

328 # not change after receiving a chunk. 

329 return 

330 

331 self._http_chunk_splits.append(self.total_bytes) 

332 

333 # If we get too many small chunks before self._high_water is reached, then any 

334 # .read() call becomes computationally expensive, and could block the event loop 

335 # for too long, hence an additional self._high_water_chunks here. 

336 if len(self._http_chunk_splits) > self._high_water_chunks: 

337 self._protocol.pause_reading() 

338 

339 # wake up readchunk when end of http chunk received 

340 waiter = self._waiter 

341 if waiter is not None: 

342 self._waiter = None 

343 set_result(waiter, None) 

344 

345 async def _wait(self, func_name: str) -> None: 

346 if not self._protocol.connected: 

347 raise RuntimeError("Connection closed.") 

348 

349 # StreamReader uses a future to link the protocol feed_data() method 

350 # to a read coroutine. Running two read coroutines at the same time 

351 # would have an unexpected behaviour. It would not possible to know 

352 # which coroutine would get the next data. 

353 if self._waiter is not None: 

354 raise RuntimeError( 

355 "%s() called while another coroutine is " 

356 "already waiting for incoming data" % func_name 

357 ) 

358 

359 waiter = self._waiter = self._loop.create_future() 

360 try: 

361 with self._timer: 

362 await waiter 

363 finally: 

364 self._waiter = None 

365 

366 async def readline(self, *, max_line_length: int | None = None) -> bytes: 

367 return await self.readuntil(max_size=max_line_length) 

368 

369 async def readuntil( 

370 self, separator: bytes = b"\n", *, max_size: int | None = None 

371 ) -> bytes: 

372 seplen = len(separator) 

373 if seplen == 0: 

374 raise ValueError("Separator should be at least one-byte string") 

375 

376 if self._exception is not None: 

377 raise self._exception 

378 

379 chunk = b"" 

380 chunk_size = 0 

381 not_enough = True 

382 max_size = max_size or self._high_water 

383 

384 while not_enough: 

385 while self._buffer and not_enough: 

386 offset = self._buffer_offset 

387 ichar = self._buffer[0].find(separator, offset) + 1 

388 # Read from current offset to found separator or to the end. 

389 data = self._read_nowait_chunk( 

390 ichar - offset + seplen - 1 if ichar else -1 

391 ) 

392 chunk += data 

393 chunk_size += len(data) 

394 if ichar: 

395 not_enough = False 

396 

397 if chunk_size > max_size: 

398 raise LineTooLong(chunk[:100] + b"...", max_size) 

399 

400 if self._eof: 

401 break 

402 

403 if not_enough: 

404 await self._wait("readuntil") 

405 

406 return chunk 

407 

408 async def read(self, n: int = -1) -> bytes: 

409 if self._exception is not None: 

410 raise self._exception 

411 

412 if not n: 

413 return b"" 

414 

415 if n < 0: 

416 # Reading everything — remove decompression chunk limit. 

417 self.set_read_chunk_size(sys.maxsize) 

418 blocks = [] 

419 while True: 

420 block = await self.readany() 

421 if not block: 

422 break 

423 blocks.append(block) 

424 return b"".join(blocks) 

425 

426 self.set_read_chunk_size(n) 

427 # TODO: should be `if` instead of `while` 

428 # because waiter maybe triggered on chunk end, 

429 # without feeding any data 

430 while not self._buffer and not self._eof: 

431 await self._wait("read") 

432 

433 return self._read_nowait(n) 

434 

435 async def readany(self) -> bytes: 

436 if self._exception is not None: 

437 raise self._exception 

438 

439 # TODO: should be `if` instead of `while` 

440 # because waiter maybe triggered on chunk end, 

441 # without feeding any data 

442 while not self._buffer and not self._eof: 

443 await self._wait("readany") 

444 

445 return self._read_nowait(-1) 

446 

447 async def readchunk(self) -> tuple[bytes, bool]: 

448 """Returns a tuple of (data, end_of_http_chunk). 

449 

450 When chunked transfer 

451 encoding is used, end_of_http_chunk is a boolean indicating if the end 

452 of the data corresponds to the end of a HTTP chunk , otherwise it is 

453 always False. 

454 """ 

455 while True: 

456 if self._exception is not None: 

457 raise self._exception 

458 

459 while self._http_chunk_splits: 

460 pos = self._http_chunk_splits.popleft() 

461 if pos == self._cursor: 

462 return (b"", True) 

463 if pos > self._cursor: 

464 return (self._read_nowait(pos - self._cursor), True) 

465 internal_logger.warning( 

466 "Skipping HTTP chunk end due to data " 

467 "consumption beyond chunk boundary" 

468 ) 

469 

470 if self._buffer: 

471 return (self._read_nowait_chunk(-1), False) 

472 # return (self._read_nowait(-1), False) 

473 

474 if self._eof: 

475 # Special case for signifying EOF. 

476 # (b'', True) is not a final return value actually. 

477 return (b"", False) 

478 

479 await self._wait("readchunk") 

480 

481 async def readexactly(self, n: int) -> bytes: 

482 if self._exception is not None: 

483 raise self._exception 

484 

485 blocks: list[bytes] = [] 

486 while n > 0: 

487 block = await self.read(n) 

488 if not block: 

489 partial = b"".join(blocks) 

490 raise asyncio.IncompleteReadError(partial, len(partial) + n) 

491 blocks.append(block) 

492 n -= len(block) 

493 

494 return b"".join(blocks) 

495 

496 def read_nowait(self, n: int = -1) -> bytes: 

497 # default was changed to be consistent with .read(-1) 

498 # 

499 # I believe the most users don't know about the method and 

500 # they are not affected. 

501 if self._exception is not None: 

502 raise self._exception 

503 

504 if self._waiter and not self._waiter.done(): 

505 raise RuntimeError( 

506 "Called while some coroutine is waiting for incoming data." 

507 ) 

508 

509 return self._read_nowait(n) 

510 

511 def _read_nowait_chunk(self, n: int) -> bytes: 

512 first_buffer = self._buffer[0] 

513 offset = self._buffer_offset 

514 if n != -1 and len(first_buffer) - offset > n: 

515 data = first_buffer[offset : offset + n] 

516 self._buffer_offset += n 

517 

518 elif offset: 

519 self._buffer.popleft() 

520 data = first_buffer[offset:] 

521 self._buffer_offset = 0 

522 

523 else: 

524 data = self._buffer.popleft() 

525 

526 data_len = len(data) 

527 self._size -= data_len 

528 self._cursor += data_len 

529 

530 chunk_splits = self._http_chunk_splits 

531 # Prevent memory leak: drop useless chunk splits 

532 while chunk_splits and chunk_splits[0] < self._cursor: 

533 chunk_splits.popleft() 

534 

535 if self._size < self._low_water and ( 

536 self._http_chunk_splits is None 

537 or len(self._http_chunk_splits) < self._low_water_chunks 

538 ): 

539 self._protocol.resume_reading() 

540 return data 

541 

542 def _read_nowait(self, n: int) -> bytes: 

543 """Read not more than n bytes, or whole buffer if n == -1""" 

544 self._timer.assert_timeout() 

545 

546 chunks = [] 

547 while self._buffer: 

548 chunk = self._read_nowait_chunk(n) 

549 chunks.append(chunk) 

550 if n != -1: 

551 n -= len(chunk) 

552 if n == 0: 

553 break 

554 

555 return b"".join(chunks) if chunks else b"" 

556 

557 

558class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init] 

559 

560 __slots__ = ("_read_eof_chunk",) 

561 

562 def __init__(self) -> None: 

563 self._read_eof_chunk = False 

564 self.total_bytes = 0 

565 

566 def __repr__(self) -> str: 

567 return "<%s>" % self.__class__.__name__ 

568 

569 def exception(self) -> BaseException | None: 

570 return None 

571 

572 def set_exception( 

573 self, 

574 exc: type[BaseException] | BaseException, 

575 exc_cause: BaseException = _EXC_SENTINEL, 

576 ) -> None: 

577 pass 

578 

579 def on_eof(self, callback: Callable[[], None]) -> None: 

580 try: 

581 callback() 

582 except Exception: 

583 internal_logger.exception("Exception in eof callback") 

584 

585 def feed_eof(self) -> None: 

586 pass 

587 

588 def is_eof(self) -> bool: 

589 return True 

590 

591 def at_eof(self) -> bool: 

592 return True 

593 

594 async def wait_eof(self) -> None: 

595 return 

596 

597 def feed_data(self, data: bytes) -> bool: 

598 return False 

599 

600 def set_read_chunk_size(self, n: int) -> None: 

601 return 

602 

603 async def readline(self, *, max_line_length: int | None = None) -> bytes: 

604 return b"" 

605 

606 async def read(self, n: int = -1) -> bytes: 

607 return b"" 

608 

609 # TODO add async def readuntil 

610 

611 async def readany(self) -> bytes: 

612 return b"" 

613 

614 async def readchunk(self) -> tuple[bytes, bool]: 

615 if not self._read_eof_chunk: 

616 self._read_eof_chunk = True 

617 return (b"", False) 

618 

619 return (b"", True) 

620 

621 async def readexactly(self, n: int) -> bytes: 

622 raise asyncio.IncompleteReadError(b"", n) 

623 

624 def read_nowait(self, n: int = -1) -> bytes: 

625 return b"" 

626 

627 

628EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader() 

629 

630 

631class DataQueue(Generic[_T]): 

632 """DataQueue is a general-purpose blocking queue with one reader.""" 

633 

634 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

635 self._loop = loop 

636 self._eof = False 

637 self._waiter: asyncio.Future[None] | None = None 

638 self._exception: type[BaseException] | BaseException | None = None 

639 self._buffer: collections.deque[_T] = collections.deque() 

640 

641 def __len__(self) -> int: 

642 return len(self._buffer) 

643 

644 def is_eof(self) -> bool: 

645 return self._eof 

646 

647 def at_eof(self) -> bool: 

648 return self._eof and not self._buffer 

649 

650 def exception(self) -> type[BaseException] | BaseException | None: 

651 return self._exception 

652 

653 def set_exception( 

654 self, 

655 exc: type[BaseException] | BaseException, 

656 exc_cause: BaseException = _EXC_SENTINEL, 

657 ) -> None: 

658 self._eof = True 

659 self._exception = exc 

660 if (waiter := self._waiter) is not None: 

661 self._waiter = None 

662 set_exception(waiter, exc, exc_cause) 

663 

664 def feed_data(self, data: _T) -> None: 

665 self._buffer.append(data) 

666 if (waiter := self._waiter) is not None: 

667 self._waiter = None 

668 set_result(waiter, None) 

669 

670 def feed_eof(self) -> None: 

671 self._eof = True 

672 if (waiter := self._waiter) is not None: 

673 self._waiter = None 

674 set_result(waiter, None) 

675 

676 async def read(self) -> _T: 

677 if not self._buffer and not self._eof: 

678 assert not self._waiter 

679 self._waiter = self._loop.create_future() 

680 try: 

681 await self._waiter 

682 except (asyncio.CancelledError, asyncio.TimeoutError): 

683 self._waiter = None 

684 raise 

685 if self._buffer: 

686 return self._buffer.popleft() 

687 if self._exception is not None: 

688 raise self._exception 

689 raise EofStream 

690 

691 def __aiter__(self) -> AsyncStreamIterator[_T]: 

692 return AsyncStreamIterator(self.read)