Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/streams.py: 21%

393 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:56 +0000

1import asyncio 

2import collections 

3import warnings 

4from typing import Awaitable, Callable, Deque, Generic, List, Optional, Tuple, TypeVar 

5 

6from .base_protocol import BaseProtocol 

7from .helpers import BaseTimerContext, set_exception, set_result 

8from .log import internal_logger 

9from .typedefs import Final 

10 

11__all__ = ( 

12 "EMPTY_PAYLOAD", 

13 "EofStream", 

14 "StreamReader", 

15 "DataQueue", 

16 "FlowControlDataQueue", 

17) 

18 

19_T = TypeVar("_T") 

20 

21 

22class EofStream(Exception): 

23 """eof stream indication.""" 

24 

25 

26class AsyncStreamIterator(Generic[_T]): 

27 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None: 

28 self.read_func = read_func 

29 

30 def __aiter__(self) -> "AsyncStreamIterator[_T]": 

31 return self 

32 

33 async def __anext__(self) -> _T: 

34 try: 

35 rv = await self.read_func() 

36 except EofStream: 

37 raise StopAsyncIteration 

38 if rv == b"": 

39 raise StopAsyncIteration 

40 return rv 

41 

42 

43class ChunkTupleAsyncStreamIterator: 

44 def __init__(self, stream: "StreamReader") -> None: 

45 self._stream = stream 

46 

47 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator": 

48 return self 

49 

50 async def __anext__(self) -> Tuple[bytes, bool]: 

51 rv = await self._stream.readchunk() 

52 if rv == (b"", False): 

53 raise StopAsyncIteration 

54 return rv 

55 

56 

57class AsyncStreamReaderMixin: 

58 def __aiter__(self) -> AsyncStreamIterator[bytes]: 

59 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined] 

60 

61 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: 

62 """Returns an asynchronous iterator that yields chunks of size n. 

63 

64 Python-3.5 available for Python 3.5+ only 

65 """ 

66 return AsyncStreamIterator( 

67 lambda: self.read(n) # type: ignore[attr-defined,no-any-return] 

68 ) 

69 

70 def iter_any(self) -> AsyncStreamIterator[bytes]: 

71 """Yield all available data as soon as it is received. 

72 

73 Python-3.5 available for Python 3.5+ only 

74 """ 

75 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined] 

76 

77 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: 

78 """Yield chunks of data as they are received by the server. 

79 

80 The yielded objects are tuples 

81 of (bytes, bool) as returned by the StreamReader.readchunk method. 

82 

83 Python-3.5 available for Python 3.5+ only 

84 """ 

85 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type] 

86 

87 

88class StreamReader(AsyncStreamReaderMixin): 

89 """An enhancement of asyncio.StreamReader. 

90 

91 Supports asynchronous iteration by line, chunk or as available:: 

92 

93 async for line in reader: 

94 ... 

95 async for chunk in reader.iter_chunked(1024): 

96 ... 

97 async for slice in reader.iter_any(): 

98 ... 

99 

100 """ 

101 

102 total_bytes = 0 

103 

104 def __init__( 

105 self, 

106 protocol: BaseProtocol, 

107 limit: int, 

108 *, 

109 timer: Optional[BaseTimerContext] = None, 

110 loop: Optional[asyncio.AbstractEventLoop] = None, 

111 ) -> None: 

112 self._protocol = protocol 

113 self._low_water = limit 

114 self._high_water = limit * 2 

115 if loop is None: 

116 loop = asyncio.get_event_loop() 

117 self._loop = loop 

118 self._size = 0 

119 self._cursor = 0 

120 self._http_chunk_splits: Optional[List[int]] = None 

121 self._buffer: Deque[bytes] = collections.deque() 

122 self._buffer_offset = 0 

123 self._eof = False 

124 self._waiter: Optional[asyncio.Future[None]] = None 

125 self._eof_waiter: Optional[asyncio.Future[None]] = None 

126 self._exception: Optional[BaseException] = None 

127 self._timer = timer 

128 self._eof_callbacks: List[Callable[[], None]] = [] 

129 

130 def __repr__(self) -> str: 

131 info = [self.__class__.__name__] 

132 if self._size: 

