Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_transports/asgi.py: 23%

70 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import typing 

2 

3import sniffio 

4 

5from .._models import Request, Response 

6from .._types import AsyncByteStream 

7from .base import AsyncBaseTransport 

8 

9if typing.TYPE_CHECKING: # pragma: no cover 

10 import asyncio 

11 

12 import trio 

13 

14 Event = typing.Union[asyncio.Event, trio.Event] 

15 

16 

17_Message = typing.Dict[str, typing.Any] 

18_Receive = typing.Callable[[], typing.Awaitable[_Message]] 

19_Send = typing.Callable[ 

20 [typing.Dict[str, typing.Any]], typing.Coroutine[None, None, None] 

21] 

22_ASGIApp = typing.Callable[ 

23 [typing.Dict[str, typing.Any], _Receive, _Send], typing.Coroutine[None, None, None] 

24] 

25 

26 

27def create_event() -> "Event": 

28 if sniffio.current_async_library() == "trio": 

29 import trio 

30 

31 return trio.Event() 

32 else: 

33 import asyncio 

34 

35 return asyncio.Event() 

36 

37 

38class ASGIResponseStream(AsyncByteStream): 

39 def __init__(self, body: typing.List[bytes]) -> None: 

40 self._body = body 

41 

42 async def __aiter__(self) -> typing.AsyncIterator[bytes]: 

43 yield b"".join(self._body) 

44 

45 

46class ASGITransport(AsyncBaseTransport): 

47 """ 

48 A custom AsyncTransport that handles sending requests directly to an ASGI app. 

49 The simplest way to use this functionality is to use the `app` argument. 

50 

51 ``` 

52 client = httpx.AsyncClient(app=app) 

53 ``` 

54 

55 Alternatively, you can setup the transport instance explicitly. 

56 This allows you to include any additional configuration arguments specific 

57 to the ASGITransport class: 

58 

59 ``` 

60 transport = httpx.ASGITransport( 

61 app=app, 

62 root_path="/submount", 

63 client=("1.2.3.4", 123) 

64 ) 

65 client = httpx.AsyncClient(transport=transport) 

66 ``` 

67 

68 Arguments: 

69 

70 * `app` - The ASGI application. 

71 * `raise_app_exceptions` - Boolean indicating if exceptions in the application 

72 should be raised. Default to `True`. Can be set to `False` for use cases 

73 such as testing the content of a client 500 response. 

74 * `root_path` - The root path on which the ASGI application should be mounted. 

75 * `client` - A two-tuple indicating the client IP and port of incoming requests. 

76 ``` 

77 """ 

78 

79 def __init__( 

80 self, 

81 app: _ASGIApp, 

82 raise_app_exceptions: bool = True, 

83 root_path: str = "", 

84 client: typing.Tuple[str, int] = ("127.0.0.1", 123), 

85 ) -> None: 

86 self.app = app 

87 self.raise_app_exceptions = raise_app_exceptions 

88 self.root_path = root_path 

89 self.client = client 

90 

91 async def handle_async_request( 

92 self, 

93 request: Request, 

94 ) -> Response: 

95 assert isinstance(request.stream, AsyncByteStream) 

96 

97 # ASGI scope. 

98 scope = { 

99 "type": "http", 

100 "asgi": {"version": "3.0"}, 

101 "http_version": "1.1", 

102 "method": request.method, 

103 "headers": [(k.lower(), v) for (k, v) in request.headers.raw], 

104 "scheme": request.url.scheme, 

105 "path": request.url.path, 

106 "raw_path": request.url.raw_path, 

107 "query_string": request.url.query, 

108 "server": (request.url.host, request.url.port), 

109 "client": self.client, 

110 "root_path": self.root_path, 

111 } 

112 

113 # Request. 

114 request_body_chunks = request.stream.__aiter__() 

115 request_complete = False 

116 

117 # Response. 

118 status_code = None 

119 response_headers = None 

120 body_parts = [] 

121 response_started = False 

122 response_complete = create_event() 

123 

124 # ASGI callables. 

125 

126 async def receive() -> typing.Dict[str, typing.Any]: 

127 nonlocal request_complete 

128 

129 if request_complete: 

130 await response_complete.wait() 

131 return {"type": "http.disconnect"} 

132 

133 try: 

134 body = await request_body_chunks.__anext__() 

135 except StopAsyncIteration: 

136 request_complete = True 

137 return {"type": "http.request", "body": b"", "more_body": False} 

138 return {"type": "http.request", "body": body, "more_body": True} 

139 

140 async def send(message: typing.Dict[str, typing.Any]) -> None: 

141 nonlocal status_code, response_headers, response_started 

142 

143 if message["type"] == "http.response.start": 

144 assert not response_started 

145 

146 status_code = message["status"] 

147 response_headers = message.get("headers", []) 

148 response_started = True 

149 

150 elif message["type"] == "http.response.body": 

151 assert not response_complete.is_set() 

152 body = message.get("body", b"") 

153 more_body = message.get("more_body", False) 

154 

155 if body and request.method != "HEAD": 

156 body_parts.append(body) 

157 

158 if not more_body: 

159 response_complete.set() 

160 

161 try: 

162 await self.app(scope, receive, send) 

163 except Exception: # noqa: PIE-786 

164 if self.raise_app_exceptions or not response_complete.is_set(): 

165 raise 

166 

167 assert response_complete.is_set() 

168 assert status_code is not None 

169 assert response_headers is not None 

170 

171 stream = ASGIResponseStream(body_parts) 

172 

173 return Response(status_code, headers=response_headers, stream=stream)