Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/streams/stapled.py: 53%
62 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1from dataclasses import dataclass
2from typing import Any, Callable, Generic, List, Mapping, Optional, Sequence, TypeVar
4from ..abc import (
5 ByteReceiveStream,
6 ByteSendStream,
7 ByteStream,
8 Listener,
9 ObjectReceiveStream,
10 ObjectSendStream,
11 ObjectStream,
12 TaskGroup,
13)
15T_Item = TypeVar("T_Item")
16T_Stream = TypeVar("T_Stream")
19@dataclass(eq=False)
20class StapledByteStream(ByteStream):
21 """
22 Combines two byte streams into a single, bidirectional byte stream.
24 Extra attributes will be provided from both streams, with the receive stream providing the
25 values in case of a conflict.
27 :param ByteSendStream send_stream: the sending byte stream
28 :param ByteReceiveStream receive_stream: the receiving byte stream
29 """
31 send_stream: ByteSendStream
32 receive_stream: ByteReceiveStream
34 async def receive(self, max_bytes: int = 65536) -> bytes:
35 return await self.receive_stream.receive(max_bytes)
37 async def send(self, item: bytes) -> None:
38 await self.send_stream.send(item)
40 async def send_eof(self) -> None:
41 await self.send_stream.aclose()
43 async def aclose(self) -> None:
44 await self.send_stream.aclose()
45 await self.receive_stream.aclose()
47 @property
48 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
49 return {
50 **self.send_stream.extra_attributes,
51 **self.receive_stream.extra_attributes,
52 }
55@dataclass(eq=False)
56class StapledObjectStream(Generic[T_Item], ObjectStream[T_Item]):
57 """
58 Combines two object streams into a single, bidirectional object stream.
60 Extra attributes will be provided from both streams, with the receive stream providing the
61 values in case of a conflict.
63 :param ObjectSendStream send_stream: the sending object stream
64 :param ObjectReceiveStream receive_stream: the receiving object stream
65 """
67 send_stream: ObjectSendStream[T_Item]
68 receive_stream: ObjectReceiveStream[T_Item]
70 async def receive(self) -> T_Item:
71 return await self.receive_stream.receive()
73 async def send(self, item: T_Item) -> None:
74 await self.send_stream.send(item)
76 async def send_eof(self) -> None:
77 await self.send_stream.aclose()
79 async def aclose(self) -> None:
80 await self.send_stream.aclose()
81 await self.receive_stream.aclose()
83 @property
84 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
85 return {
86 **self.send_stream.extra_attributes,
87 **self.receive_stream.extra_attributes,
88 }
91@dataclass(eq=False)
92class MultiListener(Generic[T_Stream], Listener[T_Stream]):
93 """
94 Combines multiple listeners into one, serving connections from all of them at once.
96 Any MultiListeners in the given collection of listeners will have their listeners moved into
97 this one.
99 Extra attributes are provided from each listener, with each successive listener overriding any
100 conflicting attributes from the previous one.
102 :param listeners: listeners to serve
103 :type listeners: Sequence[Listener[T_Stream]]
104 """
106 listeners: Sequence[Listener[T_Stream]]
108 def __post_init__(self) -> None:
109 listeners: List[Listener[T_Stream]] = []
110 for listener in self.listeners:
111 if isinstance(listener, MultiListener):
112 listeners.extend(listener.listeners)
113 del listener.listeners[:] # type: ignore[attr-defined]
114 else:
115 listeners.append(listener)
117 self.listeners = listeners
119 async def serve(
120 self, handler: Callable[[T_Stream], Any], task_group: Optional[TaskGroup] = None
121 ) -> None:
122 from .. import create_task_group
124 async with create_task_group() as tg:
125 for listener in self.listeners:
126 tg.start_soon(listener.serve, handler, task_group)
128 async def aclose(self) -> None:
129 for listener in self.listeners:
130 await listener.aclose()
132 @property
133 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
134 attributes: dict = {}
135 for listener in self.listeners:
136 attributes.update(listener.extra_attributes)
138 return attributes