Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/streams.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import collections
3import sys
4import warnings
5from collections.abc import Awaitable, Callable
6from typing import Final, Generic, TypeVar
8from .base_protocol import BaseProtocol
9from .helpers import (
10 _EXC_SENTINEL,
11 DEFAULT_CHUNK_SIZE,
12 BaseTimerContext,
13 TimerNoop,
14 set_exception,
15 set_result,
16)
17from .http_exceptions import LineTooLong
18from .log import internal_logger
20__all__ = (
21 "EMPTY_PAYLOAD",
22 "EofStream",
23 "StreamReader",
24 "DataQueue",
25)
27_T = TypeVar("_T")
30class EofStream(Exception):
31 """eof stream indication."""
34class AsyncStreamIterator(Generic[_T]):
36 __slots__ = ("read_func",)
38 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
39 self.read_func = read_func
41 def __aiter__(self) -> "AsyncStreamIterator[_T]":
42 return self
44 async def __anext__(self) -> _T:
45 try:
46 rv = await self.read_func()
47 except EofStream:
48 raise StopAsyncIteration
49 if rv == b"":
50 raise StopAsyncIteration
51 return rv
54class ChunkTupleAsyncStreamIterator:
56 __slots__ = ("_stream",)
58 def __init__(self, stream: "StreamReader") -> None:
59 self._stream = stream
61 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
62 return self
64 async def __anext__(self) -> tuple[bytes, bool]:
65 rv = await self._stream.readchunk()
66 if rv == (b"", False):
67 raise StopAsyncIteration
68 return rv
71class StreamReader:
72 """An enhancement of asyncio.StreamReader.
74 Supports asynchronous iteration by line, chunk or as available::
76 async for line in reader:
77 ...
78 async for chunk in reader.iter_chunked(1024):
79 ...
80 async for slice in reader.iter_any():
81 ...
83 """
85 __slots__ = (
86 "_protocol",
87 "_low_water",
88 "_high_water",
89 "_low_water_chunks",
90 "_high_water_chunks",
91 "_loop",
92 "_size",
93 "_cursor",
94 "_http_chunk_splits",
95 "_buffer",
96 "_buffer_offset",
97 "_eof",
98 "_waiter",
99 "_eof_waiter",
100 "_exception",
101 "_timer",
102 "_eof_callbacks",
103 "_eof_counter",
104 "total_bytes",
105 "total_compressed_bytes",
106 )
108 def __init__(
109 self,
110 protocol: BaseProtocol,
111 limit: int,
112 *,
113 timer: BaseTimerContext | None = None,
114 loop: asyncio.AbstractEventLoop,
115 ) -> None:
116 self._protocol = protocol
117 self._low_water = limit
118 self._high_water = limit * 2
119 # Use max(4, ...) because there's always at least 1 chunk split remaining
120 # (the current position), so we need low_water >= 2 to allow resume.
121 # limit // 16 gets us a reasonable value of 16k with default 256KiB limit.
122 self._high_water_chunks = max(4, limit // 16)
123 self._low_water_chunks = self._high_water_chunks // 2
124 self._loop = loop
125 self._size = 0
126 self._cursor = 0
127 self._http_chunk_splits: collections.deque[int] | None = None
128 self._buffer: collections.deque[bytes] = collections.deque()
129 self._buffer_offset = 0
130 self._eof = False
131 self._waiter: asyncio.Future[None] | None = None
132 self._eof_waiter: asyncio.Future[None] | None = None
133 self._exception: type[BaseException] | BaseException | None = None
134 self._timer = TimerNoop() if timer is None else timer
135 self._eof_callbacks: list[Callable[[], None]] = []
136 self._eof_counter = 0
137 self.total_bytes = 0
138 self.total_compressed_bytes: int | None = None
140 def __repr__(self) -> str:
141 info = [self.__class__.__name__]
142 if self._size:
143 info.append("%d bytes" % self._size)
144 if self._eof:
145 info.append("eof")
146 if self._low_water != DEFAULT_CHUNK_SIZE:
147 info.append("low=%d high=%d" % (self._low_water, self._high_water))
148 if self._waiter:
149 info.append("w=%r" % self._waiter)
150 if self._exception:
151 info.append("e=%r" % self._exception)
152 return "<%s>" % " ".join(info)
154 def __aiter__(self) -> AsyncStreamIterator[bytes]:
155 return AsyncStreamIterator(self.readline)
157 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
158 """Returns an asynchronous iterator that yields chunks of size n."""
159 self.set_read_chunk_size(n)
160 return AsyncStreamIterator(lambda: self.read(n))
162 def iter_any(self) -> AsyncStreamIterator[bytes]:
163 """Yield all available data as soon as it is received."""
164 return AsyncStreamIterator(self.readany)
166 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
167 """Yield chunks of data as they are received by the server.
169 The yielded objects are tuples
170 of (bytes, bool) as returned by the StreamReader.readchunk method.
171 """
172 return ChunkTupleAsyncStreamIterator(self)
174 def get_read_buffer_limits(self) -> tuple[int, int]:
175 return (self._low_water, self._high_water)
177 def set_read_chunk_size(self, n: int) -> None:
178 """Raise buffer limits to match the consumer's chunk size."""
179 if n > self._low_water:
180 self._low_water = n
181 self._high_water = n * 2
183 def exception(self) -> type[BaseException] | BaseException | None:
184 return self._exception
186 def set_exception(
187 self,
188 exc: type[BaseException] | BaseException,
189 exc_cause: BaseException = _EXC_SENTINEL,
190 ) -> None:
191 self._exception = exc
192 self._eof_callbacks.clear()
194 waiter = self._waiter
195 if waiter is not None:
196 self._waiter = None
197 set_exception(waiter, exc, exc_cause)
199 waiter = self._eof_waiter
200 if waiter is not None:
201 self._eof_waiter = None
202 set_exception(waiter, exc, exc_cause)
204 def on_eof(self, callback: Callable[[], None]) -> None:
205 if self._eof:
206 try:
207 callback()
208 except Exception:
209 internal_logger.exception("Exception in eof callback")
210 else:
211 self._eof_callbacks.append(callback)
213 def feed_eof(self) -> None:
214 self._eof = True
216 waiter = self._waiter
217 if waiter is not None:
218 self._waiter = None
219 set_result(waiter, None)
221 waiter = self._eof_waiter
222 if waiter is not None:
223 self._eof_waiter = None
224 set_result(waiter, None)
226 # At EOF the parser is done, there won't be unprocessed data.
227 self._protocol.resume_reading(resume_parser=False)
229 for cb in self._eof_callbacks:
230 try:
231 cb()
232 except Exception:
233 internal_logger.exception("Exception in eof callback")
235 self._eof_callbacks.clear()
237 def is_eof(self) -> bool:
238 """Return True if 'feed_eof' was called."""
239 return self._eof
241 def at_eof(self) -> bool:
242 """Return True if the buffer is empty and 'feed_eof' was called."""
243 return self._eof and not self._buffer
245 async def wait_eof(self) -> None:
246 if self._eof:
247 return
249 assert self._eof_waiter is None
250 self._eof_waiter = self._loop.create_future()
251 try:
252 await self._eof_waiter
253 finally:
254 self._eof_waiter = None
256 @property
257 def total_raw_bytes(self) -> int:
258 if self.total_compressed_bytes is None:
259 return self.total_bytes
260 return self.total_compressed_bytes
262 def unread_data(self, data: bytes) -> None:
263 """rollback reading some data from stream, inserting it to buffer head."""
264 warnings.warn(
265 "unread_data() is deprecated "
266 "and will be removed in future releases (#3260)",
267 DeprecationWarning,
268 stacklevel=2,
269 )
270 if not data:
271 return
273 if self._buffer_offset:
274 self._buffer[0] = self._buffer[0][self._buffer_offset :]
275 self._buffer_offset = 0
276 self._size += len(data)
277 self._cursor -= len(data)
278 self._buffer.appendleft(data)
279 self._eof_counter = 0
281 def feed_data(self, data: bytes) -> bool:
282 assert not self._eof, "feed_data after feed_eof"
284 if not data:
285 return False
287 data_len = len(data)
288 self._size += data_len
289 self._buffer.append(data)
290 self.total_bytes += data_len
292 waiter = self._waiter
293 if waiter is not None:
294 self._waiter = None
295 set_result(waiter, None)
297 if self._size > self._high_water:
298 self._protocol.pause_reading()
299 return False
301 def begin_http_chunk_receiving(self) -> None:
302 if self._http_chunk_splits is None:
303 if self.total_bytes:
304 raise RuntimeError(
305 "Called begin_http_chunk_receiving when some data was already fed"
306 )
307 self._http_chunk_splits = collections.deque()
309 def end_http_chunk_receiving(self) -> None:
310 if self._http_chunk_splits is None:
311 raise RuntimeError(
312 "Called end_chunk_receiving without calling "
313 "begin_chunk_receiving first"
314 )
316 # self._http_chunk_splits contains logical byte offsets from start of
317 # the body transfer. Each offset is the offset of the end of a chunk.
318 # "Logical" means bytes, accessible for a user.
319 # If no chunks containing logical data were received, current position
320 # is difinitely zero.
321 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
323 if self.total_bytes == pos:
324 # We should not add empty chunks here. So we check for that.
325 # Note, when chunked + gzip is used, we can receive a chunk
326 # of compressed data, but that data may not be enough for gzip FSM
327 # to yield any uncompressed data. That's why current position may
328 # not change after receiving a chunk.
329 return
331 self._http_chunk_splits.append(self.total_bytes)
333 # If we get too many small chunks before self._high_water is reached, then any
334 # .read() call becomes computationally expensive, and could block the event loop
335 # for too long, hence an additional self._high_water_chunks here.
336 if len(self._http_chunk_splits) > self._high_water_chunks:
337 self._protocol.pause_reading()
339 # wake up readchunk when end of http chunk received
340 waiter = self._waiter
341 if waiter is not None:
342 self._waiter = None
343 set_result(waiter, None)
345 async def _wait(self, func_name: str) -> None:
346 if not self._protocol.connected:
347 raise RuntimeError("Connection closed.")
349 # StreamReader uses a future to link the protocol feed_data() method
350 # to a read coroutine. Running two read coroutines at the same time
351 # would have an unexpected behaviour. It would not possible to know
352 # which coroutine would get the next data.
353 if self._waiter is not None:
354 raise RuntimeError(
355 "%s() called while another coroutine is "
356 "already waiting for incoming data" % func_name
357 )
359 waiter = self._waiter = self._loop.create_future()
360 try:
361 with self._timer:
362 await waiter
363 finally:
364 self._waiter = None
366 async def readline(self, *, max_line_length: int | None = None) -> bytes:
367 return await self.readuntil(max_size=max_line_length)
369 async def readuntil(
370 self, separator: bytes = b"\n", *, max_size: int | None = None
371 ) -> bytes:
372 seplen = len(separator)
373 if seplen == 0:
374 raise ValueError("Separator should be at least one-byte string")
376 if self._exception is not None:
377 raise self._exception
379 chunk = b""
380 chunk_size = 0
381 not_enough = True
382 max_size = max_size or self._high_water
384 while not_enough:
385 while self._buffer and not_enough:
386 offset = self._buffer_offset
387 ichar = self._buffer[0].find(separator, offset) + 1
388 # Read from current offset to found separator or to the end.
389 data = self._read_nowait_chunk(
390 ichar - offset + seplen - 1 if ichar else -1
391 )
392 chunk += data
393 chunk_size += len(data)
394 if ichar:
395 not_enough = False
397 if chunk_size > max_size:
398 raise LineTooLong(chunk[:100] + b"...", max_size)
400 if self._eof:
401 break
403 if not_enough:
404 await self._wait("readuntil")
406 return chunk
408 async def read(self, n: int = -1) -> bytes:
409 if self._exception is not None:
410 raise self._exception
412 if not n:
413 return b""
415 if n < 0:
416 # Reading everything — remove decompression chunk limit.
417 self.set_read_chunk_size(sys.maxsize)
418 blocks = []
419 while True:
420 block = await self.readany()
421 if not block:
422 break
423 blocks.append(block)
424 return b"".join(blocks)
426 self.set_read_chunk_size(n)
427 # TODO: should be `if` instead of `while`
428 # because waiter maybe triggered on chunk end,
429 # without feeding any data
430 while not self._buffer and not self._eof:
431 await self._wait("read")
433 return self._read_nowait(n)
435 async def readany(self) -> bytes:
436 if self._exception is not None:
437 raise self._exception
439 # TODO: should be `if` instead of `while`
440 # because waiter maybe triggered on chunk end,
441 # without feeding any data
442 while not self._buffer and not self._eof:
443 await self._wait("readany")
445 return self._read_nowait(-1)
447 async def readchunk(self) -> tuple[bytes, bool]:
448 """Returns a tuple of (data, end_of_http_chunk).
450 When chunked transfer
451 encoding is used, end_of_http_chunk is a boolean indicating if the end
452 of the data corresponds to the end of a HTTP chunk , otherwise it is
453 always False.
454 """
455 while True:
456 if self._exception is not None:
457 raise self._exception
459 while self._http_chunk_splits:
460 pos = self._http_chunk_splits.popleft()
461 if pos == self._cursor:
462 return (b"", True)
463 if pos > self._cursor:
464 return (self._read_nowait(pos - self._cursor), True)
465 internal_logger.warning(
466 "Skipping HTTP chunk end due to data "
467 "consumption beyond chunk boundary"
468 )
470 if self._buffer:
471 return (self._read_nowait_chunk(-1), False)
472 # return (self._read_nowait(-1), False)
474 if self._eof:
475 # Special case for signifying EOF.
476 # (b'', True) is not a final return value actually.
477 return (b"", False)
479 await self._wait("readchunk")
481 async def readexactly(self, n: int) -> bytes:
482 if self._exception is not None:
483 raise self._exception
485 blocks: list[bytes] = []
486 while n > 0:
487 block = await self.read(n)
488 if not block:
489 partial = b"".join(blocks)
490 raise asyncio.IncompleteReadError(partial, len(partial) + n)
491 blocks.append(block)
492 n -= len(block)
494 return b"".join(blocks)
496 def read_nowait(self, n: int = -1) -> bytes:
497 # default was changed to be consistent with .read(-1)
498 #
499 # I believe the most users don't know about the method and
500 # they are not affected.
501 if self._exception is not None:
502 raise self._exception
504 if self._waiter and not self._waiter.done():
505 raise RuntimeError(
506 "Called while some coroutine is waiting for incoming data."
507 )
509 return self._read_nowait(n)
511 def _read_nowait_chunk(self, n: int) -> bytes:
512 first_buffer = self._buffer[0]
513 offset = self._buffer_offset
514 if n != -1 and len(first_buffer) - offset > n:
515 data = first_buffer[offset : offset + n]
516 self._buffer_offset += n
518 elif offset:
519 self._buffer.popleft()
520 data = first_buffer[offset:]
521 self._buffer_offset = 0
523 else:
524 data = self._buffer.popleft()
526 data_len = len(data)
527 self._size -= data_len
528 self._cursor += data_len
530 chunk_splits = self._http_chunk_splits
531 # Prevent memory leak: drop useless chunk splits
532 while chunk_splits and chunk_splits[0] < self._cursor:
533 chunk_splits.popleft()
535 if self._size < self._low_water and (
536 self._http_chunk_splits is None
537 or len(self._http_chunk_splits) < self._low_water_chunks
538 ):
539 self._protocol.resume_reading()
540 return data
542 def _read_nowait(self, n: int) -> bytes:
543 """Read not more than n bytes, or whole buffer if n == -1"""
544 self._timer.assert_timeout()
546 chunks = []
547 while self._buffer:
548 chunk = self._read_nowait_chunk(n)
549 chunks.append(chunk)
550 if n != -1:
551 n -= len(chunk)
552 if n == 0:
553 break
555 return b"".join(chunks) if chunks else b""
558class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
560 __slots__ = ("_read_eof_chunk",)
562 def __init__(self) -> None:
563 self._read_eof_chunk = False
564 self.total_bytes = 0
566 def __repr__(self) -> str:
567 return "<%s>" % self.__class__.__name__
569 def exception(self) -> BaseException | None:
570 return None
572 def set_exception(
573 self,
574 exc: type[BaseException] | BaseException,
575 exc_cause: BaseException = _EXC_SENTINEL,
576 ) -> None:
577 pass
579 def on_eof(self, callback: Callable[[], None]) -> None:
580 try:
581 callback()
582 except Exception:
583 internal_logger.exception("Exception in eof callback")
585 def feed_eof(self) -> None:
586 pass
588 def is_eof(self) -> bool:
589 return True
591 def at_eof(self) -> bool:
592 return True
594 async def wait_eof(self) -> None:
595 return
597 def feed_data(self, data: bytes) -> bool:
598 return False
600 def set_read_chunk_size(self, n: int) -> None:
601 return
603 async def readline(self, *, max_line_length: int | None = None) -> bytes:
604 return b""
606 async def read(self, n: int = -1) -> bytes:
607 return b""
609 # TODO add async def readuntil
611 async def readany(self) -> bytes:
612 return b""
614 async def readchunk(self) -> tuple[bytes, bool]:
615 if not self._read_eof_chunk:
616 self._read_eof_chunk = True
617 return (b"", False)
619 return (b"", True)
621 async def readexactly(self, n: int) -> bytes:
622 raise asyncio.IncompleteReadError(b"", n)
624 def read_nowait(self, n: int = -1) -> bytes:
625 return b""
628EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
631class DataQueue(Generic[_T]):
632 """DataQueue is a general-purpose blocking queue with one reader."""
634 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
635 self._loop = loop
636 self._eof = False
637 self._waiter: asyncio.Future[None] | None = None
638 self._exception: type[BaseException] | BaseException | None = None
639 self._buffer: collections.deque[_T] = collections.deque()
641 def __len__(self) -> int:
642 return len(self._buffer)
644 def is_eof(self) -> bool:
645 return self._eof
647 def at_eof(self) -> bool:
648 return self._eof and not self._buffer
650 def exception(self) -> type[BaseException] | BaseException | None:
651 return self._exception
653 def set_exception(
654 self,
655 exc: type[BaseException] | BaseException,
656 exc_cause: BaseException = _EXC_SENTINEL,
657 ) -> None:
658 self._eof = True
659 self._exception = exc
660 if (waiter := self._waiter) is not None:
661 self._waiter = None
662 set_exception(waiter, exc, exc_cause)
664 def feed_data(self, data: _T) -> None:
665 self._buffer.append(data)
666 if (waiter := self._waiter) is not None:
667 self._waiter = None
668 set_result(waiter, None)
670 def feed_eof(self) -> None:
671 self._eof = True
672 if (waiter := self._waiter) is not None:
673 self._waiter = None
674 set_result(waiter, None)
676 async def read(self) -> _T:
677 if not self._buffer and not self._eof:
678 assert not self._waiter
679 self._waiter = self._loop.create_future()
680 try:
681 await self._waiter
682 except (asyncio.CancelledError, asyncio.TimeoutError):
683 self._waiter = None
684 raise
685 if self._buffer:
686 return self._buffer.popleft()
687 if self._exception is not None:
688 raise self._exception
689 raise EofStream
691 def __aiter__(self) -> AsyncStreamIterator[_T]:
692 return AsyncStreamIterator(self.read)