Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/streams.py: 34%

392 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-26 06:16 +0000

1import asyncio 

2import collections 

3import warnings 

4from typing import ( 

5 Awaitable, 

6 Callable, 

7 Deque, 

8 Final, 

9 Generic, 

10 List, 

11 Optional, 

12 Tuple, 

13 TypeVar, 

14) 

15 

16from .base_protocol import BaseProtocol 

17from .helpers import BaseTimerContext, TimerNoop, set_exception, set_result 

18from .log import internal_logger 

19 

20__all__ = ( 

21 "EMPTY_PAYLOAD", 

22 "EofStream", 

23 "StreamReader", 

24 "DataQueue", 

25 "FlowControlDataQueue", 

26) 

27 

28_T = TypeVar("_T") 

29 

30 

31class EofStream(Exception): 

32 """eof stream indication.""" 

33 

34 

35class AsyncStreamIterator(Generic[_T]): 

36 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None: 

37 self.read_func = read_func 

38 

39 def __aiter__(self) -> "AsyncStreamIterator[_T]": 

40 return self 

41 

42 async def __anext__(self) -> _T: 

43 try: 

44 rv = await self.read_func() 

45 except EofStream: 

46 raise StopAsyncIteration 

47 if rv == b"": 

48 raise StopAsyncIteration 

49 return rv 

50 

51 

52class ChunkTupleAsyncStreamIterator: 

53 def __init__(self, stream: "StreamReader") -> None: 

54 self._stream = stream 

55 

56 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator": 

57 return self 

58 

59 async def __anext__(self) -> Tuple[bytes, bool]: 

60 rv = await self._stream.readchunk() 

61 if rv == (b"", False): 

62 raise StopAsyncIteration 

63 return rv 

64 

65 

66class AsyncStreamReaderMixin: 

67 def __aiter__(self) -> AsyncStreamIterator[bytes]: 

68 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined] 

69 

70 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: 

71 """Returns an asynchronous iterator that yields chunks of size n.""" 

72 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined] 

73 

74 def iter_any(self) -> AsyncStreamIterator[bytes]: 

75 """Yield all available data as soon as it is received.""" 

76 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined] 

77 

78 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: 

79 """Yield chunks of data as they are received by the server. 

80 

81 The yielded objects are tuples 

82 of (bytes, bool) as returned by the StreamReader.readchunk method. 

83 """ 

84 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type] 

85 

86 

87class StreamReader(AsyncStreamReaderMixin): 

88 """An enhancement of asyncio.StreamReader. 

89 

90 Supports asynchronous iteration by line, chunk or as available:: 

91 

92 async for line in reader: 

93 ... 

94 async for chunk in reader.iter_chunked(1024): 

95 ... 

96 async for slice in reader.iter_any(): 

97 ... 

98 

99 """ 

100 

101 total_bytes = 0 

102 

103 def __init__( 

104 self, 

105 protocol: BaseProtocol, 

106 limit: int, 

107 *, 

108 timer: Optional[BaseTimerContext] = None, 

109 loop: asyncio.AbstractEventLoop, 

110 ) -> None: 

111 self._protocol = protocol 

112 self._low_water = limit 

113 self._high_water = limit * 2 

114 if loop is None: 

115 loop = asyncio.get_event_loop() 

116 self._loop = loop 

117 self._size = 0 

118 self._cursor = 0 

119 self._http_chunk_splits: Optional[List[int]] = None 

120 self._buffer: Deque[bytes] = collections.deque() 

121 self._buffer_offset = 0 

122 self._eof = False 

123 self._waiter: Optional[asyncio.Future[None]] = None 

124 self._eof_waiter: Optional[asyncio.Future[None]] = None 

125 self._exception: Optional[BaseException] = None 

126 self._timer = TimerNoop() if timer is None else timer 

127 self._eof_callbacks: List[Callable[[], None]] = [] 

128 

129 def __repr__(self) -> str: 

130 info = [self.__class__.__name__] 

131 if self._size: 

132 info.append("%d bytes" % self._size) 

133 if self._eof: 

134 info.append("eof") 

