Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_streams.py: 56%
18 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1from __future__ import annotations
3import math
4from typing import Any, TypeVar, overload
6from ..streams.memory import (
7 MemoryObjectReceiveStream,
8 MemoryObjectSendStream,
9 MemoryObjectStreamState,
10)
12T_Item = TypeVar("T_Item")
15@overload
16def create_memory_object_stream(
17 max_buffer_size: float = ...,
18) -> tuple[MemoryObjectSendStream[Any], MemoryObjectReceiveStream[Any]]:
19 ...
22@overload
23def create_memory_object_stream(
24 max_buffer_size: float = ..., item_type: type[T_Item] = ...
25) -> tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]]:
26 ...
29def create_memory_object_stream(
30 max_buffer_size: float = 0, item_type: type[T_Item] | None = None
31) -> tuple[MemoryObjectSendStream[Any], MemoryObjectReceiveStream[Any]]:
32 """
33 Create a memory object stream.
35 :param max_buffer_size: number of items held in the buffer until ``send()`` starts blocking
36 :param item_type: type of item, for marking the streams with the right generic type for
37 static typing (not used at run time)
38 :return: a tuple of (send stream, receive stream)
40 """
41 if max_buffer_size != math.inf and not isinstance(max_buffer_size, int):
42 raise ValueError("max_buffer_size must be either an integer or math.inf")
43 if max_buffer_size < 0:
44 raise ValueError("max_buffer_size cannot be negative")
46 state: MemoryObjectStreamState = MemoryObjectStreamState(max_buffer_size)
47 return MemoryObjectSendStream(state), MemoryObjectReceiveStream(state)