Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/streams.py: 21%

396 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1import asyncio 

2import collections 

3import warnings 

4from typing import ( 

5 Awaitable, 

6 Callable, 

7 Deque, 

8 Final, 

9 Generic, 

10 List, 

11 Optional, 

12 Tuple, 

13 TypeVar, 

14) 

15 

16from .base_protocol import BaseProtocol 

17from .helpers import BaseTimerContext, TimerNoop, set_exception, set_result 

18from .log import internal_logger 

19 

20__all__ = ( 

21 "EMPTY_PAYLOAD", 

22 "EofStream", 

23 "StreamReader", 

24 "DataQueue", 

25 "FlowControlDataQueue", 

26) 

27 

28_T = TypeVar("_T") 

29 

30 

31class EofStream(Exception): 

32 """eof stream indication.""" 

33 

34 

35class AsyncStreamIterator(Generic[_T]): 

36 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None: 

37 self.read_func = read_func 

38 

39 def __aiter__(self) -> "AsyncStreamIterator[_T]": 

40 return self 

41 

42 async def __anext__(self) -> _T: 

43 try: 

44 rv = await self.read_func() 

45 except EofStream: 

46 raise StopAsyncIteration 

47 if rv == b"": 

48 raise StopAsyncIteration 

49 return rv 

50 

51 

52class ChunkTupleAsyncStreamIterator: 

53 def __init__(self, stream: "StreamReader") -> None: 

54 self._stream = stream 

55 

56 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator": 

57 return self 

58 

59 async def __anext__(self) -> Tuple[bytes, bool]: 

60 rv = await self._stream.readchunk() 

61 if rv == (b"", False): 

62 raise StopAsyncIteration 

63 return rv 

64 

65 

66class AsyncStreamReaderMixin: 

67 def __aiter__(self) -> AsyncStreamIterator[bytes]: 

68 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined] 

69 

70 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: 

71 """Returns an asynchronous iterator that yields chunks of size n.""" 

72 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined] 

73 

74 def iter_any(self) -> AsyncStreamIterator[bytes]: 

75 """Yield all available data as soon as it is received.""" 

76 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined] 

77 

78 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: 

79 """Yield chunks of data as they are received by the server. 

80 

81 The yielded objects are tuples 

82 of (bytes, bool) as returned by the StreamReader.readchunk method. 

83 """ 

84 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type] 

85 

86 

87class StreamReader(AsyncStreamReaderMixin): 

88 """An enhancement of asyncio.StreamReader. 

89 

90 Supports asynchronous iteration by line, chunk or as available:: 

91 

92 async for line in reader: 

93 ... 

94 async for chunk in reader.iter_chunked(1024): 

95 ... 

96 async for slice in reader.iter_any(): 

97 ... 

98 

99 """ 

100 

101 total_bytes = 0 

102 

103 def __init__( 

104 self, 

105 protocol: BaseProtocol, 

106 limit: int, 

107 *, 

108 timer: Optional[BaseTimerContext] = None, 

109 loop: Optional[asyncio.AbstractEventLoop] = None, 

110 ) -> None: 

111 self._protocol = protocol 

112 self._low_water = limit 

113 self._high_water = limit * 2 

114 if loop is None: 

115 loop = asyncio.get_event_loop() 

116 self._loop = loop 

117 self._size = 0 

118 self._cursor = 0 

119 self._http_chunk_splits: Optional[List[int]] = None 

120 self._buffer: Deque[bytes] = collections.deque() 

121 self._buffer_offset = 0 

122 self._eof = False 

123 self._waiter: Optional[asyncio.Future[None]] = None 

124 self._eof_waiter: Optional[asyncio.Future[None]] = None 

125 self._exception: Optional[BaseException] = None 

126 self._timer = TimerNoop() if timer is None else timer 

127 self._eof_callbacks: List[Callable[[], None]] = [] 

128 

129 def __repr__(self) -> str: 

130 info = [self.__class__.__name__] 

131 if self._size: 

132 info.append("%d bytes" % self._size) 

133 if self._eof: 

134 info.append("eof") 

135 if self._low_water != 2**16: # default limit 

136 info.append("low=%d high=%d" % (self._low_water, self._high_water)) 