135 if self._low_water != 2**16: # default limit 

136 info.append("low=%d high=%d" % (self._low_water, self._high_water)) 

137 if self._waiter: 

138 info.append("w=%r" % self._waiter) 

139 if self._exception: 

140 info.append("e=%r" % self._exception) 

141 return "<%s>" % " ".join(info) 

142 

143 def get_read_buffer_limits(self) -> Tuple[int, int]: 

144 return (self._low_water, self._high_water) 

145 

146 def exception(self) -> Optional[BaseException]: 

147 return self._exception 

148 

149 def set_exception(self, exc: BaseException) -> None: 

150 self._exception = exc 

151 self._eof_callbacks.clear() 

152 

153 waiter = self._waiter 

154 if waiter is not None: 

155 self._waiter = None 

156 set_exception(waiter, exc) 

157 

158 waiter = self._eof_waiter 

159 if waiter is not None: 

160 self._eof_waiter = None 

161 set_exception(waiter, exc) 

162 

163 def on_eof(self, callback: Callable[[], None]) -> None: 

164 if self._eof: 

165 try: 

166 callback() 

167 except Exception: 

168 internal_logger.exception("Exception in eof callback") 

169 else: 

170 self._eof_callbacks.append(callback) 

171 

172 def feed_eof(self) -> None: 

173 self._eof = True 

174 

175 waiter = self._waiter 

176 if waiter is not None: 

177 self._waiter = None 

178 set_result(waiter, None) 

179 

180 waiter = self._eof_waiter 

181 if waiter is not None: 

182 self._eof_waiter = None 

183 set_result(waiter, None) 

184 

185 for cb in self._eof_callbacks: 

186 try: 

187 cb() 

188 except Exception: 

189 internal_logger.exception("Exception in eof callback") 

190 

191 self._eof_callbacks.clear() 

192 

193 def is_eof(self) -> bool: 

194 """Return True if 'feed_eof' was called.""" 

195 return self._eof 

196 

197 def at_eof(self) -> bool: 

198 """Return True if the buffer is empty and 'feed_eof' was called.""" 

199 return self._eof and not self._buffer 

200 

201 async def wait_eof(self) -> None: 

202 if self._eof: 

203 return 

204 

205 assert self._eof_waiter is None 

206 self._eof_waiter = self._loop.create_future() 

207 try: 

208 await self._eof_waiter 

209 finally: 

210 self._eof_waiter = None 

211 

212 def unread_data(self, data: bytes) -> None: 

213 """rollback reading some data from stream, inserting it to buffer head.""" 

214 warnings.warn( 

215 "unread_data() is deprecated " 

216 "and will be removed in future releases (#3260)", 

217 DeprecationWarning, 

218 stacklevel=2, 

219 ) 

220 if not data: 

221 return 

222 

223 if self._buffer_offset: 

224 self._buffer[0] = self._buffer[0][self._buffer_offset :] 

225 self._buffer_offset = 0 

226 self._size += len(data) 

227 self._cursor -= len(data) 

228 self._buffer.appendleft(data) 

229 self._eof_counter = 0 

230 

231 # TODO: size is ignored, remove the param later 

232 def feed_data(self, data: bytes, size: int = 0) -> None: 

233 assert not self._eof, "feed_data after feed_eof" 

234 

235 if not data: 

236 return 

237 

238 self._size += len(data) 

239 self._buffer.append(data) 

240 self.total_bytes += len(data) 

241 

242 waiter = self._waiter 

243 if waiter is not None: 

244 self._waiter = None 

245 set_result(waiter, None) 

246 

247 if self._size > self._high_water and not self._protocol._reading_paused: 

248 self._protocol.pause_reading() 

249 

250 def begin_http_chunk_receiving(self) -> None: 

251 if self._http_chunk_splits is None: 

252 if self.total_bytes: 

253 raise RuntimeError( 

254 "Called begin_http_chunk_receiving when" "some data was already fed" 

255 ) 

256 self._http_chunk_splits = [] 

257 

258 def end_http_chunk_receiving(self) -> None: 

259 if self._http_chunk_splits is None: 

