Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_streams.py: 56%

18 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1from __future__ import annotations 

2 

3import math 

4from typing import Any, TypeVar, overload 

5 

6from ..streams.memory import ( 

7 MemoryObjectReceiveStream, 

8 MemoryObjectSendStream, 

9 MemoryObjectStreamState, 

10) 

11 

12T_Item = TypeVar("T_Item") 

13 

14 

15@overload 

16def create_memory_object_stream( 

17 max_buffer_size: float = ..., 

18) -> tuple[MemoryObjectSendStream[Any], MemoryObjectReceiveStream[Any]]: 

19 ... 

20 

21 

22@overload 

23def create_memory_object_stream( 

24 max_buffer_size: float = ..., item_type: type[T_Item] = ... 

25) -> tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]]: 

26 ... 

27 

28 

29def create_memory_object_stream( 

30 max_buffer_size: float = 0, item_type: type[T_Item] | None = None 

31) -> tuple[MemoryObjectSendStream[Any], MemoryObjectReceiveStream[Any]]: 

32 """ 

33 Create a memory object stream. 

34 

35 :param max_buffer_size: number of items held in the buffer until ``send()`` starts blocking 

36 :param item_type: type of item, for marking the streams with the right generic type for 

37 static typing (not used at run time) 

38 :return: a tuple of (send stream, receive stream) 

39 

40 """ 

41 if max_buffer_size != math.inf and not isinstance(max_buffer_size, int): 

42 raise ValueError("max_buffer_size must be either an integer or math.inf") 

43 if max_buffer_size < 0: 

44 raise ValueError("max_buffer_size cannot be negative") 

45 

46 state: MemoryObjectStreamState = MemoryObjectStreamState(max_buffer_size) 

47 return MemoryObjectSendStream(state), MemoryObjectReceiveStream(state)