Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/streams.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

396 statements  

1import asyncio 

2import collections 

3import warnings 

4from typing import ( 

5 Awaitable, 

6 Callable, 

7 Deque, 

8 Final, 

9 Generic, 

10 List, 

11 Optional, 

12 Tuple, 

13 TypeVar, 

14) 

15 

16from .base_protocol import BaseProtocol 

17from .helpers import ( 

18 _EXC_SENTINEL, 

19 BaseTimerContext, 

20 TimerNoop, 

21 set_exception, 

22 set_result, 

23) 

24from .log import internal_logger 

25 

26__all__ = ( 

27 "EMPTY_PAYLOAD", 

28 "EofStream", 

29 "StreamReader", 

30 "DataQueue", 

31 "FlowControlDataQueue", 

32) 

33 

34_T = TypeVar("_T") 

35 

36 

37class EofStream(Exception): 

38 """eof stream indication.""" 

39 

40 

41class AsyncStreamIterator(Generic[_T]): 

42 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None: 

43 self.read_func = read_func 

44 

45 def __aiter__(self) -> "AsyncStreamIterator[_T]": 

46 return self 

47 

48 async def __anext__(self) -> _T: 

49 try: 

50 rv = await self.read_func() 

51 except EofStream: 

52 raise StopAsyncIteration 

53 if rv == b"": 

54 raise StopAsyncIteration 

55 return rv 

56 

57 

58class ChunkTupleAsyncStreamIterator: 

59 def __init__(self, stream: "StreamReader") -> None: 

60 self._stream = stream 

61 

62 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator": 

63 return self 

64 

65 async def __anext__(self) -> Tuple[bytes, bool]: 

66 rv = await self._stream.readchunk() 

67 if rv == (b"", False): 

68 raise StopAsyncIteration 

69 return rv 

70 

71 

72class AsyncStreamReaderMixin: 

73 def __aiter__(self) -> AsyncStreamIterator[bytes]: 

74 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined] 

75 

76 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: 

77 """Returns an asynchronous iterator that yields chunks of size n.""" 

78 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined] 

79 

80 def iter_any(self) -> AsyncStreamIterator[bytes]: 

81 """Yield all available data as soon as it is received.""" 

82 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined] 

83 

84 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: 

85 """Yield chunks of data as they are received by the server. 

86 

87 The yielded objects are tuples 

88 of (bytes, bool) as returned by the StreamReader.readchunk method. 

89 """ 

90 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type] 

91 

92 

93class StreamReader(AsyncStreamReaderMixin): 

94 """An enhancement of asyncio.StreamReader. 

95 

96 Supports asynchronous iteration by line, chunk or as available:: 

97 

98 async for line in reader: 

99 ... 

100 async for chunk in reader.iter_chunked(1024): 

101 ... 

102 async for slice in reader.iter_any(): 

103 ... 

104 

105 """ 

106 

107 total_bytes = 0 

108 

109 def __init__( 

110 self, 

111 protocol: BaseProtocol, 

112 limit: int, 

113 *, 

114 timer: Optional[BaseTimerContext] = None, 

115 loop: Optional[asyncio.AbstractEventLoop] = None, 

116 ) -> None: 

117 self._protocol = protocol 

118 self._low_water = limit 

119 self._high_water = limit * 2 

120 if loop is None: 

121 loop = asyncio.get_event_loop() 

122 self._loop = loop 

123 self._size = 0 

124 self._cursor = 0 

125 self._http_chunk_splits: Optional[List[int]] = None 

126 self._buffer: Deque[bytes] = collections.deque() 

127 self._buffer_offset = 0 

128 self._eof = False 

129 self._waiter: Optional[asyncio.Future[None]] = None 

130 self._eof_waiter: Optional[asyncio.Future[None]] = None 

131 self._exception: Optional[BaseException] = None 

132 self._timer = TimerNoop() if timer is None else timer 

133 self._eof_callbacks: List[Callable[[], None]] = [] 

134 

135 def __repr__(self) -> str: 

136 info = [self.__class__.__name__] 

137 if self._size: 

138 info.append("%d bytes" % self._size) 

139 if self._eof: 

140 info.append("eof") 

141 if self._low_water != 2**16: # default limit 

142 info.append("low=%d high=%d" % (self._low_water, self._high_water)) 

