Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_backends/auto.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

16 statements  

1import typing 

2from typing import Optional 

3 

4from .._synchronization import current_async_library 

5from .base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream 

6 

7 

8class AutoBackend(AsyncNetworkBackend): 

9 async def _init_backend(self) -> None: 

10 if not (hasattr(self, "_backend")): 

11 backend = current_async_library() 

12 if backend == "trio": 

13 from .trio import TrioBackend 

14 

15 self._backend: AsyncNetworkBackend = TrioBackend() 

16 else: 

17 from .anyio import AnyIOBackend 

18 

19 self._backend = AnyIOBackend() 

20 

21 async def connect_tcp( 

22 self, 

23 host: str, 

24 port: int, 

25 timeout: Optional[float] = None, 

26 local_address: Optional[str] = None, 

27 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None, 

28 ) -> AsyncNetworkStream: 

29 await self._init_backend() 

30 return await self._backend.connect_tcp( 

31 host, 

32 port, 

33 timeout=timeout, 

34 local_address=local_address, 

35 socket_options=socket_options, 

36 ) 

37 

38 async def connect_unix_socket( 

39 self, 

40 path: str, 

41 timeout: Optional[float] = None, 

42 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None, 

43 ) -> AsyncNetworkStream: # pragma: nocover 

44 await self._init_backend() 

45 return await self._backend.connect_unix_socket( 

46 path, timeout=timeout, socket_options=socket_options 

47 ) 

48 

49 async def sleep(self, seconds: float) -> None: # pragma: nocover 

50 await self._init_backend() 

51 return await self._backend.sleep(seconds)