Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/streams.py: 34%

392 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1import asyncio 

2import collections 

3import warnings 

4from typing import Awaitable, Callable, Generic, List, Optional, Tuple, TypeVar 

5 

6from typing_extensions import Final 

7 

8from .base_protocol import BaseProtocol 

9from .helpers import BaseTimerContext, TimerNoop, set_exception, set_result 

10from .log import internal_logger 

11 

12try: # pragma: no cover 

13 from typing import Deque 

14except ImportError: 

15 from typing_extensions import Deque 

16 

17__all__ = ( 

18 "EMPTY_PAYLOAD", 

19 "EofStream", 

20 "StreamReader", 

21 "DataQueue", 

22 "FlowControlDataQueue", 

23) 

24 

25_T = TypeVar("_T") 

26 

27 

28class EofStream(Exception): 

29 """eof stream indication.""" 

30 

31 

32class AsyncStreamIterator(Generic[_T]): 

33 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None: 

34 self.read_func = read_func 

35 

36 def __aiter__(self) -> "AsyncStreamIterator[_T]": 

37 return self 

38 

39 async def __anext__(self) -> _T: 

40 try: 

41 rv = await self.read_func() 

42 except EofStream: 

43 raise StopAsyncIteration 

44 if rv == b"": 

45 raise StopAsyncIteration 

46 return rv 

47 

48 

49class ChunkTupleAsyncStreamIterator: 

50 def __init__(self, stream: "StreamReader") -> None: 

51 self._stream = stream 

52 

53 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator": 

54 return self 

55 

56 async def __anext__(self) -> Tuple[bytes, bool]: 

57 rv = await self._stream.readchunk() 

58 if rv == (b"", False): 

59 raise StopAsyncIteration 

60 return rv 

61 

62 

63class AsyncStreamReaderMixin: 

64 def __aiter__(self) -> AsyncStreamIterator[bytes]: 

65 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined] 

66 

67 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: 

68 """Returns an asynchronous iterator that yields chunks of size n.""" 

69 return AsyncStreamIterator( 

70 lambda: self.read(n) # type: ignore[attr-defined,no-any-return] 

71 ) 

72 

73 def iter_any(self) -> AsyncStreamIterator[bytes]: 

74 """Yield all available data as soon as it is received.""" 

75 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined] 

76 

77 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: 

78 """Yield chunks of data as they are received by the server. 

79 

80 The yielded objects are tuples 

81 of (bytes, bool) as returned by the StreamReader.readchunk method. 

82 """ 

83 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type] 

84 

85 

86class StreamReader(AsyncStreamReaderMixin): 

87 """An enhancement of asyncio.StreamReader. 

88 

89 Supports asynchronous iteration by line, chunk or as available:: 

90 

91 async for line in reader: 

92 ... 

93 async for chunk in reader.iter_chunked(1024): 

94 ... 

95 async for slice in reader.iter_any(): 

96 ... 

97 

98 """ 

99 

100 total_bytes = 0 

101 

102 def __init__( 

103 self, 

104 protocol: BaseProtocol, 

105 limit: int, 

106 *, 

107 timer: Optional[BaseTimerContext] = None, 

108 loop: asyncio.AbstractEventLoop, 

109 ) -> None: 

110 self._protocol = protocol 

111 self._low_water = limit 

112 self._high_water = limit * 2 

113 if loop is None: 

114 loop = asyncio.get_event_loop() 

115 self._loop = loop 

116 self._size = 0 

117 self._cursor = 0 

118 self._http_chunk_splits: Optional[List[int]] = None 

119 self._buffer: Deque[bytes] = collections.deque() 

120 self._buffer_offset = 0 

121 self._eof = False 

122 self._waiter: Optional[asyncio.Future[None]] = None 

123 self._eof_waiter: Optional[asyncio.Future[None]] = None 

124 self._exception: Optional[BaseException] = None 

125 self._timer = TimerNoop() if timer is None else timer 

126 self._eof_callbacks: List[Callable[[], None]] = [] 

127 

128 def __repr__(self) -> str: 

129 info = [self.__class__.__name__] 