143 if self._waiter: 

144 info.append("w=%r" % self._waiter) 

145 if self._exception: 

146 info.append("e=%r" % self._exception) 

147 return "<%s>" % " ".join(info) 

148 

149 def get_read_buffer_limits(self) -> Tuple[int, int]: 

150 return (self._low_water, self._high_water) 

151 

152 def exception(self) -> Optional[BaseException]: 

153 return self._exception 

154 

155 def set_exception( 

156 self, 

157 exc: BaseException, 

158 exc_cause: BaseException = _EXC_SENTINEL, 

159 ) -> None: 

160 self._exception = exc 

161 self._eof_callbacks.clear() 

162 

163 waiter = self._waiter 

164 if waiter is not None: 

165 self._waiter = None 

166 set_exception(waiter, exc, exc_cause) 

167 

168 waiter = self._eof_waiter 

169 if waiter is not None: 

170 self._eof_waiter = None 

171 set_exception(waiter, exc, exc_cause) 

172 

173 def on_eof(self, callback: Callable[[], None]) -> None: 

174 if self._eof: 

175 try: 

176 callback() 

177 except Exception: 

178 internal_logger.exception("Exception in eof callback") 

179 else: 

180 self._eof_callbacks.append(callback) 

181 

182 def feed_eof(self) -> None: 

183 self._eof = True 

184 

185 waiter = self._waiter 

186 if waiter is not None: 

187 self._waiter = None 

188 set_result(waiter, None) 

189 

190 waiter = self._eof_waiter 

191 if waiter is not None: 

192 self._eof_waiter = None 

193 set_result(waiter, None) 

194 

195 for cb in self._eof_callbacks: 

196 try: 

197 cb() 

198 except Exception: 

199 internal_logger.exception("Exception in eof callback") 

200 

201 self._eof_callbacks.clear() 

202 

203 def is_eof(self) -> bool: 

204 """Return True if 'feed_eof' was called.""" 

205 return self._eof 

206 

207 def at_eof(self) -> bool: 

208 """Return True if the buffer is empty and 'feed_eof' was called.""" 

209 return self._eof and not self._buffer 

210 

211 async def wait_eof(self) -> None: 

212 if self._eof: 

213 return 

214 

215 assert self._eof_waiter is None 

216 self._eof_waiter = self._loop.create_future() 

217 try: 

218 await self._eof_waiter 

219 finally: 

220 self._eof_waiter = None 

221 

222 def unread_data(self, data: bytes) -> None: 

223 """rollback reading some data from stream, inserting it to buffer head.""" 

224 warnings.warn( 

225 "unread_data() is deprecated " 

226 "and will be removed in future releases (#3260)", 

227 DeprecationWarning, 

228 stacklevel=2, 

229 ) 

230 if not data: 

231 return 

232 

233 if self._buffer_offset: 

234 self._buffer[0] = self._buffer[0][self._buffer_offset :] 

235 self._buffer_offset = 0 

236 self._size += len(data) 

237 self._cursor -= len(data) 

238 self._buffer.appendleft(data) 

239 self._eof_counter = 0 

240 

241 # TODO: size is ignored, remove the param later 

242 def feed_data(self, data: bytes, size: int = 0) -> None: 

243 assert not self._eof, "feed_data after feed_eof" 

244 

245 if not data: 

246 return 

247 

248 self._size += len(data) 

249 self._buffer.append(data) 

250 self.total_bytes += len(data) 

251 

252 waiter = self._waiter 

253 if waiter is not None: 

254 self._waiter = None 

255 set_result(waiter, None) 

256 

257 if self._size > self._high_water and not self._protocol._reading_paused: 

258 self._protocol.pause_reading() 

259 

260 def begin_http_chunk_receiving(self) -> None: 

261 if self._http_chunk_splits is None: 

262 if self.total_bytes: 

263 raise RuntimeError( 

264 "Called begin_http_chunk_receiving when" "some data was already fed" 

265 ) 

266 self._http_chunk_splits = [] 

267 

268 def end_http_chunk_receiving(self) -> None: 

269 if self._http_chunk_splits is None: 

270 raise RuntimeError( 

271 "Called end_chunk_receiving without calling " 

272 "begin_chunk_receiving first" 

273 ) 

274 

275 # self._http_chunk_splits contains logical byte offsets from start of 

