Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_streams.py: 53%

17 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import math 

2from typing import Any, Optional, Tuple, Type, TypeVar, overload 

3 

4from ..streams.memory import ( 

5 MemoryObjectReceiveStream, 

6 MemoryObjectSendStream, 

7 MemoryObjectStreamState, 

8) 

9 

10T_Item = TypeVar("T_Item") 

11 

12 

13@overload 

14def create_memory_object_stream( 

15 max_buffer_size: float, item_type: Type[T_Item] 

16) -> Tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]]: 

17 ... 

18 

19 

20@overload 

21def create_memory_object_stream( 

22 max_buffer_size: float = 0, 

23) -> Tuple[MemoryObjectSendStream[Any], MemoryObjectReceiveStream[Any]]: 

24 ... 

25 

26 

27def create_memory_object_stream( 

28 max_buffer_size: float = 0, item_type: Optional[Type[T_Item]] = None 

29) -> Tuple[MemoryObjectSendStream[Any], MemoryObjectReceiveStream[Any]]: 

30 """ 

31 Create a memory object stream. 

32 

33 :param max_buffer_size: number of items held in the buffer until ``send()`` starts blocking 

34 :param item_type: type of item, for marking the streams with the right generic type for 

35 static typing (not used at run time) 

36 :return: a tuple of (send stream, receive stream) 

37 

38 """ 

39 if max_buffer_size != math.inf and not isinstance(max_buffer_size, int): 

40 raise ValueError("max_buffer_size must be either an integer or math.inf") 

41 if max_buffer_size < 0: 

42 raise ValueError("max_buffer_size cannot be negative") 

43 

44 state: MemoryObjectStreamState = MemoryObjectStreamState(max_buffer_size) 

45 return MemoryObjectSendStream(state), MemoryObjectReceiveStream(state)