Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/streams.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

446 statements  

1import asyncio 

2import collections 

3import sys 

4import warnings 

5from collections.abc import Awaitable, Callable 

6from typing import Final, Generic, TypeVar 

7 

8from .base_protocol import BaseProtocol 

9from .helpers import ( 

10 _EXC_SENTINEL, 

11 DEFAULT_CHUNK_SIZE, 

12 BaseTimerContext, 

13 TimerNoop, 

14 set_exception, 

15 set_result, 

16) 

17from .http_exceptions import LineTooLong 

18from .log import internal_logger 

19 

20__all__ = ( 

21 "EMPTY_PAYLOAD", 

22 "EofStream", 

23 "StreamReader", 

24 "DataQueue", 

25) 

26 

27_T = TypeVar("_T") 

28 

29 

30class EofStream(Exception): 

31 """eof stream indication.""" 

32 

33 

34class AsyncStreamIterator(Generic[_T]): 

35 

36 __slots__ = ("read_func",) 

37 

38 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None: 

39 self.read_func = read_func 

40 

41 def __aiter__(self) -> "AsyncStreamIterator[_T]": 

42 return self 

43 

44 async def __anext__(self) -> _T: 

45 try: 

46 rv = await self.read_func() 

47 except EofStream: 

48 raise StopAsyncIteration 

49 if rv == b"": 

50 raise StopAsyncIteration 

51 return rv 

52 

53 

54class ChunkTupleAsyncStreamIterator: 

55 

56 __slots__ = ("_stream",) 

57 

58 def __init__(self, stream: "StreamReader") -> None: 

59 self._stream = stream 

60 

61 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator": 

62 return self 

63 

64 async def __anext__(self) -> tuple[bytes, bool]: 

65 rv = await self._stream.readchunk() 

66 if rv == (b"", False): 

67 raise StopAsyncIteration 

68 return rv 

69 

70 

71class StreamReader: 

72 """An enhancement of asyncio.StreamReader. 

73 

74 Supports asynchronous iteration by line, chunk or as available:: 

75 

76 async for line in reader: 

77 ... 

78 async for chunk in reader.iter_chunked(1024): 

79 ... 

80 async for slice in reader.iter_any(): 

81 ... 

82 

83 """ 

84 

85 __slots__ = ( 

86 "_protocol", 

87 "_low_water", 

88 "_high_water", 

89 "_low_water_chunks", 

90 "_high_water_chunks", 

91 "_loop", 

92 "_size", 

93 "_cursor", 

94 "_http_chunk_splits", 

95 "_buffer", 

96 "_buffer_offset", 

97 "_eof", 

98 "_waiter", 

99 "_eof_waiter", 

100 "_exception", 

101 "_timer", 

102 "_eof_callbacks", 

103 "_eof_counter", 

104 "total_bytes", 

105 "total_compressed_bytes", 

106 ) 

107 

108 def __init__( 

109 self, 

110 protocol: BaseProtocol, 

111 limit: int, 

112 *, 

113 timer: BaseTimerContext | None = None, 

114 loop: asyncio.AbstractEventLoop | None = None, 

115 ) -> None: 

116 self._protocol = protocol 

117 self._low_water = limit 

118 self._high_water = limit * 2 

119 if loop is None: 

120 loop = asyncio.get_event_loop() 

121 # Use max(4, ...) because there's always at least 1 chunk split remaining 

122 # (the current position), so we need low_water >= 2 to allow resume. 

123 # limit // 16 gets us a reasonable value of 16k with default 256KiB limit. 