276 # the body transfer. Each offset is the offset of the end of a chunk. 

277 # "Logical" means bytes, accessible for a user. 

278 # If no chunks containing logical data were received, current position 

279 # is difinitely zero. 

280 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0 

281 

282 if self.total_bytes == pos: 

283 # We should not add empty chunks here. So we check for that. 

284 # Note, when chunked + gzip is used, we can receive a chunk 

285 # of compressed data, but that data may not be enough for gzip FSM 

286 # to yield any uncompressed data. That's why current position may 

287 # not change after receiving a chunk. 

288 return 

289 

290 self._http_chunk_splits.append(self.total_bytes) 

291 

292 # wake up readchunk when end of http chunk received 

293 waiter = self._waiter 

294 if waiter is not None: 

295 self._waiter = None 

296 set_result(waiter, None) 

297 

298 async def _wait(self, func_name: str) -> None: 

299 # StreamReader uses a future to link the protocol feed_data() method 

300 # to a read coroutine. Running two read coroutines at the same time 

301 # would have an unexpected behaviour. It would not possible to know 

302 # which coroutine would get the next data. 

303 if self._waiter is not None: 

304 raise RuntimeError( 

305 "%s() called while another coroutine is " 

306 "already waiting for incoming data" % func_name 

307 ) 

308 

309 waiter = self._waiter = self._loop.create_future() 

310 try: 

311 with self._timer: 

312 await waiter 

313 finally: 

314 self._waiter = None 

315 

316 async def readline(self) -> bytes: 

317 return await self.readuntil() 

318 

319 async def readuntil(self, separator: bytes = b"\n") -> bytes: 

320 seplen = len(separator) 

321 if seplen == 0: 

322 raise ValueError("Separator should be at least one-byte string") 

323 

324 if self._exception is not None: 

325 raise self._exception 

326 

327 chunk = b"" 

328 chunk_size = 0 

329 not_enough = True 

330 

331 while not_enough: 

332 while self._buffer and not_enough: 

333 offset = self._buffer_offset 

334 ichar = self._buffer[0].find(separator, offset) + 1 

335 # Read from current offset to found separator or to the end. 

336 data = self._read_nowait_chunk( 

337 ichar - offset + seplen - 1 if ichar else -1 

338 ) 

339 chunk += data 

340 chunk_size += len(data) 

341 if ichar: 

342 not_enough = False 

343 

344 if chunk_size > self._high_water: 

345 raise ValueError("Chunk too big") 

346 

347 if self._eof: 

348 break 

349 

350 if not_enough: 

351 await self._wait("readuntil") 

352 

353 return chunk 

354 

355 async def read(self, n: int = -1) -> bytes: 

356 if self._exception is not None: 

357 raise self._exception 

358 

359 # migration problem; with DataQueue you have to catch 

360 # EofStream exception, so common way is to run payload.read() inside 

361 # infinite loop. what can cause real infinite loop with StreamReader 

362 # lets keep this code one major release. 

363 if __debug__: 

364 if self._eof and not self._buffer: 

365 self._eof_counter = getattr(self, "_eof_counter", 0) + 1 

366 if self._eof_counter > 5: 

367 internal_logger.warning( 

368 "Multiple access to StreamReader in eof state, " 

369 "might be infinite loop.", 

370 stack_info=True, 

371 ) 

372 

373 if not n: 

374 return b"" 

375 

376 if n < 0: 

377 # This used to just loop creating a new waiter hoping to 

378 # collect everything in self._buffer, but that would 

379 # deadlock if the subprocess sends more than self.limit 

380 # bytes. So just call self.readany() until EOF. 

381 blocks = [] 

382 while True: 

383 block = await self.readany() 

384 if not block: 

385 break 

386 blocks.append(block) 

387 return b"".join(blocks) 

388 

389 # TODO: should be `if` instead of `while` 

390 # because waiter maybe triggered on chunk end, 

391 # without feeding any data 

392 while not self._buffer and not self._eof: 

393 await self._wait("read") 

394 

395 return self._read_nowait(n) 

396 

397 async def readany(self) -> bytes: 

398 if self._exception is not None: 

399 raise self._exception 

400 

401 # TODO: should be `if` instead of `while` 

402 # because waiter maybe triggered on chunk end, 

403 # without feeding any data 

