Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/streams.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import collections
3import sys
4import warnings
5from collections.abc import Awaitable, Callable
6from typing import Final, Generic, TypeVar
8from .base_protocol import BaseProtocol
9from .helpers import (
10 _EXC_SENTINEL,
11 DEFAULT_CHUNK_SIZE,
12 BaseTimerContext,
13 TimerNoop,
14 set_exception,
15 set_result,
16)
17from .http_exceptions import LineTooLong
18from .log import internal_logger
20__all__ = (
21 "EMPTY_PAYLOAD",
22 "EofStream",
23 "StreamReader",
24 "DataQueue",
25)
27_T = TypeVar("_T")
30class EofStream(Exception):
31 """eof stream indication."""
34class AsyncStreamIterator(Generic[_T]):
36 __slots__ = ("read_func",)
38 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
39 self.read_func = read_func
41 def __aiter__(self) -> "AsyncStreamIterator[_T]":
42 return self
44 async def __anext__(self) -> _T:
45 try:
46 rv = await self.read_func()
47 except EofStream:
48 raise StopAsyncIteration
49 if rv == b"":
50 raise StopAsyncIteration
51 return rv
54class ChunkTupleAsyncStreamIterator:
56 __slots__ = ("_stream",)
58 def __init__(self, stream: "StreamReader") -> None:
59 self._stream = stream
61 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
62 return self
64 async def __anext__(self) -> tuple[bytes, bool]:
65 rv = await self._stream.readchunk()
66 if rv == (b"", False):
67 raise StopAsyncIteration
68 return rv
71class StreamReader:
72 """An enhancement of asyncio.StreamReader.
74 Supports asynchronous iteration by line, chunk or as available::
76 async for line in reader:
77 ...
78 async for chunk in reader.iter_chunked(1024):
79 ...
80 async for slice in reader.iter_any():
81 ...
83 """
85 __slots__ = (
86 "_protocol",
87 "_low_water",
88 "_high_water",
89 "_low_water_chunks",
90 "_high_water_chunks",
91 "_loop",
92 "_size",
93 "_cursor",
94 "_http_chunk_splits",
95 "_buffer",
96 "_buffer_offset",
97 "_eof",
98 "_waiter",
99 "_eof_waiter",
100 "_exception",
101 "_timer",
102 "_eof_callbacks",
103 "_eof_counter",
104 "total_bytes",
105 "total_compressed_bytes",
106 )
108 def __init__(
109 self,
110 protocol: BaseProtocol,
111 limit: int,
112 *,
113 timer: BaseTimerContext | None = None,
114 loop: asyncio.AbstractEventLoop | None = None,
115 ) -> None:
116 self._protocol = protocol
117 self._low_water = limit
118 self._high_water = limit * 2
119 if loop is None:
120 loop = asyncio.get_event_loop()
121 # Use max(4, ...) because there's always at least 1 chunk split remaining
122 # (the current position), so we need low_water >= 2 to allow resume.
123 # limit // 16 gets us a reasonable value of 16k with default 256KiB limit.
124 self._high_water_chunks = max(4, limit // 16)
125 self._low_water_chunks = self._high_water_chunks // 2
126 self._loop = loop
127 self._size = 0
128 self._cursor = 0
129 self._http_chunk_splits: collections.deque[int] | None = None
130 self._buffer: collections.deque[bytes] = collections.deque()
131 self._buffer_offset = 0
132 self._eof = False
133 self._waiter: asyncio.Future[None] | None = None
134 self._eof_waiter: asyncio.Future[None] | None = None
135 self._exception: BaseException | None = None
136 self._timer = TimerNoop() if timer is None else timer
137 self._eof_callbacks: list[Callable[[], None]] = []
138 self._eof_counter = 0
139 self.total_bytes = 0
140 self.total_compressed_bytes: int | None = None
142 def __repr__(self) -> str:
143 info = [self.__class__.__name__]
144 if self._size:
145 info.append("%d bytes" % self._size)
146 if self._eof:
147 info.append("eof")
148 if self._low_water != DEFAULT_CHUNK_SIZE:
149 info.append("low=%d high=%d" % (self._low_water, self._high_water))
150 if self._waiter:
151 info.append("w=%r" % self._waiter)
152 if self._exception:
153 info.append("e=%r" % self._exception)
154 return "<%s>" % " ".join(info)
156 def __aiter__(self) -> AsyncStreamIterator[bytes]:
157 return AsyncStreamIterator(self.readline)
159 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
160 """Returns an asynchronous iterator that yields chunks of size n."""
161 self.set_read_chunk_size(n)
162 return AsyncStreamIterator(lambda: self.read(n))
164 def iter_any(self) -> AsyncStreamIterator[bytes]:
165 """Yield all available data as soon as it is received."""
166 return AsyncStreamIterator(self.readany)
168 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
169 """Yield chunks of data as they are received by the server.
171 The yielded objects are tuples
172 of (bytes, bool) as returned by the StreamReader.readchunk method.
173 """
174 return ChunkTupleAsyncStreamIterator(self)
176 def get_read_buffer_limits(self) -> tuple[int, int]:
177 return (self._low_water, self._high_water)
179 def set_read_chunk_size(self, n: int) -> None:
180 """Raise buffer limits to match the consumer's chunk size."""
181 if n > self._low_water:
182 self._low_water = n
183 self._high_water = n * 2
185 def exception(self) -> BaseException | None:
186 return self._exception
188 def set_exception(
189 self,
190 exc: BaseException,
191 exc_cause: BaseException = _EXC_SENTINEL,
192 ) -> None:
193 self._exception = exc
194 self._eof_callbacks.clear()
196 waiter = self._waiter
197 if waiter is not None:
198 self._waiter = None
199 set_exception(waiter, exc, exc_cause)
201 waiter = self._eof_waiter
202 if waiter is not None:
203 self._eof_waiter = None
204 set_exception(waiter, exc, exc_cause)
206 def on_eof(self, callback: Callable[[], None]) -> None:
207 if self._eof:
208 try:
209 callback()
210 except Exception:
211 internal_logger.exception("Exception in eof callback")
212 else:
213 self._eof_callbacks.append(callback)
215 def feed_eof(self) -> None:
216 self._eof = True
218 waiter = self._waiter
219 if waiter is not None:
220 self._waiter = None
221 set_result(waiter, None)
223 waiter = self._eof_waiter
224 if waiter is not None:
225 self._eof_waiter = None
226 set_result(waiter, None)
228 # At EOF the parser is done, there won't be unprocessed data.
229 self._protocol.resume_reading(resume_parser=False)
231 for cb in self._eof_callbacks:
232 try:
233 cb()
234 except Exception:
235 internal_logger.exception("Exception in eof callback")
237 self._eof_callbacks.clear()
239 def is_eof(self) -> bool:
240 """Return True if 'feed_eof' was called."""
241 return self._eof
243 def at_eof(self) -> bool:
244 """Return True if the buffer is empty and 'feed_eof' was called."""
245 return self._eof and not self._buffer
247 async def wait_eof(self) -> None:
248 if self._eof:
249 return
251 assert self._eof_waiter is None
252 self._eof_waiter = self._loop.create_future()
253 try:
254 await self._eof_waiter
255 finally:
256 self._eof_waiter = None
258 @property
259 def total_raw_bytes(self) -> int:
260 if self.total_compressed_bytes is None:
261 return self.total_bytes
262 return self.total_compressed_bytes
264 def unread_data(self, data: bytes) -> None:
265 """rollback reading some data from stream, inserting it to buffer head."""
266 warnings.warn(
267 "unread_data() is deprecated "
268 "and will be removed in future releases (#3260)",
269 DeprecationWarning,
270 stacklevel=2,
271 )
272 if not data:
273 return
275 if self._buffer_offset:
276 self._buffer[0] = self._buffer[0][self._buffer_offset :]
277 self._buffer_offset = 0
278 self._size += len(data)
279 self._cursor -= len(data)
280 self._buffer.appendleft(data)
281 self._eof_counter = 0
283 # TODO: size is ignored, remove the param later
284 def feed_data(self, data: bytes, size: int = 0) -> bool:
285 assert not self._eof, "feed_data after feed_eof"
287 if not data:
288 return False
290 data_len = len(data)
291 self._size += data_len
292 self._buffer.append(data)
293 self.total_bytes += data_len
295 waiter = self._waiter
296 if waiter is not None:
297 self._waiter = None
298 set_result(waiter, None)
300 if self._size > self._high_water:
301 self._protocol.pause_reading()
302 return False
304 def begin_http_chunk_receiving(self) -> None:
305 if self._http_chunk_splits is None:
306 if self.total_bytes:
307 raise RuntimeError(
308 "Called begin_http_chunk_receiving when some data was already fed"
309 )
310 self._http_chunk_splits = collections.deque()
312 def end_http_chunk_receiving(self) -> None:
313 if self._http_chunk_splits is None:
314 raise RuntimeError(
315 "Called end_chunk_receiving without calling "
316 "begin_chunk_receiving first"
317 )
319 # self._http_chunk_splits contains logical byte offsets from start of
320 # the body transfer. Each offset is the offset of the end of a chunk.
321 # "Logical" means bytes, accessible for a user.
322 # If no chunks containing logical data were received, current position
323 # is difinitely zero.
324 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
326 if self.total_bytes == pos:
327 # We should not add empty chunks here. So we check for that.
328 # Note, when chunked + gzip is used, we can receive a chunk
329 # of compressed data, but that data may not be enough for gzip FSM
330 # to yield any uncompressed data. That's why current position may
331 # not change after receiving a chunk.
332 return
334 self._http_chunk_splits.append(self.total_bytes)
336 # If we get too many small chunks before self._high_water is reached, then any
337 # .read() call becomes computationally expensive, and could block the event loop
338 # for too long, hence an additional self._high_water_chunks here.
339 if len(self._http_chunk_splits) > self._high_water_chunks:
340 self._protocol.pause_reading()
342 # wake up readchunk when end of http chunk received
343 waiter = self._waiter
344 if waiter is not None:
345 self._waiter = None
346 set_result(waiter, None)
348 async def _wait(self, func_name: str) -> None:
349 if not self._protocol.connected:
350 raise RuntimeError("Connection closed.")
352 # StreamReader uses a future to link the protocol feed_data() method
353 # to a read coroutine. Running two read coroutines at the same time
354 # would have an unexpected behaviour. It would not possible to know
355 # which coroutine would get the next data.
356 if self._waiter is not None:
357 raise RuntimeError(
358 "%s() called while another coroutine is "
359 "already waiting for incoming data" % func_name
360 )
362 waiter = self._waiter = self._loop.create_future()
363 try:
364 with self._timer:
365 await waiter
366 finally:
367 self._waiter = None
369 async def readline(self, *, max_line_length: int | None = None) -> bytes:
370 return await self.readuntil(max_size=max_line_length)
372 async def readuntil(
373 self, separator: bytes = b"\n", *, max_size: int | None = None
374 ) -> bytes:
375 seplen = len(separator)
376 if seplen == 0:
377 raise ValueError("Separator should be at least one-byte string")
379 if self._exception is not None:
380 raise self._exception
382 chunk = b""
383 chunk_size = 0
384 not_enough = True
385 max_size = max_size or self._high_water
387 while not_enough:
388 while self._buffer and not_enough:
389 offset = self._buffer_offset
390 ichar = self._buffer[0].find(separator, offset) + 1
391 # Read from current offset to found separator or to the end.
392 data = self._read_nowait_chunk(
393 ichar - offset + seplen - 1 if ichar else -1
394 )
395 chunk += data
396 chunk_size += len(data)
397 if ichar:
398 not_enough = False
400 if chunk_size > max_size:
401 raise LineTooLong(chunk[:100] + b"...", max_size)
403 if self._eof:
404 break
406 if not_enough:
407 await self._wait("readuntil")
409 return chunk
411 async def read(self, n: int = -1) -> bytes:
412 if self._exception is not None:
413 raise self._exception
415 # migration problem; with DataQueue you have to catch
416 # EofStream exception, so common way is to run payload.read() inside
417 # infinite loop. what can cause real infinite loop with StreamReader
418 # lets keep this code one major release.
419 if __debug__:
420 if self._eof and not self._buffer:
421 self._eof_counter = getattr(self, "_eof_counter", 0) + 1
422 if self._eof_counter > 5:
423 internal_logger.warning(
424 "Multiple access to StreamReader in eof state, "
425 "might be infinite loop.",
426 stack_info=True,
427 )
429 if not n:
430 return b""
432 if n < 0:
433 # Reading everything — remove decompression chunk limit.
434 self.set_read_chunk_size(sys.maxsize)
435 blocks = []
436 while True:
437 block = await self.readany()
438 if not block:
439 break
440 blocks.append(block)
441 return b"".join(blocks)
443 self.set_read_chunk_size(n)
444 # TODO: should be `if` instead of `while`
445 # because waiter maybe triggered on chunk end,
446 # without feeding any data
447 while not self._buffer and not self._eof:
448 await self._wait("read")
450 return self._read_nowait(n)
452 async def readany(self) -> bytes:
453 if self._exception is not None:
454 raise self._exception
456 # TODO: should be `if` instead of `while`
457 # because waiter maybe triggered on chunk end,
458 # without feeding any data
459 while not self._buffer and not self._eof:
460 await self._wait("readany")
462 return self._read_nowait(-1)
464 async def readchunk(self) -> tuple[bytes, bool]:
465 """Returns a tuple of (data, end_of_http_chunk).
467 When chunked transfer
468 encoding is used, end_of_http_chunk is a boolean indicating if the end
469 of the data corresponds to the end of a HTTP chunk , otherwise it is
470 always False.
471 """
472 while True:
473 if self._exception is not None:
474 raise self._exception
476 while self._http_chunk_splits:
477 pos = self._http_chunk_splits.popleft()
478 if pos == self._cursor:
479 return (b"", True)
480 if pos > self._cursor:
481 return (self._read_nowait(pos - self._cursor), True)
482 internal_logger.warning(
483 "Skipping HTTP chunk end due to data "
484 "consumption beyond chunk boundary"
485 )
487 if self._buffer:
488 return (self._read_nowait_chunk(-1), False)
489 # return (self._read_nowait(-1), False)
491 if self._eof:
492 # Special case for signifying EOF.
493 # (b'', True) is not a final return value actually.
494 return (b"", False)
496 await self._wait("readchunk")
498 async def readexactly(self, n: int) -> bytes:
499 if self._exception is not None:
500 raise self._exception
502 blocks: list[bytes] = []
503 while n > 0:
504 block = await self.read(n)
505 if not block:
506 partial = b"".join(blocks)
507 raise asyncio.IncompleteReadError(partial, len(partial) + n)
508 blocks.append(block)
509 n -= len(block)
511 return b"".join(blocks)
513 def read_nowait(self, n: int = -1) -> bytes:
514 # default was changed to be consistent with .read(-1)
515 #
516 # I believe the most users don't know about the method and
517 # they are not affected.
518 if self._exception is not None:
519 raise self._exception
521 if self._waiter and not self._waiter.done():
522 raise RuntimeError(
523 "Called while some coroutine is waiting for incoming data."
524 )
526 return self._read_nowait(n)
528 def _read_nowait_chunk(self, n: int) -> bytes:
529 first_buffer = self._buffer[0]
530 offset = self._buffer_offset
531 if n != -1 and len(first_buffer) - offset > n:
532 data = first_buffer[offset : offset + n]
533 self._buffer_offset += n
535 elif offset:
536 self._buffer.popleft()
537 data = first_buffer[offset:]
538 self._buffer_offset = 0
540 else:
541 data = self._buffer.popleft()
543 data_len = len(data)
544 self._size -= data_len
545 self._cursor += data_len
547 chunk_splits = self._http_chunk_splits
548 # Prevent memory leak: drop useless chunk splits
549 while chunk_splits and chunk_splits[0] < self._cursor:
550 chunk_splits.popleft()
552 if self._size < self._low_water and (
553 self._http_chunk_splits is None
554 or len(self._http_chunk_splits) < self._low_water_chunks
555 ):
556 self._protocol.resume_reading()
557 return data
559 def _read_nowait(self, n: int) -> bytes:
560 """Read not more than n bytes, or whole buffer if n == -1"""
561 self._timer.assert_timeout()
563 if n == -1:
564 # Drain only chunks present now; _read_nowait_chunk() can
565 # re-entrantly resume_reading() and refill the buffer.
566 count = len(self._buffer)
567 if count == 1:
568 return self._read_nowait_chunk(-1)
569 return b"".join([self._read_nowait_chunk(-1) for _ in range(count)])
571 chunks: list[bytes] = []
572 while self._buffer:
573 chunk = self._read_nowait_chunk(n)
574 chunks.append(chunk)
575 n -= len(chunk)
576 if n == 0:
577 break
579 return b"".join(chunks) if chunks else b""
582class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
584 __slots__ = ("_read_eof_chunk",)
586 def __init__(self) -> None:
587 self._read_eof_chunk = False
588 self.total_bytes = 0
590 def __repr__(self) -> str:
591 return "<%s>" % self.__class__.__name__
593 def exception(self) -> BaseException | None:
594 return None
596 def set_exception(
597 self,
598 exc: BaseException,
599 exc_cause: BaseException = _EXC_SENTINEL,
600 ) -> None:
601 pass
603 def on_eof(self, callback: Callable[[], None]) -> None:
604 try:
605 callback()
606 except Exception:
607 internal_logger.exception("Exception in eof callback")
609 def feed_eof(self) -> None:
610 pass
612 def is_eof(self) -> bool:
613 return True
615 def at_eof(self) -> bool:
616 return True
618 async def wait_eof(self) -> None:
619 return
621 def feed_data(self, data: bytes, n: int = 0) -> bool:
622 return False
624 def set_read_chunk_size(self, n: int) -> None:
625 return
627 async def readline(self, *, max_line_length: int | None = None) -> bytes:
628 return b""
630 async def read(self, n: int = -1) -> bytes:
631 return b""
633 # TODO add async def readuntil
635 async def readany(self) -> bytes:
636 return b""
638 async def readchunk(self) -> tuple[bytes, bool]:
639 if not self._read_eof_chunk:
640 self._read_eof_chunk = True
641 return (b"", False)
643 return (b"", True)
645 async def readexactly(self, n: int) -> bytes:
646 raise asyncio.IncompleteReadError(b"", n)
648 def read_nowait(self, n: int = -1) -> bytes:
649 return b""
652EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
655class DataQueue(Generic[_T]):
656 """DataQueue is a general-purpose blocking queue with one reader."""
658 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
659 self._loop = loop
660 self._eof = False
661 self._waiter: asyncio.Future[None] | None = None
662 self._exception: BaseException | None = None
663 self._buffer: collections.deque[tuple[_T, int]] = collections.deque()
665 def __len__(self) -> int:
666 return len(self._buffer)
668 def is_eof(self) -> bool:
669 return self._eof
671 def at_eof(self) -> bool:
672 return self._eof and not self._buffer
674 def exception(self) -> BaseException | None:
675 return self._exception
677 def set_exception(
678 self,
679 exc: BaseException,
680 exc_cause: BaseException = _EXC_SENTINEL,
681 ) -> None:
682 self._eof = True
683 self._exception = exc
684 if (waiter := self._waiter) is not None:
685 self._waiter = None
686 set_exception(waiter, exc, exc_cause)
688 def feed_data(self, data: _T, size: int = 0) -> None:
689 self._buffer.append((data, size))
690 if (waiter := self._waiter) is not None:
691 self._waiter = None
692 set_result(waiter, None)
694 def feed_eof(self) -> None:
695 self._eof = True
696 if (waiter := self._waiter) is not None:
697 self._waiter = None
698 set_result(waiter, None)
700 async def read(self) -> _T:
701 if not self._buffer and not self._eof:
702 assert not self._waiter
703 self._waiter = self._loop.create_future()
704 try:
705 await self._waiter
706 except (asyncio.CancelledError, asyncio.TimeoutError):
707 self._waiter = None
708 raise
709 if self._buffer:
710 data, _ = self._buffer.popleft()
711 return data
712 if self._exception is not None:
713 raise self._exception
714 raise EofStream
716 def __aiter__(self) -> AsyncStreamIterator[_T]:
717 return AsyncStreamIterator(self.read)
720class FlowControlDataQueue(DataQueue[_T]):
721 """FlowControlDataQueue resumes and pauses an underlying stream.
723 It is a destination for parsed data.
725 This class is deprecated and will be removed in version 4.0.
726 """
728 def __init__(
729 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
730 ) -> None:
731 super().__init__(loop=loop)
732 self._size = 0
733 self._protocol = protocol
734 self._limit = limit * 2
736 def feed_data(self, data: _T, size: int = 0) -> None:
737 super().feed_data(data, size)
738 self._size += size
740 if self._size > self._limit and not self._protocol._reading_paused:
741 self._protocol.pause_reading()
743 async def read(self) -> _T:
744 if not self._buffer and not self._eof:
745 assert not self._waiter
746 self._waiter = self._loop.create_future()
747 try:
748 await self._waiter
749 except (asyncio.CancelledError, asyncio.TimeoutError):
750 self._waiter = None
751 raise
752 if self._buffer:
753 data, size = self._buffer.popleft()
754 self._size -= size
755 if self._size < self._limit and self._protocol._reading_paused:
756 self._protocol.resume_reading()
757 return data
758 if self._exception is not None:
759 raise self._exception
760 raise EofStream