Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/streams.py: 21%
396 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1import asyncio
2import collections
3import warnings
4from typing import (
5 Awaitable,
6 Callable,
7 Deque,
8 Final,
9 Generic,
10 List,
11 Optional,
12 Tuple,
13 TypeVar,
14)
16from .base_protocol import BaseProtocol
17from .helpers import BaseTimerContext, TimerNoop, set_exception, set_result
18from .log import internal_logger
20__all__ = (
21 "EMPTY_PAYLOAD",
22 "EofStream",
23 "StreamReader",
24 "DataQueue",
25 "FlowControlDataQueue",
26)
28_T = TypeVar("_T")
31class EofStream(Exception):
32 """eof stream indication."""
35class AsyncStreamIterator(Generic[_T]):
36 def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
37 self.read_func = read_func
39 def __aiter__(self) -> "AsyncStreamIterator[_T]":
40 return self
42 async def __anext__(self) -> _T:
43 try:
44 rv = await self.read_func()
45 except EofStream:
46 raise StopAsyncIteration
47 if rv == b"":
48 raise StopAsyncIteration
49 return rv
52class ChunkTupleAsyncStreamIterator:
53 def __init__(self, stream: "StreamReader") -> None:
54 self._stream = stream
56 def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
57 return self
59 async def __anext__(self) -> Tuple[bytes, bool]:
60 rv = await self._stream.readchunk()
61 if rv == (b"", False):
62 raise StopAsyncIteration
63 return rv
66class AsyncStreamReaderMixin:
67 def __aiter__(self) -> AsyncStreamIterator[bytes]:
68 return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
70 def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
71 """Returns an asynchronous iterator that yields chunks of size n."""
72 return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined]
74 def iter_any(self) -> AsyncStreamIterator[bytes]:
75 """Yield all available data as soon as it is received."""
76 return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
78 def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
79 """Yield chunks of data as they are received by the server.
81 The yielded objects are tuples
82 of (bytes, bool) as returned by the StreamReader.readchunk method.
83 """
84 return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
87class StreamReader(AsyncStreamReaderMixin):
88 """An enhancement of asyncio.StreamReader.
90 Supports asynchronous iteration by line, chunk or as available::
92 async for line in reader:
93 ...
94 async for chunk in reader.iter_chunked(1024):
95 ...
96 async for slice in reader.iter_any():
97 ...
99 """
101 total_bytes = 0
103 def __init__(
104 self,
105 protocol: BaseProtocol,
106 limit: int,
107 *,
108 timer: Optional[BaseTimerContext] = None,
109 loop: Optional[asyncio.AbstractEventLoop] = None,
110 ) -> None:
111 self._protocol = protocol
112 self._low_water = limit
113 self._high_water = limit * 2
114 if loop is None:
115 loop = asyncio.get_event_loop()
116 self._loop = loop
117 self._size = 0
118 self._cursor = 0
119 self._http_chunk_splits: Optional[List[int]] = None
120 self._buffer: Deque[bytes] = collections.deque()
121 self._buffer_offset = 0
122 self._eof = False
123 self._waiter: Optional[asyncio.Future[None]] = None
124 self._eof_waiter: Optional[asyncio.Future[None]] = None
125 self._exception: Optional[BaseException] = None
126 self._timer = TimerNoop() if timer is None else timer
127 self._eof_callbacks: List[Callable[[], None]] = []
129 def __repr__(self) -> str:
130 info = [self.__class__.__name__]
131 if self._size:
132 info.append("%d bytes" % self._size)
133 if self._eof:
134 info.append("eof")
135 if self._low_water != 2**16: # default limit
136 info.append("low=%d high=%d" % (self._low_water, self._high_water))
137 if self._waiter:
138 info.append("w=%r" % self._waiter)
139 if self._exception:
140 info.append("e=%r" % self._exception)
141 return "<%s>" % " ".join(info)
143 def get_read_buffer_limits(self) -> Tuple[int, int]:
144 return (self._low_water, self._high_water)
146 def exception(self) -> Optional[BaseException]:
147 return self._exception
149 def set_exception(self, exc: BaseException) -> None:
150 self._exception = exc
151 self._eof_callbacks.clear()
153 waiter = self._waiter
154 if waiter is not None:
155 self._waiter = None
156 set_exception(waiter, exc)
158 waiter = self._eof_waiter
159 if waiter is not None:
160 self._eof_waiter = None
161 set_exception(waiter, exc)
163 def on_eof(self, callback: Callable[[], None]) -> None:
164 if self._eof:
165 try:
166 callback()
167 except Exception:
168 internal_logger.exception("Exception in eof callback")
169 else:
170 self._eof_callbacks.append(callback)
172 def feed_eof(self) -> None:
173 self._eof = True
175 waiter = self._waiter
176 if waiter is not None:
177 self._waiter = None
178 set_result(waiter, None)
180 waiter = self._eof_waiter
181 if waiter is not None:
182 self._eof_waiter = None
183 set_result(waiter, None)
185 for cb in self._eof_callbacks:
186 try:
187 cb()
188 except Exception:
189 internal_logger.exception("Exception in eof callback")
191 self._eof_callbacks.clear()
193 def is_eof(self) -> bool:
194 """Return True if 'feed_eof' was called."""
195 return self._eof
197 def at_eof(self) -> bool:
198 """Return True if the buffer is empty and 'feed_eof' was called."""
199 return self._eof and not self._buffer
201 async def wait_eof(self) -> None:
202 if self._eof:
203 return
205 assert self._eof_waiter is None
206 self._eof_waiter = self._loop.create_future()
207 try:
208 await self._eof_waiter
209 finally:
210 self._eof_waiter = None
212 def unread_data(self, data: bytes) -> None:
213 """rollback reading some data from stream, inserting it to buffer head."""
214 warnings.warn(
215 "unread_data() is deprecated "
216 "and will be removed in future releases (#3260)",
217 DeprecationWarning,
218 stacklevel=2,
219 )
220 if not data:
221 return
223 if self._buffer_offset:
224 self._buffer[0] = self._buffer[0][self._buffer_offset :]
225 self._buffer_offset = 0
226 self._size += len(data)
227 self._cursor -= len(data)
228 self._buffer.appendleft(data)
229 self._eof_counter = 0
231 # TODO: size is ignored, remove the param later
232 def feed_data(self, data: bytes, size: int = 0) -> None:
233 assert not self._eof, "feed_data after feed_eof"
235 if not data:
236 return
238 self._size += len(data)
239 self._buffer.append(data)
240 self.total_bytes += len(data)
242 waiter = self._waiter
243 if waiter is not None:
244 self._waiter = None
245 set_result(waiter, None)
247 if self._size > self._high_water and not self._protocol._reading_paused:
248 self._protocol.pause_reading()
250 def begin_http_chunk_receiving(self) -> None:
251 if self._http_chunk_splits is None:
252 if self.total_bytes:
253 raise RuntimeError(
254 "Called begin_http_chunk_receiving when" "some data was already fed"
255 )
256 self._http_chunk_splits = []
258 def end_http_chunk_receiving(self) -> None:
259 if self._http_chunk_splits is None:
260 raise RuntimeError(
261 "Called end_chunk_receiving without calling "
262 "begin_chunk_receiving first"
263 )
265 # self._http_chunk_splits contains logical byte offsets from start of
266 # the body transfer. Each offset is the offset of the end of a chunk.
267 # "Logical" means bytes, accessible for a user.
268 # If no chunks containing logical data were received, current position
269 # is difinitely zero.
270 pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
272 if self.total_bytes == pos:
273 # We should not add empty chunks here. So we check for that.
274 # Note, when chunked + gzip is used, we can receive a chunk
275 # of compressed data, but that data may not be enough for gzip FSM
276 # to yield any uncompressed data. That's why current position may
277 # not change after receiving a chunk.
278 return
280 self._http_chunk_splits.append(self.total_bytes)
282 # wake up readchunk when end of http chunk received
283 waiter = self._waiter
284 if waiter is not None:
285 self._waiter = None
286 set_result(waiter, None)
288 async def _wait(self, func_name: str) -> None:
289 # StreamReader uses a future to link the protocol feed_data() method
290 # to a read coroutine. Running two read coroutines at the same time
291 # would have an unexpected behaviour. It would not possible to know
292 # which coroutine would get the next data.
293 if self._waiter is not None:
294 raise RuntimeError(
295 "%s() called while another coroutine is "
296 "already waiting for incoming data" % func_name
297 )
299 waiter = self._waiter = self._loop.create_future()
300 try:
301 with self._timer:
302 await waiter
303 finally:
304 self._waiter = None
306 async def readline(self) -> bytes:
307 return await self.readuntil()
309 async def readuntil(self, separator: bytes = b"\n") -> bytes:
310 seplen = len(separator)
311 if seplen == 0:
312 raise ValueError("Separator should be at least one-byte string")
314 if self._exception is not None:
315 raise self._exception
317 chunk = b""
318 chunk_size = 0
319 not_enough = True
321 while not_enough:
322 while self._buffer and not_enough:
323 offset = self._buffer_offset
324 ichar = self._buffer[0].find(separator, offset) + 1
325 # Read from current offset to found separator or to the end.
326 data = self._read_nowait_chunk(
327 ichar - offset + seplen - 1 if ichar else -1
328 )
329 chunk += data
330 chunk_size += len(data)
331 if ichar:
332 not_enough = False
334 if chunk_size > self._high_water:
335 raise ValueError("Chunk too big")
337 if self._eof:
338 break
340 if not_enough:
341 await self._wait("readuntil")
343 return chunk
345 async def read(self, n: int = -1) -> bytes:
346 if self._exception is not None:
347 raise self._exception
349 # migration problem; with DataQueue you have to catch
350 # EofStream exception, so common way is to run payload.read() inside
351 # infinite loop. what can cause real infinite loop with StreamReader
352 # lets keep this code one major release.
353 if __debug__:
354 if self._eof and not self._buffer:
355 self._eof_counter = getattr(self, "_eof_counter", 0) + 1
356 if self._eof_counter > 5:
357 internal_logger.warning(
358 "Multiple access to StreamReader in eof state, "
359 "might be infinite loop.",
360 stack_info=True,
361 )
363 if not n:
364 return b""
366 if n < 0:
367 # This used to just loop creating a new waiter hoping to
368 # collect everything in self._buffer, but that would
369 # deadlock if the subprocess sends more than self.limit
370 # bytes. So just call self.readany() until EOF.
371 blocks = []
372 while True:
373 block = await self.readany()
374 if not block:
375 break
376 blocks.append(block)
377 return b"".join(blocks)
379 # TODO: should be `if` instead of `while`
380 # because waiter maybe triggered on chunk end,
381 # without feeding any data
382 while not self._buffer and not self._eof:
383 await self._wait("read")
385 return self._read_nowait(n)
387 async def readany(self) -> bytes:
388 if self._exception is not None:
389 raise self._exception
391 # TODO: should be `if` instead of `while`
392 # because waiter maybe triggered on chunk end,
393 # without feeding any data
394 while not self._buffer and not self._eof:
395 await self._wait("readany")
397 return self._read_nowait(-1)
399 async def readchunk(self) -> Tuple[bytes, bool]:
400 """Returns a tuple of (data, end_of_http_chunk).
402 When chunked transfer
403 encoding is used, end_of_http_chunk is a boolean indicating if the end
404 of the data corresponds to the end of a HTTP chunk , otherwise it is
405 always False.
406 """
407 while True:
408 if self._exception is not None:
409 raise self._exception
411 while self._http_chunk_splits:
412 pos = self._http_chunk_splits.pop(0)
413 if pos == self._cursor:
414 return (b"", True)
415 if pos > self._cursor:
416 return (self._read_nowait(pos - self._cursor), True)
417 internal_logger.warning(
418 "Skipping HTTP chunk end due to data "
419 "consumption beyond chunk boundary"
420 )
422 if self._buffer:
423 return (self._read_nowait_chunk(-1), False)
424 # return (self._read_nowait(-1), False)
426 if self._eof:
427 # Special case for signifying EOF.
428 # (b'', True) is not a final return value actually.
429 return (b"", False)
431 await self._wait("readchunk")
433 async def readexactly(self, n: int) -> bytes:
434 if self._exception is not None:
435 raise self._exception
437 blocks: List[bytes] = []
438 while n > 0:
439 block = await self.read(n)
440 if not block:
441 partial = b"".join(blocks)
442 raise asyncio.IncompleteReadError(partial, len(partial) + n)
443 blocks.append(block)
444 n -= len(block)
446 return b"".join(blocks)
448 def read_nowait(self, n: int = -1) -> bytes:
449 # default was changed to be consistent with .read(-1)
450 #
451 # I believe the most users don't know about the method and
452 # they are not affected.
453 if self._exception is not None:
454 raise self._exception
456 if self._waiter and not self._waiter.done():
457 raise RuntimeError(
458 "Called while some coroutine is waiting for incoming data."
459 )
461 return self._read_nowait(n)
463 def _read_nowait_chunk(self, n: int) -> bytes:
464 first_buffer = self._buffer[0]
465 offset = self._buffer_offset
466 if n != -1 and len(first_buffer) - offset > n:
467 data = first_buffer[offset : offset + n]
468 self._buffer_offset += n
470 elif offset:
471 self._buffer.popleft()
472 data = first_buffer[offset:]
473 self._buffer_offset = 0
475 else:
476 data = self._buffer.popleft()
478 self._size -= len(data)
479 self._cursor += len(data)
481 chunk_splits = self._http_chunk_splits
482 # Prevent memory leak: drop useless chunk splits
483 while chunk_splits and chunk_splits[0] < self._cursor:
484 chunk_splits.pop(0)
486 if self._size < self._low_water and self._protocol._reading_paused:
487 self._protocol.resume_reading()
488 return data
490 def _read_nowait(self, n: int) -> bytes:
491 """Read not more than n bytes, or whole buffer if n == -1"""
492 self._timer.assert_timeout()
494 chunks = []
495 while self._buffer:
496 chunk = self._read_nowait_chunk(n)
497 chunks.append(chunk)
498 if n != -1:
499 n -= len(chunk)
500 if n == 0:
501 break
503 return b"".join(chunks) if chunks else b""
506class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
507 def __init__(self) -> None:
508 self._read_eof_chunk = False
510 def __repr__(self) -> str:
511 return "<%s>" % self.__class__.__name__
513 def exception(self) -> Optional[BaseException]:
514 return None
516 def set_exception(self, exc: BaseException) -> None:
517 pass
519 def on_eof(self, callback: Callable[[], None]) -> None:
520 try:
521 callback()
522 except Exception:
523 internal_logger.exception("Exception in eof callback")
525 def feed_eof(self) -> None:
526 pass
528 def is_eof(self) -> bool:
529 return True
531 def at_eof(self) -> bool:
532 return True
534 async def wait_eof(self) -> None:
535 return
537 def feed_data(self, data: bytes, n: int = 0) -> None:
538 pass
540 async def readline(self) -> bytes:
541 return b""
543 async def read(self, n: int = -1) -> bytes:
544 return b""
546 # TODO add async def readuntil
548 async def readany(self) -> bytes:
549 return b""
551 async def readchunk(self) -> Tuple[bytes, bool]:
552 if not self._read_eof_chunk:
553 self._read_eof_chunk = True
554 return (b"", False)
556 return (b"", True)
558 async def readexactly(self, n: int) -> bytes:
559 raise asyncio.IncompleteReadError(b"", n)
561 def read_nowait(self, n: int = -1) -> bytes:
562 return b""
565EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
568class DataQueue(Generic[_T]):
569 """DataQueue is a general-purpose blocking queue with one reader."""
571 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
572 self._loop = loop
573 self._eof = False
574 self._waiter: Optional[asyncio.Future[None]] = None
575 self._exception: Optional[BaseException] = None
576 self._size = 0
577 self._buffer: Deque[Tuple[_T, int]] = collections.deque()
579 def __len__(self) -> int:
580 return len(self._buffer)
582 def is_eof(self) -> bool:
583 return self._eof
585 def at_eof(self) -> bool:
586 return self._eof and not self._buffer
588 def exception(self) -> Optional[BaseException]:
589 return self._exception
591 def set_exception(self, exc: BaseException) -> None:
592 self._eof = True
593 self._exception = exc
595 waiter = self._waiter
596 if waiter is not None:
597 self._waiter = None
598 set_exception(waiter, exc)
600 def feed_data(self, data: _T, size: int = 0) -> None:
601 self._size += size
602 self._buffer.append((data, size))
604 waiter = self._waiter
605 if waiter is not None:
606 self._waiter = None
607 set_result(waiter, None)
609 def feed_eof(self) -> None:
610 self._eof = True
612 waiter = self._waiter
613 if waiter is not None:
614 self._waiter = None
615 set_result(waiter, None)
617 async def read(self) -> _T:
618 if not self._buffer and not self._eof:
619 assert not self._waiter
620 self._waiter = self._loop.create_future()
621 try:
622 await self._waiter
623 except (asyncio.CancelledError, asyncio.TimeoutError):
624 self._waiter = None
625 raise
627 if self._buffer:
628 data, size = self._buffer.popleft()
629 self._size -= size
630 return data
631 else:
632 if self._exception is not None:
633 raise self._exception
634 else:
635 raise EofStream
637 def __aiter__(self) -> AsyncStreamIterator[_T]:
638 return AsyncStreamIterator(self.read)
641class FlowControlDataQueue(DataQueue[_T]):
642 """FlowControlDataQueue resumes and pauses an underlying stream.
644 It is a destination for parsed data.
645 """
647 def __init__(
648 self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
649 ) -> None:
650 super().__init__(loop=loop)
652 self._protocol = protocol
653 self._limit = limit * 2
655 def feed_data(self, data: _T, size: int = 0) -> None:
656 super().feed_data(data, size)
658 if self._size > self._limit and not self._protocol._reading_paused:
659 self._protocol.pause_reading()
661 async def read(self) -> _T:
662 try:
663 return await super().read()
664 finally:
665 if self._size < self._limit and self._protocol._reading_paused:
666 self._protocol.resume_reading()