1from __future__ import annotations
2
3import math
4from typing import Tuple, TypeVar
5from warnings import warn
6
7from ..streams.memory import (
8 MemoryObjectReceiveStream,
9 MemoryObjectSendStream,
10 MemoryObjectStreamState,
11)
12
13T_Item = TypeVar("T_Item")
14
15
16class create_memory_object_stream(
17 Tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]],
18):
19 """
20 Create a memory object stream.
21
22 The stream's item type can be annotated like
23 :func:`create_memory_object_stream[T_Item]`.
24
25 :param max_buffer_size: number of items held in the buffer until ``send()`` starts
26 blocking
27 :param item_type: old way of marking the streams with the right generic type for
28 static typing (does nothing on AnyIO 4)
29
30 .. deprecated:: 4.0
31 Use ``create_memory_object_stream[YourItemType](...)`` instead.
32 :return: a tuple of (send stream, receive stream)
33
34 """
35
36 def __new__( # type: ignore[misc]
37 cls, max_buffer_size: float = 0, item_type: object = None
38 ) -> tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]]:
39 if max_buffer_size != math.inf and not isinstance(max_buffer_size, int):
40 raise ValueError("max_buffer_size must be either an integer or math.inf")
41 if max_buffer_size < 0:
42 raise ValueError("max_buffer_size cannot be negative")
43 if item_type is not None:
44 warn(
45 "The item_type argument has been deprecated in AnyIO 4.0. "
46 "Use create_memory_object_stream[YourItemType](...) instead.",
47 DeprecationWarning,
48 stacklevel=2,
49 )
50
51 state = MemoryObjectStreamState[T_Item](max_buffer_size)
52 return (MemoryObjectSendStream(state), MemoryObjectReceiveStream(state))