Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_streams.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

16 statements  

1from __future__ import annotations 

2 

3import math 

4from typing import Tuple, TypeVar 

5from warnings import warn 

6 

7from ..streams.memory import ( 

8 MemoryObjectReceiveStream, 

9 MemoryObjectSendStream, 

10 MemoryObjectStreamState, 

11) 

12 

13T_Item = TypeVar("T_Item") 

14 

15 

16class create_memory_object_stream( 

17 Tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]], 

18): 

19 """ 

20 Create a memory object stream. 

21 

22 The stream's item type can be annotated like 

23 :func:`create_memory_object_stream[T_Item]`. 

24 

25 :param max_buffer_size: number of items held in the buffer until ``send()`` starts 

26 blocking 

27 :param item_type: old way of marking the streams with the right generic type for 

28 static typing (does nothing on AnyIO 4) 

29 

30 .. deprecated:: 4.0 

31 Use ``create_memory_object_stream[YourItemType](...)`` instead. 

32 :return: a tuple of (send stream, receive stream) 

33 

34 """ 

35 

36 def __new__( # type: ignore[misc] 

37 cls, max_buffer_size: float = 0, item_type: object = None 

38 ) -> tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]]: 

39 if max_buffer_size != math.inf and not isinstance(max_buffer_size, int): 

40 raise ValueError("max_buffer_size must be either an integer or math.inf") 

41 if max_buffer_size < 0: 

42 raise ValueError("max_buffer_size cannot be negative") 

43 if item_type is not None: 

44 warn( 

45 "The item_type argument has been deprecated in AnyIO 4.0. " 

46 "Use create_memory_object_stream[YourItemType](...) instead.", 

47 DeprecationWarning, 

48 stacklevel=2, 

49 ) 

50 

51 state = MemoryObjectStreamState[T_Item](max_buffer_size) 

52 return (MemoryObjectSendStream(state), MemoryObjectReceiveStream(state))