Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/streams/stapled.py: 54%
63 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1from __future__ import annotations
3from dataclasses import dataclass
4from typing import Any, Callable, Generic, Mapping, Sequence, TypeVar
6from ..abc import (
7 ByteReceiveStream,
8 ByteSendStream,
9 ByteStream,
10 Listener,
11 ObjectReceiveStream,
12 ObjectSendStream,
13 ObjectStream,
14 TaskGroup,
15)
17T_Item = TypeVar("T_Item")
18T_Stream = TypeVar("T_Stream")
21@dataclass(eq=False)
22class StapledByteStream(ByteStream):
23 """
24 Combines two byte streams into a single, bidirectional byte stream.
26 Extra attributes will be provided from both streams, with the receive stream providing the
27 values in case of a conflict.
29 :param ByteSendStream send_stream: the sending byte stream
30 :param ByteReceiveStream receive_stream: the receiving byte stream
31 """
33 send_stream: ByteSendStream
34 receive_stream: ByteReceiveStream
36 async def receive(self, max_bytes: int = 65536) -> bytes:
37 return await self.receive_stream.receive(max_bytes)
39 async def send(self, item: bytes) -> None:
40 await self.send_stream.send(item)
42 async def send_eof(self) -> None:
43 await self.send_stream.aclose()
45 async def aclose(self) -> None:
46 await self.send_stream.aclose()
47 await self.receive_stream.aclose()
49 @property
50 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
51 return {
52 **self.send_stream.extra_attributes,
53 **self.receive_stream.extra_attributes,
54 }
57@dataclass(eq=False)
58class StapledObjectStream(Generic[T_Item], ObjectStream[T_Item]):
59 """
60 Combines two object streams into a single, bidirectional object stream.
62 Extra attributes will be provided from both streams, with the receive stream providing the
63 values in case of a conflict.
65 :param ObjectSendStream send_stream: the sending object stream
66 :param ObjectReceiveStream receive_stream: the receiving object stream
67 """
69 send_stream: ObjectSendStream[T_Item]
70 receive_stream: ObjectReceiveStream[T_Item]
72 async def receive(self) -> T_Item:
73 return await self.receive_stream.receive()
75 async def send(self, item: T_Item) -> None:
76 await self.send_stream.send(item)
78 async def send_eof(self) -> None:
79 await self.send_stream.aclose()
81 async def aclose(self) -> None:
82 await self.send_stream.aclose()
83 await self.receive_stream.aclose()
85 @property
86 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
87 return {
88 **self.send_stream.extra_attributes,
89 **self.receive_stream.extra_attributes,
90 }
93@dataclass(eq=False)
94class MultiListener(Generic[T_Stream], Listener[T_Stream]):
95 """
96 Combines multiple listeners into one, serving connections from all of them at once.
98 Any MultiListeners in the given collection of listeners will have their listeners moved into
99 this one.
101 Extra attributes are provided from each listener, with each successive listener overriding any
102 conflicting attributes from the previous one.
104 :param listeners: listeners to serve
105 :type listeners: Sequence[Listener[T_Stream]]
106 """
108 listeners: Sequence[Listener[T_Stream]]
110 def __post_init__(self) -> None:
111 listeners: list[Listener[T_Stream]] = []
112 for listener in self.listeners:
113 if isinstance(listener, MultiListener):
114 listeners.extend(listener.listeners)
115 del listener.listeners[:] # type: ignore[attr-defined]
116 else:
117 listeners.append(listener)
119 self.listeners = listeners
121 async def serve(
122 self, handler: Callable[[T_Stream], Any], task_group: TaskGroup | None = None
123 ) -> None:
124 from .. import create_task_group
126 async with create_task_group() as tg:
127 for listener in self.listeners:
128 tg.start_soon(listener.serve, handler, task_group)
130 async def aclose(self) -> None:
131 for listener in self.listeners:
132 await listener.aclose()
134 @property
135 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
136 attributes: dict = {}
137 for listener in self.listeners:
138 attributes.update(listener.extra_attributes)
140 return attributes