Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/streams.py: 34%
392 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
1import asyncio
2import collections
3import warnings
4from typing import (
5 Awaitable,
6 Callable,
7 Deque,
8 Final,
9 Generic,
10 List,
11 Optional,
12 Tuple,
13 TypeVar,
14)
16from .base_protocol import BaseProtocol
17from .helpers import BaseTimerContext, TimerNoop, set_exception, set_result
18from .log import internal_logger
20__all__ = (
21 "EMPTY_PAYLOAD",
22 "EofStream",
23 "StreamReader",
24 "DataQueue",
25 "FlowControlDataQueue",
26)
28_T = TypeVar("_T")
31class EofStream(Exception):
32 """eof stream indication."""
35class AsyncStreamIterator(Generic[_T]):
36 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
37 self.read_func = read_func
39 def __aiter__(self) -> "AsyncStreamIterator[_T]":
40 return self
42 async def __anext__(self) -> _T:
43 try:
44 rv = await self.read_func()
45 except EofStream:
46 raise StopAsyncIteration
47 if rv == b"":
48 raise StopAsyncIteration
49 return rv
52class ChunkTupleAsyncStreamIterator:
53 def __init__(self, stream: "StreamReader") -> None:
54 self._stream = stream
56 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
57 return self
59 async def __anext__(self) -> Tuple[bytes, bool]:
60 rv = await self._stream.readchunk()
61 if rv == (b"", False):
62 raise StopAsyncIteration
63 return rv
66class AsyncStreamReaderMixin:
67 def __aiter__(self) -> AsyncStreamIterator[bytes]:
68 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
70 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
71 """Returns an asynchronous iterator that yields chunks of size n."""
72 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined]
74 def iter_any(self) -> AsyncStreamIterator[bytes]:
75 """Yield all available data as soon as it is received."""
76 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
78 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
79 """Yield chunks of data as they are received by the server.
81 The yielded objects are tuples
82 of (bytes, bool) as returned by the StreamReader.readchunk method.
83 """
84 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
87class StreamReader(AsyncStreamReaderMixin):
88 """An enhancement of asyncio.StreamReader.
90 Supports asynchronous iteration by line, chunk or as available::
92 async for line in reader:
93 ...
94 async for chunk in reader.iter_chunked(1024):
95 ...
96 async for slice in reader.iter_any():
97 ...
99 """
101 total_bytes = 0
103 def __init__(
104 self,
105 protocol: BaseProtocol,
106 limit: int,
107 *,
108 timer: Optional[BaseTimerContext] = None,
109 loop: asyncio.AbstractEventLoop,
110 ) -> None:
111 self._protocol = protocol
112 self._low_water = limit
113 self._high_water = limit * 2
114 if loop is None:
115 loop = asyncio.get_event_loop()
116 self._loop = loop
117 self._size = 0
118 self._cursor = 0
119 self._http_chunk_splits: Optional[List[int]] = None
120 self._buffer: Deque[bytes] = collections.deque()
121 self._buffer_offset = 0
122 self._eof = False
123 self._waiter: Optional[asyncio.Future[None]] = None
124 self._eof_waiter: Optional[asyncio.Future[None]] = None
125 self._exception: Optional[BaseException] = None
126 self._timer = TimerNoop() if timer is None else timer
127 self._eof_callbacks: List[Callable[[], None]] = []
129 def __repr__(self) -> str:
130 info = [self.__class__.__name__]
131 if self._size:
132 info.append("%d bytes" % self._size)
133 if self._eof:
134 info.append("eof")
135 if self._low_water != 2**16: # default limit
136 info.append("low=%d high=%d" % (self._low_water, self._high_water))
137 if self._waiter:
138 info.append("w=%r" % self._waiter)
139 if self._exception:
140 info.append("e=%r" % self._exception)
141 return "<%s>" % " ".join(info)
143 def get_read_buffer_limits(self) -> Tuple[int, int]:
144 return (self._low_water, self._high_water)
146 def exception(self) -> Optional[BaseException]:
147 return self._exception
149 def set_exception(self, exc: BaseException) -> None:
150 self._exception = exc
151 self._eof_callbacks.clear()
153 waiter = self._waiter
154 if waiter is not None:
155 self._waiter = None
156 set_exception(waiter, exc)
158 waiter = self._eof_waiter
159 if waiter is not None:
160 self._eof_waiter = None
161 set_exception(waiter, exc)
163 def on_eof(self, callback: Callable[[], None]) -> None:
164 if self._eof:
165 try:
166 callback()
167 except Exception:
168 internal_logger.exception("Exception in eof callback")
169 else:
170 self._eof_callbacks.append(callback)
172 def feed_eof(self) -> None:
173 self._eof = True
175 waiter = self._waiter
176 if waiter is not None:
177 self._waiter = None
178 set_result(waiter, None)
180 waiter = self._eof_waiter
181 if waiter is not None:
182 self._eof_waiter = None
183 set_result(waiter, None)
185 for cb in self._eof_callbacks:
186 try:
187 cb()
188 except Exception:
189 internal_logger.exception("Exception in eof callback")
191 self._eof_callbacks.clear()
193 def is_eof(self) -> bool:
194 """Return True if 'feed_eof' was called."""
195 return self._eof
197 def at_eof(self) -> bool:
198 """Return True if the buffer is empty and 'feed_eof' was called."""
199 return self._eof and not self._buffer
201 async def wait_eof(self) -> None:
202 if self._eof:
203 return
205 assert self._eof_waiter is None
206 self._eof_waiter = self._loop.create_future()
207 try:
208 await self._eof_waiter
209 finally:
210 self._eof_waiter = None
212 def unread_data(self, data: bytes) -> None:
213 """rollback reading some data from stream, inserting it to buffer head."""
214 warnings.warn(
215 "unread_data() is deprecated "
216 "and will be removed in future releases (#3260)",
217 DeprecationWarning,
218 stacklevel=2,
219 )
220 if not data:
221 return
223 if self._buffer_offset:
224 self._buffer[0] = self._buffer[0][self._buffer_offset :]
225 self._buffer_offset = 0
226 self._size += len(data)
227 self._cursor -= len(data)
228 self._buffer.appendleft(data)
229 self._eof_counter = 0
231 # TODO: size is ignored, remove the param later
232 def feed_data(self, data: bytes, size: int = 0) -> None:
233 assert not self._eof, "feed_data after feed_eof"
235 if not data:
236 return
238 self._size += len(data)
239 self._buffer.append(data)
240 self.total_bytes += len(data)
242 waiter = self._waiter
243 if waiter is not None:
244 self._waiter = None
245 set_result(waiter, None)
247 if self._size > self._high_water and not self._protocol._reading_paused:
248 self._protocol.pause_reading()
250 def begin_http_chunk_receiving(self) -> None:
251 if self._http_chunk_splits is None:
252 if self.total_bytes:
253 raise RuntimeError(
254 "Called begin_http_chunk_receiving when" "some data was already fed"
255 )
256 self._http_chunk_splits = []
258 def end_http_chunk_receiving(self) -> None:
259 if self._http_chunk_splits is None:
260 raise RuntimeError(
261 "Called end_chunk_receiving without calling "
262 "begin_chunk_receiving first"
263 )
265 # self._http_chunk_splits contains logical byte offsets from start of
266 # the body transfer. Each offset is the offset of the end of a chunk.
267 # "Logical" means bytes, accessible for a user.
268 # If no chunks containing logical data were received, current position
269 # is difinitely zero.
270 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
272 if self.total_bytes == pos:
273 # We should not add empty chunks here. So we check for that.
274 # Note, when chunked + gzip is used, we can receive a chunk
275 # of compressed data, but that data may not be enough for gzip FSM
276 # to yield any uncompressed data. That's why current position may
277 # not change after receiving a chunk.
278 return
280 self._http_chunk_splits.append(self.total_bytes)
282 # wake up readchunk when end of http chunk received
283 waiter = self._waiter
284 if waiter is not None:
285 self._waiter = None
286 set_result(waiter, None)
288 async def _wait(self, func_name: str) -> None:
289 # StreamReader uses a future to link the protocol feed_data() method
290 # to a read coroutine. Running two read coroutines at the same time
291 # would have an unexpected behaviour. It would not possible to know
292 # which coroutine would get the next data.
293 if self._waiter is not None:
294 raise RuntimeError(
295 "%s() called while another coroutine is "
296 "already waiting for incoming data" % func_name
297 )
299 waiter = self._waiter = self._loop.create_future()
300 try:
301 with self._timer:
302 await waiter
303 finally:
304 self._waiter = None
306 async def readline(self) -> bytes:
307 return await self.readuntil()
309 async def readuntil(self, separator: bytes = b"\n") -> bytes:
310 seplen = len(separator)
311 if seplen == 0:
312 raise ValueError("Separator should be at least one-byte string")
314 if self._exception is not None:
315 raise self._exception
317 chunk = b""
318 chunk_size = 0
319 not_enough = True
321 while not_enough:
322 while self._buffer and not_enough:
323 offset = self._buffer_offset
324 ichar = self._buffer[0].find(separator, offset) + 1
325 # Read from current offset to found separator or to the end.
326 data = self._read_nowait_chunk(
327 ichar - offset + seplen - 1 if ichar else -1
328 )
329 chunk += data
330 chunk_size += len(data)
331 if ichar:
332 not_enough = False
334 if chunk_size > self._high_water:
335 raise ValueError("Chunk too big")
337 if self._eof:
338 break
340 if not_enough:
341 await self._wait("readuntil")
343 return chunk
345 async def read(self, n: int = -1) -> bytes:
346 if self._exception is not None:
347 raise self._exception
349 if not n:
350 return b""
352 if n < 0:
353 # This used to just loop creating a new waiter hoping to
354 # collect everything in self._buffer, but that would
355 # deadlock if the subprocess sends more than self.limit
356 # bytes. So just call self.readany() until EOF.
357 blocks = []
358 while True:
359 block = await self.readany()
360 if not block:
361 break
362 blocks.append(block)
363 return b"".join(blocks)
365 # TODO: should be `if` instead of `while`
366 # because waiter maybe triggered on chunk end,
367 # without feeding any data
368 while not self._buffer and not self._eof:
369 await self._wait("read")
371 return self._read_nowait(n)
373 async def readany(self) -> bytes:
374 if self._exception is not None:
375 raise self._exception
377 # TODO: should be `if` instead of `while`
378 # because waiter maybe triggered on chunk end,
379 # without feeding any data
380 while not self._buffer and not self._eof:
381 await self._wait("readany")
383 return self._read_nowait(-1)
385 async def readchunk(self) -> Tuple[bytes, bool]:
386 """Returns a tuple of (data, end_of_http_chunk).
388 When chunked transfer
389 encoding is used, end_of_http_chunk is a boolean indicating if the end
390 of the data corresponds to the end of a HTTP chunk , otherwise it is
391 always False.
392 """
393 while True:
394 if self._exception is not None:
395 raise self._exception
397 while self._http_chunk_splits:
398 pos = self._http_chunk_splits.pop(0)
399 if pos == self._cursor:
400 return (b"", True)
401 if pos > self._cursor:
402 return (self._read_nowait(pos - self._cursor), True)
403 internal_logger.warning(
404 "Skipping HTTP chunk end due to data "
405 "consumption beyond chunk boundary"
406 )
408 if self._buffer:
409 return (self._read_nowait_chunk(-1), False)
410 # return (self._read_nowait(-1), False)
412 if self._eof:
413 # Special case for signifying EOF.
414 # (b'', True) is not a final return value actually.
415 return (b"", False)
417 await self._wait("readchunk")
419 async def readexactly(self, n: int) -> bytes:
420 if self._exception is not None:
421 raise self._exception
423 blocks: List[bytes] = []
424 while n > 0:
425 block = await self.read(n)
426 if not block:
427 partial = b"".join(blocks)
428 raise asyncio.IncompleteReadError(partial, len(partial) + n)
429 blocks.append(block)
430 n -= len(block)
432 return b"".join(blocks)
434 def read_nowait(self, n: int = -1) -> bytes:
435 # default was changed to be consistent with .read(-1)
436 #
437 # I believe the most users don't know about the method and
438 # they are not affected.
439 if self._exception is not None:
440 raise self._exception
442 if self._waiter and not self._waiter.done():
443 raise RuntimeError(
444 "Called while some coroutine is waiting for incoming data."
445 )
447 return self._read_nowait(n)
449 def _read_nowait_chunk(self, n: int) -> bytes:
450 first_buffer = self._buffer[0]
451 offset = self._buffer_offset
452 if n != -1 and len(first_buffer) - offset > n:
453 data = first_buffer[offset : offset + n]
454 self._buffer_offset += n
456 elif offset:
457 self._buffer.popleft()
458 data = first_buffer[offset:]
459 self._buffer_offset = 0
461 else:
462 data = self._buffer.popleft()
464 self._size -= len(data)
465 self._cursor += len(data)
467 chunk_splits = self._http_chunk_splits
468 # Prevent memory leak: drop useless chunk splits
469 while chunk_splits and chunk_splits[0] < self._cursor:
470 chunk_splits.pop(0)
472 if self._size < self._low_water and self._protocol._reading_paused:
473 self._protocol.resume_reading()
474 return data
476 def _read_nowait(self, n: int) -> bytes:
477 """Read not more than n bytes, or whole buffer if n == -1"""
478 self._timer.assert_timeout()
480 chunks = []
481 while self._buffer:
482 chunk = self._read_nowait_chunk(n)
483 chunks.append(chunk)
484 if n != -1:
485 n -= len(chunk)
486 if n == 0:
487 break
489 return b"".join(chunks) if chunks else b""
492class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
493 def __init__(self) -> None:
494 self._read_eof_chunk = False
496 def __repr__(self) -> str:
497 return "<%s>" % self.__class__.__name__
499 def exception(self) -> Optional[BaseException]:
500 return None
502 def set_exception(self, exc: BaseException) -> None:
503 pass
505 def on_eof(self, callback: Callable[[], None]) -> None:
506 try:
507 callback()
508 except Exception:
509 internal_logger.exception("Exception in eof callback")
511 def feed_eof(self) -> None:
512 pass
514 def is_eof(self) -> bool:
515 return True
517 def at_eof(self) -> bool:
518 return True
520 async def wait_eof(self) -> None:
521 return
523 def feed_data(self, data: bytes, n: int = 0) -> None:
524 pass
526 async def readline(self) -> bytes:
527 return b""
529 async def read(self, n: int = -1) -> bytes:
530 return b""
532 # TODO add async def readuntil
534 async def readany(self) -> bytes:
535 return b""
537 async def readchunk(self) -> Tuple[bytes, bool]:
538 if not self._read_eof_chunk:
539 self._read_eof_chunk = True
540 return (b"", False)
542 return (b"", True)
544 async def readexactly(self, n: int) -> bytes:
545 raise asyncio.IncompleteReadError(b"", n)
547 def read_nowait(self, n: int = -1) -> bytes:
548 return b""
551EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
554class DataQueue(Generic[_T]):
555 """DataQueue is a general-purpose blocking queue with one reader."""
557 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
558 self._loop = loop
559 self._eof = False
560 self._waiter: Optional[asyncio.Future[None]] = None
561 self._exception: Optional[BaseException] = None
562 self._size = 0
563 self._buffer: Deque[Tuple[_T, int]] = collections.deque()
565 def __len__(self) -> int:
566 return len(self._buffer)
568 def is_eof(self) -> bool:
569 return self._eof
571 def at_eof(self) -> bool:
572 return self._eof and not self._buffer
574 def exception(self) -> Optional[BaseException]:
575 return self._exception
577 def set_exception(self, exc: BaseException) -> None:
578 self._eof = True
579 self._exception = exc
581 waiter = self._waiter
582 if waiter is not None:
583 self._waiter = None
584 set_exception(waiter, exc)
586 def feed_data(self, data: _T, size: int = 0) -> None:
587 self._size += size
588 self._buffer.append((data, size))
590 waiter = self._waiter
591 if waiter is not None:
592 self._waiter = None
593 set_result(waiter, None)
595 def feed_eof(self) -> None:
596 self._eof = True
598 waiter = self._waiter
599 if waiter is not None:
600 self._waiter = None
601 set_result(waiter, None)
603 async def read(self) -> _T:
604 if not self._buffer and not self._eof:
605 assert not self._waiter
606 self._waiter = self._loop.create_future()
607 try:
608 await self._waiter
609 except (asyncio.CancelledError, asyncio.TimeoutError):
610 self._waiter = None
611 raise
613 if self._buffer:
614 data, size = self._buffer.popleft()
615 self._size -= size
616 return data
617 else:
618 if self._exception is not None:
619 raise self._exception
620 else:
621 raise EofStream
623 def __aiter__(self) -> AsyncStreamIterator[_T]:
624 return AsyncStreamIterator(self.read)
627class FlowControlDataQueue(DataQueue[_T]):
628 """FlowControlDataQueue resumes and pauses an underlying stream.
630 It is a destination for parsed data.
631 """
633 def __init__(
634 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
635 ) -> None:
636 super().__init__(loop=loop)
638 self._protocol = protocol
639 self._limit = limit * 2
641 def feed_data(self, data: _T, size: int = 0) -> None:
642 super().feed_data(data, size)
644 if self._size > self._limit and not self._protocol._reading_paused:
645 self._protocol.pause_reading()
647 async def read(self) -> _T:
648 try:
649 return await super().read()
650 finally:
651 if self._size < self._limit and self._protocol._reading_paused:
652 self._protocol.resume_reading()