Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/streams.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import collections
3import warnings
4from typing import (
5 Awaitable,
6 Callable,
7 Deque,
8 Final,
9 Generic,
10 List,
11 Optional,
12 Tuple,
13 TypeVar,
14)
16from .base_protocol import BaseProtocol
17from .helpers import (
18 _EXC_SENTINEL,
19 BaseTimerContext,
20 TimerNoop,
21 set_exception,
22 set_result,
23)
24from .log import internal_logger
26__all__ = (
27 "EMPTY_PAYLOAD",
28 "EofStream",
29 "StreamReader",
30 "DataQueue",
31)
33_T = TypeVar("_T")
36class EofStream(Exception):
37 """eof stream indication."""
40class AsyncStreamIterator(Generic[_T]):
42 __slots__ = ("read_func",)
44 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
45 self.read_func = read_func
47 def __aiter__(self) -> "AsyncStreamIterator[_T]":
48 return self
50 async def __anext__(self) -> _T:
51 try:
52 rv = await self.read_func()
53 except EofStream:
54 raise StopAsyncIteration
55 if rv == b"":
56 raise StopAsyncIteration
57 return rv
60class ChunkTupleAsyncStreamIterator:
62 __slots__ = ("_stream",)
64 def __init__(self, stream: "StreamReader") -> None:
65 self._stream = stream
67 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
68 return self
70 async def __anext__(self) -> Tuple[bytes, bool]:
71 rv = await self._stream.readchunk()
72 if rv == (b"", False):
73 raise StopAsyncIteration
74 return rv
77class AsyncStreamReaderMixin:
79 __slots__ = ()
81 def __aiter__(self) -> AsyncStreamIterator[bytes]:
82 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
84 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
85 """Returns an asynchronous iterator that yields chunks of size n."""
86 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined]
88 def iter_any(self) -> AsyncStreamIterator[bytes]:
89 """Yield all available data as soon as it is received."""
90 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
92 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
93 """Yield chunks of data as they are received by the server.
95 The yielded objects are tuples
96 of (bytes, bool) as returned by the StreamReader.readchunk method.
97 """
98 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
101class StreamReader(AsyncStreamReaderMixin):
102 """An enhancement of asyncio.StreamReader.
104 Supports asynchronous iteration by line, chunk or as available::
106 async for line in reader:
107 ...
108 async for chunk in reader.iter_chunked(1024):
109 ...
110 async for slice in reader.iter_any():
111 ...
113 """
115 __slots__ = (
116 "_protocol",
117 "_low_water",
118 "_high_water",
119 "_loop",
120 "_size",
121 "_cursor",
122 "_http_chunk_splits",
123 "_buffer",
124 "_buffer_offset",
125 "_eof",
126 "_waiter",
127 "_eof_waiter",
128 "_exception",
129 "_timer",
130 "_eof_callbacks",
131 "_eof_counter",
132 "total_bytes",
133 )
135 def __init__(
136 self,
137 protocol: BaseProtocol,
138 limit: int,
139 *,
140 timer: Optional[BaseTimerContext] = None,
141 loop: Optional[asyncio.AbstractEventLoop] = None,
142 ) -> None:
143 self._protocol = protocol
144 self._low_water = limit
145 self._high_water = limit * 2
146 if loop is None:
147 loop = asyncio.get_event_loop()
148 self._loop = loop
149 self._size = 0
150 self._cursor = 0
151 self._http_chunk_splits: Optional[List[int]] = None
152 self._buffer: Deque[bytes] = collections.deque()
153 self._buffer_offset = 0
154 self._eof = False
155 self._waiter: Optional[asyncio.Future[None]] = None
156 self._eof_waiter: Optional[asyncio.Future[None]] = None
157 self._exception: Optional[BaseException] = None
158 self._timer = TimerNoop() if timer is None else timer
159 self._eof_callbacks: List[Callable[[], None]] = []
160 self._eof_counter = 0
161 self.total_bytes = 0
163 def __repr__(self) -> str:
164 info = [self.__class__.__name__]
165 if self._size:
166 info.append("%d bytes" % self._size)
167 if self._eof:
168 info.append("eof")
169 if self._low_water != 2**16: # default limit
170 info.append("low=%d high=%d" % (self._low_water, self._high_water))
171 if self._waiter:
172 info.append("w=%r" % self._waiter)
173 if self._exception:
174 info.append("e=%r" % self._exception)
175 return "<%s>" % " ".join(info)
177 def get_read_buffer_limits(self) -> Tuple[int, int]:
178 return (self._low_water, self._high_water)
180 def exception(self) -> Optional[BaseException]:
181 return self._exception
183 def set_exception(
184 self,
185 exc: BaseException,
186 exc_cause: BaseException = _EXC_SENTINEL,
187 ) -> None:
188 self._exception = exc
189 self._eof_callbacks.clear()
191 waiter = self._waiter
192 if waiter is not None:
193 self._waiter = None
194 set_exception(waiter, exc, exc_cause)
196 waiter = self._eof_waiter
197 if waiter is not None:
198 self._eof_waiter = None
199 set_exception(waiter, exc, exc_cause)
201 def on_eof(self, callback: Callable[[], None]) -> None:
202 if self._eof:
203 try:
204 callback()
205 except Exception:
206 internal_logger.exception("Exception in eof callback")
207 else:
208 self._eof_callbacks.append(callback)
210 def feed_eof(self) -> None:
211 self._eof = True
213 waiter = self._waiter
214 if waiter is not None:
215 self._waiter = None
216 set_result(waiter, None)
218 waiter = self._eof_waiter
219 if waiter is not None:
220 self._eof_waiter = None
221 set_result(waiter, None)
223 if self._protocol._reading_paused:
224 self._protocol.resume_reading()
226 for cb in self._eof_callbacks:
227 try:
228 cb()
229 except Exception:
230 internal_logger.exception("Exception in eof callback")
232 self._eof_callbacks.clear()
234 def is_eof(self) -> bool:
235 """Return True if 'feed_eof' was called."""
236 return self._eof
238 def at_eof(self) -> bool:
239 """Return True if the buffer is empty and 'feed_eof' was called."""
240 return self._eof and not self._buffer
242 async def wait_eof(self) -> None:
243 if self._eof:
244 return
246 assert self._eof_waiter is None
247 self._eof_waiter = self._loop.create_future()
248 try:
249 await self._eof_waiter
250 finally:
251 self._eof_waiter = None
253 def unread_data(self, data: bytes) -> None:
254 """rollback reading some data from stream, inserting it to buffer head."""
255 warnings.warn(
256 "unread_data() is deprecated "
257 "and will be removed in future releases (#3260)",
258 DeprecationWarning,
259 stacklevel=2,
260 )
261 if not data:
262 return
264 if self._buffer_offset:
265 self._buffer[0] = self._buffer[0][self._buffer_offset :]
266 self._buffer_offset = 0
267 self._size += len(data)
268 self._cursor -= len(data)
269 self._buffer.appendleft(data)
270 self._eof_counter = 0
272 # TODO: size is ignored, remove the param later
273 def feed_data(self, data: bytes, size: int = 0) -> None:
274 assert not self._eof, "feed_data after feed_eof"
276 if not data:
277 return
279 data_len = len(data)
280 self._size += data_len
281 self._buffer.append(data)
282 self.total_bytes += data_len
284 waiter = self._waiter
285 if waiter is not None:
286 self._waiter = None
287 set_result(waiter, None)
289 if self._size > self._high_water and not self._protocol._reading_paused:
290 self._protocol.pause_reading()
292 def begin_http_chunk_receiving(self) -> None:
293 if self._http_chunk_splits is None:
294 if self.total_bytes:
295 raise RuntimeError(
296 "Called begin_http_chunk_receiving when some data was already fed"
297 )
298 self._http_chunk_splits = []
300 def end_http_chunk_receiving(self) -> None:
301 if self._http_chunk_splits is None:
302 raise RuntimeError(
303 "Called end_chunk_receiving without calling "
304 "begin_chunk_receiving first"
305 )
307 # self._http_chunk_splits contains logical byte offsets from start of
308 # the body transfer. Each offset is the offset of the end of a chunk.
309 # "Logical" means bytes, accessible for a user.
310 # If no chunks containing logical data were received, current position
311 # is difinitely zero.
312 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
314 if self.total_bytes == pos:
315 # We should not add empty chunks here. So we check for that.
316 # Note, when chunked + gzip is used, we can receive a chunk
317 # of compressed data, but that data may not be enough for gzip FSM
318 # to yield any uncompressed data. That's why current position may
319 # not change after receiving a chunk.
320 return
322 self._http_chunk_splits.append(self.total_bytes)
324 # wake up readchunk when end of http chunk received
325 waiter = self._waiter
326 if waiter is not None:
327 self._waiter = None
328 set_result(waiter, None)
330 async def _wait(self, func_name: str) -> None:
331 if not self._protocol.connected:
332 raise RuntimeError("Connection closed.")
334 # StreamReader uses a future to link the protocol feed_data() method
335 # to a read coroutine. Running two read coroutines at the same time
336 # would have an unexpected behaviour. It would not possible to know
337 # which coroutine would get the next data.
338 if self._waiter is not None:
339 raise RuntimeError(
340 "%s() called while another coroutine is "
341 "already waiting for incoming data" % func_name
342 )
344 waiter = self._waiter = self._loop.create_future()
345 try:
346 with self._timer:
347 await waiter
348 finally:
349 self._waiter = None
351 async def readline(self) -> bytes:
352 return await self.readuntil()
354 async def readuntil(self, separator: bytes = b"\n") -> bytes:
355 seplen = len(separator)
356 if seplen == 0:
357 raise ValueError("Separator should be at least one-byte string")
359 if self._exception is not None:
360 raise self._exception
362 chunk = b""
363 chunk_size = 0
364 not_enough = True
366 while not_enough:
367 while self._buffer and not_enough:
368 offset = self._buffer_offset
369 ichar = self._buffer[0].find(separator, offset) + 1
370 # Read from current offset to found separator or to the end.
371 data = self._read_nowait_chunk(
372 ichar - offset + seplen - 1 if ichar else -1
373 )
374 chunk += data
375 chunk_size += len(data)
376 if ichar:
377 not_enough = False
379 if chunk_size > self._high_water:
380 raise ValueError("Chunk too big")
382 if self._eof:
383 break
385 if not_enough:
386 await self._wait("readuntil")
388 return chunk
390 async def read(self, n: int = -1) -> bytes:
391 if self._exception is not None:
392 raise self._exception
394 # migration problem; with DataQueue you have to catch
395 # EofStream exception, so common way is to run payload.read() inside
396 # infinite loop. what can cause real infinite loop with StreamReader
397 # lets keep this code one major release.
398 if __debug__:
399 if self._eof and not self._buffer:
400 self._eof_counter = getattr(self, "_eof_counter", 0) + 1
401 if self._eof_counter > 5:
402 internal_logger.warning(
403 "Multiple access to StreamReader in eof state, "
404 "might be infinite loop.",
405 stack_info=True,
406 )
408 if not n:
409 return b""
411 if n < 0:
412 # This used to just loop creating a new waiter hoping to
413 # collect everything in self._buffer, but that would
414 # deadlock if the subprocess sends more than self.limit
415 # bytes. So just call self.readany() until EOF.
416 blocks = []
417 while True:
418 block = await self.readany()
419 if not block:
420 break
421 blocks.append(block)
422 return b"".join(blocks)
424 # TODO: should be `if` instead of `while`
425 # because waiter maybe triggered on chunk end,
426 # without feeding any data
427 while not self._buffer and not self._eof:
428 await self._wait("read")
430 return self._read_nowait(n)
432 async def readany(self) -> bytes:
433 if self._exception is not None:
434 raise self._exception
436 # TODO: should be `if` instead of `while`
437 # because waiter maybe triggered on chunk end,
438 # without feeding any data
439 while not self._buffer and not self._eof:
440 await self._wait("readany")
442 return self._read_nowait(-1)
444 async def readchunk(self) -> Tuple[bytes, bool]:
445 """Returns a tuple of (data, end_of_http_chunk).
447 When chunked transfer
448 encoding is used, end_of_http_chunk is a boolean indicating if the end
449 of the data corresponds to the end of a HTTP chunk , otherwise it is
450 always False.
451 """
452 while True:
453 if self._exception is not None:
454 raise self._exception
456 while self._http_chunk_splits:
457 pos = self._http_chunk_splits.pop(0)
458 if pos == self._cursor:
459 return (b"", True)
460 if pos > self._cursor:
461 return (self._read_nowait(pos - self._cursor), True)
462 internal_logger.warning(
463 "Skipping HTTP chunk end due to data "
464 "consumption beyond chunk boundary"
465 )
467 if self._buffer:
468 return (self._read_nowait_chunk(-1), False)
469 # return (self._read_nowait(-1), False)
471 if self._eof:
472 # Special case for signifying EOF.
473 # (b'', True) is not a final return value actually.
474 return (b"", False)
476 await self._wait("readchunk")
478 async def readexactly(self, n: int) -> bytes:
479 if self._exception is not None:
480 raise self._exception
482 blocks: List[bytes] = []
483 while n > 0:
484 block = await self.read(n)
485 if not block:
486 partial = b"".join(blocks)
487 raise asyncio.IncompleteReadError(partial, len(partial) + n)
488 blocks.append(block)
489 n -= len(block)
491 return b"".join(blocks)
493 def read_nowait(self, n: int = -1) -> bytes:
494 # default was changed to be consistent with .read(-1)
495 #
496 # I believe the most users don't know about the method and
497 # they are not affected.
498 if self._exception is not None:
499 raise self._exception
501 if self._waiter and not self._waiter.done():
502 raise RuntimeError(
503 "Called while some coroutine is waiting for incoming data."
504 )
506 return self._read_nowait(n)
508 def _read_nowait_chunk(self, n: int) -> bytes:
509 first_buffer = self._buffer[0]
510 offset = self._buffer_offset
511 if n != -1 and len(first_buffer) - offset > n:
512 data = first_buffer[offset : offset + n]
513 self._buffer_offset += n
515 elif offset:
516 self._buffer.popleft()
517 data = first_buffer[offset:]
518 self._buffer_offset = 0
520 else:
521 data = self._buffer.popleft()
523 data_len = len(data)
524 self._size -= data_len
525 self._cursor += data_len
527 chunk_splits = self._http_chunk_splits
528 # Prevent memory leak: drop useless chunk splits
529 while chunk_splits and chunk_splits[0] < self._cursor:
530 chunk_splits.pop(0)
532 if self._size < self._low_water and self._protocol._reading_paused:
533 self._protocol.resume_reading()
534 return data
536 def _read_nowait(self, n: int) -> bytes:
537 """Read not more than n bytes, or whole buffer if n == -1"""
538 self._timer.assert_timeout()
540 chunks = []
541 while self._buffer:
542 chunk = self._read_nowait_chunk(n)
543 chunks.append(chunk)
544 if n != -1:
545 n -= len(chunk)
546 if n == 0:
547 break
549 return b"".join(chunks) if chunks else b""
552class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
554 __slots__ = ("_read_eof_chunk",)
556 def __init__(self) -> None:
557 self._read_eof_chunk = False
558 self.total_bytes = 0
560 def __repr__(self) -> str:
561 return "<%s>" % self.__class__.__name__
563 def exception(self) -> Optional[BaseException]:
564 return None
566 def set_exception(
567 self,
568 exc: BaseException,
569 exc_cause: BaseException = _EXC_SENTINEL,
570 ) -> None:
571 pass
573 def on_eof(self, callback: Callable[[], None]) -> None:
574 try:
575 callback()
576 except Exception:
577 internal_logger.exception("Exception in eof callback")
579 def feed_eof(self) -> None:
580 pass
582 def is_eof(self) -> bool:
583 return True
585 def at_eof(self) -> bool:
586 return True
588 async def wait_eof(self) -> None:
589 return
591 def feed_data(self, data: bytes, n: int = 0) -> None:
592 pass
594 async def readline(self) -> bytes:
595 return b""
597 async def read(self, n: int = -1) -> bytes:
598 return b""
600 # TODO add async def readuntil
602 async def readany(self) -> bytes:
603 return b""
605 async def readchunk(self) -> Tuple[bytes, bool]:
606 if not self._read_eof_chunk:
607 self._read_eof_chunk = True
608 return (b"", False)
610 return (b"", True)
612 async def readexactly(self, n: int) -> bytes:
613 raise asyncio.IncompleteReadError(b"", n)
615 def read_nowait(self, n: int = -1) -> bytes:
616 return b""
619EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
622class DataQueue(Generic[_T]):
623 """DataQueue is a general-purpose blocking queue with one reader."""
625 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
626 self._loop = loop
627 self._eof = False
628 self._waiter: Optional[asyncio.Future[None]] = None
629 self._exception: Optional[BaseException] = None
630 self._buffer: Deque[Tuple[_T, int]] = collections.deque()
632 def __len__(self) -> int:
633 return len(self._buffer)
635 def is_eof(self) -> bool:
636 return self._eof
638 def at_eof(self) -> bool:
639 return self._eof and not self._buffer
641 def exception(self) -> Optional[BaseException]:
642 return self._exception
644 def set_exception(
645 self,
646 exc: BaseException,
647 exc_cause: BaseException = _EXC_SENTINEL,
648 ) -> None:
649 self._eof = True
650 self._exception = exc
651 if (waiter := self._waiter) is not None:
652 self._waiter = None
653 set_exception(waiter, exc, exc_cause)
655 def feed_data(self, data: _T, size: int = 0) -> None:
656 self._buffer.append((data, size))
657 if (waiter := self._waiter) is not None:
658 self._waiter = None
659 set_result(waiter, None)
661 def feed_eof(self) -> None:
662 self._eof = True
663 if (waiter := self._waiter) is not None:
664 self._waiter = None
665 set_result(waiter, None)
667 async def read(self) -> _T:
668 if not self._buffer and not self._eof:
669 assert not self._waiter
670 self._waiter = self._loop.create_future()
671 try:
672 await self._waiter
673 except (asyncio.CancelledError, asyncio.TimeoutError):
674 self._waiter = None
675 raise
676 if self._buffer:
677 data, _ = self._buffer.popleft()
678 return data
679 if self._exception is not None:
680 raise self._exception
681 raise EofStream
683 def __aiter__(self) -> AsyncStreamIterator[_T]:
684 return AsyncStreamIterator(self.read)
687class FlowControlDataQueue(DataQueue[_T]):
688 """FlowControlDataQueue resumes and pauses an underlying stream.
690 It is a destination for parsed data.
692 This class is deprecated and will be removed in version 4.0.
693 """
695 def __init__(
696 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
697 ) -> None:
698 super().__init__(loop=loop)
699 self._size = 0
700 self._protocol = protocol
701 self._limit = limit * 2
703 def feed_data(self, data: _T, size: int = 0) -> None:
704 super().feed_data(data, size)
705 self._size += size
707 if self._size > self._limit and not self._protocol._reading_paused:
708 self._protocol.pause_reading()
710 async def read(self) -> _T:
711 if not self._buffer and not self._eof:
712 assert not self._waiter
713 self._waiter = self._loop.create_future()
714 try:
715 await self._waiter
716 except (asyncio.CancelledError, asyncio.TimeoutError):
717 self._waiter = None
718 raise
719 if self._buffer:
720 data, size = self._buffer.popleft()
721 self._size -= size
722 if self._size < self._limit and self._protocol._reading_paused:
723 self._protocol.resume_reading()
724 return data
725 if self._exception is not None:
726 raise self._exception
727 raise EofStream