Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/streams.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import collections
3import warnings
4from collections.abc import Awaitable, Callable
5from typing import Final, Generic, TypeVar
7from .base_protocol import BaseProtocol
8from .helpers import (
9 _EXC_SENTINEL,
10 BaseTimerContext,
11 TimerNoop,
12 set_exception,
13 set_result,
14)
15from .http_exceptions import LineTooLong
16from .log import internal_logger
18__all__ = (
19 "EMPTY_PAYLOAD",
20 "EofStream",
21 "StreamReader",
22 "DataQueue",
23)
25_T = TypeVar("_T")
28class EofStream(Exception):
29 """eof stream indication."""
32class AsyncStreamIterator(Generic[_T]):
34 __slots__ = ("read_func",)
36 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
37 self.read_func = read_func
39 def __aiter__(self) -> "AsyncStreamIterator[_T]":
40 return self
42 async def __anext__(self) -> _T:
43 try:
44 rv = await self.read_func()
45 except EofStream:
46 raise StopAsyncIteration
47 if rv == b"":
48 raise StopAsyncIteration
49 return rv
52class ChunkTupleAsyncStreamIterator:
54 __slots__ = ("_stream",)
56 def __init__(self, stream: "StreamReader") -> None:
57 self._stream = stream
59 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
60 return self
62 async def __anext__(self) -> tuple[bytes, bool]:
63 rv = await self._stream.readchunk()
64 if rv == (b"", False):
65 raise StopAsyncIteration
66 return rv
69class AsyncStreamReaderMixin:
71 __slots__ = ()
73 def __aiter__(self) -> AsyncStreamIterator[bytes]:
74 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
76 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
77 """Returns an asynchronous iterator that yields chunks of size n."""
78 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined]
80 def iter_any(self) -> AsyncStreamIterator[bytes]:
81 """Yield all available data as soon as it is received."""
82 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
84 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
85 """Yield chunks of data as they are received by the server.
87 The yielded objects are tuples
88 of (bytes, bool) as returned by the StreamReader.readchunk method.
89 """
90 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
93class StreamReader(AsyncStreamReaderMixin):
94 """An enhancement of asyncio.StreamReader.
96 Supports asynchronous iteration by line, chunk or as available::
98 async for line in reader:
99 ...
100 async for chunk in reader.iter_chunked(1024):
101 ...
102 async for slice in reader.iter_any():
103 ...
105 """
107 __slots__ = (
108 "_protocol",
109 "_low_water",
110 "_high_water",
111 "_low_water_chunks",
112 "_high_water_chunks",
113 "_loop",
114 "_size",
115 "_cursor",
116 "_http_chunk_splits",
117 "_buffer",
118 "_buffer_offset",
119 "_eof",
120 "_waiter",
121 "_eof_waiter",
122 "_exception",
123 "_timer",
124 "_eof_callbacks",
125 "_eof_counter",
126 "total_bytes",
127 "total_compressed_bytes",
128 )
130 def __init__(
131 self,
132 protocol: BaseProtocol,
133 limit: int,
134 *,
135 timer: BaseTimerContext | None = None,
136 loop: asyncio.AbstractEventLoop,
137 ) -> None:
138 self._protocol = protocol
139 self._low_water = limit
140 self._high_water = limit * 2
141 # Ensure high_water_chunks >= 3 so it's always > low_water_chunks.
142 self._high_water_chunks = max(3, limit // 4)
143 # Use max(2, ...) because there's always at least 1 chunk split remaining
144 # (the current position), so we need low_water >= 2 to allow resume.
145 self._low_water_chunks = max(2, self._high_water_chunks // 2)
146 self._loop = loop
147 self._size = 0
148 self._cursor = 0
149 self._http_chunk_splits: collections.deque[int] | None = None
150 self._buffer: collections.deque[bytes] = collections.deque()
151 self._buffer_offset = 0
152 self._eof = False
153 self._waiter: asyncio.Future[None] | None = None
154 self._eof_waiter: asyncio.Future[None] | None = None
155 self._exception: type[BaseException] | BaseException | None = None
156 self._timer = TimerNoop() if timer is None else timer
157 self._eof_callbacks: list[Callable[[], None]] = []
158 self._eof_counter = 0
159 self.total_bytes = 0
160 self.total_compressed_bytes: int | None = None
162 def __repr__(self) -> str:
163 info = [self.__class__.__name__]
164 if self._size:
165 info.append("%d bytes" % self._size)
166 if self._eof:
167 info.append("eof")
168 if self._low_water != 2**16: # default limit
169 info.append("low=%d high=%d" % (self._low_water, self._high_water))
170 if self._waiter:
171 info.append("w=%r" % self._waiter)
172 if self._exception:
173 info.append("e=%r" % self._exception)
174 return "<%s>" % " ".join(info)
176 def get_read_buffer_limits(self) -> tuple[int, int]:
177 return (self._low_water, self._high_water)
179 def exception(self) -> type[BaseException] | BaseException | None:
180 return self._exception
182 def set_exception(
183 self,
184 exc: type[BaseException] | BaseException,
185 exc_cause: BaseException = _EXC_SENTINEL,
186 ) -> None:
187 self._exception = exc
188 self._eof_callbacks.clear()
190 waiter = self._waiter
191 if waiter is not None:
192 self._waiter = None
193 set_exception(waiter, exc, exc_cause)
195 waiter = self._eof_waiter
196 if waiter is not None:
197 self._eof_waiter = None
198 set_exception(waiter, exc, exc_cause)
200 def on_eof(self, callback: Callable[[], None]) -> None:
201 if self._eof:
202 try:
203 callback()
204 except Exception:
205 internal_logger.exception("Exception in eof callback")
206 else:
207 self._eof_callbacks.append(callback)
209 def feed_eof(self) -> None:
210 self._eof = True
212 waiter = self._waiter
213 if waiter is not None:
214 self._waiter = None
215 set_result(waiter, None)
217 waiter = self._eof_waiter
218 if waiter is not None:
219 self._eof_waiter = None
220 set_result(waiter, None)
222 if self._protocol._reading_paused:
223 self._protocol.resume_reading()
225 for cb in self._eof_callbacks:
226 try:
227 cb()
228 except Exception:
229 internal_logger.exception("Exception in eof callback")
231 self._eof_callbacks.clear()
233 def is_eof(self) -> bool:
234 """Return True if 'feed_eof' was called."""
235 return self._eof
237 def at_eof(self) -> bool:
238 """Return True if the buffer is empty and 'feed_eof' was called."""
239 return self._eof and not self._buffer
241 async def wait_eof(self) -> None:
242 if self._eof:
243 return
245 assert self._eof_waiter is None
246 self._eof_waiter = self._loop.create_future()
247 try:
248 await self._eof_waiter
249 finally:
250 self._eof_waiter = None
252 @property
253 def total_raw_bytes(self) -> int:
254 if self.total_compressed_bytes is None:
255 return self.total_bytes
256 return self.total_compressed_bytes
258 def unread_data(self, data: bytes) -> None:
259 """rollback reading some data from stream, inserting it to buffer head."""
260 warnings.warn(
261 "unread_data() is deprecated "
262 "and will be removed in future releases (#3260)",
263 DeprecationWarning,
264 stacklevel=2,
265 )
266 if not data:
267 return
269 if self._buffer_offset:
270 self._buffer[0] = self._buffer[0][self._buffer_offset :]
271 self._buffer_offset = 0
272 self._size += len(data)
273 self._cursor -= len(data)
274 self._buffer.appendleft(data)
275 self._eof_counter = 0
277 def feed_data(self, data: bytes) -> None:
278 assert not self._eof, "feed_data after feed_eof"
280 if not data:
281 return
283 data_len = len(data)
284 self._size += data_len
285 self._buffer.append(data)
286 self.total_bytes += data_len
288 waiter = self._waiter
289 if waiter is not None:
290 self._waiter = None
291 set_result(waiter, None)
293 if self._size > self._high_water and not self._protocol._reading_paused:
294 self._protocol.pause_reading()
296 def begin_http_chunk_receiving(self) -> None:
297 if self._http_chunk_splits is None:
298 if self.total_bytes:
299 raise RuntimeError(
300 "Called begin_http_chunk_receiving when some data was already fed"
301 )
302 self._http_chunk_splits = collections.deque()
304 def end_http_chunk_receiving(self) -> None:
305 if self._http_chunk_splits is None:
306 raise RuntimeError(
307 "Called end_chunk_receiving without calling "
308 "begin_chunk_receiving first"
309 )
311 # self._http_chunk_splits contains logical byte offsets from start of
312 # the body transfer. Each offset is the offset of the end of a chunk.
313 # "Logical" means bytes, accessible for a user.
314 # If no chunks containing logical data were received, current position
315 # is difinitely zero.
316 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
318 if self.total_bytes == pos:
319 # We should not add empty chunks here. So we check for that.
320 # Note, when chunked + gzip is used, we can receive a chunk
321 # of compressed data, but that data may not be enough for gzip FSM
322 # to yield any uncompressed data. That's why current position may
323 # not change after receiving a chunk.
324 return
326 self._http_chunk_splits.append(self.total_bytes)
328 # If we get too many small chunks before self._high_water is reached, then any
329 # .read() call becomes computationally expensive, and could block the event loop
330 # for too long, hence an additional self._high_water_chunks here.
331 if (
332 len(self._http_chunk_splits) > self._high_water_chunks
333 and not self._protocol._reading_paused
334 ):
335 self._protocol.pause_reading()
337 # wake up readchunk when end of http chunk received
338 waiter = self._waiter
339 if waiter is not None:
340 self._waiter = None
341 set_result(waiter, None)
343 async def _wait(self, func_name: str) -> None:
344 if not self._protocol.connected:
345 raise RuntimeError("Connection closed.")
347 # StreamReader uses a future to link the protocol feed_data() method
348 # to a read coroutine. Running two read coroutines at the same time
349 # would have an unexpected behaviour. It would not possible to know
350 # which coroutine would get the next data.
351 if self._waiter is not None:
352 raise RuntimeError(
353 "%s() called while another coroutine is "
354 "already waiting for incoming data" % func_name
355 )
357 waiter = self._waiter = self._loop.create_future()
358 try:
359 with self._timer:
360 await waiter
361 finally:
362 self._waiter = None
364 async def readline(self, *, max_line_length: int | None = None) -> bytes:
365 return await self.readuntil(max_size=max_line_length)
367 async def readuntil(
368 self, separator: bytes = b"\n", *, max_size: int | None = None
369 ) -> bytes:
370 seplen = len(separator)
371 if seplen == 0:
372 raise ValueError("Separator should be at least one-byte string")
374 if self._exception is not None:
375 raise self._exception
377 chunk = b""
378 chunk_size = 0
379 not_enough = True
380 max_size = max_size or self._high_water
382 while not_enough:
383 while self._buffer and not_enough:
384 offset = self._buffer_offset
385 ichar = self._buffer[0].find(separator, offset) + 1
386 # Read from current offset to found separator or to the end.
387 data = self._read_nowait_chunk(
388 ichar - offset + seplen - 1 if ichar else -1
389 )
390 chunk += data
391 chunk_size += len(data)
392 if ichar:
393 not_enough = False
395 if chunk_size > max_size:
396 raise LineTooLong(chunk[:100] + b"...", max_size)
398 if self._eof:
399 break
401 if not_enough:
402 await self._wait("readuntil")
404 return chunk
406 async def read(self, n: int = -1) -> bytes:
407 if self._exception is not None:
408 raise self._exception
410 if not n:
411 return b""
413 if n < 0:
414 # This used to just loop creating a new waiter hoping to
415 # collect everything in self._buffer, but that would
416 # deadlock if the subprocess sends more than self.limit
417 # bytes. So just call self.readany() until EOF.
418 blocks = []
419 while True:
420 block = await self.readany()
421 if not block:
422 break
423 blocks.append(block)
424 return b"".join(blocks)
426 # TODO: should be `if` instead of `while`
427 # because waiter maybe triggered on chunk end,
428 # without feeding any data
429 while not self._buffer and not self._eof:
430 await self._wait("read")
432 return self._read_nowait(n)
434 async def readany(self) -> bytes:
435 if self._exception is not None:
436 raise self._exception
438 # TODO: should be `if` instead of `while`
439 # because waiter maybe triggered on chunk end,
440 # without feeding any data
441 while not self._buffer and not self._eof:
442 await self._wait("readany")
444 return self._read_nowait(-1)
446 async def readchunk(self) -> tuple[bytes, bool]:
447 """Returns a tuple of (data, end_of_http_chunk).
449 When chunked transfer
450 encoding is used, end_of_http_chunk is a boolean indicating if the end
451 of the data corresponds to the end of a HTTP chunk , otherwise it is
452 always False.
453 """
454 while True:
455 if self._exception is not None:
456 raise self._exception
458 while self._http_chunk_splits:
459 pos = self._http_chunk_splits.popleft()
460 if pos == self._cursor:
461 return (b"", True)
462 if pos > self._cursor:
463 return (self._read_nowait(pos - self._cursor), True)
464 internal_logger.warning(
465 "Skipping HTTP chunk end due to data "
466 "consumption beyond chunk boundary"
467 )
469 if self._buffer:
470 return (self._read_nowait_chunk(-1), False)
471 # return (self._read_nowait(-1), False)
473 if self._eof:
474 # Special case for signifying EOF.
475 # (b'', True) is not a final return value actually.
476 return (b"", False)
478 await self._wait("readchunk")
480 async def readexactly(self, n: int) -> bytes:
481 if self._exception is not None:
482 raise self._exception
484 blocks: list[bytes] = []
485 while n > 0:
486 block = await self.read(n)
487 if not block:
488 partial = b"".join(blocks)
489 raise asyncio.IncompleteReadError(partial, len(partial) + n)
490 blocks.append(block)
491 n -= len(block)
493 return b"".join(blocks)
495 def read_nowait(self, n: int = -1) -> bytes:
496 # default was changed to be consistent with .read(-1)
497 #
498 # I believe the most users don't know about the method and
499 # they are not affected.
500 if self._exception is not None:
501 raise self._exception
503 if self._waiter and not self._waiter.done():
504 raise RuntimeError(
505 "Called while some coroutine is waiting for incoming data."
506 )
508 return self._read_nowait(n)
510 def _read_nowait_chunk(self, n: int) -> bytes:
511 first_buffer = self._buffer[0]
512 offset = self._buffer_offset
513 if n != -1 and len(first_buffer) - offset > n:
514 data = first_buffer[offset : offset + n]
515 self._buffer_offset += n
517 elif offset:
518 self._buffer.popleft()
519 data = first_buffer[offset:]
520 self._buffer_offset = 0
522 else:
523 data = self._buffer.popleft()
525 data_len = len(data)
526 self._size -= data_len
527 self._cursor += data_len
529 chunk_splits = self._http_chunk_splits
530 # Prevent memory leak: drop useless chunk splits
531 while chunk_splits and chunk_splits[0] < self._cursor:
532 chunk_splits.popleft()
534 if (
535 self._protocol._reading_paused
536 and self._size < self._low_water
537 and (
538 self._http_chunk_splits is None
539 or len(self._http_chunk_splits) < self._low_water_chunks
540 )
541 ):
542 self._protocol.resume_reading()
543 return data
545 def _read_nowait(self, n: int) -> bytes:
546 """Read not more than n bytes, or whole buffer if n == -1"""
547 self._timer.assert_timeout()
549 chunks = []
550 while self._buffer:
551 chunk = self._read_nowait_chunk(n)
552 chunks.append(chunk)
553 if n != -1:
554 n -= len(chunk)
555 if n == 0:
556 break
558 return b"".join(chunks) if chunks else b""
561class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
563 __slots__ = ("_read_eof_chunk",)
565 def __init__(self) -> None:
566 self._read_eof_chunk = False
567 self.total_bytes = 0
569 def __repr__(self) -> str:
570 return "<%s>" % self.__class__.__name__
572 def exception(self) -> BaseException | None:
573 return None
575 def set_exception(
576 self,
577 exc: type[BaseException] | BaseException,
578 exc_cause: BaseException = _EXC_SENTINEL,
579 ) -> None:
580 pass
582 def on_eof(self, callback: Callable[[], None]) -> None:
583 try:
584 callback()
585 except Exception:
586 internal_logger.exception("Exception in eof callback")
588 def feed_eof(self) -> None:
589 pass
591 def is_eof(self) -> bool:
592 return True
594 def at_eof(self) -> bool:
595 return True
597 async def wait_eof(self) -> None:
598 return
600 def feed_data(self, data: bytes) -> None:
601 pass
603 async def readline(self, *, max_line_length: int | None = None) -> bytes:
604 return b""
606 async def read(self, n: int = -1) -> bytes:
607 return b""
609 # TODO add async def readuntil
611 async def readany(self) -> bytes:
612 return b""
614 async def readchunk(self) -> tuple[bytes, bool]:
615 if not self._read_eof_chunk:
616 self._read_eof_chunk = True
617 return (b"", False)
619 return (b"", True)
621 async def readexactly(self, n: int) -> bytes:
622 raise asyncio.IncompleteReadError(b"", n)
624 def read_nowait(self, n: int = -1) -> bytes:
625 return b""
628EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
631class DataQueue(Generic[_T]):
632 """DataQueue is a general-purpose blocking queue with one reader."""
634 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
635 self._loop = loop
636 self._eof = False
637 self._waiter: asyncio.Future[None] | None = None
638 self._exception: type[BaseException] | BaseException | None = None
639 self._buffer: collections.deque[_T] = collections.deque()
641 def __len__(self) -> int:
642 return len(self._buffer)
644 def is_eof(self) -> bool:
645 return self._eof
647 def at_eof(self) -> bool:
648 return self._eof and not self._buffer
650 def exception(self) -> type[BaseException] | BaseException | None:
651 return self._exception
653 def set_exception(
654 self,
655 exc: type[BaseException] | BaseException,
656 exc_cause: BaseException = _EXC_SENTINEL,
657 ) -> None:
658 self._eof = True
659 self._exception = exc
660 if (waiter := self._waiter) is not None:
661 self._waiter = None
662 set_exception(waiter, exc, exc_cause)
664 def feed_data(self, data: _T) -> None:
665 self._buffer.append(data)
666 if (waiter := self._waiter) is not None:
667 self._waiter = None
668 set_result(waiter, None)
670 def feed_eof(self) -> None:
671 self._eof = True
672 if (waiter := self._waiter) is not None:
673 self._waiter = None
674 set_result(waiter, None)
676 async def read(self) -> _T:
677 if not self._buffer and not self._eof:
678 assert not self._waiter
679 self._waiter = self._loop.create_future()
680 try:
681 await self._waiter
682 except (asyncio.CancelledError, asyncio.TimeoutError):
683 self._waiter = None
684 raise
685 if self._buffer:
686 return self._buffer.popleft()
687 if self._exception is not None:
688 raise self._exception
689 raise EofStream
691 def __aiter__(self) -> AsyncStreamIterator[_T]:
692 return AsyncStreamIterator(self.read)