Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_streams.py: 53%
17 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import math
2from typing import Any, Optional, Tuple, Type, TypeVar, overload
4from ..streams.memory import (
5 MemoryObjectReceiveStream,
6 MemoryObjectSendStream,
7 MemoryObjectStreamState,
8)
10T_Item = TypeVar("T_Item")
13@overload
14def create_memory_object_stream(
15 max_buffer_size: float, item_type: Type[T_Item]
16) -> Tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]]:
17 ...
20@overload
21def create_memory_object_stream(
22 max_buffer_size: float = 0,
23) -> Tuple[MemoryObjectSendStream[Any], MemoryObjectReceiveStream[Any]]:
24 ...
27def create_memory_object_stream(
28 max_buffer_size: float = 0, item_type: Optional[Type[T_Item]] = None
29) -> Tuple[MemoryObjectSendStream[Any], MemoryObjectReceiveStream[Any]]:
30 """
31 Create a memory object stream.
33 :param max_buffer_size: number of items held in the buffer until ``send()`` starts blocking
34 :param item_type: type of item, for marking the streams with the right generic type for
35 static typing (not used at run time)
36 :return: a tuple of (send stream, receive stream)
38 """
39 if max_buffer_size != math.inf and not isinstance(max_buffer_size, int):
40 raise ValueError("max_buffer_size must be either an integer or math.inf")
41 if max_buffer_size < 0:
42 raise ValueError("max_buffer_size cannot be negative")
44 state: MemoryObjectStreamState = MemoryObjectStreamState(max_buffer_size)
45 return MemoryObjectSendStream(state), MemoryObjectReceiveStream(state)