124 self._high_water_chunks = max(4, limit // 16) 

125 self._low_water_chunks = self._high_water_chunks // 2 

126 self._loop = loop 

127 self._size = 0 

128 self._cursor = 0 

129 self._http_chunk_splits: collections.deque[int] | None = None 

130 self._buffer: collections.deque[bytes] = collections.deque() 

131 self._buffer_offset = 0 

132 self._eof = False 

133 self._waiter: asyncio.Future[None] | None = None 

134 self._eof_waiter: asyncio.Future[None] | None = None 

135 self._exception: BaseException | None = None 

136 self._timer = TimerNoop() if timer is None else timer 

137 self._eof_callbacks: list[Callable[[], None]] = [] 

138 self._eof_counter = 0 

139 self.total_bytes = 0 

140 self.total_compressed_bytes: int | None = None 

141 

142 def __repr__(self) -> str: 

143 info = [self.__class__.__name__] 

144 if self._size: 

145 info.append("%d bytes" % self._size) 

146 if self._eof: 

147 info.append("eof") 

148 if self._low_water != DEFAULT_CHUNK_SIZE: 

149 info.append("low=%d high=%d" % (self._low_water, self._high_water)) 

150 if self._waiter: 

151 info.append("w=%r" % self._waiter) 

152 if self._exception: 

153 info.append("e=%r" % self._exception) 

154 return "<%s>" % " ".join(info) 

155 

156 def __aiter__(self) -> AsyncStreamIterator[bytes]: 

157 return AsyncStreamIterator(self.readline) 

158 

159 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: 

160 """Returns an asynchronous iterator that yields chunks of size n.""" 

161 self.set_read_chunk_size(n) 

162 return AsyncStreamIterator(lambda: self.read(n)) 

163 

164 def iter_any(self) -> AsyncStreamIterator[bytes]: 

165 """Yield all available data as soon as it is received.""" 

166 return AsyncStreamIterator(self.readany) 

167 

168 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: 

169 """Yield chunks of data as they are received by the server. 

170 

171 The yielded objects are tuples 

172 of (bytes, bool) as returned by the StreamReader.readchunk method. 

173 """ 

174 return ChunkTupleAsyncStreamIterator(self) 

175 

176 def get_read_buffer_limits(self) -> tuple[int, int]: 

177 return (self._low_water, self._high_water) 

178 

179 def set_read_chunk_size(self, n: int) -> None: 

180 """Raise buffer limits to match the consumer's chunk size.""" 

181 if n > self._low_water: 

182 self._low_water = n 

183 self._high_water = n * 2 

184 

185 def exception(self) -> BaseException | None: 

186 return self._exception 

187 

188 def set_exception( 

189 self, 

190 exc: BaseException, 

191 exc_cause: BaseException = _EXC_SENTINEL, 

192 ) -> None: 

193 self._exception = exc 

194 self._eof_callbacks.clear() 

195 

196 waiter = self._waiter 

197 if waiter is not None: 

198 self._waiter = None 

199 set_exception(waiter, exc, exc_cause) 

200 

201 waiter = self._eof_waiter 

202 if waiter is not None: 

203 self._eof_waiter = None 

204 set_exception(waiter, exc, exc_cause) 

205 

206 def on_eof(self, callback: Callable[[], None]) -> None: 

207 if self._eof: 

208 try: 

209 callback() 

210 except Exception: 

211 internal_logger.exception("Exception in eof callback") 

212 else: 

213 self._eof_callbacks.append(callback) 

214 

215 def feed_eof(self) -> None: 

216 self._eof = True 

217 

218 waiter = self._waiter 

219 if waiter is not None: 

220 self._waiter = None 

221 set_result(waiter, None) 

222 

223 waiter = self._eof_waiter 

224 if waiter is not None: 

225 self._eof_waiter = None 

226 set_result(waiter, None) 

227 

228 # At EOF the parser is done, there won't be unprocessed data. 

229 self._protocol.resume_reading(resume_parser=False) 

230 

231 for cb in self._eof_callbacks: 

232 try: 

233 cb() 

234 except Exception: 

235 internal_logger.exception("Exception in eof callback") 

236 

237 self._eof_callbacks.clear() 

238 

239 def is_eof(self) -> bool: 

240 """Return True if 'feed_eof' was called.""" 

241 return self._eof 

242 

243 def at_eof(self) -> bool: 

244 """Return True if the buffer is empty and 'feed_eof' was called.""" 

245 return self._eof and not self._buffer 

246 

247 async def wait_eof(self) -> None: 

248 if self._eof: 

249 return 

250 

251 assert self._eof_waiter is None 

252 self._eof_waiter = self._loop.create_future() 

253 try: 

254 await self._eof_waiter 

255 finally: 

256 self._eof_waiter = None 

257 

258 @property 

259 def total_raw_bytes(self) -> int: 

260 if self.total_compressed_bytes is None: 

261 return self.total_bytes 

262 return self.total_compressed_bytes 

263 

264 def unread_data(self, data: bytes) -> None: 

265 """rollback reading some data from stream, inserting it to buffer head.""" 

266 warnings.warn( 

267 "unread_data() is deprecated " 

268 "and will be removed in future releases (#3260)", 

269 DeprecationWarning, 

270 stacklevel=2, 

271 ) 

272 if not data: 

273 return 

274 

275 if self._buffer_offset: 

276 self._buffer[0] = self._buffer[0][self._buffer_offset :] 

277 self._buffer_offset = 0 

278 self._size += len(data) 

279 self._cursor -= len(data) 

280 self._buffer.appendleft(data) 

281 self._eof_counter = 0 

282 

283 # TODO: size is ignored, remove the param later 

284 def feed_data(self, data: bytes, size: int = 0) -> bool: 

285 assert not self._eof, "feed_data after feed_eof" 

286 

287 if not data: 

288 return False 

289 

290 data_len = len(data) 

291 self._size += data_len 

292 self._buffer.append(data) 

293 self.total_bytes += data_len 

294 

295 waiter = self._waiter 

296 if waiter is not None: 

297 self._waiter = None 

298 set_result(waiter, None) 

299 

300 if self._size > self._high_water: 

301 self._protocol.pause_reading() 

302 return False 

303 

304 def begin_http_chunk_receiving(self) -> None: 

305 if self._http_chunk_splits is None: 

306 if self.total_bytes: 

307 raise RuntimeError( 

308 "Called begin_http_chunk_receiving when some data was already fed" 

309 ) 

310 self._http_chunk_splits = collections.deque() 

311 

312 def end_http_chunk_receiving(self) -> None: 

313 if self._http_chunk_splits is None: 

314 raise RuntimeError( 

315 "Called end_chunk_receiving without calling " 

316 "begin_chunk_receiving first" 

317 ) 

318 

319 # self._http_chunk_splits contains logical byte offsets from start of 

320 # the body transfer. Each offset is the offset of the end of a chunk. 

321 # "Logical" means bytes, accessible for a user. 

322 # If no chunks containing logical data were received, current position 

323 # is difinitely zero. 

324 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0 

325 

326 if self.total_bytes == pos: 

327 # We should not add empty chunks here. So we check for that. 

328 # Note, when chunked + gzip is used, we can receive a chunk 

329 # of compressed data, but that data may not be enough for gzip FSM 

330 # to yield any uncompressed data. That's why current position may 

331 # not change after receiving a chunk. 

332 return 

333 

334 self._http_chunk_splits.append(self.total_bytes) 

335 

336 # If we get too many small chunks before self._high_water is reached, then any 

337 # .read() call becomes computationally expensive, and could block the event loop 

338 # for too long, hence an additional self._high_water_chunks here. 

339 if len(self._http_chunk_splits) > self._high_water_chunks: 

340 self._protocol.pause_reading() 

341 

342 # wake up readchunk when end of http chunk received 

343 waiter = self._waiter 

344 if waiter is not None: 

345 self._waiter = None 

346 set_result(waiter, None) 

347 

348 async def _wait(self, func_name: str) -> None: 

349 if not self._protocol.connected: 

350 raise RuntimeError("Connection closed.") 

351 

352 # StreamReader uses a future to link the protocol feed_data() method 

353 # to a read coroutine. Running two read coroutines at the same time 

354 # would have an unexpected behaviour. It would not possible to know 

355 # which coroutine would get the next data. 

356 if self._waiter is not None: 

357 raise RuntimeError( 

358 "%s() called while another coroutine is " 

359 "already waiting for incoming data" % func_name 

360 ) 

361 

362 waiter = self._waiter = self._loop.create_future() 

363 try: 

364 with self._timer: 

365 await waiter 

366 finally: 

367 self._waiter = None 

368 

369 async def readline(self, *, max_line_length: int | None = None) -> bytes: 

370 return await self.readuntil(max_size=max_line_length) 

371 

372 async def readuntil( 

373 self, separator: bytes = b"\n", *, max_size: int | None = None 

374 ) -> bytes: 

375 seplen = len(separator) 

376 if seplen == 0: 

377 raise ValueError("Separator should be at least one-byte string") 

378 

379 if self._exception is not None: 

380 raise self._exception 

381 

382 chunk = b"" 

383 chunk_size = 0 

384 not_enough = True 

385 max_size = max_size or self._high_water 

386 

387 while not_enough: 

388 while self._buffer and not_enough: 

389 offset = self._buffer_offset 

390 ichar = self._buffer[0].find(separator, offset) + 1 

391 # Read from current offset to found separator or to the end. 

392 data = self._read_nowait_chunk( 

393 ichar - offset + seplen - 1 if ichar else -1 

394 ) 

395 chunk += data 

396 chunk_size += len(data) 

397 if ichar: 

398 not_enough = False 

399 

400 if chunk_size > max_size: 

401 raise LineTooLong(chunk[:100] + b"...", max_size) 

402 

403 if self._eof: 

404 break 

405 

406 if not_enough: 

407 await self._wait("readuntil") 

408 

409 return chunk 

410 

411 async def read(self, n: int = -1) -> bytes: 

412 if self._exception is not None: 

413 raise self._exception 

414 

415 # migration problem; with DataQueue you have to catch 

416 # EofStream exception, so common way is to run payload.read() inside 

417 # infinite loop. what can cause real infinite loop with StreamReader 

418 # lets keep this code one major release. 

419 if __debug__: 

420 if self._eof and not self._buffer: 

421 self._eof_counter = getattr(self, "_eof_counter", 0) + 1 

422 if self._eof_counter > 5: 

423 internal_logger.warning( 

424 "Multiple access to StreamReader in eof state, " 

425 "might be infinite loop.", 

426 stack_info=True, 

427 ) 

428 

429 if not n: 

430 return b"" 

431 

432 if n < 0: 

433 # Reading everything — remove decompression chunk limit. 

434 self.set_read_chunk_size(sys.maxsize) 

435 blocks = [] 

436 while True: 

437 block = await self.readany() 

438 if not block: 

439 break 

440 blocks.append(block) 

441 return b"".join(blocks) 

442 

443 self.set_read_chunk_size(n) 

444 # TODO: should be `if` instead of `while` 

445 # because waiter maybe triggered on chunk end, 

446 # without feeding any data 

447 while not self._buffer and not self._eof: 

448 await self._wait("read") 

449 

450 return self._read_nowait(n) 

451 

452 async def readany(self) -> bytes: 

453 if self._exception is not None: 

454 raise self._exception 

455 

456 # TODO: should be `if` instead of `while` 

457 # because waiter maybe triggered on chunk end, 

458 # without feeding any data 

459 while not self._buffer and not self._eof: 

460 await self._wait("readany") 

461 

462 return self._read_nowait(-1) 

463 

464 async def readchunk(self) -> tuple[bytes, bool]: 

465 """Returns a tuple of (data, end_of_http_chunk). 

466 

467 When chunked transfer 

468 encoding is used, end_of_http_chunk is a boolean indicating if the end 

469 of the data corresponds to the end of a HTTP chunk , otherwise it is 

470 always False. 

471 """ 

472 while True: 

473 if self._exception is not None: 

474 raise self._exception 

475 

476 while self._http_chunk_splits: 

477 pos = self._http_chunk_splits.popleft() 

478 if pos == self._cursor: 

479 return (b"", True) 

480 if pos > self._cursor: 

481 return (self._read_nowait(pos - self._cursor), True) 

482 internal_logger.warning( 

483 "Skipping HTTP chunk end due to data " 

484 "consumption beyond chunk boundary" 

485 ) 

486 

487 if self._buffer: 

488 return (self._read_nowait_chunk(-1), False) 

489 # return (self._read_nowait(-1), False) 

490 

491 if self._eof: 

492 # Special case for signifying EOF. 

493 # (b'', True) is not a final return value actually. 

494 return (b"", False) 

495 

496 await self._wait("readchunk") 

497 

498 async def readexactly(self, n: int) -> bytes: 

499 if self._exception is not None: 

500 raise self._exception 

501 

502 blocks: list[bytes] = [] 

503 while n > 0: 

504 block = await self.read(n) 

505 if not block: 

506 partial = b"".join(blocks) 

507 raise asyncio.IncompleteReadError(partial, len(partial) + n) 

508 blocks.append(block) 

509 n -= len(block) 

510 

511 return b"".join(blocks) 

512 

513 def read_nowait(self, n: int = -1) -> bytes: 

514 # default was changed to be consistent with .read(-1) 

515 # 

516 # I believe the most users don't know about the method and 

517 # they are not affected. 

518 if self._exception is not None: 

519 raise self._exception 

520 

521 if self._waiter and not self._waiter.done(): 

522 raise RuntimeError( 

523 "Called while some coroutine is waiting for incoming data." 

524 ) 

525 

526 return self._read_nowait(n) 

527 

528 def _read_nowait_chunk(self, n: int) -> bytes: 

529 first_buffer = self._buffer[0] 

530 offset = self._buffer_offset 

531 if n != -1 and len(first_buffer) - offset > n: 

532 data = first_buffer[offset : offset + n] 

533 self._buffer_offset += n 

534 

535 elif offset: 

536 self._buffer.popleft() 

537 data = first_buffer[offset:] 

538 self._buffer_offset = 0 

539 

540 else: 

541 data = self._buffer.popleft() 

542 

543 data_len = len(data) 

544 self._size -= data_len 

545 self._cursor += data_len 

546 

547 chunk_splits = self._http_chunk_splits 

548 # Prevent memory leak: drop useless chunk splits 

549 while chunk_splits and chunk_splits[0] < self._cursor: 

550 chunk_splits.popleft() 

551 

552 if self._size < self._low_water and ( 

553 self._http_chunk_splits is None 

554 or len(self._http_chunk_splits) < self._low_water_chunks 

555 ): 

556 self._protocol.resume_reading() 

557 return data 

558 

559 def _read_nowait(self, n: int) -> bytes: 

560 """Read not more than n bytes, or whole buffer if n == -1""" 

561 self._timer.assert_timeout() 

562 

563 if n == -1: 

564 # Drain only chunks present now; _read_nowait_chunk() can 

565 # re-entrantly resume_reading() and refill the buffer. 

566 count = len(self._buffer) 

567 if count == 1: 

568 return self._read_nowait_chunk(-1) 

569 return b"".join([self._read_nowait_chunk(-1) for _ in range(count)]) 

570 

571 chunks: list[bytes] = [] 

572 while self._buffer: 

573 chunk = self._read_nowait_chunk(n) 

574 chunks.append(chunk) 

575 n -= len(chunk) 

576 if n == 0: 

577 break 

578 

579 return b"".join(chunks) if chunks else b"" 

580 

581 

582class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init] 

