Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/streams.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import collections
3import warnings
4from typing import (
5 Awaitable,
6 Callable,
7 Deque,
8 Final,
9 Generic,
10 List,
11 Optional,
12 Tuple,
13 TypeVar,
14)
16from .base_protocol import BaseProtocol
17from .helpers import (
18 _EXC_SENTINEL,
19 BaseTimerContext,
20 TimerNoop,
21 set_exception,
22 set_result,
23)
24from .http_exceptions import LineTooLong
25from .log import internal_logger
27__all__ = (
28 "EMPTY_PAYLOAD",
29 "EofStream",
30 "StreamReader",
31 "DataQueue",
32)
34_T = TypeVar("_T")
37class EofStream(Exception):
38 """eof stream indication."""
41class AsyncStreamIterator(Generic[_T]):
43 __slots__ = ("read_func",)
45 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
46 self.read_func = read_func
48 def __aiter__(self) -> "AsyncStreamIterator[_T]":
49 return self
51 async def __anext__(self) -> _T:
52 try:
53 rv = await self.read_func()
54 except EofStream:
55 raise StopAsyncIteration
56 if rv == b"":
57 raise StopAsyncIteration
58 return rv
61class ChunkTupleAsyncStreamIterator:
63 __slots__ = ("_stream",)
65 def __init__(self, stream: "StreamReader") -> None:
66 self._stream = stream
68 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
69 return self
71 async def __anext__(self) -> Tuple[bytes, bool]:
72 rv = await self._stream.readchunk()
73 if rv == (b"", False):
74 raise StopAsyncIteration
75 return rv
78class AsyncStreamReaderMixin:
80 __slots__ = ()
82 def __aiter__(self) -> AsyncStreamIterator[bytes]:
83 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
85 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
86 """Returns an asynchronous iterator that yields chunks of size n."""
87 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined]
89 def iter_any(self) -> AsyncStreamIterator[bytes]:
90 """Yield all available data as soon as it is received."""
91 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
93 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
94 """Yield chunks of data as they are received by the server.
96 The yielded objects are tuples
97 of (bytes, bool) as returned by the StreamReader.readchunk method.
98 """
99 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
102class StreamReader(AsyncStreamReaderMixin):
103 """An enhancement of asyncio.StreamReader.
105 Supports asynchronous iteration by line, chunk or as available::
107 async for line in reader:
108 ...
109 async for chunk in reader.iter_chunked(1024):
110 ...
111 async for slice in reader.iter_any():
112 ...
114 """
116 __slots__ = (
117 "_protocol",
118 "_low_water",
119 "_high_water",
120 "_low_water_chunks",
121 "_high_water_chunks",
122 "_loop",
123 "_size",
124 "_cursor",
125 "_http_chunk_splits",
126 "_buffer",
127 "_buffer_offset",
128 "_eof",
129 "_waiter",
130 "_eof_waiter",
131 "_exception",
132 "_timer",
133 "_eof_callbacks",
134 "_eof_counter",
135 "total_bytes",
136 "total_compressed_bytes",
137 )
139 def __init__(
140 self,
141 protocol: BaseProtocol,
142 limit: int,
143 *,
144 timer: Optional[BaseTimerContext] = None,
145 loop: Optional[asyncio.AbstractEventLoop] = None,
146 ) -> None:
147 self._protocol = protocol
148 self._low_water = limit
149 self._high_water = limit * 2
150 if loop is None:
151 loop = asyncio.get_event_loop()
152 # Ensure high_water_chunks >= 3 so it's always > low_water_chunks.
153 self._high_water_chunks = max(3, limit // 4)
154 # Use max(2, ...) because there's always at least 1 chunk split remaining
155 # (the current position), so we need low_water >= 2 to allow resume.
156 self._low_water_chunks = max(2, self._high_water_chunks // 2)
157 self._loop = loop
158 self._size = 0
159 self._cursor = 0
160 self._http_chunk_splits: Optional[Deque[int]] = None
161 self._buffer: Deque[bytes] = collections.deque()
162 self._buffer_offset = 0
163 self._eof = False
164 self._waiter: Optional[asyncio.Future[None]] = None
165 self._eof_waiter: Optional[asyncio.Future[None]] = None
166 self._exception: Optional[BaseException] = None
167 self._timer = TimerNoop() if timer is None else timer
168 self._eof_callbacks: List[Callable[[], None]] = []
169 self._eof_counter = 0
170 self.total_bytes = 0
171 self.total_compressed_bytes: Optional[int] = None
173 def __repr__(self) -> str:
174 info = [self.__class__.__name__]
175 if self._size:
176 info.append("%d bytes" % self._size)
177 if self._eof:
178 info.append("eof")
179 if self._low_water != 2**16: # default limit
180 info.append("low=%d high=%d" % (self._low_water, self._high_water))
181 if self._waiter:
182 info.append("w=%r" % self._waiter)
183 if self._exception:
184 info.append("e=%r" % self._exception)
185 return "<%s>" % " ".join(info)
187 def get_read_buffer_limits(self) -> Tuple[int, int]:
188 return (self._low_water, self._high_water)
190 def exception(self) -> Optional[BaseException]:
191 return self._exception
193 def set_exception(
194 self,
195 exc: BaseException,
196 exc_cause: BaseException = _EXC_SENTINEL,
197 ) -> None:
198 self._exception = exc
199 self._eof_callbacks.clear()
201 waiter = self._waiter
202 if waiter is not None:
203 self._waiter = None
204 set_exception(waiter, exc, exc_cause)
206 waiter = self._eof_waiter
207 if waiter is not None:
208 self._eof_waiter = None
209 set_exception(waiter, exc, exc_cause)
211 def on_eof(self, callback: Callable[[], None]) -> None:
212 if self._eof:
213 try:
214 callback()
215 except Exception:
216 internal_logger.exception("Exception in eof callback")
217 else:
218 self._eof_callbacks.append(callback)
220 def feed_eof(self) -> None:
221 self._eof = True
223 waiter = self._waiter
224 if waiter is not None:
225 self._waiter = None
226 set_result(waiter, None)
228 waiter = self._eof_waiter
229 if waiter is not None:
230 self._eof_waiter = None
231 set_result(waiter, None)
233 if self._protocol._reading_paused:
234 self._protocol.resume_reading()
236 for cb in self._eof_callbacks:
237 try:
238 cb()
239 except Exception:
240 internal_logger.exception("Exception in eof callback")
242 self._eof_callbacks.clear()
244 def is_eof(self) -> bool:
245 """Return True if 'feed_eof' was called."""
246 return self._eof
248 def at_eof(self) -> bool:
249 """Return True if the buffer is empty and 'feed_eof' was called."""
250 return self._eof and not self._buffer
252 async def wait_eof(self) -> None:
253 if self._eof:
254 return
256 assert self._eof_waiter is None
257 self._eof_waiter = self._loop.create_future()
258 try:
259 await self._eof_waiter
260 finally:
261 self._eof_waiter = None
263 @property
264 def total_raw_bytes(self) -> int:
265 if self.total_compressed_bytes is None:
266 return self.total_bytes
267 return self.total_compressed_bytes
269 def unread_data(self, data: bytes) -> None:
270 """rollback reading some data from stream, inserting it to buffer head."""
271 warnings.warn(
272 "unread_data() is deprecated "
273 "and will be removed in future releases (#3260)",
274 DeprecationWarning,
275 stacklevel=2,
276 )
277 if not data:
278 return
280 if self._buffer_offset:
281 self._buffer[0] = self._buffer[0][self._buffer_offset :]
282 self._buffer_offset = 0
283 self._size += len(data)
284 self._cursor -= len(data)
285 self._buffer.appendleft(data)
286 self._eof_counter = 0
288 # TODO: size is ignored, remove the param later
289 def feed_data(self, data: bytes, size: int = 0) -> None:
290 assert not self._eof, "feed_data after feed_eof"
292 if not data:
293 return
295 data_len = len(data)
296 self._size += data_len
297 self._buffer.append(data)
298 self.total_bytes += data_len
300 waiter = self._waiter
301 if waiter is not None:
302 self._waiter = None
303 set_result(waiter, None)
305 if self._size > self._high_water and not self._protocol._reading_paused:
306 self._protocol.pause_reading()
308 def begin_http_chunk_receiving(self) -> None:
309 if self._http_chunk_splits is None:
310 if self.total_bytes:
311 raise RuntimeError(
312 "Called begin_http_chunk_receiving when some data was already fed"
313 )
314 self._http_chunk_splits = collections.deque()
316 def end_http_chunk_receiving(self) -> None:
317 if self._http_chunk_splits is None:
318 raise RuntimeError(
319 "Called end_chunk_receiving without calling "
320 "begin_chunk_receiving first"
321 )
323 # self._http_chunk_splits contains logical byte offsets from start of
324 # the body transfer. Each offset is the offset of the end of a chunk.
325 # "Logical" means bytes, accessible for a user.
326 # If no chunks containing logical data were received, current position
327 # is difinitely zero.
328 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
330 if self.total_bytes == pos:
331 # We should not add empty chunks here. So we check for that.
332 # Note, when chunked + gzip is used, we can receive a chunk
333 # of compressed data, but that data may not be enough for gzip FSM
334 # to yield any uncompressed data. That's why current position may
335 # not change after receiving a chunk.
336 return
338 self._http_chunk_splits.append(self.total_bytes)
340 # If we get too many small chunks before self._high_water is reached, then any
341 # .read() call becomes computationally expensive, and could block the event loop
342 # for too long, hence an additional self._high_water_chunks here.
343 if (
344 len(self._http_chunk_splits) > self._high_water_chunks
345 and not self._protocol._reading_paused
346 ):
347 self._protocol.pause_reading()
349 # wake up readchunk when end of http chunk received
350 waiter = self._waiter
351 if waiter is not None:
352 self._waiter = None
353 set_result(waiter, None)
355 async def _wait(self, func_name: str) -> None:
356 if not self._protocol.connected:
357 raise RuntimeError("Connection closed.")
359 # StreamReader uses a future to link the protocol feed_data() method
360 # to a read coroutine. Running two read coroutines at the same time
361 # would have an unexpected behaviour. It would not possible to know
362 # which coroutine would get the next data.
363 if self._waiter is not None:
364 raise RuntimeError(
365 "%s() called while another coroutine is "
366 "already waiting for incoming data" % func_name
367 )
369 waiter = self._waiter = self._loop.create_future()
370 try:
371 with self._timer:
372 await waiter
373 finally:
374 self._waiter = None
376 async def readline(self, *, max_line_length: Optional[int] = None) -> bytes:
377 return await self.readuntil(max_size=max_line_length)
379 async def readuntil(
380 self, separator: bytes = b"\n", *, max_size: Optional[int] = None
381 ) -> bytes:
382 seplen = len(separator)
383 if seplen == 0:
384 raise ValueError("Separator should be at least one-byte string")
386 if self._exception is not None:
387 raise self._exception
389 chunk = b""
390 chunk_size = 0
391 not_enough = True
392 max_size = max_size or self._high_water
394 while not_enough:
395 while self._buffer and not_enough:
396 offset = self._buffer_offset
397 ichar = self._buffer[0].find(separator, offset) + 1
398 # Read from current offset to found separator or to the end.
399 data = self._read_nowait_chunk(
400 ichar - offset + seplen - 1 if ichar else -1
401 )
402 chunk += data
403 chunk_size += len(data)
404 if ichar:
405 not_enough = False
407 if chunk_size > max_size:
408 raise LineTooLong(chunk[:100] + b"...", max_size)
410 if self._eof:
411 break
413 if not_enough:
414 await self._wait("readuntil")
416 return chunk
418 async def read(self, n: int = -1) -> bytes:
419 if self._exception is not None:
420 raise self._exception
422 # migration problem; with DataQueue you have to catch
423 # EofStream exception, so common way is to run payload.read() inside
424 # infinite loop. what can cause real infinite loop with StreamReader
425 # lets keep this code one major release.
426 if __debug__:
427 if self._eof and not self._buffer:
428 self._eof_counter = getattr(self, "_eof_counter", 0) + 1
429 if self._eof_counter > 5:
430 internal_logger.warning(
431 "Multiple access to StreamReader in eof state, "
432 "might be infinite loop.",
433 stack_info=True,
434 )
436 if not n:
437 return b""
439 if n < 0:
440 # This used to just loop creating a new waiter hoping to
441 # collect everything in self._buffer, but that would
442 # deadlock if the subprocess sends more than self.limit
443 # bytes. So just call self.readany() until EOF.
444 blocks = []
445 while True:
446 block = await self.readany()
447 if not block:
448 break
449 blocks.append(block)
450 return b"".join(blocks)
452 # TODO: should be `if` instead of `while`
453 # because waiter maybe triggered on chunk end,
454 # without feeding any data
455 while not self._buffer and not self._eof:
456 await self._wait("read")
458 return self._read_nowait(n)
460 async def readany(self) -> bytes:
461 if self._exception is not None:
462 raise self._exception
464 # TODO: should be `if` instead of `while`
465 # because waiter maybe triggered on chunk end,
466 # without feeding any data
467 while not self._buffer and not self._eof:
468 await self._wait("readany")
470 return self._read_nowait(-1)
472 async def readchunk(self) -> Tuple[bytes, bool]:
473 """Returns a tuple of (data, end_of_http_chunk).
475 When chunked transfer
476 encoding is used, end_of_http_chunk is a boolean indicating if the end
477 of the data corresponds to the end of a HTTP chunk , otherwise it is
478 always False.
479 """
480 while True:
481 if self._exception is not None:
482 raise self._exception
484 while self._http_chunk_splits:
485 pos = self._http_chunk_splits.popleft()
486 if pos == self._cursor:
487 return (b"", True)
488 if pos > self._cursor:
489 return (self._read_nowait(pos - self._cursor), True)
490 internal_logger.warning(
491 "Skipping HTTP chunk end due to data "
492 "consumption beyond chunk boundary"
493 )
495 if self._buffer:
496 return (self._read_nowait_chunk(-1), False)
497 # return (self._read_nowait(-1), False)
499 if self._eof:
500 # Special case for signifying EOF.
501 # (b'', True) is not a final return value actually.
502 return (b"", False)
504 await self._wait("readchunk")
506 async def readexactly(self, n: int) -> bytes:
507 if self._exception is not None:
508 raise self._exception
510 blocks: List[bytes] = []
511 while n > 0:
512 block = await self.read(n)
513 if not block:
514 partial = b"".join(blocks)
515 raise asyncio.IncompleteReadError(partial, len(partial) + n)
516 blocks.append(block)
517 n -= len(block)
519 return b"".join(blocks)
521 def read_nowait(self, n: int = -1) -> bytes:
522 # default was changed to be consistent with .read(-1)
523 #
524 # I believe the most users don't know about the method and
525 # they are not affected.
526 if self._exception is not None:
527 raise self._exception
529 if self._waiter and not self._waiter.done():
530 raise RuntimeError(
531 "Called while some coroutine is waiting for incoming data."
532 )
534 return self._read_nowait(n)
536 def _read_nowait_chunk(self, n: int) -> bytes:
537 first_buffer = self._buffer[0]
538 offset = self._buffer_offset
539 if n != -1 and len(first_buffer) - offset > n:
540 data = first_buffer[offset : offset + n]
541 self._buffer_offset += n
543 elif offset:
544 self._buffer.popleft()
545 data = first_buffer[offset:]
546 self._buffer_offset = 0
548 else:
549 data = self._buffer.popleft()
551 data_len = len(data)
552 self._size -= data_len
553 self._cursor += data_len
555 chunk_splits = self._http_chunk_splits
556 # Prevent memory leak: drop useless chunk splits
557 while chunk_splits and chunk_splits[0] < self._cursor:
558 chunk_splits.popleft()
560 if (
561 self._protocol._reading_paused
562 and self._size < self._low_water
563 and (
564 self._http_chunk_splits is None
565 or len(self._http_chunk_splits) < self._low_water_chunks
566 )
567 ):
568 self._protocol.resume_reading()
569 return data
571 def _read_nowait(self, n: int) -> bytes:
572 """Read not more than n bytes, or whole buffer if n == -1"""
573 self._timer.assert_timeout()
575 chunks = []
576 while self._buffer:
577 chunk = self._read_nowait_chunk(n)
578 chunks.append(chunk)
579 if n != -1:
580 n -= len(chunk)
581 if n == 0:
582 break
584 return b"".join(chunks) if chunks else b""
587class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
589 __slots__ = ("_read_eof_chunk",)
591 def __init__(self) -> None:
592 self._read_eof_chunk = False
593 self.total_bytes = 0
595 def __repr__(self) -> str:
596 return "<%s>" % self.__class__.__name__
598 def exception(self) -> Optional[BaseException]:
599 return None
601 def set_exception(
602 self,
603 exc: BaseException,
604 exc_cause: BaseException = _EXC_SENTINEL,
605 ) -> None:
606 pass
608 def on_eof(self, callback: Callable[[], None]) -> None:
609 try:
610 callback()
611 except Exception:
612 internal_logger.exception("Exception in eof callback")
614 def feed_eof(self) -> None:
615 pass
617 def is_eof(self) -> bool:
618 return True
620 def at_eof(self) -> bool:
621 return True
623 async def wait_eof(self) -> None:
624 return
626 def feed_data(self, data: bytes, n: int = 0) -> None:
627 pass
629 async def readline(self, *, max_line_length: Optional[int] = None) -> bytes:
630 return b""
632 async def read(self, n: int = -1) -> bytes:
633 return b""
635 # TODO add async def readuntil
637 async def readany(self) -> bytes:
638 return b""
640 async def readchunk(self) -> Tuple[bytes, bool]:
641 if not self._read_eof_chunk:
642 self._read_eof_chunk = True
643 return (b"", False)
645 return (b"", True)
647 async def readexactly(self, n: int) -> bytes:
648 raise asyncio.IncompleteReadError(b"", n)
650 def read_nowait(self, n: int = -1) -> bytes:
651 return b""
654EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
657class DataQueue(Generic[_T]):
658 """DataQueue is a general-purpose blocking queue with one reader."""
660 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
661 self._loop = loop
662 self._eof = False
663 self._waiter: Optional[asyncio.Future[None]] = None
664 self._exception: Optional[BaseException] = None
665 self._buffer: Deque[Tuple[_T, int]] = collections.deque()
667 def __len__(self) -> int:
668 return len(self._buffer)
670 def is_eof(self) -> bool:
671 return self._eof
673 def at_eof(self) -> bool:
674 return self._eof and not self._buffer
676 def exception(self) -> Optional[BaseException]:
677 return self._exception
679 def set_exception(
680 self,
681 exc: BaseException,
682 exc_cause: BaseException = _EXC_SENTINEL,
683 ) -> None:
684 self._eof = True
685 self._exception = exc
686 if (waiter := self._waiter) is not None:
687 self._waiter = None
688 set_exception(waiter, exc, exc_cause)
690 def feed_data(self, data: _T, size: int = 0) -> None:
691 self._buffer.append((data, size))
692 if (waiter := self._waiter) is not None:
693 self._waiter = None
694 set_result(waiter, None)
696 def feed_eof(self) -> None:
697 self._eof = True
698 if (waiter := self._waiter) is not None:
699 self._waiter = None
700 set_result(waiter, None)
702 async def read(self) -> _T:
703 if not self._buffer and not self._eof:
704 assert not self._waiter
705 self._waiter = self._loop.create_future()
706 try:
707 await self._waiter
708 except (asyncio.CancelledError, asyncio.TimeoutError):
709 self._waiter = None
710 raise
711 if self._buffer:
712 data, _ = self._buffer.popleft()
713 return data
714 if self._exception is not None:
715 raise self._exception
716 raise EofStream
718 def __aiter__(self) -> AsyncStreamIterator[_T]:
719 return AsyncStreamIterator(self.read)
722class FlowControlDataQueue(DataQueue[_T]):
723 """FlowControlDataQueue resumes and pauses an underlying stream.
725 It is a destination for parsed data.
727 This class is deprecated and will be removed in version 4.0.
728 """
730 def __init__(
731 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
732 ) -> None:
733 super().__init__(loop=loop)
734 self._size = 0
735 self._protocol = protocol
736 self._limit = limit * 2
738 def feed_data(self, data: _T, size: int = 0) -> None:
739 super().feed_data(data, size)
740 self._size += size
742 if self._size > self._limit and not self._protocol._reading_paused:
743 self._protocol.pause_reading()
745 async def read(self) -> _T:
746 if not self._buffer and not self._eof:
747 assert not self._waiter
748 self._waiter = self._loop.create_future()
749 try:
750 await self._waiter
751 except (asyncio.CancelledError, asyncio.TimeoutError):
752 self._waiter = None
753 raise
754 if self._buffer:
755 data, size = self._buffer.popleft()
756 self._size -= size
757 if self._size < self._limit and self._protocol._reading_paused:
758 self._protocol.resume_reading()
759 return data
760 if self._exception is not None:
761 raise self._exception
762 raise EofStream