Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/streams.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import collections
3import warnings
4from typing import (
5 Awaitable,
6 Callable,
7 Deque,
8 Final,
9 Generic,
10 List,
11 Optional,
12 Tuple,
13 TypeVar,
14)
16from .base_protocol import BaseProtocol
17from .helpers import (
18 _EXC_SENTINEL,
19 BaseTimerContext,
20 TimerNoop,
21 set_exception,
22 set_result,
23)
24from .log import internal_logger
26__all__ = (
27 "EMPTY_PAYLOAD",
28 "EofStream",
29 "StreamReader",
30 "DataQueue",
31 "FlowControlDataQueue",
32)
34_T = TypeVar("_T")
37class EofStream(Exception):
38 """eof stream indication."""
41class AsyncStreamIterator(Generic[_T]):
42 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
43 self.read_func = read_func
45 def __aiter__(self) -> "AsyncStreamIterator[_T]":
46 return self
48 async def __anext__(self) -> _T:
49 try:
50 rv = await self.read_func()
51 except EofStream:
52 raise StopAsyncIteration
53 if rv == b"":
54 raise StopAsyncIteration
55 return rv
58class ChunkTupleAsyncStreamIterator:
59 def __init__(self, stream: "StreamReader") -> None:
60 self._stream = stream
62 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
63 return self
65 async def __anext__(self) -> Tuple[bytes, bool]:
66 rv = await self._stream.readchunk()
67 if rv == (b"", False):
68 raise StopAsyncIteration
69 return rv
72class AsyncStreamReaderMixin:
73 def __aiter__(self) -> AsyncStreamIterator[bytes]:
74 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
76 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
77 """Returns an asynchronous iterator that yields chunks of size n."""
78 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined]
80 def iter_any(self) -> AsyncStreamIterator[bytes]:
81 """Yield all available data as soon as it is received."""
82 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
84 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
85 """Yield chunks of data as they are received by the server.
87 The yielded objects are tuples
88 of (bytes, bool) as returned by the StreamReader.readchunk method.
89 """
90 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
93class StreamReader(AsyncStreamReaderMixin):
94 """An enhancement of asyncio.StreamReader.
96 Supports asynchronous iteration by line, chunk or as available::
98 async for line in reader:
99 ...
100 async for chunk in reader.iter_chunked(1024):
101 ...
102 async for slice in reader.iter_any():
103 ...
105 """
107 total_bytes = 0
109 def __init__(
110 self,
111 protocol: BaseProtocol,
112 limit: int,
113 *,
114 timer: Optional[BaseTimerContext] = None,
115 loop: Optional[asyncio.AbstractEventLoop] = None,
116 ) -> None:
117 self._protocol = protocol
118 self._low_water = limit
119 self._high_water = limit * 2
120 if loop is None:
121 loop = asyncio.get_event_loop()
122 self._loop = loop
123 self._size = 0
124 self._cursor = 0
125 self._http_chunk_splits: Optional[List[int]] = None
126 self._buffer: Deque[bytes] = collections.deque()
127 self._buffer_offset = 0
128 self._eof = False
129 self._waiter: Optional[asyncio.Future[None]] = None
130 self._eof_waiter: Optional[asyncio.Future[None]] = None
131 self._exception: Optional[BaseException] = None
132 self._timer = TimerNoop() if timer is None else timer
133 self._eof_callbacks: List[Callable[[], None]] = []
135 def __repr__(self) -> str:
136 info = [self.__class__.__name__]
137 if self._size:
138 info.append("%d bytes" % self._size)
139 if self._eof:
140 info.append("eof")
141 if self._low_water != 2**16: # default limit
142 info.append("low=%d high=%d" % (self._low_water, self._high_water))
143 if self._waiter:
144 info.append("w=%r" % self._waiter)
145 if self._exception:
146 info.append("e=%r" % self._exception)
147 return "<%s>" % " ".join(info)
149 def get_read_buffer_limits(self) -> Tuple[int, int]:
150 return (self._low_water, self._high_water)
152 def exception(self) -> Optional[BaseException]:
153 return self._exception
155 def set_exception(
156 self,
157 exc: BaseException,
158 exc_cause: BaseException = _EXC_SENTINEL,
159 ) -> None:
160 self._exception = exc
161 self._eof_callbacks.clear()
163 waiter = self._waiter
164 if waiter is not None:
165 self._waiter = None
166 set_exception(waiter, exc, exc_cause)
168 waiter = self._eof_waiter
169 if waiter is not None:
170 self._eof_waiter = None
171 set_exception(waiter, exc, exc_cause)
173 def on_eof(self, callback: Callable[[], None]) -> None:
174 if self._eof:
175 try:
176 callback()
177 except Exception:
178 internal_logger.exception("Exception in eof callback")
179 else:
180 self._eof_callbacks.append(callback)
182 def feed_eof(self) -> None:
183 self._eof = True
185 waiter = self._waiter
186 if waiter is not None:
187 self._waiter = None
188 set_result(waiter, None)
190 waiter = self._eof_waiter
191 if waiter is not None:
192 self._eof_waiter = None
193 set_result(waiter, None)
195 for cb in self._eof_callbacks:
196 try:
197 cb()
198 except Exception:
199 internal_logger.exception("Exception in eof callback")
201 self._eof_callbacks.clear()
203 def is_eof(self) -> bool:
204 """Return True if 'feed_eof' was called."""
205 return self._eof
207 def at_eof(self) -> bool:
208 """Return True if the buffer is empty and 'feed_eof' was called."""
209 return self._eof and not self._buffer
211 async def wait_eof(self) -> None:
212 if self._eof:
213 return
215 assert self._eof_waiter is None
216 self._eof_waiter = self._loop.create_future()
217 try:
218 await self._eof_waiter
219 finally:
220 self._eof_waiter = None
222 def unread_data(self, data: bytes) -> None:
223 """rollback reading some data from stream, inserting it to buffer head."""
224 warnings.warn(
225 "unread_data() is deprecated "
226 "and will be removed in future releases (#3260)",
227 DeprecationWarning,
228 stacklevel=2,
229 )
230 if not data:
231 return
233 if self._buffer_offset:
234 self._buffer[0] = self._buffer[0][self._buffer_offset :]
235 self._buffer_offset = 0
236 self._size += len(data)
237 self._cursor -= len(data)
238 self._buffer.appendleft(data)
239 self._eof_counter = 0
241 # TODO: size is ignored, remove the param later
242 def feed_data(self, data: bytes, size: int = 0) -> None:
243 assert not self._eof, "feed_data after feed_eof"
245 if not data:
246 return
248 self._size += len(data)
249 self._buffer.append(data)
250 self.total_bytes += len(data)
252 waiter = self._waiter
253 if waiter is not None:
254 self._waiter = None
255 set_result(waiter, None)
257 if self._size > self._high_water and not self._protocol._reading_paused:
258 self._protocol.pause_reading()
260 def begin_http_chunk_receiving(self) -> None:
261 if self._http_chunk_splits is None:
262 if self.total_bytes:
263 raise RuntimeError(
264 "Called begin_http_chunk_receiving when" "some data was already fed"
265 )
266 self._http_chunk_splits = []
268 def end_http_chunk_receiving(self) -> None:
269 if self._http_chunk_splits is None:
270 raise RuntimeError(
271 "Called end_chunk_receiving without calling "
272 "begin_chunk_receiving first"
273 )
275 # self._http_chunk_splits contains logical byte offsets from start of
276 # the body transfer. Each offset is the offset of the end of a chunk.
277 # "Logical" means bytes, accessible for a user.
278 # If no chunks containing logical data were received, current position
279 # is difinitely zero.
280 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
282 if self.total_bytes == pos:
283 # We should not add empty chunks here. So we check for that.
284 # Note, when chunked + gzip is used, we can receive a chunk
285 # of compressed data, but that data may not be enough for gzip FSM
286 # to yield any uncompressed data. That's why current position may
287 # not change after receiving a chunk.
288 return
290 self._http_chunk_splits.append(self.total_bytes)
292 # wake up readchunk when end of http chunk received
293 waiter = self._waiter
294 if waiter is not None:
295 self._waiter = None
296 set_result(waiter, None)
298 async def _wait(self, func_name: str) -> None:
299 # StreamReader uses a future to link the protocol feed_data() method
300 # to a read coroutine. Running two read coroutines at the same time
301 # would have an unexpected behaviour. It would not possible to know
302 # which coroutine would get the next data.
303 if self._waiter is not None:
304 raise RuntimeError(
305 "%s() called while another coroutine is "
306 "already waiting for incoming data" % func_name
307 )
309 waiter = self._waiter = self._loop.create_future()
310 try:
311 with self._timer:
312 await waiter
313 finally:
314 self._waiter = None
316 async def readline(self) -> bytes:
317 return await self.readuntil()
319 async def readuntil(self, separator: bytes = b"\n") -> bytes:
320 seplen = len(separator)
321 if seplen == 0:
322 raise ValueError("Separator should be at least one-byte string")
324 if self._exception is not None:
325 raise self._exception
327 chunk = b""
328 chunk_size = 0
329 not_enough = True
331 while not_enough:
332 while self._buffer and not_enough:
333 offset = self._buffer_offset
334 ichar = self._buffer[0].find(separator, offset) + 1
335 # Read from current offset to found separator or to the end.
336 data = self._read_nowait_chunk(
337 ichar - offset + seplen - 1 if ichar else -1
338 )
339 chunk += data
340 chunk_size += len(data)
341 if ichar:
342 not_enough = False
344 if chunk_size > self._high_water:
345 raise ValueError("Chunk too big")
347 if self._eof:
348 break
350 if not_enough:
351 await self._wait("readuntil")
353 return chunk
355 async def read(self, n: int = -1) -> bytes:
356 if self._exception is not None:
357 raise self._exception
359 # migration problem; with DataQueue you have to catch
360 # EofStream exception, so common way is to run payload.read() inside
361 # infinite loop. what can cause real infinite loop with StreamReader
362 # lets keep this code one major release.
363 if __debug__:
364 if self._eof and not self._buffer:
365 self._eof_counter = getattr(self, "_eof_counter", 0) + 1
366 if self._eof_counter > 5:
367 internal_logger.warning(
368 "Multiple access to StreamReader in eof state, "
369 "might be infinite loop.",
370 stack_info=True,
371 )
373 if not n:
374 return b""
376 if n < 0:
377 # This used to just loop creating a new waiter hoping to
378 # collect everything in self._buffer, but that would
379 # deadlock if the subprocess sends more than self.limit
380 # bytes. So just call self.readany() until EOF.
381 blocks = []
382 while True:
383 block = await self.readany()
384 if not block:
385 break
386 blocks.append(block)
387 return b"".join(blocks)
389 # TODO: should be `if` instead of `while`
390 # because waiter maybe triggered on chunk end,
391 # without feeding any data
392 while not self._buffer and not self._eof:
393 await self._wait("read")
395 return self._read_nowait(n)
397 async def readany(self) -> bytes:
398 if self._exception is not None:
399 raise self._exception
401 # TODO: should be `if` instead of `while`
402 # because waiter maybe triggered on chunk end,
403 # without feeding any data
404 while not self._buffer and not self._eof:
405 await self._wait("readany")
407 return self._read_nowait(-1)
409 async def readchunk(self) -> Tuple[bytes, bool]:
410 """Returns a tuple of (data, end_of_http_chunk).
412 When chunked transfer
413 encoding is used, end_of_http_chunk is a boolean indicating if the end
414 of the data corresponds to the end of a HTTP chunk , otherwise it is
415 always False.
416 """
417 while True:
418 if self._exception is not None:
419 raise self._exception
421 while self._http_chunk_splits:
422 pos = self._http_chunk_splits.pop(0)
423 if pos == self._cursor:
424 return (b"", True)
425 if pos > self._cursor:
426 return (self._read_nowait(pos - self._cursor), True)
427 internal_logger.warning(
428 "Skipping HTTP chunk end due to data "
429 "consumption beyond chunk boundary"
430 )
432 if self._buffer:
433 return (self._read_nowait_chunk(-1), False)
434 # return (self._read_nowait(-1), False)
436 if self._eof:
437 # Special case for signifying EOF.
438 # (b'', True) is not a final return value actually.
439 return (b"", False)
441 await self._wait("readchunk")
443 async def readexactly(self, n: int) -> bytes:
444 if self._exception is not None:
445 raise self._exception
447 blocks: List[bytes] = []
448 while n > 0:
449 block = await self.read(n)
450 if not block:
451 partial = b"".join(blocks)
452 raise asyncio.IncompleteReadError(partial, len(partial) + n)
453 blocks.append(block)
454 n -= len(block)
456 return b"".join(blocks)
458 def read_nowait(self, n: int = -1) -> bytes:
459 # default was changed to be consistent with .read(-1)
460 #
461 # I believe the most users don't know about the method and
462 # they are not affected.
463 if self._exception is not None:
464 raise self._exception
466 if self._waiter and not self._waiter.done():
467 raise RuntimeError(
468 "Called while some coroutine is waiting for incoming data."
469 )
471 return self._read_nowait(n)
473 def _read_nowait_chunk(self, n: int) -> bytes:
474 first_buffer = self._buffer[0]
475 offset = self._buffer_offset
476 if n != -1 and len(first_buffer) - offset > n:
477 data = first_buffer[offset : offset + n]
478 self._buffer_offset += n
480 elif offset:
481 self._buffer.popleft()
482 data = first_buffer[offset:]
483 self._buffer_offset = 0
485 else:
486 data = self._buffer.popleft()
488 self._size -= len(data)
489 self._cursor += len(data)
491 chunk_splits = self._http_chunk_splits
492 # Prevent memory leak: drop useless chunk splits
493 while chunk_splits and chunk_splits[0] < self._cursor:
494 chunk_splits.pop(0)
496 if self._size < self._low_water and self._protocol._reading_paused:
497 self._protocol.resume_reading()
498 return data
500 def _read_nowait(self, n: int) -> bytes:
501 """Read not more than n bytes, or whole buffer if n == -1"""
502 self._timer.assert_timeout()
504 chunks = []
505 while self._buffer:
506 chunk = self._read_nowait_chunk(n)
507 chunks.append(chunk)
508 if n != -1:
509 n -= len(chunk)
510 if n == 0:
511 break
513 return b"".join(chunks) if chunks else b""
516class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
517 def __init__(self) -> None:
518 self._read_eof_chunk = False
520 def __repr__(self) -> str:
521 return "<%s>" % self.__class__.__name__
523 def exception(self) -> Optional[BaseException]:
524 return None
526 def set_exception(
527 self,
528 exc: BaseException,
529 exc_cause: BaseException = _EXC_SENTINEL,
530 ) -> None:
531 pass
533 def on_eof(self, callback: Callable[[], None]) -> None:
534 try:
535 callback()
536 except Exception:
537 internal_logger.exception("Exception in eof callback")
539 def feed_eof(self) -> None:
540 pass
542 def is_eof(self) -> bool:
543 return True
545 def at_eof(self) -> bool:
546 return True
548 async def wait_eof(self) -> None:
549 return
551 def feed_data(self, data: bytes, n: int = 0) -> None:
552 pass
554 async def readline(self) -> bytes:
555 return b""
557 async def read(self, n: int = -1) -> bytes:
558 return b""
560 # TODO add async def readuntil
562 async def readany(self) -> bytes:
563 return b""
565 async def readchunk(self) -> Tuple[bytes, bool]:
566 if not self._read_eof_chunk:
567 self._read_eof_chunk = True
568 return (b"", False)
570 return (b"", True)
572 async def readexactly(self, n: int) -> bytes:
573 raise asyncio.IncompleteReadError(b"", n)
575 def read_nowait(self, n: int = -1) -> bytes:
576 return b""
579EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
582class DataQueue(Generic[_T]):
583 """DataQueue is a general-purpose blocking queue with one reader."""
585 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
586 self._loop = loop
587 self._eof = False
588 self._waiter: Optional[asyncio.Future[None]] = None
589 self._exception: Optional[BaseException] = None
590 self._size = 0
591 self._buffer: Deque[Tuple[_T, int]] = collections.deque()
593 def __len__(self) -> int:
594 return len(self._buffer)
596 def is_eof(self) -> bool:
597 return self._eof
599 def at_eof(self) -> bool:
600 return self._eof and not self._buffer
602 def exception(self) -> Optional[BaseException]:
603 return self._exception
605 def set_exception(
606 self,
607 exc: BaseException,
608 exc_cause: BaseException = _EXC_SENTINEL,
609 ) -> None:
610 self._eof = True
611 self._exception = exc
613 waiter = self._waiter
614 if waiter is not None:
615 self._waiter = None
616 set_exception(waiter, exc, exc_cause)
618 def feed_data(self, data: _T, size: int = 0) -> None:
619 self._size += size
620 self._buffer.append((data, size))
622 waiter = self._waiter
623 if waiter is not None:
624 self._waiter = None
625 set_result(waiter, None)
627 def feed_eof(self) -> None:
628 self._eof = True
630 waiter = self._waiter
631 if waiter is not None:
632 self._waiter = None
633 set_result(waiter, None)
635 async def read(self) -> _T:
636 if not self._buffer and not self._eof:
637 assert not self._waiter
638 self._waiter = self._loop.create_future()
639 try:
640 await self._waiter
641 except (asyncio.CancelledError, asyncio.TimeoutError):
642 self._waiter = None
643 raise
645 if self._buffer:
646 data, size = self._buffer.popleft()
647 self._size -= size
648 return data
649 else:
650 if self._exception is not None:
651 raise self._exception
652 else:
653 raise EofStream
655 def __aiter__(self) -> AsyncStreamIterator[_T]:
656 return AsyncStreamIterator(self.read)
659class FlowControlDataQueue(DataQueue[_T]):
660 """FlowControlDataQueue resumes and pauses an underlying stream.
662 It is a destination for parsed data.
663 """
665 def __init__(
666 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
667 ) -> None:
668 super().__init__(loop=loop)
670 self._protocol = protocol
671 self._limit = limit * 2
673 def feed_data(self, data: _T, size: int = 0) -> None:
674 super().feed_data(data, size)
676 if self._size > self._limit and not self._protocol._reading_paused:
677 self._protocol.pause_reading()
679 async def read(self) -> _T:
680 try:
681 return await super().read()
682 finally:
683 if self._size < self._limit and self._protocol._reading_paused:
684 self._protocol.resume_reading()