583 

584 __slots__ = ("_read_eof_chunk",) 

585 

586 def __init__(self) -> None: 

587 self._read_eof_chunk = False 

588 self.total_bytes = 0 

589 

590 def __repr__(self) -> str: 

591 return "<%s>" % self.__class__.__name__ 

592 

593 def exception(self) -> BaseException | None: 

594 return None 

595 

596 def set_exception( 

597 self, 

598 exc: BaseException, 

599 exc_cause: BaseException = _EXC_SENTINEL, 

600 ) -> None: 

601 pass 

602 

603 def on_eof(self, callback: Callable[[], None]) -> None: 

604 try: 

605 callback() 

606 except Exception: 

607 internal_logger.exception("Exception in eof callback") 

608 

609 def feed_eof(self) -> None: 

610 pass 

611 

612 def is_eof(self) -> bool: 

613 return True 

614 

615 def at_eof(self) -> bool: 

616 return True 

617 

618 async def wait_eof(self) -> None: 

619 return 

620 

621 def feed_data(self, data: bytes, n: int = 0) -> bool: 

622 return False 

623 

624 def set_read_chunk_size(self, n: int) -> None: 

625 return 

626 

627 async def readline(self, *, max_line_length: int | None = None) -> bytes: 

628 return b"" 

629 