260 raise RuntimeError( 

261 "Called end_chunk_receiving without calling " 

262 "begin_chunk_receiving first" 

263 ) 

264 

265 # self._http_chunk_splits contains logical byte offsets from start of 

266 # the body transfer. Each offset is the offset of the end of a chunk. 

267 # "Logical" means bytes, accessible for a user. 

268 # If no chunks containing logical data were received, current position 

269 # is difinitely zero. 

270 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0 

271 

272 if self.total_bytes == pos: 

273 # We should not add empty chunks here. So we check for that. 

274 # Note, when chunked + gzip is used, we can receive a chunk 

275 # of compressed data, but that data may not be enough for gzip FSM 

276 # to yield any uncompressed data. That's why current position may 

277 # not change after receiving a chunk. 

278 return 

279 

280 self._http_chunk_splits.append(self.total_bytes) 

281 

282 # wake up readchunk when end of http chunk received 

283 waiter = self._waiter 

284 if waiter is not None: 

285 self._waiter = None 

286 set_result(waiter, None) 

287 

288 async def _wait(self, func_name: str) -> None: 

289 # StreamReader uses a future to link the protocol feed_data() method 

290 # to a read coroutine. Running two read coroutines at the same time 

291 # would have an unexpected behaviour. It would not possible to know 

292 # which coroutine would get the next data. 

293 if self._waiter is not None: 

294 raise RuntimeError( 

295 "%s() called while another coroutine is " 

296 "already waiting for incoming data" % func_name 

297 ) 

298 

299 waiter = self._waiter = self._loop.create_future() 

300 try: 

301 with self._timer: 

302 await waiter 

303 finally: 

304 self._waiter = None 

305 

306 async def readline(self) -> bytes: 

307 return await self.readuntil() 

308 

309 async def readuntil(self, separator: bytes = b"\n") -> bytes: 

310 seplen = len(separator) 

311 if seplen == 0: 

312 raise ValueError("Separator should be at least one-byte string") 

313 

314 if self._exception is not None: 

315 raise self._exception 

316 

317 chunk = b"" 

318 chunk_size = 0 

319 not_enough = True 

320 

321 while not_enough: 

322 while self._buffer and not_enough: 

323 offset = self._buffer_offset 

324 ichar = self._buffer[0].find(separator, offset) + 1 

325 # Read from current offset to found separator or to the end. 

326 data = self._read_nowait_chunk( 

327 ichar - offset + seplen - 1 if ichar else -1 

328 ) 

329 chunk += data 

330 chunk_size += len(data) 

331 if ichar: 

332 not_enough = False 

333 

334 if chunk_size > self._high_water: 

335 raise ValueError("Chunk too big") 

336 

337 if self._eof: 

338 break 

339 

340 if not_enough: 

341 await self._wait("readuntil") 

342 

343 return chunk 

344 

345 async def read(self, n: int = -1) -> bytes: 

346 if self._exception is not None: 

347 raise self._exception 

348 

349 if not n: 

350 return b"" 

351 

352 if n < 0: 

353 # This used to just loop creating a new waiter hoping to 

354 # collect everything in self._buffer, but that would 

355 # deadlock if the subprocess sends more than self.limit 

356 # bytes. So just call self.readany() until EOF. 

357 blocks = [] 

358 while True: 

359 block = await self.readany() 

360 if not block: 

361 break 

362 blocks.append(block) 

363 return b"".join(blocks) 

364 

365 # TODO: should be `if` instead of `while` 

366 # because waiter maybe triggered on chunk end, 

367 # without feeding any data 

368 while not self._buffer and not self._eof: 

369 await self._wait("read") 

370 

371 return self._read_nowait(n) 

372 

373 async def readany(self) -> bytes: 

374 if self._exception is not None: 

375 raise self._exception 

376 

377 # TODO: should be `if` instead of `while` 

378 # because waiter maybe triggered on chunk end, 

379 # without feeding any data 

380 while not self._buffer and not self._eof: 

381 await self._wait("readany") 

382 

383 return self._read_nowait(-1) 

384 

385 async def readchunk(self) -> Tuple[bytes, bool]: 

