Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/streams.py: 34%
392 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1import asyncio
2import collections
3import warnings
4from typing import Awaitable, Callable, Generic, List, Optional, Tuple, TypeVar
6from typing_extensions import Final
8from .base_protocol import BaseProtocol
9from .helpers import BaseTimerContext, TimerNoop, set_exception, set_result
10from .log import internal_logger
12try: # pragma: no cover
13 from typing import Deque
14except ImportError:
15 from typing_extensions import Deque
17__all__ = (
18 "EMPTY_PAYLOAD",
19 "EofStream",
20 "StreamReader",
21 "DataQueue",
22 "FlowControlDataQueue",
23)
25_T = TypeVar("_T")
28class EofStream(Exception):
29 """eof stream indication."""
32class AsyncStreamIterator(Generic[_T]):
33 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
34 self.read_func = read_func
36 def __aiter__(self) -> "AsyncStreamIterator[_T]":
37 return self
39 async def __anext__(self) -> _T:
40 try:
41 rv = await self.read_func()
42 except EofStream:
43 raise StopAsyncIteration
44 if rv == b"":
45 raise StopAsyncIteration
46 return rv
49class ChunkTupleAsyncStreamIterator:
50 def __init__(self, stream: "StreamReader") -> None:
51 self._stream = stream
53 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
54 return self
56 async def __anext__(self) -> Tuple[bytes, bool]:
57 rv = await self._stream.readchunk()
58 if rv == (b"", False):
59 raise StopAsyncIteration
60 return rv
63class AsyncStreamReaderMixin:
64 def __aiter__(self) -> AsyncStreamIterator[bytes]:
65 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
67 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
68 """Returns an asynchronous iterator that yields chunks of size n."""
69 return AsyncStreamIterator(
70 lambda: self.read(n) # type: ignore[attr-defined,no-any-return]
71 )
73 def iter_any(self) -> AsyncStreamIterator[bytes]:
74 """Yield all available data as soon as it is received."""
75 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
77 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
78 """Yield chunks of data as they are received by the server.
80 The yielded objects are tuples
81 of (bytes, bool) as returned by the StreamReader.readchunk method.
82 """
83 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
86class StreamReader(AsyncStreamReaderMixin):
87 """An enhancement of asyncio.StreamReader.
89 Supports asynchronous iteration by line, chunk or as available::
91 async for line in reader:
92 ...
93 async for chunk in reader.iter_chunked(1024):
94 ...
95 async for slice in reader.iter_any():
96 ...
98 """
100 total_bytes = 0
102 def __init__(
103 self,
104 protocol: BaseProtocol,
105 limit: int,
106 *,
107 timer: Optional[BaseTimerContext] = None,
108 loop: asyncio.AbstractEventLoop,
109 ) -> None:
110 self._protocol = protocol
111 self._low_water = limit
112 self._high_water = limit * 2
113 if loop is None:
114 loop = asyncio.get_event_loop()
115 self._loop = loop
116 self._size = 0
117 self._cursor = 0
118 self._http_chunk_splits: Optional[List[int]] = None
119 self._buffer: Deque[bytes] = collections.deque()
120 self._buffer_offset = 0
121 self._eof = False
122 self._waiter: Optional[asyncio.Future[None]] = None
123 self._eof_waiter: Optional[asyncio.Future[None]] = None
124 self._exception: Optional[BaseException] = None
125 self._timer = TimerNoop() if timer is None else timer
126 self._eof_callbacks: List[Callable[[], None]] = []
128 def __repr__(self) -> str:
129 info = [self.__class__.__name__]
130 if self._size:
131 info.append("%d bytes" % self._size)
132 if self._eof:
133 info.append("eof")
134 if self._low_water != 2**16: # default limit
135 info.append("low=%d high=%d" % (self._low_water, self._high_water))
136 if self._waiter:
137 info.append("w=%r" % self._waiter)
138 if self._exception:
139 info.append("e=%r" % self._exception)
140 return "<%s>" % " ".join(info)
142 def get_read_buffer_limits(self) -> Tuple[int, int]:
143 return (self._low_water, self._high_water)
145 def exception(self) -> Optional[BaseException]:
146 return self._exception
148 def set_exception(self, exc: BaseException) -> None:
149 self._exception = exc
150 self._eof_callbacks.clear()
152 waiter = self._waiter
153 if waiter is not None:
154 self._waiter = None
155 set_exception(waiter, exc)
157 waiter = self._eof_waiter
158 if waiter is not None:
159 self._eof_waiter = None
160 set_exception(waiter, exc)
162 def on_eof(self, callback: Callable[[], None]) -> None:
163 if self._eof:
164 try:
165 callback()
166 except Exception:
167 internal_logger.exception("Exception in eof callback")
168 else:
169 self._eof_callbacks.append(callback)
171 def feed_eof(self) -> None:
172 self._eof = True
174 waiter = self._waiter
175 if waiter is not None:
176 self._waiter = None
177 set_result(waiter, None)
179 waiter = self._eof_waiter
180 if waiter is not None:
181 self._eof_waiter = None
182 set_result(waiter, None)
184 for cb in self._eof_callbacks:
185 try:
186 cb()
187 except Exception:
188 internal_logger.exception("Exception in eof callback")
190 self._eof_callbacks.clear()
192 def is_eof(self) -> bool:
193 """Return True if 'feed_eof' was called."""
194 return self._eof
196 def at_eof(self) -> bool:
197 """Return True if the buffer is empty and 'feed_eof' was called."""
198 return self._eof and not self._buffer
200 async def wait_eof(self) -> None:
201 if self._eof:
202 return
204 assert self._eof_waiter is None
205 self._eof_waiter = self._loop.create_future()
206 try:
207 await self._eof_waiter
208 finally:
209 self._eof_waiter = None
211 def unread_data(self, data: bytes) -> None:
212 """rollback reading some data from stream, inserting it to buffer head."""
213 warnings.warn(
214 "unread_data() is deprecated "
215 "and will be removed in future releases (#3260)",
216 DeprecationWarning,
217 stacklevel=2,
218 )
219 if not data:
220 return
222 if self._buffer_offset:
223 self._buffer[0] = self._buffer[0][self._buffer_offset :]
224 self._buffer_offset = 0
225 self._size += len(data)
226 self._cursor -= len(data)
227 self._buffer.appendleft(data)
228 self._eof_counter = 0
230 # TODO: size is ignored, remove the param later
231 def feed_data(self, data: bytes, size: int = 0) -> None:
232 assert not self._eof, "feed_data after feed_eof"
234 if not data:
235 return
237 self._size += len(data)
238 self._buffer.append(data)
239 self.total_bytes += len(data)
241 waiter = self._waiter
242 if waiter is not None:
243 self._waiter = None
244 set_result(waiter, None)
246 if self._size > self._high_water and not self._protocol._reading_paused:
247 self._protocol.pause_reading()
249 def begin_http_chunk_receiving(self) -> None:
250 if self._http_chunk_splits is None:
251 if self.total_bytes:
252 raise RuntimeError(
253 "Called begin_http_chunk_receiving when" "some data was already fed"
254 )
255 self._http_chunk_splits = []
257 def end_http_chunk_receiving(self) -> None:
258 if self._http_chunk_splits is None:
259 raise RuntimeError(
260 "Called end_chunk_receiving without calling "
261 "begin_chunk_receiving first"
262 )
264 # self._http_chunk_splits contains logical byte offsets from start of
265 # the body transfer. Each offset is the offset of the end of a chunk.
266 # "Logical" means bytes, accessible for a user.
267 # If no chunks containing logical data were received, current position
268 # is difinitely zero.
269 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
271 if self.total_bytes == pos:
272 # We should not add empty chunks here. So we check for that.
273 # Note, when chunked + gzip is used, we can receive a chunk
274 # of compressed data, but that data may not be enough for gzip FSM
275 # to yield any uncompressed data. That's why current position may
276 # not change after receiving a chunk.
277 return
279 self._http_chunk_splits.append(self.total_bytes)
281 # wake up readchunk when end of http chunk received
282 waiter = self._waiter
283 if waiter is not None:
284 self._waiter = None
285 set_result(waiter, None)
287 async def _wait(self, func_name: str) -> None:
288 # StreamReader uses a future to link the protocol feed_data() method
289 # to a read coroutine. Running two read coroutines at the same time
290 # would have an unexpected behaviour. It would not possible to know
291 # which coroutine would get the next data.
292 if self._waiter is not None:
293 raise RuntimeError(
294 "%s() called while another coroutine is "
295 "already waiting for incoming data" % func_name
296 )
298 waiter = self._waiter = self._loop.create_future()
299 try:
300 with self._timer:
301 await waiter
302 finally:
303 self._waiter = None
305 async def readline(self) -> bytes:
306 return await self.readuntil()
308 async def readuntil(self, separator: bytes = b"\n") -> bytes:
309 seplen = len(separator)
310 if seplen == 0:
311 raise ValueError("Separator should be at least one-byte string")
313 if self._exception is not None:
314 raise self._exception
316 chunk = b""
317 chunk_size = 0
318 not_enough = True
320 while not_enough:
321 while self._buffer and not_enough:
322 offset = self._buffer_offset
323 ichar = self._buffer[0].find(separator, offset) + 1
324 # Read from current offset to found separator or to the end.
325 data = self._read_nowait_chunk(
326 ichar - offset + seplen - 1 if ichar else -1
327 )
328 chunk += data
329 chunk_size += len(data)
330 if ichar:
331 not_enough = False
333 if chunk_size > self._high_water:
334 raise ValueError("Chunk too big")
336 if self._eof:
337 break
339 if not_enough:
340 await self._wait("readuntil")
342 return chunk
344 async def read(self, n: int = -1) -> bytes:
345 if self._exception is not None:
346 raise self._exception
348 if not n:
349 return b""
351 if n < 0:
352 # This used to just loop creating a new waiter hoping to
353 # collect everything in self._buffer, but that would
354 # deadlock if the subprocess sends more than self.limit
355 # bytes. So just call self.readany() until EOF.
356 blocks = []
357 while True:
358 block = await self.readany()
359 if not block:
360 break
361 blocks.append(block)
362 return b"".join(blocks)
364 # TODO: should be `if` instead of `while`
365 # because waiter maybe triggered on chunk end,
366 # without feeding any data
367 while not self._buffer and not self._eof:
368 await self._wait("read")
370 return self._read_nowait(n)
372 async def readany(self) -> bytes:
373 if self._exception is not None:
374 raise self._exception
376 # TODO: should be `if` instead of `while`
377 # because waiter maybe triggered on chunk end,
378 # without feeding any data
379 while not self._buffer and not self._eof:
380 await self._wait("readany")
382 return self._read_nowait(-1)
384 async def readchunk(self) -> Tuple[bytes, bool]:
385 """Returns a tuple of (data, end_of_http_chunk).
387 When chunked transfer
388 encoding is used, end_of_http_chunk is a boolean indicating if the end
389 of the data corresponds to the end of a HTTP chunk , otherwise it is
390 always False.
391 """
392 while True:
393 if self._exception is not None:
394 raise self._exception
396 while self._http_chunk_splits:
397 pos = self._http_chunk_splits.pop(0)
398 if pos == self._cursor:
399 return (b"", True)
400 if pos > self._cursor:
401 return (self._read_nowait(pos - self._cursor), True)
402 internal_logger.warning(
403 "Skipping HTTP chunk end due to data "
404 "consumption beyond chunk boundary"
405 )
407 if self._buffer:
408 return (self._read_nowait_chunk(-1), False)
409 # return (self._read_nowait(-1), False)
411 if self._eof:
412 # Special case for signifying EOF.
413 # (b'', True) is not a final return value actually.
414 return (b"", False)
416 await self._wait("readchunk")
418 async def readexactly(self, n: int) -> bytes:
419 if self._exception is not None:
420 raise self._exception
422 blocks: List[bytes] = []
423 while n > 0:
424 block = await self.read(n)
425 if not block:
426 partial = b"".join(blocks)
427 raise asyncio.IncompleteReadError(partial, len(partial) + n)
428 blocks.append(block)
429 n -= len(block)
431 return b"".join(blocks)
433 def read_nowait(self, n: int = -1) -> bytes:
434 # default was changed to be consistent with .read(-1)
435 #
436 # I believe the most users don't know about the method and
437 # they are not affected.
438 if self._exception is not None:
439 raise self._exception
441 if self._waiter and not self._waiter.done():
442 raise RuntimeError(
443 "Called while some coroutine is waiting for incoming data."
444 )
446 return self._read_nowait(n)
448 def _read_nowait_chunk(self, n: int) -> bytes:
449 first_buffer = self._buffer[0]
450 offset = self._buffer_offset
451 if n != -1 and len(first_buffer) - offset > n:
452 data = first_buffer[offset : offset + n]
453 self._buffer_offset += n
455 elif offset:
456 self._buffer.popleft()
457 data = first_buffer[offset:]
458 self._buffer_offset = 0
460 else:
461 data = self._buffer.popleft()
463 self._size -= len(data)
464 self._cursor += len(data)
466 chunk_splits = self._http_chunk_splits
467 # Prevent memory leak: drop useless chunk splits
468 while chunk_splits and chunk_splits[0] < self._cursor:
469 chunk_splits.pop(0)
471 if self._size < self._low_water and self._protocol._reading_paused:
472 self._protocol.resume_reading()
473 return data
475 def _read_nowait(self, n: int) -> bytes:
476 """Read not more than n bytes, or whole buffer if n == -1"""
477 self._timer.assert_timeout()
479 chunks = []
480 while self._buffer:
481 chunk = self._read_nowait_chunk(n)
482 chunks.append(chunk)
483 if n != -1:
484 n -= len(chunk)
485 if n == 0:
486 break
488 return b"".join(chunks) if chunks else b""
491class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
492 def __init__(self) -> None:
493 pass
495 def __repr__(self) -> str:
496 return "<%s>" % self.__class__.__name__
498 def exception(self) -> Optional[BaseException]:
499 return None
501 def set_exception(self, exc: BaseException) -> None:
502 pass
504 def on_eof(self, callback: Callable[[], None]) -> None:
505 try:
506 callback()
507 except Exception:
508 internal_logger.exception("Exception in eof callback")
510 def feed_eof(self) -> None:
511 pass
513 def is_eof(self) -> bool:
514 return True
516 def at_eof(self) -> bool:
517 return True
519 async def wait_eof(self) -> None:
520 return
522 def feed_data(self, data: bytes, n: int = 0) -> None:
523 pass
525 async def readline(self) -> bytes:
526 return b""
528 async def read(self, n: int = -1) -> bytes:
529 return b""
531 # TODO add async def readuntil
533 async def readany(self) -> bytes:
534 return b""
536 async def readchunk(self) -> Tuple[bytes, bool]:
537 return (b"", True)
539 async def readexactly(self, n: int) -> bytes:
540 raise asyncio.IncompleteReadError(b"", n)
542 def read_nowait(self, n: int = -1) -> bytes:
543 return b""
546EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
549class DataQueue(Generic[_T]):
550 """DataQueue is a general-purpose blocking queue with one reader."""
552 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
553 self._loop = loop
554 self._eof = False
555 self._waiter: Optional[asyncio.Future[None]] = None
556 self._exception: Optional[BaseException] = None
557 self._size = 0
558 self._buffer: Deque[Tuple[_T, int]] = collections.deque()
560 def __len__(self) -> int:
561 return len(self._buffer)
563 def is_eof(self) -> bool:
564 return self._eof
566 def at_eof(self) -> bool:
567 return self._eof and not self._buffer
569 def exception(self) -> Optional[BaseException]:
570 return self._exception
572 def set_exception(self, exc: BaseException) -> None:
573 self._eof = True
574 self._exception = exc
576 waiter = self._waiter
577 if waiter is not None:
578 self._waiter = None
579 set_exception(waiter, exc)
581 def feed_data(self, data: _T, size: int = 0) -> None:
582 self._size += size
583 self._buffer.append((data, size))
585 waiter = self._waiter
586 if waiter is not None:
587 self._waiter = None
588 set_result(waiter, None)
590 def feed_eof(self) -> None:
591 self._eof = True
593 waiter = self._waiter
594 if waiter is not None:
595 self._waiter = None
596 set_result(waiter, None)
598 async def read(self) -> _T:
599 if not self._buffer and not self._eof:
600 assert not self._waiter
601 self._waiter = self._loop.create_future()
602 try:
603 await self._waiter
604 except (asyncio.CancelledError, asyncio.TimeoutError):
605 self._waiter = None
606 raise
608 if self._buffer:
609 data, size = self._buffer.popleft()
610 self._size -= size
611 return data
612 else:
613 if self._exception is not None:
614 raise self._exception
615 else:
616 raise EofStream
618 def __aiter__(self) -> AsyncStreamIterator[_T]:
619 return AsyncStreamIterator(self.read)
622class FlowControlDataQueue(DataQueue[_T]):
623 """FlowControlDataQueue resumes and pauses an underlying stream.
625 It is a destination for parsed data.
626 """
628 def __init__(
629 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
630 ) -> None:
631 super().__init__(loop=loop)
633 self._protocol = protocol
634 self._limit = limit * 2
636 def feed_data(self, data: _T, size: int = 0) -> None:
637 super().feed_data(data, size)
639 if self._size > self._limit and not self._protocol._reading_paused:
640 self._protocol.pause_reading()
642 async def read(self) -> _T:
643 try:
644 return await super().read()
645 finally:
646 if self._size < self._limit and self._protocol._reading_paused:
647 self._protocol.resume_reading()