630 async def read(self, n: int = -1) -> bytes: 

631 return b"" 

632 

633 # TODO add async def readuntil 

634 

635 async def readany(self) -> bytes: 

636 return b"" 

637 

638 async def readchunk(self) -> tuple[bytes, bool]: 

639 if not self._read_eof_chunk: 

640 self._read_eof_chunk = True 

641 return (b"", False) 

642 

643 return (b"", True) 

644 

645 async def readexactly(self, n: int) -> bytes: 

646 raise asyncio.IncompleteReadError(b"", n) 

647 

648 def read_nowait(self, n: int = -1) -> bytes: 

649 return b"" 

650 

651 

652EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader() 

653 

654 

655class DataQueue(Generic[_T]): 

656 """DataQueue is a general-purpose blocking queue with one reader.""" 

657 

658 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

659 self._loop = loop 

660 self._eof = False 

661 self._waiter: asyncio.Future[None] | None = None 

662 self._exception: BaseException | None = None 

663 self._buffer: collections.deque[tuple[_T, int]] = collections.deque() 

664 

665 def __len__(self) -> int: 

666 return len(self._buffer) 

667 

668 def is_eof(self) -> bool: 

669 return self._eof 

670 

671 def at_eof(self) -> bool: 

672 return self._eof and not self._buffer 