130 if self._size: 

131 info.append("%d bytes" % self._size) 

132 if self._eof: 

133 info.append("eof") 

134 if self._low_water != 2**16: # default limit 

135 info.append("low=%d high=%d" % (self._low_water, self._high_water)) 

136 if self._waiter: 

137 info.append("w=%r" % self._waiter) 

138 if self._exception: 

139 info.append("e=%r" % self._exception) 

140 return "<%s>" % " ".join(info) 

141 

142 def get_read_buffer_limits(self) -> Tuple[int, int]: 

143 return (self._low_water, self._high_water) 

144 

145 def exception(self) -> Optional[BaseException]: 

146 return self._exception 

147 

148 def set_exception(self, exc: BaseException) -> None: 

149 self._exception = exc 

150 self._eof_callbacks.clear() 

151 

152 waiter = self._waiter 

153 if waiter is not None: 

154 self._waiter = None 

155 set_exception(waiter, exc) 

156 

157 waiter = self._eof_waiter 

158 if waiter is not None: 

159 self._eof_waiter = None 

160 set_exception(waiter, exc) 

161 

162 def on_eof(self, callback: Callable[[], None]) -> None: 

163 if self._eof: 

164 try: 

165 callback() 

166 except Exception: 

167 internal_logger.exception("Exception in eof callback") 

168 else: 

169 self._eof_callbacks.append(callback) 

170 

171 def feed_eof(self) -> None: 

172 self._eof = True 

173 

174 waiter = self._waiter 

175 if waiter is not None: 

176 self._waiter = None 

177 set_result(waiter, None) 

178 

179 waiter = self._eof_waiter 

180 if waiter is not None: 

181 self._eof_waiter = None 

182 set_result(waiter, None) 

183 

184 for cb in self._eof_callbacks: 

185 try: 

186 cb() 

187 except Exception: 

188 internal_logger.exception("Exception in eof callback") 

189 

190 self._eof_callbacks.clear() 

191 

192 def is_eof(self) -> bool: 

193 """Return True if 'feed_eof' was called.""" 

194 return self._eof 

195 

196 def at_eof(self) -> bool: 

197 """Return True if the buffer is empty and 'feed_eof' was called.""" 

198 return self._eof and not self._buffer 

199 

200 async def wait_eof(self) -> None: 

201 if self._eof: 

202 return 

203 

204 assert self._eof_waiter is None 

205 self._eof_waiter = self._loop.create_future() 

206 try: 

207 await self._eof_waiter 

208 finally: 

209 self._eof_waiter = None 

210 

211 def unread_data(self, data: bytes) -> None: 

212 """rollback reading some data from stream, inserting it to buffer head.""" 

213 warnings.warn( 

214 "unread_data() is deprecated " 

215 "and will be removed in future releases (#3260)", 

216 DeprecationWarning, 

217 stacklevel=2, 

218 ) 

219 if not data: 

220 return 

221 

222 if self._buffer_offset: 

223 self._buffer[0] = self._buffer[0][self._buffer_offset :] 

224 self._buffer_offset = 0 

225 self._size += len(data) 

226 self._cursor -= len(data) 

227 self._buffer.appendleft(data) 

228 self._eof_counter = 0 

229 

230 # TODO: size is ignored, remove the param later 

231 def feed_data(self, data: bytes, size: int = 0) -> None: 

232 assert not self._eof, "feed_data after feed_eof" 

233 

234 if not data: 

235 return 

236 

237 self._size += len(data) 

238 self._buffer.append(data) 

239 self.total_bytes += len(data) 

240 

241 waiter = self._waiter 

242 if waiter is not None: 

243 self._waiter = None 

244 set_result(waiter, None) 

245 

246 if self._size > self._high_water and not self._protocol._reading_paused: 

247 self._protocol.pause_reading() 

248 

249 def begin_http_chunk_receiving(self) -> None: 

250 if self._http_chunk_splits is None: 

251 if self.total_bytes: 

252 raise RuntimeError( 

253 "Called begin_http_chunk_receiving when" "some data was already fed" 

254 ) 

255 self._http_chunk_splits = [] 