133 info.append("%d bytes" % self._size) 

134 if self._eof: 

135 info.append("eof") 

136 if self._low_water != 2**16: # default limit 

137 info.append("low=%d high=%d" % (self._low_water, self._high_water)) 

138 if self._waiter: 

139 info.append("w=%r" % self._waiter) 

140 if self._exception: 

141 info.append("e=%r" % self._exception) 

142 return "<%s>" % " ".join(info) 

143 

144 def get_read_buffer_limits(self) -> Tuple[int, int]: 

145 return (self._low_water, self._high_water) 

146 

147 def exception(self) -> Optional[BaseException]: 

148 return self._exception 

149 

150 def set_exception(self, exc: BaseException) -> None: 

151 self._exception = exc 

152 self._eof_callbacks.clear() 

153 

154 waiter = self._waiter 

155 if waiter is not None: 

156 self._waiter = None 

157 set_exception(waiter, exc) 

158 

159 waiter = self._eof_waiter 

160 if waiter is not None: 

161 self._eof_waiter = None 

162 set_exception(waiter, exc) 

163 

164 def on_eof(self, callback: Callable[[], None]) -> None: 

165 if self._eof: 

166 try: 

167 callback() 

168 except Exception: 

169 internal_logger.exception("Exception in eof callback") 

170 else: 

171 self._eof_callbacks.append(callback) 

172 

173 def feed_eof(self) -> None: 

174 self._eof = True 

175 

176 waiter = self._waiter 

177 if waiter is not None: 

178 self._waiter = None 

179 set_result(waiter, None) 

180 

181 waiter = self._eof_waiter 

182 if waiter is not None: 

183 self._eof_waiter = None 

184 set_result(waiter, None) 

185 

186 for cb in self._eof_callbacks: 

187 try: 

188 cb() 

189 except Exception: 

190 internal_logger.exception("Exception in eof callback") 

191 

192 self._eof_callbacks.clear() 

193 

194 def is_eof(self) -> bool: 

195 """Return True if 'feed_eof' was called.""" 

196 return self._eof 

197 

198 def at_eof(self) -> bool: 

199 """Return True if the buffer is empty and 'feed_eof' was called.""" 

200 return self._eof and not self._buffer 

201 

202 async def wait_eof(self) -> None: 

203 if self._eof: 

204 return 

205 

206 assert self._eof_waiter is None 

207 self._eof_waiter = self._loop.create_future() 

208 try: 

209 await self._eof_waiter 

210 finally: 

211 self._eof_waiter = None 

212 

213 def unread_data(self, data: bytes) -> None: 

214 """rollback reading some data from stream, inserting it to buffer head.""" 

215 warnings.warn( 

216 "unread_data() is deprecated " 

217 "and will be removed in future releases (#3260)", 

218 DeprecationWarning, 

219 stacklevel=2, 

220 ) 

221 if not data: 

222 return 

223 

224 if self._buffer_offset: 

225 self._buffer[0] = self._buffer[0][self._buffer_offset :] 

226 self._buffer_offset = 0 

227 self._size += len(data) 

228 self._cursor -= len(data) 

229 self._buffer.appendleft(data) 

230 self._eof_counter = 0 

231 

232 # TODO: size is ignored, remove the param later 

233 def feed_data(self, data: bytes, size: int = 0) -> None: 

234 assert not self._eof, "feed_data after feed_eof" 

235 

236 if not data: 

237 return 

238 

239 self._size += len(data) 

240 self._buffer.append(data) 

241 self.total_bytes += len(data) 

242 

243 waiter = self._waiter 

244 if waiter is not None: 

245 self._waiter = None 

246 set_result(waiter, None) 

247 

248 if self._size > self._high_water and not self._protocol._reading_paused: 

249 self._protocol.pause_reading() 

250 

251 def begin_http_chunk_receiving(self) -> None: 

252 if self._http_chunk_splits is None: 

253 if self.total_bytes: 

254 raise RuntimeError( 

255 "Called begin_http_chunk_receiving when" "some data was already fed" 

256 ) 

257 self._http_chunk_splits = [] 

258 

259 def end_http_chunk_receiving(self) -> None: 

260 if self._http_chunk_splits is None: 