137 if self._waiter: 

138 info.append("w=%r" % self._waiter) 

139 if self._exception: 

140 info.append("e=%r" % self._exception) 

141 return "<%s>" % " ".join(info) 

142 

143 def get_read_buffer_limits(self) -> Tuple[int, int]: 

144 return (self._low_water, self._high_water) 

145 

146 def exception(self) -> Optional[BaseException]: 

147 return self._exception 

148 

149 def set_exception(self, exc: BaseException) -> None: 

150 self._exception = exc 

151 self._eof_callbacks.clear() 

152 

153 waiter = self._waiter 

154 if waiter is not None: 

155 self._waiter = None 

156 set_exception(waiter, exc) 

157 

158 waiter = self._eof_waiter 

159 if waiter is not None: 

160 self._eof_waiter = None 

161 set_exception(waiter, exc) 

162 

163 def on_eof(self, callback: Callable[[], None]) -> None: 

164 if self._eof: 

165 try: 

166 callback() 

167 except Exception: 

168 internal_logger.exception("Exception in eof callback") 

169 else: 

170 self._eof_callbacks.append(callback) 

171 

172 def feed_eof(self) -> None: 

173 self._eof = True 

174 

175 waiter = self._waiter 

176 if waiter is not None: 

177 self._waiter = None 

178 set_result(waiter, None) 

179 

180 waiter = self._eof_waiter 

181 if waiter is not None: 

182 self._eof_waiter = None 

183 set_result(waiter, None) 

184 

185 for cb in self._eof_callbacks: 

186 try: 

187 cb() 

188 except Exception: 

189 internal_logger.exception("Exception in eof callback") 

190 

191 self._eof_callbacks.clear() 

192 

193 def is_eof(self) -> bool: 

194 """Return True if 'feed_eof' was called.""" 

195 return self._eof 

196 

197 def at_eof(self) -> bool: 

198 """Return True if the buffer is empty and 'feed_eof' was called.""" 

199 return self._eof and not self._buffer 

200 

201 async def wait_eof(self) -> None: 

202 if self._eof: 

203 return 

204 

205 assert self._eof_waiter is None 

206 self._eof_waiter = self._loop.create_future() 

207 try: 

208 await self._eof_waiter 

209 finally: 

210 self._eof_waiter = None 

211 

212 def unread_data(self, data: bytes) -> None: 

213 """rollback reading some data from stream, inserting it to buffer head.""" 

214 warnings.warn( 

215 "unread_data() is deprecated " 

216 "and will be removed in future releases (#3260)", 

217 DeprecationWarning, 

218 stacklevel=2, 

219 ) 

220 if not data: 

221 return 

222 

223 if self._buffer_offset: 

224 self._buffer[0] = self._buffer[0][self._buffer_offset :] 

225 self._buffer_offset = 0 

226 self._size += len(data) 

227 self._cursor -= len(data) 

228 self._buffer.appendleft(data) 

229 self._eof_counter = 0 

230 

231 # TODO: size is ignored, remove the param later 

232 def feed_data(self, data: bytes, size: int = 0) -> None: 

233 assert not self._eof, "feed_data after feed_eof" 

234 

235 if not data: 

236 return 

237 

238 self._size += len(data) 

239 self._buffer.append(data) 

240 self.total_bytes += len(data) 

241 

242 waiter = self._waiter 

243 if waiter is not None: 

244 self._waiter = None 

245 set_result(waiter, None) 

246 

247 if self._size > self._high_water and not self._protocol._reading_paused: 

248 self._protocol.pause_reading() 

249 

250 def begin_http_chunk_receiving(self) -> None: 

251 if self._http_chunk_splits is None: 

252 if self.total_bytes: 

253 raise RuntimeError( 

254 "Called begin_http_chunk_receiving when" "some data was already fed" 

255 ) 

256 self._http_chunk_splits = [] 

257 

258 def end_http_chunk_receiving(self) -> None: 

259 if self._http_chunk_splits is None: 

260 raise RuntimeError( 

261 "Called end_chunk_receiving without calling " 

262 "begin_chunk_receiving first" 

263 ) 

264 

265 # self._http_chunk_splits contains logical byte offsets from start of 

266 # the body transfer. Each offset is the offset of the end of a chunk. 