404 while not self._buffer and not self._eof: 

405 await self._wait("readany") 

406 

407 return self._read_nowait(-1) 

408 

409 async def readchunk(self) -> Tuple[bytes, bool]: 

410 """Returns a tuple of (data, end_of_http_chunk). 

411 

412 When chunked transfer 

413 encoding is used, end_of_http_chunk is a boolean indicating if the end 

414 of the data corresponds to the end of a HTTP chunk , otherwise it is 

415 always False. 

416 """ 

417 while True: 

418 if self._exception is not None: 

419 raise self._exception 

420 

421 while self._http_chunk_splits: 

422 pos = self._http_chunk_splits.pop(0) 

423 if pos == self._cursor: 

424 return (b"", True) 

425 if pos > self._cursor: 

426 return (self._read_nowait(pos - self._cursor), True) 

427 internal_logger.warning( 

428 "Skipping HTTP chunk end due to data " 

429 "consumption beyond chunk boundary" 

430 ) 

431 

432 if self._buffer: 

433 return (self._read_nowait_chunk(-1), False) 

434 # return (self._read_nowait(-1), False) 

435 

436 if self._eof: 

437 # Special case for signifying EOF. 

438 # (b'', True) is not a final return value actually. 

439 return (b"", False) 

440 

441 await self._wait("readchunk") 

442 

443 async def readexactly(self, n: int) -> bytes: 

444 if self._exception is not None: 

445 raise self._exception 

446 

447 blocks: List[bytes] = [] 

448 while n > 0: 

449 block = await self.read(n) 

450 if not block: 

451 partial = b"".join(blocks) 

452 raise asyncio.IncompleteReadError(partial, len(partial) + n) 

453 blocks.append(block) 

454 n -= len(block) 

455 

456 return b"".join(blocks) 

457 

458 def read_nowait(self, n: int = -1) -> bytes: 

459 # default was changed to be consistent with .read(-1) 

460 # 

461 # I believe the most users don't know about the method and 

462 # they are not affected. 

463 if self._exception is not None: 

464 raise self._exception 

465 

466 if self._waiter and not self._waiter.done(): 

467 raise RuntimeError( 

468 "Called while some coroutine is waiting for incoming data." 

469 ) 

470 

471 return self._read_nowait(n) 

472 

473 def _read_nowait_chunk(self, n: int) -> bytes: 

474 first_buffer = self._buffer[0] 

475 offset = self._buffer_offset 

476 if n != -1 and len(first_buffer) - offset > n: 

477 data = first_buffer[offset : offset + n] 

478 self._buffer_offset += n 

479 

480 elif offset: 

481 self._buffer.popleft() 

482 data = first_buffer[offset:] 

483 self._buffer_offset = 0 

484 

485 else: 

486 data = self._buffer.popleft() 

487 

488 self._size -= len(data) 

489 self._cursor += len(data) 

490 

491 chunk_splits = self._http_chunk_splits 

492 # Prevent memory leak: drop useless chunk splits 

493 while chunk_splits and chunk_splits[0] < self._cursor: 

494 chunk_splits.pop(0) 

495 

496 if self._size < self._low_water and self._protocol._reading_paused: 

497 self._protocol.resume_reading() 

498 return data 

499 

500 def _read_nowait(self, n: int) -> bytes: 

501 """Read not more than n bytes, or whole buffer if n == -1""" 

502 self._timer.assert_timeout() 

503 

504 chunks = [] 

505 while self._buffer: 

506 chunk = self._read_nowait_chunk(n) 

507 chunks.append(chunk) 

508 if n != -1: 

509 n -= len(chunk) 

510 if n == 0: 

511 break 

512 

513 return b"".join(chunks) if chunks else b"" 

514 

515 

516class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init] 

517 def __init__(self) -> None: 

518 self._read_eof_chunk = False 

519 

520 def __repr__(self) -> str: 

521 return "<%s>" % self.__class__.__name__ 

522 

523 def exception(self) -> Optional[BaseException]: 

524 return None 

525 

526 def set_exception( 

527 self, 

528 exc: BaseException, 

529 exc_cause: BaseException = _EXC_SENTINEL, 

530 ) -> None: 

531 pass 

532 

533 def on_eof(self, callback: Callable[[], None]) -> None: 

534 try: 

535 callback() 

536 except Exception: 