261 raise RuntimeError( 

262 "Called end_chunk_receiving without calling " 

263 "begin_chunk_receiving first" 

264 ) 

265 

266 # self._http_chunk_splits contains logical byte offsets from start of 

267 # the body transfer. Each offset is the offset of the end of a chunk. 

268 # "Logical" means bytes, accessible for a user. 

269 # If no chunks containig logical data were received, current position 

270 # is difinitely zero. 

271 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0 

272 

273 if self.total_bytes == pos: 

274 # We should not add empty chunks here. So we check for that. 

275 # Note, when chunked + gzip is used, we can receive a chunk 

276 # of compressed data, but that data may not be enough for gzip FSM 

277 # to yield any uncompressed data. That's why current position may 

278 # not change after receiving a chunk. 

279 return 

280 

281 self._http_chunk_splits.append(self.total_bytes) 

282 

283 # wake up readchunk when end of http chunk received 

284 waiter = self._waiter 

285 if waiter is not None: 

286 self._waiter = None 

287 set_result(waiter, None) 

288 

289 async def _wait(self, func_name: str) -> None: 

290 # StreamReader uses a future to link the protocol feed_data() method 

291 # to a read coroutine. Running two read coroutines at the same time 

292 # would have an unexpected behaviour. It would not possible to know 

293 # which coroutine would get the next data. 

294 if self._waiter is not None: 

295 raise RuntimeError( 

296 "%s() called while another coroutine is " 

297 "already waiting for incoming data" % func_name 

298 ) 

299 

300 waiter = self._waiter = self._loop.create_future() 

301 try: 

302 if self._timer: 

303 with self._timer: 

304 await waiter 

305 else: 

306 await waiter 

307 finally: 

308 self._waiter = None 

309 

310 async def readline(self) -> bytes: 

311 return await self.readuntil() 

312 

313 async def readuntil(self, separator: bytes = b"\n") -> bytes: 

314 seplen = len(separator) 

315 if seplen == 0: 

316 raise ValueError("Separator should be at least one-byte string") 

317 

318 if self._exception is not None: 

319 raise self._exception 

320 

321 chunk = b"" 

322 chunk_size = 0 

323 not_enough = True 

324 

325 while not_enough: 

326 while self._buffer and not_enough: 

327 offset = self._buffer_offset 

328 ichar = self._buffer[0].find(separator, offset) + 1 

329 # Read from current offset to found separator or to the end. 

330 data = self._read_nowait_chunk(ichar - offset if ichar else -1) 

331 chunk += data 

332 chunk_size += len(data) 

333 if ichar: 

334 not_enough = False 

335 

336 if chunk_size > self._high_water: 

337 raise ValueError("Chunk too big") 

338 

339 if self._eof: 

340 break 

341 

342 if not_enough: 

343 await self._wait("readuntil") 

344 

345 return chunk 

346 

347 async def read(self, n: int = -1) -> bytes: 

348 if self._exception is not None: 

349 raise self._exception 

350 

351 # migration problem; with DataQueue you have to catch 

352 # EofStream exception, so common way is to run payload.read() inside 

353 # infinite loop. what can cause real infinite loop with StreamReader 

354 # lets keep this code one major release. 

355 if __debug__: 

356 if self._eof and not self._buffer: 

357 self._eof_counter = getattr(self, "_eof_counter", 0) + 1 

358 if self._eof_counter > 5: 

359 internal_logger.warning( 

360 "Multiple access to StreamReader in eof state, " 

361 "might be infinite loop.", 

362 stack_info=True, 

363 ) 

364 

365 if not n: 

366 return b"" 

367 

368 if n < 0: 

369 # This used to just loop creating a new waiter hoping to 

370 # collect everything in self._buffer, but that would 

371 # deadlock if the subprocess sends more than self.limit 

372 # bytes. So just call self.readany() until EOF. 

373 blocks = [] 

374 while True: 

375 block = await self.readany() 

376 if not block: 

377 break 

378 blocks.append(block) 

379 return b"".join(blocks) 

380 

381 # TODO: should be `if` instead of `while` 

382 # because waiter maybe triggered on chunk end, 

383 # without feeding any data 

384 while not self._buffer and not self._eof: 

385 await self._wait("read") 

386 