386 """Returns a tuple of (data, end_of_http_chunk). 

387 

388 When chunked transfer 

389 encoding is used, end_of_http_chunk is a boolean indicating if the end 

390 of the data corresponds to the end of a HTTP chunk , otherwise it is 

391 always False. 

392 """ 

393 while True: 

394 if self._exception is not None: 

395 raise self._exception 

396 

397 while self._http_chunk_splits: 

398 pos = self._http_chunk_splits.pop(0) 

399 if pos == self._cursor: 

400 return (b"", True) 

401 if pos > self._cursor: 

402 return (self._read_nowait(pos - self._cursor), True) 

403 internal_logger.warning( 

404 "Skipping HTTP chunk end due to data " 

405 "consumption beyond chunk boundary" 

406 ) 

407 

408 if self._buffer: 

409 return (self._read_nowait_chunk(-1), False) 

410 # return (self._read_nowait(-1), False) 

411 

412 if self._eof: 

413 # Special case for signifying EOF. 

414 # (b'', True) is not a final return value actually. 

415 return (b"", False) 

416 

417 await self._wait("readchunk") 

418 

419 async def readexactly(self, n: int) -> bytes: 

420 if self._exception is not None: 

421 raise self._exception 

422 

423 blocks: List[bytes] = [] 

424 while n > 0: 

425 block = await self.read(n) 

426 if not block: 

427 partial = b"".join(blocks) 

428 raise asyncio.IncompleteReadError(partial, len(partial) + n) 

429 blocks.append(block) 

430 n -= len(block) 

431 

432 return b"".join(blocks) 

433 

434 def read_nowait(self, n: int = -1) -> bytes: 

435 # default was changed to be consistent with .read(-1) 

436 # 

437 # I believe the most users don't know about the method and 

438 # they are not affected. 

439 if self._exception is not None: 

440 raise self._exception 

441 

442 if self._waiter and not self._waiter.done(): 

443 raise RuntimeError( 

444 "Called while some coroutine is waiting for incoming data." 

445 ) 

446 

447 return self._read_nowait(n) 

448 

449 def _read_nowait_chunk(self, n: int) -> bytes: 

450 first_buffer = self._buffer[0] 

451 offset = self._buffer_offset 

452 if n != -1 and len(first_buffer) - offset > n: 

453 data = first_buffer[offset : offset + n] 

454 self._buffer_offset += n 

455 

456 elif offset: 

457 self._buffer.popleft() 

458 data = first_buffer[offset:] 

459 self._buffer_offset = 0 

460 

461 else: 

462 data = self._buffer.popleft() 

463 

464 self._size -= len(data) 

465 self._cursor += len(data) 

466 

467 chunk_splits = self._http_chunk_splits 

468 # Prevent memory leak: drop useless chunk splits 

469 while chunk_splits and chunk_splits[0] < self._cursor: 

470 chunk_splits.pop(0) 

471 

472 if self._size < self._low_water and self._protocol._reading_paused: 

473 self._protocol.resume_reading() 

474 return data 

475 

476 def _read_nowait(self, n: int) -> bytes: 

477 """Read not more than n bytes, or whole buffer if n == -1""" 

478 self._timer.assert_timeout() 

479 

480 chunks = [] 

481 while self._buffer: 

482 chunk = self._read_nowait_chunk(n) 

483 chunks.append(chunk) 

484 if n != -1: 

485 n -= len(chunk) 

486 if n == 0: 

487 break 

488 

489 return b"".join(chunks) if chunks else b"" 

490 

491 

492class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init] 

493 def __init__(self) -> None: 

494 self._read_eof_chunk = False 

495 

496 def __repr__(self) -> str: 

497 return "<%s>" % self.__class__.__name__ 

498 

499 def exception(self) -> Optional[BaseException]: 

500 return None 

501 

502 def set_exception(self, exc: BaseException) -> None: 

503 pass 

504 

505 def on_eof(self, callback: Callable[[], None]) -> None: 

506 try: 

507 callback() 

508 except Exception: 

509 internal_logger.exception("Exception in eof callback") 

510 

511 def feed_eof(self) -> None: 