267 # "Logical" means bytes, accessible for a user. 

268 # If no chunks containing logical data were received, current position 

269 # is difinitely zero. 

270 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0 

271 

272 if self.total_bytes == pos: 

273 # We should not add empty chunks here. So we check for that. 

274 # Note, when chunked + gzip is used, we can receive a chunk 

275 # of compressed data, but that data may not be enough for gzip FSM 

276 # to yield any uncompressed data. That's why current position may 

277 # not change after receiving a chunk. 

278 return 

279 

280 self._http_chunk_splits.append(self.total_bytes) 

281 

282 # wake up readchunk when end of http chunk received 

283 waiter = self._waiter 

284 if waiter is not None: 

285 self._waiter = None 

286 set_result(waiter, None) 

287 

288 async def _wait(self, func_name: str) -> None: 

289 # StreamReader uses a future to link the protocol feed_data() method 

290 # to a read coroutine. Running two read coroutines at the same time 

291 # would have an unexpected behaviour. It would not possible to know 

292 # which coroutine would get the next data. 

293 if self._waiter is not None: 

294 raise RuntimeError( 

295 "%s() called while another coroutine is " 

296 "already waiting for incoming data" % func_name 

297 ) 

298 

299 waiter = self._waiter = self._loop.create_future() 

300 try: 

301 with self._timer: 

302 await waiter 

303 finally: 

304 self._waiter = None 

305 

306 async def readline(self) -> bytes: 

307 return await self.readuntil() 

308 

309 async def readuntil(self, separator: bytes = b"\n") -> bytes: 

310 seplen = len(separator) 

311 if seplen == 0: 

312 raise ValueError("Separator should be at least one-byte string") 

313 

314 if self._exception is not None: 

315 raise self._exception 

316 

317 chunk = b"" 

318 chunk_size = 0 

319 not_enough = True 

320 

321 while not_enough: 

322 while self._buffer and not_enough: 

323 offset = self._buffer_offset 

324 ichar = self._buffer[0].find(separator, offset) + 1 

325 # Read from current offset to found separator or to the end. 

326 data = self._read_nowait_chunk( 

327 ichar - offset + seplen - 1 if ichar else -1 

328 ) 

329 chunk += data 

330 chunk_size += len(data) 

331 if ichar: 

332 not_enough = False 

333 

334 if chunk_size > self._high_water: 

335 raise ValueError("Chunk too big") 

336 

337 if self._eof: 

338 break 

339 

340 if not_enough: 

341 await self._wait("readuntil") 

342 

343 return chunk 

344 

345 async def read(self, n: int = -1) -> bytes: 

346 if self._exception is not None: 

347 raise self._exception 

348 

349 # migration problem; with DataQueue you have to catch 

350 # EofStream exception, so common way is to run payload.read() inside 

351 # infinite loop. what can cause real infinite loop with StreamReader 

352 # lets keep this code one major release. 

353 if __debug__: 

354 if self._eof and not self._buffer: 

355 self._eof_counter = getattr(self, "_eof_counter", 0) + 1 

356 if self._eof_counter > 5: 

357 internal_logger.warning( 

358 "Multiple access to StreamReader in eof state, " 

359 "might be infinite loop.", 

360 stack_info=True, 

361 ) 

362 

363 if not n: 

364 return b"" 

365 

366 if n < 0: 

367 # This used to just loop creating a new waiter hoping to 

368 # collect everything in self._buffer, but that would 

369 # deadlock if the subprocess sends more than self.limit 

370 # bytes. So just call self.readany() until EOF. 

371 blocks = [] 

372 while True: 

373 block = await self.readany() 

374 if not block: 

375 break 

376 blocks.append(block) 

377 return b"".join(blocks) 

378 

379 # TODO: should be `if` instead of `while` 

380 # because waiter maybe triggered on chunk end, 

381 # without feeding any data 

382 while not self._buffer and not self._eof: 

383 await self._wait("read") 

384 

385 return self._read_nowait(n) 

386 

387 async def readany(self) -> bytes: 

388 if self._exception is not None: 

389 raise self._exception 

390 

391 # TODO: should be `if` instead of `while` 

392 # because waiter maybe triggered on chunk end, 

