Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/streams.py: 21%
393 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
1import asyncio
2import collections
3import warnings
4from typing import Awaitable, Callable, Deque, Generic, List, Optional, Tuple, TypeVar
6from .base_protocol import BaseProtocol
7from .helpers import BaseTimerContext, set_exception, set_result
8from .log import internal_logger
9from .typedefs import Final
11__all__ = (
12 "EMPTY_PAYLOAD",
13 "EofStream",
14 "StreamReader",
15 "DataQueue",
16 "FlowControlDataQueue",
17)
19_T = TypeVar("_T")
22class EofStream(Exception):
23 """eof stream indication."""
26class AsyncStreamIterator(Generic[_T]):
27 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
28 self.read_func = read_func
30 def __aiter__(self) -> "AsyncStreamIterator[_T]":
31 return self
33 async def __anext__(self) -> _T:
34 try:
35 rv = await self.read_func()
36 except EofStream:
37 raise StopAsyncIteration
38 if rv == b"":
39 raise StopAsyncIteration
40 return rv
43class ChunkTupleAsyncStreamIterator:
44 def __init__(self, stream: "StreamReader") -> None:
45 self._stream = stream
47 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
48 return self
50 async def __anext__(self) -> Tuple[bytes, bool]:
51 rv = await self._stream.readchunk()
52 if rv == (b"", False):
53 raise StopAsyncIteration
54 return rv
57class AsyncStreamReaderMixin:
58 def __aiter__(self) -> AsyncStreamIterator[bytes]:
59 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
61 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
62 """Returns an asynchronous iterator that yields chunks of size n.
64 Python-3.5 available for Python 3.5+ only
65 """
66 return AsyncStreamIterator(
67 lambda: self.read(n) # type: ignore[attr-defined,no-any-return]
68 )
70 def iter_any(self) -> AsyncStreamIterator[bytes]:
71 """Yield all available data as soon as it is received.
73 Python-3.5 available for Python 3.5+ only
74 """
75 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
77 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
78 """Yield chunks of data as they are received by the server.
80 The yielded objects are tuples
81 of (bytes, bool) as returned by the StreamReader.readchunk method.
83 Python-3.5 available for Python 3.5+ only
84 """
85 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
88class StreamReader(AsyncStreamReaderMixin):
89 """An enhancement of asyncio.StreamReader.
91 Supports asynchronous iteration by line, chunk or as available::
93 async for line in reader:
94 ...
95 async for chunk in reader.iter_chunked(1024):
96 ...
97 async for slice in reader.iter_any():
98 ...
100 """
102 total_bytes = 0
104 def __init__(
105 self,
106 protocol: BaseProtocol,
107 limit: int,
108 *,
109 timer: Optional[BaseTimerContext] = None,
110 loop: Optional[asyncio.AbstractEventLoop] = None,
111 ) -> None:
112 self._protocol = protocol
113 self._low_water = limit
114 self._high_water = limit * 2
115 if loop is None:
116 loop = asyncio.get_event_loop()
117 self._loop = loop
118 self._size = 0
119 self._cursor = 0
120 self._http_chunk_splits: Optional[List[int]] = None
121 self._buffer: Deque[bytes] = collections.deque()
122 self._buffer_offset = 0
123 self._eof = False
124 self._waiter: Optional[asyncio.Future[None]] = None
125 self._eof_waiter: Optional[asyncio.Future[None]] = None
126 self._exception: Optional[BaseException] = None
127 self._timer = timer
128 self._eof_callbacks: List[Callable[[], None]] = []
130 def __repr__(self) -> str:
131 info = [self.__class__.__name__]
132 if self._size:
133 info.append("%d bytes" % self._size)
134 if self._eof:
135 info.append("eof")
136 if self._low_water != 2**16: # default limit
137 info.append("low=%d high=%d" % (self._low_water, self._high_water))
138 if self._waiter:
139 info.append("w=%r" % self._waiter)
140 if self._exception:
141 info.append("e=%r" % self._exception)
142 return "<%s>" % " ".join(info)
144 def get_read_buffer_limits(self) -> Tuple[int, int]:
145 return (self._low_water, self._high_water)
147 def exception(self) -> Optional[BaseException]:
148 return self._exception
150 def set_exception(self, exc: BaseException) -> None:
151 self._exception = exc
152 self._eof_callbacks.clear()
154 waiter = self._waiter
155 if waiter is not None:
156 self._waiter = None
157 set_exception(waiter, exc)
159 waiter = self._eof_waiter
160 if waiter is not None:
161 self._eof_waiter = None
162 set_exception(waiter, exc)
164 def on_eof(self, callback: Callable[[], None]) -> None:
165 if self._eof:
166 try:
167 callback()
168 except Exception:
169 internal_logger.exception("Exception in eof callback")
170 else:
171 self._eof_callbacks.append(callback)
173 def feed_eof(self) -> None:
174 self._eof = True
176 waiter = self._waiter
177 if waiter is not None:
178 self._waiter = None
179 set_result(waiter, None)
181 waiter = self._eof_waiter
182 if waiter is not None:
183 self._eof_waiter = None
184 set_result(waiter, None)
186 for cb in self._eof_callbacks:
187 try:
188 cb()
189 except Exception:
190 internal_logger.exception("Exception in eof callback")
192 self._eof_callbacks.clear()
194 def is_eof(self) -> bool:
195 """Return True if 'feed_eof' was called."""
196 return self._eof
198 def at_eof(self) -> bool:
199 """Return True if the buffer is empty and 'feed_eof' was called."""
200 return self._eof and not self._buffer
202 async def wait_eof(self) -> None:
203 if self._eof:
204 return
206 assert self._eof_waiter is None
207 self._eof_waiter = self._loop.create_future()
208 try:
209 await self._eof_waiter
210 finally:
211 self._eof_waiter = None
213 def unread_data(self, data: bytes) -> None:
214 """rollback reading some data from stream, inserting it to buffer head."""
215 warnings.warn(
216 "unread_data() is deprecated "
217 "and will be removed in future releases (#3260)",
218 DeprecationWarning,
219 stacklevel=2,
220 )
221 if not data:
222 return
224 if self._buffer_offset:
225 self._buffer[0] = self._buffer[0][self._buffer_offset :]
226 self._buffer_offset = 0
227 self._size += len(data)
228 self._cursor -= len(data)
229 self._buffer.appendleft(data)
230 self._eof_counter = 0
232 # TODO: size is ignored, remove the param later
233 def feed_data(self, data: bytes, size: int = 0) -> None:
234 assert not self._eof, "feed_data after feed_eof"
236 if not data:
237 return
239 self._size += len(data)
240 self._buffer.append(data)
241 self.total_bytes += len(data)
243 waiter = self._waiter
244 if waiter is not None:
245 self._waiter = None
246 set_result(waiter, None)
248 if self._size > self._high_water and not self._protocol._reading_paused:
249 self._protocol.pause_reading()
251 def begin_http_chunk_receiving(self) -> None:
252 if self._http_chunk_splits is None:
253 if self.total_bytes:
254 raise RuntimeError(
255 "Called begin_http_chunk_receiving when" "some data was already fed"
256 )
257 self._http_chunk_splits = []
259 def end_http_chunk_receiving(self) -> None:
260 if self._http_chunk_splits is None:
261 raise RuntimeError(
262 "Called end_chunk_receiving without calling "
263 "begin_chunk_receiving first"
264 )
266 # self._http_chunk_splits contains logical byte offsets from start of
267 # the body transfer. Each offset is the offset of the end of a chunk.
268 # "Logical" means bytes, accessible for a user.
269 # If no chunks containig logical data were received, current position
270 # is difinitely zero.
271 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
273 if self.total_bytes == pos:
274 # We should not add empty chunks here. So we check for that.
275 # Note, when chunked + gzip is used, we can receive a chunk
276 # of compressed data, but that data may not be enough for gzip FSM
277 # to yield any uncompressed data. That's why current position may
278 # not change after receiving a chunk.
279 return
281 self._http_chunk_splits.append(self.total_bytes)
283 # wake up readchunk when end of http chunk received
284 waiter = self._waiter
285 if waiter is not None:
286 self._waiter = None
287 set_result(waiter, None)
289 async def _wait(self, func_name: str) -> None:
290 # StreamReader uses a future to link the protocol feed_data() method
291 # to a read coroutine. Running two read coroutines at the same time
292 # would have an unexpected behaviour. It would not possible to know
293 # which coroutine would get the next data.
294 if self._waiter is not None:
295 raise RuntimeError(
296 "%s() called while another coroutine is "
297 "already waiting for incoming data" % func_name
298 )
300 waiter = self._waiter = self._loop.create_future()
301 try:
302 if self._timer:
303 with self._timer:
304 await waiter
305 else:
306 await waiter
307 finally:
308 self._waiter = None
310 async def readline(self) -> bytes:
311 return await self.readuntil()
313 async def readuntil(self, separator: bytes = b"\n") -> bytes:
314 seplen = len(separator)
315 if seplen == 0:
316 raise ValueError("Separator should be at least one-byte string")
318 if self._exception is not None:
319 raise self._exception
321 chunk = b""
322 chunk_size = 0
323 not_enough = True
325 while not_enough:
326 while self._buffer and not_enough:
327 offset = self._buffer_offset
328 ichar = self._buffer[0].find(separator, offset) + 1
329 # Read from current offset to found separator or to the end.
330 data = self._read_nowait_chunk(ichar - offset if ichar else -1)
331 chunk += data
332 chunk_size += len(data)
333 if ichar:
334 not_enough = False
336 if chunk_size > self._high_water:
337 raise ValueError("Chunk too big")
339 if self._eof:
340 break
342 if not_enough:
343 await self._wait("readuntil")
345 return chunk
347 async def read(self, n: int = -1) -> bytes:
348 if self._exception is not None:
349 raise self._exception
351 # migration problem; with DataQueue you have to catch
352 # EofStream exception, so common way is to run payload.read() inside
353 # infinite loop. what can cause real infinite loop with StreamReader
354 # lets keep this code one major release.
355 if __debug__:
356 if self._eof and not self._buffer:
357 self._eof_counter = getattr(self, "_eof_counter", 0) + 1
358 if self._eof_counter > 5:
359 internal_logger.warning(
360 "Multiple access to StreamReader in eof state, "
361 "might be infinite loop.",
362 stack_info=True,
363 )
365 if not n:
366 return b""
368 if n < 0:
369 # This used to just loop creating a new waiter hoping to
370 # collect everything in self._buffer, but that would
371 # deadlock if the subprocess sends more than self.limit
372 # bytes. So just call self.readany() until EOF.
373 blocks = []
374 while True:
375 block = await self.readany()
376 if not block:
377 break
378 blocks.append(block)
379 return b"".join(blocks)
381 # TODO: should be `if` instead of `while`
382 # because waiter maybe triggered on chunk end,
383 # without feeding any data
384 while not self._buffer and not self._eof:
385 await self._wait("read")
387 return self._read_nowait(n)
389 async def readany(self) -> bytes:
390 if self._exception is not None:
391 raise self._exception
393 # TODO: should be `if` instead of `while`
394 # because waiter maybe triggered on chunk end,
395 # without feeding any data
396 while not self._buffer and not self._eof:
397 await self._wait("readany")
399 return self._read_nowait(-1)
401 async def readchunk(self) -> Tuple[bytes, bool]:
402 """Returns a tuple of (data, end_of_http_chunk).
404 When chunked transfer
405 encoding is used, end_of_http_chunk is a boolean indicating if the end
406 of the data corresponds to the end of a HTTP chunk , otherwise it is
407 always False.
408 """
409 while True:
410 if self._exception is not None:
411 raise self._exception
413 while self._http_chunk_splits:
414 pos = self._http_chunk_splits.pop(0)
415 if pos == self._cursor:
416 return (b"", True)
417 if pos > self._cursor:
418 return (self._read_nowait(pos - self._cursor), True)
419 internal_logger.warning(
420 "Skipping HTTP chunk end due to data "
421 "consumption beyond chunk boundary"
422 )
424 if self._buffer:
425 return (self._read_nowait_chunk(-1), False)
426 # return (self._read_nowait(-1), False)
428 if self._eof:
429 # Special case for signifying EOF.
430 # (b'', True) is not a final return value actually.
431 return (b"", False)
433 await self._wait("readchunk")
435 async def readexactly(self, n: int) -> bytes:
436 if self._exception is not None:
437 raise self._exception
439 blocks: List[bytes] = []
440 while n > 0:
441 block = await self.read(n)
442 if not block:
443 partial = b"".join(blocks)
444 raise asyncio.IncompleteReadError(partial, len(partial) + n)
445 blocks.append(block)
446 n -= len(block)
448 return b"".join(blocks)
450 def read_nowait(self, n: int = -1) -> bytes:
451 # default was changed to be consistent with .read(-1)
452 #
453 # I believe the most users don't know about the method and
454 # they are not affected.
455 if self._exception is not None:
456 raise self._exception
458 if self._waiter and not self._waiter.done():
459 raise RuntimeError(
460 "Called while some coroutine is waiting for incoming data."
461 )
463 return self._read_nowait(n)
465 def _read_nowait_chunk(self, n: int) -> bytes:
466 first_buffer = self._buffer[0]
467 offset = self._buffer_offset
468 if n != -1 and len(first_buffer) - offset > n:
469 data = first_buffer[offset : offset + n]
470 self._buffer_offset += n
472 elif offset:
473 self._buffer.popleft()
474 data = first_buffer[offset:]
475 self._buffer_offset = 0
477 else:
478 data = self._buffer.popleft()
480 self._size -= len(data)
481 self._cursor += len(data)
483 chunk_splits = self._http_chunk_splits
484 # Prevent memory leak: drop useless chunk splits
485 while chunk_splits and chunk_splits[0] < self._cursor:
486 chunk_splits.pop(0)
488 if self._size < self._low_water and self._protocol._reading_paused:
489 self._protocol.resume_reading()
490 return data
492 def _read_nowait(self, n: int) -> bytes:
493 """Read not more than n bytes, or whole buffer if n == -1"""
494 chunks = []
496 while self._buffer:
497 chunk = self._read_nowait_chunk(n)
498 chunks.append(chunk)
499 if n != -1:
500 n -= len(chunk)
501 if n == 0:
502 break
504 return b"".join(chunks) if chunks else b""
507class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
508 def __init__(self) -> None:
509 pass
511 def exception(self) -> Optional[BaseException]:
512 return None
514 def set_exception(self, exc: BaseException) -> None:
515 pass
517 def on_eof(self, callback: Callable[[], None]) -> None:
518 try:
519 callback()
520 except Exception:
521 internal_logger.exception("Exception in eof callback")
523 def feed_eof(self) -> None:
524 pass
526 def is_eof(self) -> bool:
527 return True
529 def at_eof(self) -> bool:
530 return True
532 async def wait_eof(self) -> None:
533 return
535 def feed_data(self, data: bytes, n: int = 0) -> None:
536 pass
538 async def readline(self) -> bytes:
539 return b""
541 async def read(self, n: int = -1) -> bytes:
542 return b""
544 # TODO add async def readuntil
546 async def readany(self) -> bytes:
547 return b""
549 async def readchunk(self) -> Tuple[bytes, bool]:
550 return (b"", True)
552 async def readexactly(self, n: int) -> bytes:
553 raise asyncio.IncompleteReadError(b"", n)
555 def read_nowait(self, n: int = -1) -> bytes:
556 return b""
559EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
562class DataQueue(Generic[_T]):
563 """DataQueue is a general-purpose blocking queue with one reader."""
565 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
566 self._loop = loop
567 self._eof = False
568 self._waiter: Optional[asyncio.Future[None]] = None
569 self._exception: Optional[BaseException] = None
570 self._size = 0
571 self._buffer: Deque[Tuple[_T, int]] = collections.deque()
573 def __len__(self) -> int:
574 return len(self._buffer)
576 def is_eof(self) -> bool:
577 return self._eof
579 def at_eof(self) -> bool:
580 return self._eof and not self._buffer
582 def exception(self) -> Optional[BaseException]:
583 return self._exception
585 def set_exception(self, exc: BaseException) -> None:
586 self._eof = True
587 self._exception = exc
589 waiter = self._waiter
590 if waiter is not None:
591 self._waiter = None
592 set_exception(waiter, exc)
594 def feed_data(self, data: _T, size: int = 0) -> None:
595 self._size += size
596 self._buffer.append((data, size))
598 waiter = self._waiter
599 if waiter is not None:
600 self._waiter = None
601 set_result(waiter, None)
603 def feed_eof(self) -> None:
604 self._eof = True
606 waiter = self._waiter
607 if waiter is not None:
608 self._waiter = None
609 set_result(waiter, None)
611 async def read(self) -> _T:
612 if not self._buffer and not self._eof:
613 assert not self._waiter
614 self._waiter = self._loop.create_future()
615 try:
616 await self._waiter
617 except (asyncio.CancelledError, asyncio.TimeoutError):
618 self._waiter = None
619 raise
621 if self._buffer:
622 data, size = self._buffer.popleft()
623 self._size -= size
624 return data
625 else:
626 if self._exception is not None:
627 raise self._exception
628 else:
629 raise EofStream
631 def __aiter__(self) -> AsyncStreamIterator[_T]:
632 return AsyncStreamIterator(self.read)
635class FlowControlDataQueue(DataQueue[_T]):
636 """FlowControlDataQueue resumes and pauses an underlying stream.
638 It is a destination for parsed data.
639 """
641 def __init__(
642 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
643 ) -> None:
644 super().__init__(loop=loop)
646 self._protocol = protocol
647 self._limit = limit * 2
649 def feed_data(self, data: _T, size: int = 0) -> None:
650 super().feed_data(data, size)
652 if self._size > self._limit and not self._protocol._reading_paused:
653 self._protocol.pause_reading()
655 async def read(self) -> _T:
656 try:
657 return await super().read()
658 finally:
659 if self._size < self._limit and self._protocol._reading_paused:
660 self._protocol.resume_reading()