387 return self._read_nowait(n) 

388 

389 async def readany(self) -> bytes: 

390 if self._exception is not None: 

391 raise self._exception 

392 

393 # TODO: should be `if` instead of `while` 

394 # because waiter maybe triggered on chunk end, 

395 # without feeding any data 

396 while not self._buffer and not self._eof: 

397 await self._wait("readany") 

398 

399 return self._read_nowait(-1) 

400 

401 async def readchunk(self) -> Tuple[bytes, bool]: 

402 """Returns a tuple of (data, end_of_http_chunk). 

403 

404 When chunked transfer 

405 encoding is used, end_of_http_chunk is a boolean indicating if the end 

406 of the data corresponds to the end of a HTTP chunk , otherwise it is 

407 always False. 

408 """ 

409 while True: 

410 if self._exception is not None: 

411 raise self._exception 

412 

413 while self._http_chunk_splits: 

414 pos = self._http_chunk_splits.pop(0) 

415 if pos == self._cursor: 

416 return (b"", True) 

417 if pos > self._cursor: 

418 return (self._read_nowait(pos - self._cursor), True) 

419 internal_logger.warning( 

420 "Skipping HTTP chunk end due to data " 

421 "consumption beyond chunk boundary" 

422 ) 

423 

424 if self._buffer: 

425 return (self._read_nowait_chunk(-1), False) 

426 # return (self._read_nowait(-1), False) 

427 

428 if self._eof: 

429 # Special case for signifying EOF. 

430 # (b'', True) is not a final return value actually. 

431 return (b"", False) 

432 

433 await self._wait("readchunk") 

434 

435 async def readexactly(self, n: int) -> bytes: 

436 if self._exception is not None: 

437 raise self._exception 

438 

439 blocks: List[bytes] = [] 

440 while n > 0: 

441 block = await self.read(n) 

442 if not block: 

443 partial = b"".join(blocks) 

444 raise asyncio.IncompleteReadError(partial, len(partial) + n) 

445 blocks.append(block) 

446 n -= len(block) 

447 

448 return b"".join(blocks) 

449 

450 def read_nowait(self, n: int = -1) -> bytes: 

451 # default was changed to be consistent with .read(-1) 

452 # 

453 # I believe the most users don't know about the method and 

454 # they are not affected. 

455 if self._exception is not None: 

456 raise self._exception 

457 

458 if self._waiter and not self._waiter.done(): 

459 raise RuntimeError( 

460 "Called while some coroutine is waiting for incoming data." 

461 ) 

462 

463 return self._read_nowait(n) 

464 

465 def _read_nowait_chunk(self, n: int) -> bytes: 

466 first_buffer = self._buffer[0] 

467 offset = self._buffer_offset 

468 if n != -1 and len(first_buffer) - offset > n: 

469 data = first_buffer[offset : offset + n] 

470 self._buffer_offset += n 

471 

472 elif offset: 

473 self._buffer.popleft() 

474 data = first_buffer[offset:] 

475 self._buffer_offset = 0 

476 

477 else: 

478 data = self._buffer.popleft() 

479 

480 self._size -= len(data) 

481 self._cursor += len(data) 

482 

483 chunk_splits = self._http_chunk_splits 

484 # Prevent memory leak: drop useless chunk splits 

485 while chunk_splits and chunk_splits[0] < self._cursor: 

486 chunk_splits.pop(0) 

487 

488 if self._size < self._low_water and self._protocol._reading_paused: 

489 self._protocol.resume_reading() 

490 return data 

491 

492 def _read_nowait(self, n: int) -> bytes: 

493 """Read not more than n bytes, or whole buffer if n == -1""" 

494 chunks = [] 

495 

496 while self._buffer: 

497 chunk = self._read_nowait_chunk(n) 

498 chunks.append(chunk) 

499 if n != -1: 

500 n -= len(chunk) 

501 if n == 0: 

502 break 

503 

504 return b"".join(chunks) if chunks else b"" 

505 

506 

507class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init] 

508 def __init__(self) -> None: 

509 pass 

510 

511 def exception(self) -> Optional[BaseException]: 

512 return None 

513 

514 def set_exception(self, exc: BaseException) -> None: 

515 pass 