393 # without feeding any data 

394 while not self._buffer and not self._eof: 

395 await self._wait("readany") 

396 

397 return self._read_nowait(-1) 

398 

399 async def readchunk(self) -> Tuple[bytes, bool]: 

400 """Returns a tuple of (data, end_of_http_chunk). 

401 

402 When chunked transfer 

403 encoding is used, end_of_http_chunk is a boolean indicating if the end 

404 of the data corresponds to the end of a HTTP chunk , otherwise it is 

405 always False. 

406 """ 

407 while True: 

408 if self._exception is not None: 

409 raise self._exception 

410 

411 while self._http_chunk_splits: 

412 pos = self._http_chunk_splits.pop(0) 

413 if pos == self._cursor: 

414 return (b"", True) 

415 if pos > self._cursor: 

416 return (self._read_nowait(pos - self._cursor), True) 

417 internal_logger.warning( 

418 "Skipping HTTP chunk end due to data " 

419 "consumption beyond chunk boundary" 

420 ) 

421 

422 if self._buffer: 

423 return (self._read_nowait_chunk(-1), False) 

424 # return (self._read_nowait(-1), False) 

425 

426 if self._eof: 

427 # Special case for signifying EOF. 

428 # (b'', True) is not a final return value actually. 

429 return (b"", False) 

430 

431 await self._wait("readchunk") 

432 

433 async def readexactly(self, n: int) -> bytes: 

434 if self._exception is not None: 

435 raise self._exception 

436 

437 blocks: List[bytes] = [] 

438 while n > 0: 

439 block = await self.read(n) 

440 if not block: 

441 partial = b"".join(blocks) 

442 raise asyncio.IncompleteReadError(partial, len(partial) + n) 

443 blocks.append(block) 

444 n -= len(block) 

445 

446 return b"".join(blocks) 

447 

448 def read_nowait(self, n: int = -1) -> bytes: 

449 # default was changed to be consistent with .read(-1) 

450 # 

451 # I believe the most users don't know about the method and 

452 # they are not affected. 

453 if self._exception is not None: 

454 raise self._exception 

455 

456 if self._waiter and not self._waiter.done(): 

457 raise RuntimeError( 

458 "Called while some coroutine is waiting for incoming data." 

459 ) 

460 

461 return self._read_nowait(n) 

462 

463 def _read_nowait_chunk(self, n: int) -> bytes: 

464 first_buffer = self._buffer[0] 

465 offset = self._buffer_offset 

466 if n != -1 and len(first_buffer) - offset > n: 

467 data = first_buffer[offset : offset + n] 

468 self._buffer_offset += n 

469 

470 elif offset: 

471 self._buffer.popleft() 

472 data = first_buffer[offset:] 

473 self._buffer_offset = 0 

474 

475 else: 

476 data = self._buffer.popleft() 

477 

478 self._size -= len(data) 

479 self._cursor += len(data) 

480 

481 chunk_splits = self._http_chunk_splits 

482 # Prevent memory leak: drop useless chunk splits 

483 while chunk_splits and chunk_splits[0] < self._cursor: 

484 chunk_splits.pop(0) 

485 

486 if self._size < self._low_water and self._protocol._reading_paused: 

487 self._protocol.resume_reading() 

488 return data 

489 

490 def _read_nowait(self, n: int) -> bytes: 

491 """Read not more than n bytes, or whole buffer if n == -1""" 

492 self._timer.assert_timeout() 

493 

494 chunks = [] 

495 while self._buffer: 

496 chunk = self._read_nowait_chunk(n) 

497 chunks.append(chunk) 

498 if n != -1: 

499 n -= len(chunk) 

500 if n == 0: 

501 break 

502 

503 return b"".join(chunks) if chunks else b"" 

504 

505 

506class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init] 

507 def __init__(self) -> None: 

508 self._read_eof_chunk = False 

509 

510 def __repr__(self) -> str: 

511 return "<%s>" % self.__class__.__name__ 

512 

513 def exception(self) -> Optional[BaseException]: 

514 return None 

515 

516 def set_exception(self, exc: BaseException) -> None: 

517 pass 

518 

519 def on_eof(self, callback: Callable[[], None]) -> None: 

520 try: 

521 callback() 