673 

674 def exception(self) -> BaseException | None: 

675 return self._exception 

676 

677 def set_exception( 

678 self, 

679 exc: BaseException, 

680 exc_cause: BaseException = _EXC_SENTINEL, 

681 ) -> None: 

682 self._eof = True 

683 self._exception = exc 

684 if (waiter := self._waiter) is not None: 

685 self._waiter = None 

686 set_exception(waiter, exc, exc_cause) 

687 

688 def feed_data(self, data: _T, size: int = 0) -> None: 

689 self._buffer.append((data, size)) 

690 if (waiter := self._waiter) is not None: 

691 self._waiter = None 

692 set_result(waiter, None) 

693 

694 def feed_eof(self) -> None: 

695 self._eof = True 

696 if (waiter := self._waiter) is not None: 

697 self._waiter = None 

698 set_result(waiter, None) 

699 

700 async def read(self) -> _T: 

701 if not self._buffer and not self._eof: 

702 assert not self._waiter 

703 self._waiter = self._loop.create_future() 

704 try: 

705 await self._waiter 

706 except (asyncio.CancelledError, asyncio.TimeoutError): 

707 self._waiter = None 

708 raise 

709 if self._buffer: 

710 data, _ = self._buffer.popleft() 

711 return data 

712 if self._exception is not None: 