256 

257 def end_http_chunk_receiving(self) -> None: 

258 if self._http_chunk_splits is None: 

259 raise RuntimeError( 

260 "Called end_chunk_receiving without calling " 

261 "begin_chunk_receiving first" 

262 ) 

263 

264 # self._http_chunk_splits contains logical byte offsets from start of 

265 # the body transfer. Each offset is the offset of the end of a chunk. 

266 # "Logical" means bytes, accessible for a user. 

267 # If no chunks containing logical data were received, current position 

268 # is difinitely zero. 

269 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0 

270 

271 if self.total_bytes == pos: 

272 # We should not add empty chunks here. So we check for that. 

273 # Note, when chunked + gzip is used, we can receive a chunk 

274 # of compressed data, but that data may not be enough for gzip FSM 

275 # to yield any uncompressed data. That's why current position may 

276 # not change after receiving a chunk. 

277 return 

278 

279 self._http_chunk_splits.append(self.total_bytes) 

280 

281 # wake up readchunk when end of http chunk received 

282 waiter = self._waiter 

283 if waiter is not None: 

284 self._waiter = None 

285 set_result(waiter, None) 

286 

287 async def _wait(self, func_name: str) -> None: 

288 # StreamReader uses a future to link the protocol feed_data() method 

289 # to a read coroutine. Running two read coroutines at the same time 

290 # would have an unexpected behaviour. It would not possible to know 

291 # which coroutine would get the next data. 

292 if self._waiter is not None: 

293 raise RuntimeError( 

294 "%s() called while another coroutine is " 

295 "already waiting for incoming data" % func_name 

296 ) 

297 

298 waiter = self._waiter = self._loop.create_future() 

299 try: 

300 with self._timer: 

301 await waiter 

302 finally: 

303 self._waiter = None 

304 

305 async def readline(self) -> bytes: 

306 return await self.readuntil() 

307 

308 async def readuntil(self, separator: bytes = b"\n") -> bytes: 

309 seplen = len(separator) 

310 if seplen == 0: 

311 raise ValueError("Separator should be at least one-byte string") 

312 

313 if self._exception is not None: 

314 raise self._exception 

315 

316 chunk = b"" 

317 chunk_size = 0 

318 not_enough = True 

319 

320 while not_enough: 

321 while self._buffer and not_enough: 

322 offset = self._buffer_offset 

323 ichar = self._buffer[0].find(separator, offset) + 1 

324 # Read from current offset to found separator or to the end. 

325 data = self._read_nowait_chunk( 

326 ichar - offset + seplen - 1 if ichar else -1 

327 ) 

328 chunk += data 

329 chunk_size += len(data) 

330 if ichar: 

331 not_enough = False 

332 

333 if chunk_size > self._high_water: 

334 raise ValueError("Chunk too big") 

335 

336 if self._eof: 

337 break 

338 

339 if not_enough: 

340 await self._wait("readuntil") 

341 

342 return chunk 

343 

344 async def read(self, n: int = -1) -> bytes: 

345 if self._exception is not None: 

346 raise self._exception 

347 

348 if not n: 

349 return b"" 

350 

351 if n < 0: 

352 # This used to just loop creating a new waiter hoping to 

353 # collect everything in self._buffer, but that would 

354 # deadlock if the subprocess sends more than self.limit 

355 # bytes. So just call self.readany() until EOF. 

356 blocks = [] 

357 while True: 

358 block = await self.readany() 

359 if not block: 

360 break 

361 blocks.append(block) 

362 return b"".join(blocks) 

363 

364 # TODO: should be `if` instead of `while` 

365 # because waiter maybe triggered on chunk end, 

366 # without feeding any data 

367 while not self._buffer and not self._eof: 

368 await self._wait("read") 

369 

370 return self._read_nowait(n) 

371 

372 async def readany(self) -> bytes: 

373 if self._exception is not None: 

374 raise self._exception 

375 

376 # TODO: should be `if` instead of `while` 

377 # because waiter maybe triggered on chunk end, 

378 # without feeding any data 

379 while not self._buffer and not self._eof: 

380 await self._wait("readany") 

