Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/backends/auto.py: 40%
15 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1from typing import Optional
3import sniffio
5from .base import AsyncNetworkBackend, AsyncNetworkStream
8class AutoBackend(AsyncNetworkBackend):
9 async def _init_backend(self) -> None:
10 if not (hasattr(self, "_backend")):
11 backend = sniffio.current_async_library()
12 if backend == "trio":
13 from .trio import TrioBackend
15 self._backend: AsyncNetworkBackend = TrioBackend()
16 else:
17 from .asyncio import AsyncIOBackend
19 self._backend = AsyncIOBackend()
21 async def connect_tcp(
22 self,
23 host: str,
24 port: int,
25 timeout: Optional[float] = None,
26 local_address: Optional[str] = None,
27 ) -> AsyncNetworkStream:
28 await self._init_backend()
29 return await self._backend.connect_tcp(
30 host, port, timeout=timeout, local_address=local_address
31 )
33 async def connect_unix_socket(
34 self, path: str, timeout: Optional[float] = None
35 ) -> AsyncNetworkStream: # pragma: nocover
36 await self._init_backend()
37 return await self._backend.connect_unix_socket(path, timeout=timeout)
39 async def sleep(self, seconds: float) -> None: # pragma: nocover
40 await self._init_backend()
41 return await self._backend.sleep(seconds)