713 raise self._exception 

714 raise EofStream 

715 

716 def __aiter__(self) -> AsyncStreamIterator[_T]: 

717 return AsyncStreamIterator(self.read) 

718 

719 

720class FlowControlDataQueue(DataQueue[_T]): 

721 """FlowControlDataQueue resumes and pauses an underlying stream. 

722 

723 It is a destination for parsed data. 

724 

725 This class is deprecated and will be removed in version 4.0. 

726 """ 

727 

728 def __init__( 

729 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop 

730 ) -> None: 

731 super().__init__(loop=loop) 

732 self._size = 0 

733 self._protocol = protocol 

734 self._limit = limit * 2 

735 

736 def feed_data(self, data: _T, size: int = 0) -> None: 

737 super().feed_data(data, size) 

738 self._size += size 

739 

740 if self._size > self._limit and not self._protocol._reading_paused: 

741 self._protocol.pause_reading() 

742 

743 async def read(self) -> _T: 

744 if not self._buffer and not self._eof: 

745 assert not self._waiter 

746 self._waiter = self._loop.create_future() 

747 try: 

748 await self._waiter 

749 except (asyncio.CancelledError, asyncio.TimeoutError): 

750 self._waiter = None 

751 raise 

752 if self._buffer: 

753 data, size = self._buffer.popleft() 

754 self._size -= size 

755 if self._size < self._limit and self._protocol._reading_paused: 

756 self._protocol.resume_reading() 

757 return data 

758 if self._exception is not None: 

759 raise self._exception 

760 raise EofStream