Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/streams.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import collections
3import warnings
4from typing import (
5 Awaitable,
6 Callable,
7 Deque,
8 Final,
9 Generic,
10 List,
11 Optional,
12 Tuple,
13 TypeVar,
14)
16from .base_protocol import BaseProtocol
17from .helpers import (
18 _EXC_SENTINEL,
19 BaseTimerContext,
20 TimerNoop,
21 set_exception,
22 set_result,
23)
24from .log import internal_logger
26__all__ = (
27 "EMPTY_PAYLOAD",
28 "EofStream",
29 "StreamReader",
30 "DataQueue",
31)
33_T = TypeVar("_T")
36class EofStream(Exception):
37 """eof stream indication."""
40class AsyncStreamIterator(Generic[_T]):
42 __slots__ = ("read_func",)
44 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
45 self.read_func = read_func
47 def __aiter__(self) -> "AsyncStreamIterator[_T]":
48 return self
50 async def __anext__(self) -> _T:
51 try:
52 rv = await self.read_func()
53 except EofStream:
54 raise StopAsyncIteration
55 if rv == b"":
56 raise StopAsyncIteration
57 return rv
60class ChunkTupleAsyncStreamIterator:
62 __slots__ = ("_stream",)
64 def __init__(self, stream: "StreamReader") -> None:
65 self._stream = stream
67 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
68 return self
70 async def __anext__(self) -> Tuple[bytes, bool]:
71 rv = await self._stream.readchunk()
72 if rv == (b"", False):
73 raise StopAsyncIteration
74 return rv
77class AsyncStreamReaderMixin:
79 __slots__ = ()
81 def __aiter__(self) -> AsyncStreamIterator[bytes]:
82 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
84 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
85 """Returns an asynchronous iterator that yields chunks of size n."""
86 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined]
88 def iter_any(self) -> AsyncStreamIterator[bytes]:
89 """Yield all available data as soon as it is received."""
90 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
92 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
93 """Yield chunks of data as they are received by the server.
95 The yielded objects are tuples
96 of (bytes, bool) as returned by the StreamReader.readchunk method.
97 """
98 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
101class StreamReader(AsyncStreamReaderMixin):
102 """An enhancement of asyncio.StreamReader.
104 Supports asynchronous iteration by line, chunk or as available::
106 async for line in reader:
107 ...
108 async for chunk in reader.iter_chunked(1024):
109 ...
110 async for slice in reader.iter_any():
111 ...
113 """
115 __slots__ = (
116 "_protocol",
117 "_low_water",
118 "_high_water",
119 "_low_water_chunks",
120 "_high_water_chunks",
121 "_loop",
122 "_size",
123 "_cursor",
124 "_http_chunk_splits",
125 "_buffer",
126 "_buffer_offset",
127 "_eof",
128 "_waiter",
129 "_eof_waiter",
130 "_exception",
131 "_timer",
132 "_eof_callbacks",
133 "_eof_counter",
134 "total_bytes",
135 "total_compressed_bytes",
136 )
138 def __init__(
139 self,
140 protocol: BaseProtocol,
141 limit: int,
142 *,
143 timer: Optional[BaseTimerContext] = None,
144 loop: Optional[asyncio.AbstractEventLoop] = None,
145 ) -> None:
146 self._protocol = protocol
147 self._low_water = limit
148 self._high_water = limit * 2
149 if loop is None:
150 loop = asyncio.get_event_loop()
151 # Ensure high_water_chunks >= 3 so it's always > low_water_chunks.
152 self._high_water_chunks = max(3, limit // 4)
153 # Use max(2, ...) because there's always at least 1 chunk split remaining
154 # (the current position), so we need low_water >= 2 to allow resume.
155 self._low_water_chunks = max(2, self._high_water_chunks // 2)
156 self._loop = loop
157 self._size = 0
158 self._cursor = 0
159 self._http_chunk_splits: Optional[Deque[int]] = None
160 self._buffer: Deque[bytes] = collections.deque()
161 self._buffer_offset = 0
162 self._eof = False
163 self._waiter: Optional[asyncio.Future[None]] = None
164 self._eof_waiter: Optional[asyncio.Future[None]] = None
165 self._exception: Optional[BaseException] = None
166 self._timer = TimerNoop() if timer is None else timer
167 self._eof_callbacks: List[Callable[[], None]] = []
168 self._eof_counter = 0
169 self.total_bytes = 0
170 self.total_compressed_bytes: Optional[int] = None
172 def __repr__(self) -> str:
173 info = [self.__class__.__name__]
174 if self._size:
175 info.append("%d bytes" % self._size)
176 if self._eof:
177 info.append("eof")
178 if self._low_water != 2**16: # default limit
179 info.append("low=%d high=%d" % (self._low_water, self._high_water))
180 if self._waiter:
181 info.append("w=%r" % self._waiter)
182 if self._exception:
183 info.append("e=%r" % self._exception)
184 return "<%s>" % " ".join(info)
186 def get_read_buffer_limits(self) -> Tuple[int, int]:
187 return (self._low_water, self._high_water)
189 def exception(self) -> Optional[BaseException]:
190 return self._exception
192 def set_exception(
193 self,
194 exc: BaseException,
195 exc_cause: BaseException = _EXC_SENTINEL,
196 ) -> None:
197 self._exception = exc
198 self._eof_callbacks.clear()
200 waiter = self._waiter
201 if waiter is not None:
202 self._waiter = None
203 set_exception(waiter, exc, exc_cause)
205 waiter = self._eof_waiter
206 if waiter is not None:
207 self._eof_waiter = None
208 set_exception(waiter, exc, exc_cause)
210 def on_eof(self, callback: Callable[[], None]) -> None:
211 if self._eof:
212 try:
213 callback()
214 except Exception:
215 internal_logger.exception("Exception in eof callback")
216 else:
217 self._eof_callbacks.append(callback)
219 def feed_eof(self) -> None:
220 self._eof = True
222 waiter = self._waiter
223 if waiter is not None:
224 self._waiter = None
225 set_result(waiter, None)
227 waiter = self._eof_waiter
228 if waiter is not None:
229 self._eof_waiter = None
230 set_result(waiter, None)
232 if self._protocol._reading_paused:
233 self._protocol.resume_reading()
235 for cb in self._eof_callbacks:
236 try:
237 cb()
238 except Exception:
239 internal_logger.exception("Exception in eof callback")
241 self._eof_callbacks.clear()
243 def is_eof(self) -> bool:
244 """Return True if 'feed_eof' was called."""
245 return self._eof
247 def at_eof(self) -> bool:
248 """Return True if the buffer is empty and 'feed_eof' was called."""
249 return self._eof and not self._buffer
251 async def wait_eof(self) -> None:
252 if self._eof:
253 return
255 assert self._eof_waiter is None
256 self._eof_waiter = self._loop.create_future()
257 try:
258 await self._eof_waiter
259 finally:
260 self._eof_waiter = None
262 @property
263 def total_raw_bytes(self) -> int:
264 if self.total_compressed_bytes is None:
265 return self.total_bytes
266 return self.total_compressed_bytes
268 def unread_data(self, data: bytes) -> None:
269 """rollback reading some data from stream, inserting it to buffer head."""
270 warnings.warn(
271 "unread_data() is deprecated "
272 "and will be removed in future releases (#3260)",
273 DeprecationWarning,
274 stacklevel=2,
275 )
276 if not data:
277 return
279 if self._buffer_offset:
280 self._buffer[0] = self._buffer[0][self._buffer_offset :]
281 self._buffer_offset = 0
282 self._size += len(data)
283 self._cursor -= len(data)
284 self._buffer.appendleft(data)
285 self._eof_counter = 0
287 # TODO: size is ignored, remove the param later
288 def feed_data(self, data: bytes, size: int = 0) -> None:
289 assert not self._eof, "feed_data after feed_eof"
291 if not data:
292 return
294 data_len = len(data)
295 self._size += data_len
296 self._buffer.append(data)
297 self.total_bytes += data_len
299 waiter = self._waiter
300 if waiter is not None:
301 self._waiter = None
302 set_result(waiter, None)
304 if self._size > self._high_water and not self._protocol._reading_paused:
305 self._protocol.pause_reading()
307 def begin_http_chunk_receiving(self) -> None:
308 if self._http_chunk_splits is None:
309 if self.total_bytes:
310 raise RuntimeError(
311 "Called begin_http_chunk_receiving when some data was already fed"
312 )
313 self._http_chunk_splits = collections.deque()
315 def end_http_chunk_receiving(self) -> None:
316 if self._http_chunk_splits is None:
317 raise RuntimeError(
318 "Called end_chunk_receiving without calling "
319 "begin_chunk_receiving first"
320 )
322 # self._http_chunk_splits contains logical byte offsets from start of
323 # the body transfer. Each offset is the offset of the end of a chunk.
324 # "Logical" means bytes, accessible for a user.
325 # If no chunks containing logical data were received, current position
326 # is difinitely zero.
327 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
329 if self.total_bytes == pos:
330 # We should not add empty chunks here. So we check for that.
331 # Note, when chunked + gzip is used, we can receive a chunk
332 # of compressed data, but that data may not be enough for gzip FSM
333 # to yield any uncompressed data. That's why current position may
334 # not change after receiving a chunk.
335 return
337 self._http_chunk_splits.append(self.total_bytes)
339 # If we get too many small chunks before self._high_water is reached, then any
340 # .read() call becomes computationally expensive, and could block the event loop
341 # for too long, hence an additional self._high_water_chunks here.
342 if (
343 len(self._http_chunk_splits) > self._high_water_chunks
344 and not self._protocol._reading_paused
345 ):
346 self._protocol.pause_reading()
348 # wake up readchunk when end of http chunk received
349 waiter = self._waiter
350 if waiter is not None:
351 self._waiter = None
352 set_result(waiter, None)
354 async def _wait(self, func_name: str) -> None:
355 if not self._protocol.connected:
356 raise RuntimeError("Connection closed.")
358 # StreamReader uses a future to link the protocol feed_data() method
359 # to a read coroutine. Running two read coroutines at the same time
360 # would have an unexpected behaviour. It would not possible to know
361 # which coroutine would get the next data.
362 if self._waiter is not None:
363 raise RuntimeError(
364 "%s() called while another coroutine is "
365 "already waiting for incoming data" % func_name
366 )
368 waiter = self._waiter = self._loop.create_future()
369 try:
370 with self._timer:
371 await waiter
372 finally:
373 self._waiter = None
375 async def readline(self) -> bytes:
376 return await self.readuntil()
378 async def readuntil(self, separator: bytes = b"\n") -> bytes:
379 seplen = len(separator)
380 if seplen == 0:
381 raise ValueError("Separator should be at least one-byte string")
383 if self._exception is not None:
384 raise self._exception
386 chunk = b""
387 chunk_size = 0
388 not_enough = True
390 while not_enough:
391 while self._buffer and not_enough:
392 offset = self._buffer_offset
393 ichar = self._buffer[0].find(separator, offset) + 1
394 # Read from current offset to found separator or to the end.
395 data = self._read_nowait_chunk(
396 ichar - offset + seplen - 1 if ichar else -1
397 )
398 chunk += data
399 chunk_size += len(data)
400 if ichar:
401 not_enough = False
403 if chunk_size > self._high_water:
404 raise ValueError("Chunk too big")
406 if self._eof:
407 break
409 if not_enough:
410 await self._wait("readuntil")
412 return chunk
414 async def read(self, n: int = -1) -> bytes:
415 if self._exception is not None:
416 raise self._exception
418 # migration problem; with DataQueue you have to catch
419 # EofStream exception, so common way is to run payload.read() inside
420 # infinite loop. what can cause real infinite loop with StreamReader
421 # lets keep this code one major release.
422 if __debug__:
423 if self._eof and not self._buffer:
424 self._eof_counter = getattr(self, "_eof_counter", 0) + 1
425 if self._eof_counter > 5:
426 internal_logger.warning(
427 "Multiple access to StreamReader in eof state, "
428 "might be infinite loop.",
429 stack_info=True,
430 )
432 if not n:
433 return b""
435 if n < 0:
436 # This used to just loop creating a new waiter hoping to
437 # collect everything in self._buffer, but that would
438 # deadlock if the subprocess sends more than self.limit
439 # bytes. So just call self.readany() until EOF.
440 blocks = []
441 while True:
442 block = await self.readany()
443 if not block:
444 break
445 blocks.append(block)
446 return b"".join(blocks)
448 # TODO: should be `if` instead of `while`
449 # because waiter maybe triggered on chunk end,
450 # without feeding any data
451 while not self._buffer and not self._eof:
452 await self._wait("read")
454 return self._read_nowait(n)
456 async def readany(self) -> bytes:
457 if self._exception is not None:
458 raise self._exception
460 # TODO: should be `if` instead of `while`
461 # because waiter maybe triggered on chunk end,
462 # without feeding any data
463 while not self._buffer and not self._eof:
464 await self._wait("readany")
466 return self._read_nowait(-1)
468 async def readchunk(self) -> Tuple[bytes, bool]:
469 """Returns a tuple of (data, end_of_http_chunk).
471 When chunked transfer
472 encoding is used, end_of_http_chunk is a boolean indicating if the end
473 of the data corresponds to the end of a HTTP chunk , otherwise it is
474 always False.
475 """
476 while True:
477 if self._exception is not None:
478 raise self._exception
480 while self._http_chunk_splits:
481 pos = self._http_chunk_splits.popleft()
482 if pos == self._cursor:
483 return (b"", True)
484 if pos > self._cursor:
485 return (self._read_nowait(pos - self._cursor), True)
486 internal_logger.warning(
487 "Skipping HTTP chunk end due to data "
488 "consumption beyond chunk boundary"
489 )
491 if self._buffer:
492 return (self._read_nowait_chunk(-1), False)
493 # return (self._read_nowait(-1), False)
495 if self._eof:
496 # Special case for signifying EOF.
497 # (b'', True) is not a final return value actually.
498 return (b"", False)
500 await self._wait("readchunk")
502 async def readexactly(self, n: int) -> bytes:
503 if self._exception is not None:
504 raise self._exception
506 blocks: List[bytes] = []
507 while n > 0:
508 block = await self.read(n)
509 if not block:
510 partial = b"".join(blocks)
511 raise asyncio.IncompleteReadError(partial, len(partial) + n)
512 blocks.append(block)
513 n -= len(block)
515 return b"".join(blocks)
517 def read_nowait(self, n: int = -1) -> bytes:
518 # default was changed to be consistent with .read(-1)
519 #
520 # I believe the most users don't know about the method and
521 # they are not affected.
522 if self._exception is not None:
523 raise self._exception
525 if self._waiter and not self._waiter.done():
526 raise RuntimeError(
527 "Called while some coroutine is waiting for incoming data."
528 )
530 return self._read_nowait(n)
532 def _read_nowait_chunk(self, n: int) -> bytes:
533 first_buffer = self._buffer[0]
534 offset = self._buffer_offset
535 if n != -1 and len(first_buffer) - offset > n:
536 data = first_buffer[offset : offset + n]
537 self._buffer_offset += n
539 elif offset:
540 self._buffer.popleft()
541 data = first_buffer[offset:]
542 self._buffer_offset = 0
544 else:
545 data = self._buffer.popleft()
547 data_len = len(data)
548 self._size -= data_len
549 self._cursor += data_len
551 chunk_splits = self._http_chunk_splits
552 # Prevent memory leak: drop useless chunk splits
553 while chunk_splits and chunk_splits[0] < self._cursor:
554 chunk_splits.popleft()
556 if (
557 self._protocol._reading_paused
558 and self._size < self._low_water
559 and (
560 self._http_chunk_splits is None
561 or len(self._http_chunk_splits) < self._low_water_chunks
562 )
563 ):
564 self._protocol.resume_reading()
565 return data
567 def _read_nowait(self, n: int) -> bytes:
568 """Read not more than n bytes, or whole buffer if n == -1"""
569 self._timer.assert_timeout()
571 chunks = []
572 while self._buffer:
573 chunk = self._read_nowait_chunk(n)
574 chunks.append(chunk)
575 if n != -1:
576 n -= len(chunk)
577 if n == 0:
578 break
580 return b"".join(chunks) if chunks else b""
583class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
585 __slots__ = ("_read_eof_chunk",)
587 def __init__(self) -> None:
588 self._read_eof_chunk = False
589 self.total_bytes = 0
591 def __repr__(self) -> str:
592 return "<%s>" % self.__class__.__name__
594 def exception(self) -> Optional[BaseException]:
595 return None
597 def set_exception(
598 self,
599 exc: BaseException,
600 exc_cause: BaseException = _EXC_SENTINEL,
601 ) -> None:
602 pass
604 def on_eof(self, callback: Callable[[], None]) -> None:
605 try:
606 callback()
607 except Exception:
608 internal_logger.exception("Exception in eof callback")
610 def feed_eof(self) -> None:
611 pass
613 def is_eof(self) -> bool:
614 return True
616 def at_eof(self) -> bool:
617 return True
619 async def wait_eof(self) -> None:
620 return
622 def feed_data(self, data: bytes, n: int = 0) -> None:
623 pass
625 async def readline(self) -> bytes:
626 return b""
628 async def read(self, n: int = -1) -> bytes:
629 return b""
631 # TODO add async def readuntil
633 async def readany(self) -> bytes:
634 return b""
636 async def readchunk(self) -> Tuple[bytes, bool]:
637 if not self._read_eof_chunk:
638 self._read_eof_chunk = True
639 return (b"", False)
641 return (b"", True)
643 async def readexactly(self, n: int) -> bytes:
644 raise asyncio.IncompleteReadError(b"", n)
646 def read_nowait(self, n: int = -1) -> bytes:
647 return b""
650EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
653class DataQueue(Generic[_T]):
654 """DataQueue is a general-purpose blocking queue with one reader."""
656 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
657 self._loop = loop
658 self._eof = False
659 self._waiter: Optional[asyncio.Future[None]] = None
660 self._exception: Optional[BaseException] = None
661 self._buffer: Deque[Tuple[_T, int]] = collections.deque()
663 def __len__(self) -> int:
664 return len(self._buffer)
666 def is_eof(self) -> bool:
667 return self._eof
669 def at_eof(self) -> bool:
670 return self._eof and not self._buffer
672 def exception(self) -> Optional[BaseException]:
673 return self._exception
675 def set_exception(
676 self,
677 exc: BaseException,
678 exc_cause: BaseException = _EXC_SENTINEL,
679 ) -> None:
680 self._eof = True
681 self._exception = exc
682 if (waiter := self._waiter) is not None:
683 self._waiter = None
684 set_exception(waiter, exc, exc_cause)
686 def feed_data(self, data: _T, size: int = 0) -> None:
687 self._buffer.append((data, size))
688 if (waiter := self._waiter) is not None:
689 self._waiter = None
690 set_result(waiter, None)
692 def feed_eof(self) -> None:
693 self._eof = True
694 if (waiter := self._waiter) is not None:
695 self._waiter = None
696 set_result(waiter, None)
698 async def read(self) -> _T:
699 if not self._buffer and not self._eof:
700 assert not self._waiter
701 self._waiter = self._loop.create_future()
702 try:
703 await self._waiter
704 except (asyncio.CancelledError, asyncio.TimeoutError):
705 self._waiter = None
706 raise
707 if self._buffer:
708 data, _ = self._buffer.popleft()
709 return data
710 if self._exception is not None:
711 raise self._exception
712 raise EofStream
714 def __aiter__(self) -> AsyncStreamIterator[_T]:
715 return AsyncStreamIterator(self.read)
718class FlowControlDataQueue(DataQueue[_T]):
719 """FlowControlDataQueue resumes and pauses an underlying stream.
721 It is a destination for parsed data.
723 This class is deprecated and will be removed in version 4.0.
724 """
726 def __init__(
727 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
728 ) -> None:
729 super().__init__(loop=loop)
730 self._size = 0
731 self._protocol = protocol
732 self._limit = limit * 2
734 def feed_data(self, data: _T, size: int = 0) -> None:
735 super().feed_data(data, size)
736 self._size += size
738 if self._size > self._limit and not self._protocol._reading_paused:
739 self._protocol.pause_reading()
741 async def read(self) -> _T:
742 if not self._buffer and not self._eof:
743 assert not self._waiter
744 self._waiter = self._loop.create_future()
745 try:
746 await self._waiter
747 except (asyncio.CancelledError, asyncio.TimeoutError):
748 self._waiter = None
749 raise
750 if self._buffer:
751 data, size = self._buffer.popleft()
752 self._size -= size
753 if self._size < self._limit and self._protocol._reading_paused:
754 self._protocol.resume_reading()
755 return data
756 if self._exception is not None:
757 raise self._exception
758 raise EofStream