381 

382 return self._read_nowait(-1) 

383 

384 async def readchunk(self) -> Tuple[bytes, bool]: 

385 """Returns a tuple of (data, end_of_http_chunk). 

386 

387 When chunked transfer 

388 encoding is used, end_of_http_chunk is a boolean indicating if the end 

389 of the data corresponds to the end of a HTTP chunk , otherwise it is 

390 always False. 

391 """ 

392 while True: 

393 if self._exception is not None: 

394 raise self._exception 

395 

396 while self._http_chunk_splits: 

397 pos = self._http_chunk_splits.pop(0) 

398 if pos == self._cursor: 

399 return (b"", True) 

400 if pos > self._cursor: 

401 return (self._read_nowait(pos - self._cursor), True) 

402 internal_logger.warning( 

403 "Skipping HTTP chunk end due to data " 

404 "consumption beyond chunk boundary" 

405 ) 

406 

407 if self._buffer: 

408 return (self._read_nowait_chunk(-1), False) 

409 # return (self._read_nowait(-1), False) 

410 

411 if self._eof: 

412 # Special case for signifying EOF. 

413 # (b'', True) is not a final return value actually. 

414 return (b"", False) 

415 

416 await self._wait("readchunk") 

417 

418 async def readexactly(self, n: int) -> bytes: 

419 if self._exception is not None: 

420 raise self._exception 

421 

422 blocks: List[bytes] = [] 

423 while n > 0: 

424 block = await self.read(n) 

425 if not block: 

426 partial = b"".join(blocks) 

427 raise asyncio.IncompleteReadError(partial, len(partial) + n) 

428 blocks.append(block) 

429 n -= len(block) 

430 

431 return b"".join(blocks) 

432 

433 def read_nowait(self, n: int = -1) -> bytes: 

434 # default was changed to be consistent with .read(-1) 

435 # 

436 # I believe the most users don't know about the method and 

437 # they are not affected. 

438 if self._exception is not None: 

439 raise self._exception 

440 

441 if self._waiter and not self._waiter.done(): 

442 raise RuntimeError( 

443 "Called while some coroutine is waiting for incoming data." 

444 ) 

445 

446 return self._read_nowait(n) 

447 

448 def _read_nowait_chunk(self, n: int) -> bytes: 

449 first_buffer = self._buffer[0] 

450 offset = self._buffer_offset 

451 if n != -1 and len(first_buffer) - offset > n: 

452 data = first_buffer[offset : offset + n] 

453 self._buffer_offset += n 

454 

455 elif offset: 

456 self._buffer.popleft() 

457 data = first_buffer[offset:] 

458 self._buffer_offset = 0 

459 

460 else: 

461 data = self._buffer.popleft() 

462 

463 self._size -= len(data) 

464 self._cursor += len(data) 

465 

466 chunk_splits = self._http_chunk_splits 

467 # Prevent memory leak: drop useless chunk splits 

468 while chunk_splits and chunk_splits[0] < self._cursor: 

469 chunk_splits.pop(0) 

470 

471 if self._size < self._low_water and self._protocol._reading_paused: 

472 self._protocol.resume_reading() 

473 return data 

474 

475 def _read_nowait(self, n: int) -> bytes: 

476 """Read not more than n bytes, or whole buffer if n == -1""" 

477 self._timer.assert_timeout() 

478 

479 chunks = [] 

480 while self._buffer: 

481 chunk = self._read_nowait_chunk(n) 

482 chunks.append(chunk) 

483 if n != -1: 

484 n -= len(chunk) 

485 if n == 0: 

486 break 

487 

488 return b"".join(chunks) if chunks else b"" 

489 

490 

491class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init] 

492 def __init__(self) -> None: 

493 pass 

494 

495 def __repr__(self) -> str: 

496 return "<%s>" % self.__class__.__name__ 

497 

498 def exception(self) -> Optional[BaseException]: 

499 return None 

500 

501 def set_exception(self, exc: BaseException) -> None: 

502 pass 

503 

504 def on_eof(self, callback: Callable[[], None]) -> None: 

505 try: 

506 callback() 