522 except Exception: 

523 internal_logger.exception("Exception in eof callback") 

524 

525 def feed_eof(self) -> None: 

526 pass 

527 

528 def is_eof(self) -> bool: 

529 return True 

530 

531 def at_eof(self) -> bool: 

532 return True 

533 

534 async def wait_eof(self) -> None: 

535 return 

536 

537 def feed_data(self, data: bytes, n: int = 0) -> None: 

538 pass 

539 

540 async def readline(self) -> bytes: 

541 return b"" 

542 

543 async def read(self, n: int = -1) -> bytes: 

544 return b"" 

545 

546 # TODO add async def readuntil 

547 

548 async def readany(self) -> bytes: 

549 return b"" 

550 

551 async def readchunk(self) -> Tuple[bytes, bool]: 

552 if not self._read_eof_chunk: 

553 self._read_eof_chunk = True 

554 return (b"", False) 

555 

556 return (b"", True) 

557 

558 async def readexactly(self, n: int) -> bytes: 

559 raise asyncio.IncompleteReadError(b"", n) 

560 

561 def read_nowait(self, n: int = -1) -> bytes: 

562 return b"" 

563 

564 

565EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader() 

566 

567 

568class DataQueue(Generic[_T]): 

569 """DataQueue is a general-purpose blocking queue with one reader.""" 

570 

571 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

572 self._loop = loop 

573 self._eof = False 

574 self._waiter: Optional[asyncio.Future[None]] = None 

575 self._exception: Optional[BaseException] = None 

576 self._size = 0 

577 self._buffer: Deque[Tuple[_T, int]] = collections.deque() 

578 

579 def __len__(self) -> int: 

580 return len(self._buffer) 

581 

582 def is_eof(self) -> bool: 

583 return self._eof 

584 

585 def at_eof(self) -> bool: 

586 return self._eof and not self._buffer 

587 

588 def exception(self) -> Optional[BaseException]: 

589 return self._exception 

590 

591 def set_exception(self, exc: BaseException) -> None: 

592 self._eof = True 

593 self._exception = exc 

594 

595 waiter = self._waiter 

596 if waiter is not None: 

597 self._waiter = None 

598 set_exception(waiter, exc) 

599 

600 def feed_data(self, data: _T, size: int = 0) -> None: 

601 self._size += size 

602 self._buffer.append((data, size)) 

603 

604 waiter = self._waiter 

605 if waiter is not None: 

606 self._waiter = None 

607 set_result(waiter, None) 

608 

609 def feed_eof(self) -> None: 

610 self._eof = True 

611 

612 waiter = self._waiter 

613 if waiter is not None: 

614 self._waiter = None 

615 set_result(waiter, None) 

616 

617 async def read(self) -> _T: 

618 if not self._buffer and not self._eof: 

619 assert not self._waiter 

620 self._waiter = self._loop.create_future() 

621 try: 

622 await self._waiter 

623 except (asyncio.CancelledError, asyncio.TimeoutError): 

624 self._waiter = None 

625 raise 

626 

627 if self._buffer: 

628 data, size = self._buffer.popleft() 

629 self._size -= size 

630 return data 

631 else: 

632 if self._exception is not None: 

633 raise self._exception 

634 else: 

635 raise EofStream 

636 

637 def __aiter__(self) -> AsyncStreamIterator[_T]: 

638 return AsyncStreamIterator(self.read) 

639 

640 

641class FlowControlDataQueue(DataQueue[_T]): 

642 """FlowControlDataQueue resumes and pauses an underlying stream. 

643 

644 It is a destination for parsed data. 

645 """ 

646 

647 def __init__( 

648 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop 

649 ) -> None: 

650 super().__init__(loop=loop) 

651 

652 self._protocol = protocol 

653 self._limit = limit * 2 

654 

655 def feed_data(self, data: _T, size: int = 0) -> None: 

656 super().feed_data(data, size) 

657 

658 if self._size > self._limit and not self._protocol._reading_paused: 

659 self._protocol.pause_reading() 

660 

661 async def read(self) -> _T: 

662 try: 

663 return await super().read() 

664 finally: 

665 if self._size < self._limit and self._protocol._reading_paused: 

666 self._protocol.resume_reading()