Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/streams/stapled.py: 53%

62 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1from dataclasses import dataclass 

2from typing import Any, Callable, Generic, List, Mapping, Optional, Sequence, TypeVar 

3 

4from ..abc import ( 

5 ByteReceiveStream, 

6 ByteSendStream, 

7 ByteStream, 

8 Listener, 

9 ObjectReceiveStream, 

10 ObjectSendStream, 

11 ObjectStream, 

12 TaskGroup, 

13) 

14 

15T_Item = TypeVar("T_Item") 

16T_Stream = TypeVar("T_Stream") 

17 

18 

19@dataclass(eq=False) 

20class StapledByteStream(ByteStream): 

21 """ 

22 Combines two byte streams into a single, bidirectional byte stream. 

23 

24 Extra attributes will be provided from both streams, with the receive stream providing the 

25 values in case of a conflict. 

26 

27 :param ByteSendStream send_stream: the sending byte stream 

28 :param ByteReceiveStream receive_stream: the receiving byte stream 

29 """ 

30 

31 send_stream: ByteSendStream 

32 receive_stream: ByteReceiveStream 

33 

34 async def receive(self, max_bytes: int = 65536) -> bytes: 

35 return await self.receive_stream.receive(max_bytes) 

36 

37 async def send(self, item: bytes) -> None: 

38 await self.send_stream.send(item) 

39 

40 async def send_eof(self) -> None: 

41 await self.send_stream.aclose() 

42 

43 async def aclose(self) -> None: 

44 await self.send_stream.aclose() 

45 await self.receive_stream.aclose() 

46 

47 @property 

48 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: 

49 return { 

50 **self.send_stream.extra_attributes, 

51 **self.receive_stream.extra_attributes, 

52 } 

53 

54 

55@dataclass(eq=False) 

56class StapledObjectStream(Generic[T_Item], ObjectStream[T_Item]): 

57 """ 

58 Combines two object streams into a single, bidirectional object stream. 

59 

60 Extra attributes will be provided from both streams, with the receive stream providing the 

61 values in case of a conflict. 

62 

63 :param ObjectSendStream send_stream: the sending object stream 

64 :param ObjectReceiveStream receive_stream: the receiving object stream 

65 """ 

66 

67 send_stream: ObjectSendStream[T_Item] 

68 receive_stream: ObjectReceiveStream[T_Item] 

69 

70 async def receive(self) -> T_Item: 

71 return await self.receive_stream.receive() 

72 

73 async def send(self, item: T_Item) -> None: 

74 await self.send_stream.send(item) 

75 

76 async def send_eof(self) -> None: 

77 await self.send_stream.aclose() 

78 

79 async def aclose(self) -> None: 

80 await self.send_stream.aclose() 

81 await self.receive_stream.aclose() 

82 

83 @property 

84 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: 

85 return { 

86 **self.send_stream.extra_attributes, 

87 **self.receive_stream.extra_attributes, 

88 } 

89 

90 

91@dataclass(eq=False) 

92class MultiListener(Generic[T_Stream], Listener[T_Stream]): 

93 """ 

94 Combines multiple listeners into one, serving connections from all of them at once. 

95 

96 Any MultiListeners in the given collection of listeners will have their listeners moved into 

97 this one. 

98 

99 Extra attributes are provided from each listener, with each successive listener overriding any 

100 conflicting attributes from the previous one. 

101 

102 :param listeners: listeners to serve 

103 :type listeners: Sequence[Listener[T_Stream]] 

104 """ 

105 

106 listeners: Sequence[Listener[T_Stream]] 

107 

108 def __post_init__(self) -> None: 

109 listeners: List[Listener[T_Stream]] = [] 

110 for listener in self.listeners: 

111 if isinstance(listener, MultiListener): 

112 listeners.extend(listener.listeners) 

113 del listener.listeners[:] # type: ignore[attr-defined] 

114 else: 

115 listeners.append(listener) 

116 

117 self.listeners = listeners 

118 

119 async def serve( 

120 self, handler: Callable[[T_Stream], Any], task_group: Optional[TaskGroup] = None 

121 ) -> None: 

122 from .. import create_task_group 

123 

124 async with create_task_group() as tg: 

125 for listener in self.listeners: 

126 tg.start_soon(listener.serve, handler, task_group) 

127 

128 async def aclose(self) -> None: 

129 for listener in self.listeners: 

130 await listener.aclose() 

131 

132 @property 

133 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: 

134 attributes: dict = {} 

135 for listener in self.listeners: 

136 attributes.update(listener.extra_attributes) 

137 

138 return attributes