516 

517 def on_eof(self, callback: Callable[[], None]) -> None: 

518 try: 

519 callback() 

520 except Exception: 

521 internal_logger.exception("Exception in eof callback") 

522 

523 def feed_eof(self) -> None: 

524 pass 

525 

526 def is_eof(self) -> bool: 

527 return True 

528 

529 def at_eof(self) -> bool: 

530 return True 

531 

532 async def wait_eof(self) -> None: 

533 return 

534 

535 def feed_data(self, data: bytes, n: int = 0) -> None: 

536 pass 

537 

538 async def readline(self) -> bytes: 

539 return b"" 

540 

541 async def read(self, n: int = -1) -> bytes: 

542 return b"" 

543 

544 # TODO add async def readuntil 

545 

546 async def readany(self) -> bytes: 

547 return b"" 

548 

549 async def readchunk(self) -> Tuple[bytes, bool]: 

550 return (b"", True) 

551 

552 async def readexactly(self, n: int) -> bytes: 

553 raise asyncio.IncompleteReadError(b"", n) 

554 

555 def read_nowait(self, n: int = -1) -> bytes: 

556 return b"" 

557 

558 

559EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader() 

560 

561 

562class DataQueue(Generic[_T]): 

563 """DataQueue is a general-purpose blocking queue with one reader.""" 

564 

565 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

566 self._loop = loop 

567 self._eof = False 

568 self._waiter: Optional[asyncio.Future[None]] = None 

569 self._exception: Optional[BaseException] = None 

570 self._size = 0 

571 self._buffer: Deque[Tuple[_T, int]] = collections.deque() 

572 

573 def __len__(self) -> int: 

574 return len(self._buffer) 

575 

576 def is_eof(self) -> bool: 

577 return self._eof 

578 

579 def at_eof(self) -> bool: 

580 return self._eof and not self._buffer 

581 

582 def exception(self) -> Optional[BaseException]: 

583 return self._exception 

584 

585 def set_exception(self, exc: BaseException) -> None: 

586 self._eof = True 

587 self._exception = exc 

588 

589 waiter = self._waiter 

590 if waiter is not None: 

591 self._waiter = None 

592 set_exception(waiter, exc) 

593 

594 def feed_data(self, data: _T, size: int = 0) -> None: 

595 self._size += size 

596 self._buffer.append((data, size)) 

597 

598 waiter = self._waiter 

599 if waiter is not None: 

600 self._waiter = None 

601 set_result(waiter, None) 

602 

603 def feed_eof(self) -> None: 

604 self._eof = True 

605 

606 waiter = self._waiter 

607 if waiter is not None: 

608 self._waiter = None 

609 set_result(waiter, None) 

610 

611 async def read(self) -> _T: 

612 if not self._buffer and not self._eof: 

613 assert not self._waiter 

614 self._waiter = self._loop.create_future() 

615 try: 

616 await self._waiter 

617 except (asyncio.CancelledError, asyncio.TimeoutError): 

618 self._waiter = None 

619 raise 

620 

621 if self._buffer: 

622 data, size = self._buffer.popleft() 

623 self._size -= size 

624 return data 

625 else: 

626 if self._exception is not None: 

627 raise self._exception 

628 else: 

629 raise EofStream 

630 

631 def __aiter__(self) -> AsyncStreamIterator[_T]: 

632 return AsyncStreamIterator(self.read) 

633 

634 

635class FlowControlDataQueue(DataQueue[_T]): 

636 """FlowControlDataQueue resumes and pauses an underlying stream. 

637 

638 It is a destination for parsed data. 

639 """ 

640 

641 def __init__( 

642 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop 

643 ) -> None: 

644 super().__init__(loop=loop) 

645 

646 self._protocol = protocol 

647 self._limit = limit * 2 

648 

649 def feed_data(self, data: _T, size: int = 0) -> None: 

650 super().feed_data(data, size) 

651 

652 if self._size > self._limit and not self._protocol._reading_paused: 

653 self._protocol.pause_reading() 

654 

655 async def read(self) -> _T: 

656 try: 

657 return await super().read() 

658 finally: 

659 if self._size < self._limit and self._protocol._reading_paused: 

660 self._protocol.resume_reading()