507 except Exception: 

508 internal_logger.exception("Exception in eof callback") 

509 

510 def feed_eof(self) -> None: 

511 pass 

512 

513 def is_eof(self) -> bool: 

514 return True 

515 

516 def at_eof(self) -> bool: 

517 return True 

518 

519 async def wait_eof(self) -> None: 

520 return 

521 

522 def feed_data(self, data: bytes, n: int = 0) -> None: 

523 pass 

524 

525 async def readline(self) -> bytes: 

526 return b"" 

527 

528 async def read(self, n: int = -1) -> bytes: 

529 return b"" 

530 

531 # TODO add async def readuntil 

532 

533 async def readany(self) -> bytes: 

534 return b"" 

535 

536 async def readchunk(self) -> Tuple[bytes, bool]: 

537 return (b"", True) 

538 

539 async def readexactly(self, n: int) -> bytes: 

540 raise asyncio.IncompleteReadError(b"", n) 

541 

542 def read_nowait(self, n: int = -1) -> bytes: 

543 return b"" 

544 

545 

546EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader() 

547 

548 

549class DataQueue(Generic[_T]): 

550 """DataQueue is a general-purpose blocking queue with one reader.""" 

551 

552 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

553 self._loop = loop 

554 self._eof = False 

555 self._waiter: Optional[asyncio.Future[None]] = None 

556 self._exception: Optional[BaseException] = None 

557 self._size = 0 

558 self._buffer: Deque[Tuple[_T, int]] = collections.deque() 

559 

560 def __len__(self) -> int: 

561 return len(self._buffer) 

562 

563 def is_eof(self) -> bool: 

564 return self._eof 

565 

566 def at_eof(self) -> bool: 

567 return self._eof and not self._buffer 

568 

569 def exception(self) -> Optional[BaseException]: 

570 return self._exception 

571 

572 def set_exception(self, exc: BaseException) -> None: 

573 self._eof = True 

574 self._exception = exc 

575 

576 waiter = self._waiter 

577 if waiter is not None: 

578 self._waiter = None 

579 set_exception(waiter, exc) 

580 

581 def feed_data(self, data: _T, size: int = 0) -> None: 

582 self._size += size 

583 self._buffer.append((data, size)) 

584 

585 waiter = self._waiter 

586 if waiter is not None: 

587 self._waiter = None 

588 set_result(waiter, None) 

589 

590 def feed_eof(self) -> None: 

591 self._eof = True 

592 

593 waiter = self._waiter 

594 if waiter is not None: 

595 self._waiter = None 

596 set_result(waiter, None) 

597 

598 async def read(self) -> _T: 

599 if not self._buffer and not self._eof: 

600 assert not self._waiter 

601 self._waiter = self._loop.create_future() 

602 try: 

603 await self._waiter 

604 except (asyncio.CancelledError, asyncio.TimeoutError): 

605 self._waiter = None 

606 raise 

607 

608 if self._buffer: 

609 data, size = self._buffer.popleft() 

610 self._size -= size 

611 return data 

612 else: 

613 if self._exception is not None: 

614 raise self._exception 

615 else: 

616 raise EofStream 

617 

618 def __aiter__(self) -> AsyncStreamIterator[_T]: 

619 return AsyncStreamIterator(self.read) 

620 

621 

622class FlowControlDataQueue(DataQueue[_T]): 

623 """FlowControlDataQueue resumes and pauses an underlying stream. 

624 

625 It is a destination for parsed data. 

626 """ 

627 

628 def __init__( 

629 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop 

630 ) -> None: 

631 super().__init__(loop=loop) 

632 

633 self._protocol = protocol 

634 self._limit = limit * 2 

635 

636 def feed_data(self, data: _T, size: int = 0) -> None: 

637 super().feed_data(data, size) 

638 

639 if self._size > self._limit and not self._protocol._reading_paused: 

640 self._protocol.pause_reading() 

641 

642 async def read(self) -> _T: 

643 try: 

644 return await super().read() 

645 finally: 

646 if self._size < self._limit and self._protocol._reading_paused: 

647 self._protocol.resume_reading()