512 pass 

513 

514 def is_eof(self) -> bool: 

515 return True 

516 

517 def at_eof(self) -> bool: 

518 return True 

519 

520 async def wait_eof(self) -> None: 

521 return 

522 

523 def feed_data(self, data: bytes, n: int = 0) -> None: 

524 pass 

525 

526 async def readline(self) -> bytes: 

527 return b"" 

528 

529 async def read(self, n: int = -1) -> bytes: 

530 return b"" 

531 

532 # TODO add async def readuntil 

533 

534 async def readany(self) -> bytes: 

535 return b"" 

536 

537 async def readchunk(self) -> Tuple[bytes, bool]: 

538 if not self._read_eof_chunk: 

539 self._read_eof_chunk = True 

540 return (b"", False) 

541 

542 return (b"", True) 

543 

544 async def readexactly(self, n: int) -> bytes: 

545 raise asyncio.IncompleteReadError(b"", n) 

546 

547 def read_nowait(self, n: int = -1) -> bytes: 

548 return b"" 

549 

550 

551EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader() 

552 

553 

554class DataQueue(Generic[_T]): 

555 """DataQueue is a general-purpose blocking queue with one reader.""" 

556 

557 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

558 self._loop = loop 

559 self._eof = False 

560 self._waiter: Optional[asyncio.Future[None]] = None 

561 self._exception: Optional[BaseException] = None 

562 self._size = 0 

563 self._buffer: Deque[Tuple[_T, int]] = collections.deque() 

564 

565 def __len__(self) -> int: 

566 return len(self._buffer) 

567 

568 def is_eof(self) -> bool: 

569 return self._eof 

570 

571 def at_eof(self) -> bool: 

572 return self._eof and not self._buffer 

573 

574 def exception(self) -> Optional[BaseException]: 

575 return self._exception 

576 

577 def set_exception(self, exc: BaseException) -> None: 

578 self._eof = True 

579 self._exception = exc 

580 

581 waiter = self._waiter 

582 if waiter is not None: 

583 self._waiter = None 

584 set_exception(waiter, exc) 

585 

586 def feed_data(self, data: _T, size: int = 0) -> None: 

587 self._size += size 

588 self._buffer.append((data, size)) 

589 

590 waiter = self._waiter 

591 if waiter is not None: 

592 self._waiter = None 

593 set_result(waiter, None) 

594 

595 def feed_eof(self) -> None: 

596 self._eof = True 

597 

598 waiter = self._waiter 

599 if waiter is not None: 

600 self._waiter = None 

601 set_result(waiter, None) 

602 

603 async def read(self) -> _T: 

604 if not self._buffer and not self._eof: 

605 assert not self._waiter 

606 self._waiter = self._loop.create_future() 

607 try: 

608 await self._waiter 

609 except (asyncio.CancelledError, asyncio.TimeoutError): 

610 self._waiter = None 

611 raise 

612 

613 if self._buffer: 

614 data, size = self._buffer.popleft() 

615 self._size -= size 

616 return data 

617 else: 

618 if self._exception is not None: 

619 raise self._exception 

620 else: 

621 raise EofStream 

622 

623 def __aiter__(self) -> AsyncStreamIterator[_T]: 

624 return AsyncStreamIterator(self.read) 

625 

626 

627class FlowControlDataQueue(DataQueue[_T]): 

628 """FlowControlDataQueue resumes and pauses an underlying stream. 

629 

630 It is a destination for parsed data. 

631 """ 

632 

633 def __init__( 

634 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop 

635 ) -> None: 

636 super().__init__(loop=loop) 

637 

638 self._protocol = protocol 

639 self._limit = limit * 2 

640 

641 def feed_data(self, data: _T, size: int = 0) -> None: 

642 super().feed_data(data, size) 

643 

644 if self._size > self._limit and not self._protocol._reading_paused: 

645 self._protocol.pause_reading() 

646 

647 async def read(self) -> _T: 

648 try: 

649 return await super().read() 

650 finally: 

651 if self._size < self._limit and self._protocol._reading_paused: 

652 self._protocol.resume_reading()