Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/streams/stapled.py: 54%

63 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1from __future__ import annotations 

2 

3from dataclasses import dataclass 

4from typing import Any, Callable, Generic, Mapping, Sequence, TypeVar 

5 

6from ..abc import ( 

7 ByteReceiveStream, 

8 ByteSendStream, 

9 ByteStream, 

10 Listener, 

11 ObjectReceiveStream, 

12 ObjectSendStream, 

13 ObjectStream, 

14 TaskGroup, 

15) 

16 

17T_Item = TypeVar("T_Item") 

18T_Stream = TypeVar("T_Stream") 

19 

20 

21@dataclass(eq=False) 

22class StapledByteStream(ByteStream): 

23 """ 

24 Combines two byte streams into a single, bidirectional byte stream. 

25 

26 Extra attributes will be provided from both streams, with the receive stream providing the 

27 values in case of a conflict. 

28 

29 :param ByteSendStream send_stream: the sending byte stream 

30 :param ByteReceiveStream receive_stream: the receiving byte stream 

31 """ 

32 

33 send_stream: ByteSendStream 

34 receive_stream: ByteReceiveStream 

35 

36 async def receive(self, max_bytes: int = 65536) -> bytes: 

37 return await self.receive_stream.receive(max_bytes) 

38 

39 async def send(self, item: bytes) -> None: 

40 await self.send_stream.send(item) 

41 

42 async def send_eof(self) -> None: 

43 await self.send_stream.aclose() 

44 

45 async def aclose(self) -> None: 

46 await self.send_stream.aclose() 

47 await self.receive_stream.aclose() 

48 

49 @property 

50 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: 

51 return { 

52 **self.send_stream.extra_attributes, 

53 **self.receive_stream.extra_attributes, 

54 } 

55 

56 

57@dataclass(eq=False) 

58class StapledObjectStream(Generic[T_Item], ObjectStream[T_Item]): 

59 """ 

60 Combines two object streams into a single, bidirectional object stream. 

61 

62 Extra attributes will be provided from both streams, with the receive stream providing the 

63 values in case of a conflict. 

64 

65 :param ObjectSendStream send_stream: the sending object stream 

66 :param ObjectReceiveStream receive_stream: the receiving object stream 

67 """ 

68 

69 send_stream: ObjectSendStream[T_Item] 

70 receive_stream: ObjectReceiveStream[T_Item] 

71 

72 async def receive(self) -> T_Item: 

73 return await self.receive_stream.receive() 

74 

75 async def send(self, item: T_Item) -> None: 

76 await self.send_stream.send(item) 

77 

78 async def send_eof(self) -> None: 

79 await self.send_stream.aclose() 

80 

81 async def aclose(self) -> None: 

82 await self.send_stream.aclose() 

83 await self.receive_stream.aclose() 

84 

85 @property 

86 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: 

87 return { 

88 **self.send_stream.extra_attributes, 

89 **self.receive_stream.extra_attributes, 

90 } 

91 

92 

93@dataclass(eq=False) 

94class MultiListener(Generic[T_Stream], Listener[T_Stream]): 

95 """ 

96 Combines multiple listeners into one, serving connections from all of them at once. 

97 

98 Any MultiListeners in the given collection of listeners will have their listeners moved into 

99 this one. 

100 

101 Extra attributes are provided from each listener, with each successive listener overriding any 

102 conflicting attributes from the previous one. 

103 

104 :param listeners: listeners to serve 

105 :type listeners: Sequence[Listener[T_Stream]] 

106 """ 

107 

108 listeners: Sequence[Listener[T_Stream]] 

109 

110 def __post_init__(self) -> None: 

111 listeners: list[Listener[T_Stream]] = [] 

112 for listener in self.listeners: 

113 if isinstance(listener, MultiListener): 

114 listeners.extend(listener.listeners) 

115 del listener.listeners[:] # type: ignore[attr-defined] 

116 else: 

117 listeners.append(listener) 

118 

119 self.listeners = listeners 

120 

121 async def serve( 

122 self, handler: Callable[[T_Stream], Any], task_group: TaskGroup | None = None 

123 ) -> None: 

124 from .. import create_task_group 

125 

126 async with create_task_group() as tg: 

127 for listener in self.listeners: 

128 tg.start_soon(listener.serve, handler, task_group) 

129 

130 async def aclose(self) -> None: 

131 for listener in self.listeners: 

132 await listener.aclose() 

133 

134 @property 

135 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: 

136 attributes: dict = {} 

137 for listener in self.listeners: 

138 attributes.update(listener.extra_attributes) 

139 

140 return attributes