Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/streams.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import collections
3import warnings
4from typing import (
5 Awaitable,
6 Callable,
7 Deque,
8 Final,
9 Generic,
10 List,
11 Optional,
12 Tuple,
13 TypeVar,
14)
16from .base_protocol import BaseProtocol
17from .helpers import (
18 _EXC_SENTINEL,
19 BaseTimerContext,
20 TimerNoop,
21 set_exception,
22 set_result,
23)
24from .log import internal_logger
26__all__ = (
27 "EMPTY_PAYLOAD",
28 "EofStream",
29 "StreamReader",
30 "DataQueue",
31)
33_T = TypeVar("_T")
36class EofStream(Exception):
37 """eof stream indication."""
40class AsyncStreamIterator(Generic[_T]):
42 __slots__ = ("read_func",)
44 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
45 self.read_func = read_func
47 def __aiter__(self) -> "AsyncStreamIterator[_T]":
48 return self
50 async def __anext__(self) -> _T:
51 try:
52 rv = await self.read_func()
53 except EofStream:
54 raise StopAsyncIteration
55 if rv == b"":
56 raise StopAsyncIteration
57 return rv
60class ChunkTupleAsyncStreamIterator:
62 __slots__ = ("_stream",)
64 def __init__(self, stream: "StreamReader") -> None:
65 self._stream = stream
67 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
68 return self
70 async def __anext__(self) -> Tuple[bytes, bool]:
71 rv = await self._stream.readchunk()
72 if rv == (b"", False):
73 raise StopAsyncIteration
74 return rv
77class AsyncStreamReaderMixin:
79 __slots__ = ()
81 def __aiter__(self) -> AsyncStreamIterator[bytes]:
82 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
84 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
85 """Returns an asynchronous iterator that yields chunks of size n."""
86 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined]
88 def iter_any(self) -> AsyncStreamIterator[bytes]:
89 """Yield all available data as soon as it is received."""
90 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
92 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
93 """Yield chunks of data as they are received by the server.
95 The yielded objects are tuples
96 of (bytes, bool) as returned by the StreamReader.readchunk method.
97 """
98 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
101class StreamReader(AsyncStreamReaderMixin):
102 """An enhancement of asyncio.StreamReader.
104 Supports asynchronous iteration by line, chunk or as available::
106 async for line in reader:
107 ...
108 async for chunk in reader.iter_chunked(1024):
109 ...
110 async for slice in reader.iter_any():
111 ...
113 """
115 __slots__ = (
116 "_protocol",
117 "_low_water",
118 "_high_water",
119 "_loop",
120 "_size",
121 "_cursor",
122 "_http_chunk_splits",
123 "_buffer",
124 "_buffer_offset",
125 "_eof",
126 "_waiter",
127 "_eof_waiter",
128 "_exception",
129 "_timer",
130 "_eof_callbacks",
131 "_eof_counter",
132 "total_bytes",
133 "total_compressed_bytes",
134 )
136 def __init__(
137 self,
138 protocol: BaseProtocol,
139 limit: int,
140 *,
141 timer: Optional[BaseTimerContext] = None,
142 loop: Optional[asyncio.AbstractEventLoop] = None,
143 ) -> None:
144 self._protocol = protocol
145 self._low_water = limit
146 self._high_water = limit * 2
147 if loop is None:
148 loop = asyncio.get_event_loop()
149 self._loop = loop
150 self._size = 0
151 self._cursor = 0
152 self._http_chunk_splits: Optional[List[int]] = None
153 self._buffer: Deque[bytes] = collections.deque()
154 self._buffer_offset = 0
155 self._eof = False
156 self._waiter: Optional[asyncio.Future[None]] = None
157 self._eof_waiter: Optional[asyncio.Future[None]] = None
158 self._exception: Optional[BaseException] = None
159 self._timer = TimerNoop() if timer is None else timer
160 self._eof_callbacks: List[Callable[[], None]] = []
161 self._eof_counter = 0
162 self.total_bytes = 0
163 self.total_compressed_bytes: Optional[int] = None
165 def __repr__(self) -> str:
166 info = [self.__class__.__name__]
167 if self._size:
168 info.append("%d bytes" % self._size)
169 if self._eof:
170 info.append("eof")
171 if self._low_water != 2**16: # default limit
172 info.append("low=%d high=%d" % (self._low_water, self._high_water))
173 if self._waiter:
174 info.append("w=%r" % self._waiter)
175 if self._exception:
176 info.append("e=%r" % self._exception)
177 return "<%s>" % " ".join(info)
179 def get_read_buffer_limits(self) -> Tuple[int, int]:
180 return (self._low_water, self._high_water)
182 def exception(self) -> Optional[BaseException]:
183 return self._exception
185 def set_exception(
186 self,
187 exc: BaseException,
188 exc_cause: BaseException = _EXC_SENTINEL,
189 ) -> None:
190 self._exception = exc
191 self._eof_callbacks.clear()
193 waiter = self._waiter
194 if waiter is not None:
195 self._waiter = None
196 set_exception(waiter, exc, exc_cause)
198 waiter = self._eof_waiter
199 if waiter is not None:
200 self._eof_waiter = None
201 set_exception(waiter, exc, exc_cause)
203 def on_eof(self, callback: Callable[[], None]) -> None:
204 if self._eof:
205 try:
206 callback()
207 except Exception:
208 internal_logger.exception("Exception in eof callback")
209 else:
210 self._eof_callbacks.append(callback)
212 def feed_eof(self) -> None:
213 self._eof = True
215 waiter = self._waiter
216 if waiter is not None:
217 self._waiter = None
218 set_result(waiter, None)
220 waiter = self._eof_waiter
221 if waiter is not None:
222 self._eof_waiter = None
223 set_result(waiter, None)
225 if self._protocol._reading_paused:
226 self._protocol.resume_reading()
228 for cb in self._eof_callbacks:
229 try:
230 cb()
231 except Exception:
232 internal_logger.exception("Exception in eof callback")
234 self._eof_callbacks.clear()
236 def is_eof(self) -> bool:
237 """Return True if 'feed_eof' was called."""
238 return self._eof
240 def at_eof(self) -> bool:
241 """Return True if the buffer is empty and 'feed_eof' was called."""
242 return self._eof and not self._buffer
244 async def wait_eof(self) -> None:
245 if self._eof:
246 return
248 assert self._eof_waiter is None
249 self._eof_waiter = self._loop.create_future()
250 try:
251 await self._eof_waiter
252 finally:
253 self._eof_waiter = None
255 @property
256 def total_raw_bytes(self) -> int:
257 if self.total_compressed_bytes is None:
258 return self.total_bytes
259 return self.total_compressed_bytes
261 def unread_data(self, data: bytes) -> None:
262 """rollback reading some data from stream, inserting it to buffer head."""
263 warnings.warn(
264 "unread_data() is deprecated "
265 "and will be removed in future releases (#3260)",
266 DeprecationWarning,
267 stacklevel=2,
268 )
269 if not data:
270 return
272 if self._buffer_offset:
273 self._buffer[0] = self._buffer[0][self._buffer_offset :]
274 self._buffer_offset = 0
275 self._size += len(data)
276 self._cursor -= len(data)
277 self._buffer.appendleft(data)
278 self._eof_counter = 0
280 # TODO: size is ignored, remove the param later
281 def feed_data(self, data: bytes, size: int = 0) -> None:
282 assert not self._eof, "feed_data after feed_eof"
284 if not data:
285 return
287 data_len = len(data)
288 self._size += data_len
289 self._buffer.append(data)
290 self.total_bytes += data_len
292 waiter = self._waiter
293 if waiter is not None:
294 self._waiter = None
295 set_result(waiter, None)
297 if self._size > self._high_water and not self._protocol._reading_paused:
298 self._protocol.pause_reading()
300 def begin_http_chunk_receiving(self) -> None:
301 if self._http_chunk_splits is None:
302 if self.total_bytes:
303 raise RuntimeError(
304 "Called begin_http_chunk_receiving when some data was already fed"
305 )
306 self._http_chunk_splits = []
308 def end_http_chunk_receiving(self) -> None:
309 if self._http_chunk_splits is None:
310 raise RuntimeError(
311 "Called end_chunk_receiving without calling "
312 "begin_chunk_receiving first"
313 )
315 # self._http_chunk_splits contains logical byte offsets from start of
316 # the body transfer. Each offset is the offset of the end of a chunk.
317 # "Logical" means bytes, accessible for a user.
318 # If no chunks containing logical data were received, current position
319 # is difinitely zero.
320 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
322 if self.total_bytes == pos:
323 # We should not add empty chunks here. So we check for that.
324 # Note, when chunked + gzip is used, we can receive a chunk
325 # of compressed data, but that data may not be enough for gzip FSM
326 # to yield any uncompressed data. That's why current position may
327 # not change after receiving a chunk.
328 return
330 self._http_chunk_splits.append(self.total_bytes)
332 # wake up readchunk when end of http chunk received
333 waiter = self._waiter
334 if waiter is not None:
335 self._waiter = None
336 set_result(waiter, None)
338 async def _wait(self, func_name: str) -> None:
339 if not self._protocol.connected:
340 raise RuntimeError("Connection closed.")
342 # StreamReader uses a future to link the protocol feed_data() method
343 # to a read coroutine. Running two read coroutines at the same time
344 # would have an unexpected behaviour. It would not possible to know
345 # which coroutine would get the next data.
346 if self._waiter is not None:
347 raise RuntimeError(
348 "%s() called while another coroutine is "
349 "already waiting for incoming data" % func_name
350 )
352 waiter = self._waiter = self._loop.create_future()
353 try:
354 with self._timer:
355 await waiter
356 finally:
357 self._waiter = None
359 async def readline(self) -> bytes:
360 return await self.readuntil()
362 async def readuntil(self, separator: bytes = b"\n") -> bytes:
363 seplen = len(separator)
364 if seplen == 0:
365 raise ValueError("Separator should be at least one-byte string")
367 if self._exception is not None:
368 raise self._exception
370 chunk = b""
371 chunk_size = 0
372 not_enough = True
374 while not_enough:
375 while self._buffer and not_enough:
376 offset = self._buffer_offset
377 ichar = self._buffer[0].find(separator, offset) + 1
378 # Read from current offset to found separator or to the end.
379 data = self._read_nowait_chunk(
380 ichar - offset + seplen - 1 if ichar else -1
381 )
382 chunk += data
383 chunk_size += len(data)
384 if ichar:
385 not_enough = False
387 if chunk_size > self._high_water:
388 raise ValueError("Chunk too big")
390 if self._eof:
391 break
393 if not_enough:
394 await self._wait("readuntil")
396 return chunk
398 async def read(self, n: int = -1) -> bytes:
399 if self._exception is not None:
400 raise self._exception
402 # migration problem; with DataQueue you have to catch
403 # EofStream exception, so common way is to run payload.read() inside
404 # infinite loop. what can cause real infinite loop with StreamReader
405 # lets keep this code one major release.
406 if __debug__:
407 if self._eof and not self._buffer:
408 self._eof_counter = getattr(self, "_eof_counter", 0) + 1
409 if self._eof_counter > 5:
410 internal_logger.warning(
411 "Multiple access to StreamReader in eof state, "
412 "might be infinite loop.",
413 stack_info=True,
414 )
416 if not n:
417 return b""
419 if n < 0:
420 # This used to just loop creating a new waiter hoping to
421 # collect everything in self._buffer, but that would
422 # deadlock if the subprocess sends more than self.limit
423 # bytes. So just call self.readany() until EOF.
424 blocks = []
425 while True:
426 block = await self.readany()
427 if not block:
428 break
429 blocks.append(block)
430 return b"".join(blocks)
432 # TODO: should be `if` instead of `while`
433 # because waiter maybe triggered on chunk end,
434 # without feeding any data
435 while not self._buffer and not self._eof:
436 await self._wait("read")
438 return self._read_nowait(n)
440 async def readany(self) -> bytes:
441 if self._exception is not None:
442 raise self._exception
444 # TODO: should be `if` instead of `while`
445 # because waiter maybe triggered on chunk end,
446 # without feeding any data
447 while not self._buffer and not self._eof:
448 await self._wait("readany")
450 return self._read_nowait(-1)
452 async def readchunk(self) -> Tuple[bytes, bool]:
453 """Returns a tuple of (data, end_of_http_chunk).
455 When chunked transfer
456 encoding is used, end_of_http_chunk is a boolean indicating if the end
457 of the data corresponds to the end of a HTTP chunk , otherwise it is
458 always False.
459 """
460 while True:
461 if self._exception is not None:
462 raise self._exception
464 while self._http_chunk_splits:
465 pos = self._http_chunk_splits.pop(0)
466 if pos == self._cursor:
467 return (b"", True)
468 if pos > self._cursor:
469 return (self._read_nowait(pos - self._cursor), True)
470 internal_logger.warning(
471 "Skipping HTTP chunk end due to data "
472 "consumption beyond chunk boundary"
473 )
475 if self._buffer:
476 return (self._read_nowait_chunk(-1), False)
477 # return (self._read_nowait(-1), False)
479 if self._eof:
480 # Special case for signifying EOF.
481 # (b'', True) is not a final return value actually.
482 return (b"", False)
484 await self._wait("readchunk")
486 async def readexactly(self, n: int) -> bytes:
487 if self._exception is not None:
488 raise self._exception
490 blocks: List[bytes] = []
491 while n > 0:
492 block = await self.read(n)
493 if not block:
494 partial = b"".join(blocks)
495 raise asyncio.IncompleteReadError(partial, len(partial) + n)
496 blocks.append(block)
497 n -= len(block)
499 return b"".join(blocks)
501 def read_nowait(self, n: int = -1) -> bytes:
502 # default was changed to be consistent with .read(-1)
503 #
504 # I believe the most users don't know about the method and
505 # they are not affected.
506 if self._exception is not None:
507 raise self._exception
509 if self._waiter and not self._waiter.done():
510 raise RuntimeError(
511 "Called while some coroutine is waiting for incoming data."
512 )
514 return self._read_nowait(n)
516 def _read_nowait_chunk(self, n: int) -> bytes:
517 first_buffer = self._buffer[0]
518 offset = self._buffer_offset
519 if n != -1 and len(first_buffer) - offset > n:
520 data = first_buffer[offset : offset + n]
521 self._buffer_offset += n
523 elif offset:
524 self._buffer.popleft()
525 data = first_buffer[offset:]
526 self._buffer_offset = 0
528 else:
529 data = self._buffer.popleft()
531 data_len = len(data)
532 self._size -= data_len
533 self._cursor += data_len
535 chunk_splits = self._http_chunk_splits
536 # Prevent memory leak: drop useless chunk splits
537 while chunk_splits and chunk_splits[0] < self._cursor:
538 chunk_splits.pop(0)
540 if self._size < self._low_water and self._protocol._reading_paused:
541 self._protocol.resume_reading()
542 return data
544 def _read_nowait(self, n: int) -> bytes:
545 """Read not more than n bytes, or whole buffer if n == -1"""
546 self._timer.assert_timeout()
548 chunks = []
549 while self._buffer:
550 chunk = self._read_nowait_chunk(n)
551 chunks.append(chunk)
552 if n != -1:
553 n -= len(chunk)
554 if n == 0:
555 break
557 return b"".join(chunks) if chunks else b""
560class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
562 __slots__ = ("_read_eof_chunk",)
564 def __init__(self) -> None:
565 self._read_eof_chunk = False
566 self.total_bytes = 0
568 def __repr__(self) -> str:
569 return "<%s>" % self.__class__.__name__
571 def exception(self) -> Optional[BaseException]:
572 return None
574 def set_exception(
575 self,
576 exc: BaseException,
577 exc_cause: BaseException = _EXC_SENTINEL,
578 ) -> None:
579 pass
581 def on_eof(self, callback: Callable[[], None]) -> None:
582 try:
583 callback()
584 except Exception:
585 internal_logger.exception("Exception in eof callback")
587 def feed_eof(self) -> None:
588 pass
590 def is_eof(self) -> bool:
591 return True
593 def at_eof(self) -> bool:
594 return True
596 async def wait_eof(self) -> None:
597 return
599 def feed_data(self, data: bytes, n: int = 0) -> None:
600 pass
602 async def readline(self) -> bytes:
603 return b""
605 async def read(self, n: int = -1) -> bytes:
606 return b""
608 # TODO add async def readuntil
610 async def readany(self) -> bytes:
611 return b""
613 async def readchunk(self) -> Tuple[bytes, bool]:
614 if not self._read_eof_chunk:
615 self._read_eof_chunk = True
616 return (b"", False)
618 return (b"", True)
620 async def readexactly(self, n: int) -> bytes:
621 raise asyncio.IncompleteReadError(b"", n)
623 def read_nowait(self, n: int = -1) -> bytes:
624 return b""
627EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
630class DataQueue(Generic[_T]):
631 """DataQueue is a general-purpose blocking queue with one reader."""
633 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
634 self._loop = loop
635 self._eof = False
636 self._waiter: Optional[asyncio.Future[None]] = None
637 self._exception: Optional[BaseException] = None
638 self._buffer: Deque[Tuple[_T, int]] = collections.deque()
640 def __len__(self) -> int:
641 return len(self._buffer)
643 def is_eof(self) -> bool:
644 return self._eof
646 def at_eof(self) -> bool:
647 return self._eof and not self._buffer
649 def exception(self) -> Optional[BaseException]:
650 return self._exception
652 def set_exception(
653 self,
654 exc: BaseException,
655 exc_cause: BaseException = _EXC_SENTINEL,
656 ) -> None:
657 self._eof = True
658 self._exception = exc
659 if (waiter := self._waiter) is not None:
660 self._waiter = None
661 set_exception(waiter, exc, exc_cause)
663 def feed_data(self, data: _T, size: int = 0) -> None:
664 self._buffer.append((data, size))
665 if (waiter := self._waiter) is not None:
666 self._waiter = None
667 set_result(waiter, None)
669 def feed_eof(self) -> None:
670 self._eof = True
671 if (waiter := self._waiter) is not None:
672 self._waiter = None
673 set_result(waiter, None)
675 async def read(self) -> _T:
676 if not self._buffer and not self._eof:
677 assert not self._waiter
678 self._waiter = self._loop.create_future()
679 try:
680 await self._waiter
681 except (asyncio.CancelledError, asyncio.TimeoutError):
682 self._waiter = None
683 raise
684 if self._buffer:
685 data, _ = self._buffer.popleft()
686 return data
687 if self._exception is not None:
688 raise self._exception
689 raise EofStream
691 def __aiter__(self) -> AsyncStreamIterator[_T]:
692 return AsyncStreamIterator(self.read)
695class FlowControlDataQueue(DataQueue[_T]):
696 """FlowControlDataQueue resumes and pauses an underlying stream.
698 It is a destination for parsed data.
700 This class is deprecated and will be removed in version 4.0.
701 """
703 def __init__(
704 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
705 ) -> None:
706 super().__init__(loop=loop)
707 self._size = 0
708 self._protocol = protocol
709 self._limit = limit * 2
711 def feed_data(self, data: _T, size: int = 0) -> None:
712 super().feed_data(data, size)
713 self._size += size
715 if self._size > self._limit and not self._protocol._reading_paused:
716 self._protocol.pause_reading()
718 async def read(self) -> _T:
719 if not self._buffer and not self._eof:
720 assert not self._waiter
721 self._waiter = self._loop.create_future()
722 try:
723 await self._waiter
724 except (asyncio.CancelledError, asyncio.TimeoutError):
725 self._waiter = None
726 raise
727 if self._buffer:
728 data, size = self._buffer.popleft()
729 self._size -= size
730 if self._size < self._limit and self._protocol._reading_paused:
731 self._protocol.resume_reading()
732 return data
733 if self._exception is not None:
734 raise self._exception
735 raise EofStream