Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/streams.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import collections
3import warnings
4from typing import (
5 Awaitable,
6 Callable,
7 Deque,
8 Final,
9 Generic,
10 List,
11 Optional,
12 Tuple,
13 Type,
14 TypeVar,
15 Union,
16)
18from .base_protocol import BaseProtocol
19from .helpers import (
20 _EXC_SENTINEL,
21 BaseTimerContext,
22 TimerNoop,
23 set_exception,
24 set_result,
25)
26from .log import internal_logger
28__all__ = (
29 "EMPTY_PAYLOAD",
30 "EofStream",
31 "StreamReader",
32 "DataQueue",
33)
35_T = TypeVar("_T")
38class EofStream(Exception):
39 """eof stream indication."""
42class AsyncStreamIterator(Generic[_T]):
44 __slots__ = ("read_func",)
46 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
47 self.read_func = read_func
49 def __aiter__(self) -> "AsyncStreamIterator[_T]":
50 return self
52 async def __anext__(self) -> _T:
53 try:
54 rv = await self.read_func()
55 except EofStream:
56 raise StopAsyncIteration
57 if rv == b"":
58 raise StopAsyncIteration
59 return rv
62class ChunkTupleAsyncStreamIterator:
64 __slots__ = ("_stream",)
66 def __init__(self, stream: "StreamReader") -> None:
67 self._stream = stream
69 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
70 return self
72 async def __anext__(self) -> Tuple[bytes, bool]:
73 rv = await self._stream.readchunk()
74 if rv == (b"", False):
75 raise StopAsyncIteration
76 return rv
79class AsyncStreamReaderMixin:
81 __slots__ = ()
83 def __aiter__(self) -> AsyncStreamIterator[bytes]:
84 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
86 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
87 """Returns an asynchronous iterator that yields chunks of size n."""
88 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined]
90 def iter_any(self) -> AsyncStreamIterator[bytes]:
91 """Yield all available data as soon as it is received."""
92 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
94 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
95 """Yield chunks of data as they are received by the server.
97 The yielded objects are tuples
98 of (bytes, bool) as returned by the StreamReader.readchunk method.
99 """
100 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
103class StreamReader(AsyncStreamReaderMixin):
104 """An enhancement of asyncio.StreamReader.
106 Supports asynchronous iteration by line, chunk or as available::
108 async for line in reader:
109 ...
110 async for chunk in reader.iter_chunked(1024):
111 ...
112 async for slice in reader.iter_any():
113 ...
115 """
117 __slots__ = (
118 "_protocol",
119 "_low_water",
120 "_high_water",
121 "_loop",
122 "_size",
123 "_cursor",
124 "_http_chunk_splits",
125 "_buffer",
126 "_buffer_offset",
127 "_eof",
128 "_waiter",
129 "_eof_waiter",
130 "_exception",
131 "_timer",
132 "_eof_callbacks",
133 "_eof_counter",
134 "total_bytes",
135 )
137 def __init__(
138 self,
139 protocol: BaseProtocol,
140 limit: int,
141 *,
142 timer: Optional[BaseTimerContext] = None,
143 loop: asyncio.AbstractEventLoop,
144 ) -> None:
145 self._protocol = protocol
146 self._low_water = limit
147 self._high_water = limit * 2
148 self._loop = loop
149 self._size = 0
150 self._cursor = 0
151 self._http_chunk_splits: Optional[List[int]] = None
152 self._buffer: Deque[bytes] = collections.deque()
153 self._buffer_offset = 0
154 self._eof = False
155 self._waiter: Optional[asyncio.Future[None]] = None
156 self._eof_waiter: Optional[asyncio.Future[None]] = None
157 self._exception: Optional[Union[Type[BaseException], BaseException]] = None
158 self._timer = TimerNoop() if timer is None else timer
159 self._eof_callbacks: List[Callable[[], None]] = []
160 self._eof_counter = 0
161 self.total_bytes = 0
163 def __repr__(self) -> str:
164 info = [self.__class__.__name__]
165 if self._size:
166 info.append("%d bytes" % self._size)
167 if self._eof:
168 info.append("eof")
169 if self._low_water != 2**16: # default limit
170 info.append("low=%d high=%d" % (self._low_water, self._high_water))
171 if self._waiter:
172 info.append("w=%r" % self._waiter)
173 if self._exception:
174 info.append("e=%r" % self._exception)
175 return "<%s>" % " ".join(info)
177 def get_read_buffer_limits(self) -> Tuple[int, int]:
178 return (self._low_water, self._high_water)
180 def exception(self) -> Optional[Union[Type[BaseException], BaseException]]:
181 return self._exception
183 def set_exception(
184 self,
185 exc: Union[Type[BaseException], BaseException],
186 exc_cause: BaseException = _EXC_SENTINEL,
187 ) -> None:
188 self._exception = exc
189 self._eof_callbacks.clear()
191 waiter = self._waiter
192 if waiter is not None:
193 self._waiter = None
194 set_exception(waiter, exc, exc_cause)
196 waiter = self._eof_waiter
197 if waiter is not None:
198 self._eof_waiter = None
199 set_exception(waiter, exc, exc_cause)
201 def on_eof(self, callback: Callable[[], None]) -> None:
202 if self._eof:
203 try:
204 callback()
205 except Exception:
206 internal_logger.exception("Exception in eof callback")
207 else:
208 self._eof_callbacks.append(callback)
210 def feed_eof(self) -> None:
211 self._eof = True
213 waiter = self._waiter
214 if waiter is not None:
215 self._waiter = None
216 set_result(waiter, None)
218 waiter = self._eof_waiter
219 if waiter is not None:
220 self._eof_waiter = None
221 set_result(waiter, None)
223 if self._protocol._reading_paused:
224 self._protocol.resume_reading()
226 for cb in self._eof_callbacks:
227 try:
228 cb()
229 except Exception:
230 internal_logger.exception("Exception in eof callback")
232 self._eof_callbacks.clear()
234 def is_eof(self) -> bool:
235 """Return True if 'feed_eof' was called."""
236 return self._eof
238 def at_eof(self) -> bool:
239 """Return True if the buffer is empty and 'feed_eof' was called."""
240 return self._eof and not self._buffer
242 async def wait_eof(self) -> None:
243 if self._eof:
244 return
246 assert self._eof_waiter is None
247 self._eof_waiter = self._loop.create_future()
248 try:
249 await self._eof_waiter
250 finally:
251 self._eof_waiter = None
253 def unread_data(self, data: bytes) -> None:
254 """rollback reading some data from stream, inserting it to buffer head."""
255 warnings.warn(
256 "unread_data() is deprecated "
257 "and will be removed in future releases (#3260)",
258 DeprecationWarning,
259 stacklevel=2,
260 )
261 if not data:
262 return
264 if self._buffer_offset:
265 self._buffer[0] = self._buffer[0][self._buffer_offset :]
266 self._buffer_offset = 0
267 self._size += len(data)
268 self._cursor -= len(data)
269 self._buffer.appendleft(data)
270 self._eof_counter = 0
272 def feed_data(self, data: bytes) -> None:
273 assert not self._eof, "feed_data after feed_eof"
275 if not data:
276 return
278 data_len = len(data)
279 self._size += data_len
280 self._buffer.append(data)
281 self.total_bytes += data_len
283 waiter = self._waiter
284 if waiter is not None:
285 self._waiter = None
286 set_result(waiter, None)
288 if self._size > self._high_water and not self._protocol._reading_paused:
289 self._protocol.pause_reading()
291 def begin_http_chunk_receiving(self) -> None:
292 if self._http_chunk_splits is None:
293 if self.total_bytes:
294 raise RuntimeError(
295 "Called begin_http_chunk_receiving when some data was already fed"
296 )
297 self._http_chunk_splits = []
299 def end_http_chunk_receiving(self) -> None:
300 if self._http_chunk_splits is None:
301 raise RuntimeError(
302 "Called end_chunk_receiving without calling "
303 "begin_chunk_receiving first"
304 )
306 # self._http_chunk_splits contains logical byte offsets from start of
307 # the body transfer. Each offset is the offset of the end of a chunk.
308 # "Logical" means bytes, accessible for a user.
309 # If no chunks containing logical data were received, current position
310 # is difinitely zero.
311 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
313 if self.total_bytes == pos:
314 # We should not add empty chunks here. So we check for that.
315 # Note, when chunked + gzip is used, we can receive a chunk
316 # of compressed data, but that data may not be enough for gzip FSM
317 # to yield any uncompressed data. That's why current position may
318 # not change after receiving a chunk.
319 return
321 self._http_chunk_splits.append(self.total_bytes)
323 # wake up readchunk when end of http chunk received
324 waiter = self._waiter
325 if waiter is not None:
326 self._waiter = None
327 set_result(waiter, None)
329 async def _wait(self, func_name: str) -> None:
330 if not self._protocol.connected:
331 raise RuntimeError("Connection closed.")
333 # StreamReader uses a future to link the protocol feed_data() method
334 # to a read coroutine. Running two read coroutines at the same time
335 # would have an unexpected behaviour. It would not possible to know
336 # which coroutine would get the next data.
337 if self._waiter is not None:
338 raise RuntimeError(
339 "%s() called while another coroutine is "
340 "already waiting for incoming data" % func_name
341 )
343 waiter = self._waiter = self._loop.create_future()
344 try:
345 with self._timer:
346 await waiter
347 finally:
348 self._waiter = None
350 async def readline(self) -> bytes:
351 return await self.readuntil()
353 async def readuntil(self, separator: bytes = b"\n") -> bytes:
354 seplen = len(separator)
355 if seplen == 0:
356 raise ValueError("Separator should be at least one-byte string")
358 if self._exception is not None:
359 raise self._exception
361 chunk = b""
362 chunk_size = 0
363 not_enough = True
365 while not_enough:
366 while self._buffer and not_enough:
367 offset = self._buffer_offset
368 ichar = self._buffer[0].find(separator, offset) + 1
369 # Read from current offset to found separator or to the end.
370 data = self._read_nowait_chunk(
371 ichar - offset + seplen - 1 if ichar else -1
372 )
373 chunk += data
374 chunk_size += len(data)
375 if ichar:
376 not_enough = False
378 if chunk_size > self._high_water:
379 raise ValueError("Chunk too big")
381 if self._eof:
382 break
384 if not_enough:
385 await self._wait("readuntil")
387 return chunk
389 async def read(self, n: int = -1) -> bytes:
390 if self._exception is not None:
391 raise self._exception
393 if not n:
394 return b""
396 if n < 0:
397 # This used to just loop creating a new waiter hoping to
398 # collect everything in self._buffer, but that would
399 # deadlock if the subprocess sends more than self.limit
400 # bytes. So just call self.readany() until EOF.
401 blocks = []
402 while True:
403 block = await self.readany()
404 if not block:
405 break
406 blocks.append(block)
407 return b"".join(blocks)
409 # TODO: should be `if` instead of `while`
410 # because waiter maybe triggered on chunk end,
411 # without feeding any data
412 while not self._buffer and not self._eof:
413 await self._wait("read")
415 return self._read_nowait(n)
417 async def readany(self) -> bytes:
418 if self._exception is not None:
419 raise self._exception
421 # TODO: should be `if` instead of `while`
422 # because waiter maybe triggered on chunk end,
423 # without feeding any data
424 while not self._buffer and not self._eof:
425 await self._wait("readany")
427 return self._read_nowait(-1)
429 async def readchunk(self) -> Tuple[bytes, bool]:
430 """Returns a tuple of (data, end_of_http_chunk).
432 When chunked transfer
433 encoding is used, end_of_http_chunk is a boolean indicating if the end
434 of the data corresponds to the end of a HTTP chunk , otherwise it is
435 always False.
436 """
437 while True:
438 if self._exception is not None:
439 raise self._exception
441 while self._http_chunk_splits:
442 pos = self._http_chunk_splits.pop(0)
443 if pos == self._cursor:
444 return (b"", True)
445 if pos > self._cursor:
446 return (self._read_nowait(pos - self._cursor), True)
447 internal_logger.warning(
448 "Skipping HTTP chunk end due to data "
449 "consumption beyond chunk boundary"
450 )
452 if self._buffer:
453 return (self._read_nowait_chunk(-1), False)
454 # return (self._read_nowait(-1), False)
456 if self._eof:
457 # Special case for signifying EOF.
458 # (b'', True) is not a final return value actually.
459 return (b"", False)
461 await self._wait("readchunk")
463 async def readexactly(self, n: int) -> bytes:
464 if self._exception is not None:
465 raise self._exception
467 blocks: List[bytes] = []
468 while n > 0:
469 block = await self.read(n)
470 if not block:
471 partial = b"".join(blocks)
472 raise asyncio.IncompleteReadError(partial, len(partial) + n)
473 blocks.append(block)
474 n -= len(block)
476 return b"".join(blocks)
478 def read_nowait(self, n: int = -1) -> bytes:
479 # default was changed to be consistent with .read(-1)
480 #
481 # I believe the most users don't know about the method and
482 # they are not affected.
483 if self._exception is not None:
484 raise self._exception
486 if self._waiter and not self._waiter.done():
487 raise RuntimeError(
488 "Called while some coroutine is waiting for incoming data."
489 )
491 return self._read_nowait(n)
493 def _read_nowait_chunk(self, n: int) -> bytes:
494 first_buffer = self._buffer[0]
495 offset = self._buffer_offset
496 if n != -1 and len(first_buffer) - offset > n:
497 data = first_buffer[offset : offset + n]
498 self._buffer_offset += n
500 elif offset:
501 self._buffer.popleft()
502 data = first_buffer[offset:]
503 self._buffer_offset = 0
505 else:
506 data = self._buffer.popleft()
508 data_len = len(data)
509 self._size -= data_len
510 self._cursor += data_len
512 chunk_splits = self._http_chunk_splits
513 # Prevent memory leak: drop useless chunk splits
514 while chunk_splits and chunk_splits[0] < self._cursor:
515 chunk_splits.pop(0)
517 if self._size < self._low_water and self._protocol._reading_paused:
518 self._protocol.resume_reading()
519 return data
521 def _read_nowait(self, n: int) -> bytes:
522 """Read not more than n bytes, or whole buffer if n == -1"""
523 self._timer.assert_timeout()
525 chunks = []
526 while self._buffer:
527 chunk = self._read_nowait_chunk(n)
528 chunks.append(chunk)
529 if n != -1:
530 n -= len(chunk)
531 if n == 0:
532 break
534 return b"".join(chunks) if chunks else b""
537class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
539 __slots__ = ("_read_eof_chunk",)
541 def __init__(self) -> None:
542 self._read_eof_chunk = False
543 self.total_bytes = 0
545 def __repr__(self) -> str:
546 return "<%s>" % self.__class__.__name__
548 def exception(self) -> Optional[BaseException]:
549 return None
551 def set_exception(
552 self,
553 exc: Union[Type[BaseException], BaseException],
554 exc_cause: BaseException = _EXC_SENTINEL,
555 ) -> None:
556 pass
558 def on_eof(self, callback: Callable[[], None]) -> None:
559 try:
560 callback()
561 except Exception:
562 internal_logger.exception("Exception in eof callback")
564 def feed_eof(self) -> None:
565 pass
567 def is_eof(self) -> bool:
568 return True
570 def at_eof(self) -> bool:
571 return True
573 async def wait_eof(self) -> None:
574 return
576 def feed_data(self, data: bytes) -> None:
577 pass
579 async def readline(self) -> bytes:
580 return b""
582 async def read(self, n: int = -1) -> bytes:
583 return b""
585 # TODO add async def readuntil
587 async def readany(self) -> bytes:
588 return b""
590 async def readchunk(self) -> Tuple[bytes, bool]:
591 if not self._read_eof_chunk:
592 self._read_eof_chunk = True
593 return (b"", False)
595 return (b"", True)
597 async def readexactly(self, n: int) -> bytes:
598 raise asyncio.IncompleteReadError(b"", n)
600 def read_nowait(self, n: int = -1) -> bytes:
601 return b""
604EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
607class DataQueue(Generic[_T]):
608 """DataQueue is a general-purpose blocking queue with one reader."""
610 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
611 self._loop = loop
612 self._eof = False
613 self._waiter: Optional[asyncio.Future[None]] = None
614 self._exception: Union[Type[BaseException], BaseException, None] = None
615 self._buffer: Deque[_T] = collections.deque()
617 def __len__(self) -> int:
618 return len(self._buffer)
620 def is_eof(self) -> bool:
621 return self._eof
623 def at_eof(self) -> bool:
624 return self._eof and not self._buffer
626 def exception(self) -> Optional[Union[Type[BaseException], BaseException]]:
627 return self._exception
629 def set_exception(
630 self,
631 exc: Union[Type[BaseException], BaseException],
632 exc_cause: BaseException = _EXC_SENTINEL,
633 ) -> None:
634 self._eof = True
635 self._exception = exc
636 if (waiter := self._waiter) is not None:
637 self._waiter = None
638 set_exception(waiter, exc, exc_cause)
640 def feed_data(self, data: _T) -> None:
641 self._buffer.append(data)
642 if (waiter := self._waiter) is not None:
643 self._waiter = None
644 set_result(waiter, None)
646 def feed_eof(self) -> None:
647 self._eof = True
648 if (waiter := self._waiter) is not None:
649 self._waiter = None
650 set_result(waiter, None)
652 async def read(self) -> _T:
653 if not self._buffer and not self._eof:
654 assert not self._waiter
655 self._waiter = self._loop.create_future()
656 try:
657 await self._waiter
658 except (asyncio.CancelledError, asyncio.TimeoutError):
659 self._waiter = None
660 raise
661 if self._buffer:
662 return self._buffer.popleft()
663 if self._exception is not None:
664 raise self._exception
665 raise EofStream
667 def __aiter__(self) -> AsyncStreamIterator[_T]:
668 return AsyncStreamIterator(self.read)