537 internal_logger.exception("Exception in eof callback") 

538 

539 def feed_eof(self) -> None: 

540 pass 

541 

542 def is_eof(self) -> bool: 

543 return True 

544 

545 def at_eof(self) -> bool: 

546 return True 

547 

548 async def wait_eof(self) -> None: 

549 return 

550 

551 def feed_data(self, data: bytes, n: int = 0) -> None: 

552 pass 

553 

554 async def readline(self) -> bytes: 

555 return b"" 

556 

557 async def read(self, n: int = -1) -> bytes: 

558 return b"" 

559 

560 # TODO add async def readuntil 

561 

562 async def readany(self) -> bytes: 

563 return b"" 

564 

565 async def readchunk(self) -> Tuple[bytes, bool]: 

566 if not self._read_eof_chunk: 

567 self._read_eof_chunk = True 

568 return (b"", False) 

569 

570 return (b"", True) 

571 

572 async def readexactly(self, n: int) -> bytes: 

573 raise asyncio.IncompleteReadError(b"", n) 

574 

575 def read_nowait(self, n: int = -1) -> bytes: 

576 return b"" 

577 

578 

579EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader() 

580 

581 

582class DataQueue(Generic[_T]): 

583 """DataQueue is a general-purpose blocking queue with one reader.""" 

584 

585 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

586 self._loop = loop 

587 self._eof = False 

588 self._waiter: Optional[asyncio.Future[None]] = None 

589 self._exception: Optional[BaseException] = None 

590 self._size = 0 

591 self._buffer: Deque[Tuple[_T, int]] = collections.deque() 

592 

593 def __len__(self) -> int: 

594 return len(self._buffer) 

595 

596 def is_eof(self) -> bool: 

597 return self._eof 

598 

599 def at_eof(self) -> bool: 

600 return self._eof and not self._buffer 

601 

602 def exception(self) -> Optional[BaseException]: 

603 return self._exception 

604 

605 def set_exception( 

606 self, 

607 exc: BaseException, 

608 exc_cause: BaseException = _EXC_SENTINEL, 

609 ) -> None: 

610 self._eof = True 

611 self._exception = exc 

612 

613 waiter = self._waiter 

614 if waiter is not None: 

615 self._waiter = None 

616 set_exception(waiter, exc, exc_cause) 

617 

618 def feed_data(self, data: _T, size: int = 0) -> None: 

619 self._size += size 

620 self._buffer.append((data, size)) 

621 

622 waiter = self._waiter 

623 if waiter is not None: 

624 self._waiter = None 

625 set_result(waiter, None) 

626 

627 def feed_eof(self) -> None: 

628 self._eof = True 

629 

630 waiter = self._waiter 

631 if waiter is not None: 

632 self._waiter = None 

633 set_result(waiter, None) 

634 

635 async def read(self) -> _T: 

636 if not self._buffer and not self._eof: 

637 assert not self._waiter 

638 self._waiter = self._loop.create_future() 

639 try: 

640 await self._waiter 

641 except (asyncio.CancelledError, asyncio.TimeoutError): 

642 self._waiter = None 

643 raise 

644 

645 if self._buffer: 

646 data, size = self._buffer.popleft() 

647 self._size -= size 

648 return data 

649 else: 

650 if self._exception is not None: 

651 raise self._exception 

652 else: 

653 raise EofStream 

654 

655 def __aiter__(self) -> AsyncStreamIterator[_T]: 

656 return AsyncStreamIterator(self.read) 

657 

658 

659class FlowControlDataQueue(DataQueue[_T]): 

660 """FlowControlDataQueue resumes and pauses an underlying stream. 

661 

662 It is a destination for parsed data. 

663 """ 

664 

665 def __init__( 

666 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop 

667 ) -> None: 

668 super().__init__(loop=loop) 

669 

670 self._protocol = protocol 

671 self._limit = limit * 2 

672 

673 def feed_data(self, data: _T, size: int = 0) -> None: 

674 super().feed_data(data, size) 

675 

676 if self._size > self._limit and not self._protocol._reading_paused: 

677 self._protocol.pause_reading() 

678 

679 async def read(self) -> _T: 

680 try: 

681 return await super().read() 

682 finally: 

683 if self._size < self._limit and self._protocol._reading_paused: 

684 self._protocol.resume_reading()