Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_transports/asgi.py: 23%
70 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import typing
3import sniffio
5from .._models import Request, Response
6from .._types import AsyncByteStream
7from .base import AsyncBaseTransport
9if typing.TYPE_CHECKING: # pragma: no cover
10 import asyncio
12 import trio
14 Event = typing.Union[asyncio.Event, trio.Event]
17_Message = typing.Dict[str, typing.Any]
18_Receive = typing.Callable[[], typing.Awaitable[_Message]]
19_Send = typing.Callable[
20 [typing.Dict[str, typing.Any]], typing.Coroutine[None, None, None]
21]
22_ASGIApp = typing.Callable[
23 [typing.Dict[str, typing.Any], _Receive, _Send], typing.Coroutine[None, None, None]
24]
27def create_event() -> "Event":
28 if sniffio.current_async_library() == "trio":
29 import trio
31 return trio.Event()
32 else:
33 import asyncio
35 return asyncio.Event()
38class ASGIResponseStream(AsyncByteStream):
39 def __init__(self, body: typing.List[bytes]) -> None:
40 self._body = body
42 async def __aiter__(self) -> typing.AsyncIterator[bytes]:
43 yield b"".join(self._body)
46class ASGITransport(AsyncBaseTransport):
47 """
48 A custom AsyncTransport that handles sending requests directly to an ASGI app.
49 The simplest way to use this functionality is to use the `app` argument.
51 ```
52 client = httpx.AsyncClient(app=app)
53 ```
55 Alternatively, you can setup the transport instance explicitly.
56 This allows you to include any additional configuration arguments specific
57 to the ASGITransport class:
59 ```
60 transport = httpx.ASGITransport(
61 app=app,
62 root_path="/submount",
63 client=("1.2.3.4", 123)
64 )
65 client = httpx.AsyncClient(transport=transport)
66 ```
68 Arguments:
70 * `app` - The ASGI application.
71 * `raise_app_exceptions` - Boolean indicating if exceptions in the application
72 should be raised. Default to `True`. Can be set to `False` for use cases
73 such as testing the content of a client 500 response.
74 * `root_path` - The root path on which the ASGI application should be mounted.
75 * `client` - A two-tuple indicating the client IP and port of incoming requests.
76 ```
77 """
79 def __init__(
80 self,
81 app: _ASGIApp,
82 raise_app_exceptions: bool = True,
83 root_path: str = "",
84 client: typing.Tuple[str, int] = ("127.0.0.1", 123),
85 ) -> None:
86 self.app = app
87 self.raise_app_exceptions = raise_app_exceptions
88 self.root_path = root_path
89 self.client = client
91 async def handle_async_request(
92 self,
93 request: Request,
94 ) -> Response:
95 assert isinstance(request.stream, AsyncByteStream)
97 # ASGI scope.
98 scope = {
99 "type": "http",
100 "asgi": {"version": "3.0"},
101 "http_version": "1.1",
102 "method": request.method,
103 "headers": [(k.lower(), v) for (k, v) in request.headers.raw],
104 "scheme": request.url.scheme,
105 "path": request.url.path,
106 "raw_path": request.url.raw_path,
107 "query_string": request.url.query,
108 "server": (request.url.host, request.url.port),
109 "client": self.client,
110 "root_path": self.root_path,
111 }
113 # Request.
114 request_body_chunks = request.stream.__aiter__()
115 request_complete = False
117 # Response.
118 status_code = None
119 response_headers = None
120 body_parts = []
121 response_started = False
122 response_complete = create_event()
124 # ASGI callables.
126 async def receive() -> typing.Dict[str, typing.Any]:
127 nonlocal request_complete
129 if request_complete:
130 await response_complete.wait()
131 return {"type": "http.disconnect"}
133 try:
134 body = await request_body_chunks.__anext__()
135 except StopAsyncIteration:
136 request_complete = True
137 return {"type": "http.request", "body": b"", "more_body": False}
138 return {"type": "http.request", "body": body, "more_body": True}
140 async def send(message: typing.Dict[str, typing.Any]) -> None:
141 nonlocal status_code, response_headers, response_started
143 if message["type"] == "http.response.start":
144 assert not response_started
146 status_code = message["status"]
147 response_headers = message.get("headers", [])
148 response_started = True
150 elif message["type"] == "http.response.body":
151 assert not response_complete.is_set()
152 body = message.get("body", b"")
153 more_body = message.get("more_body", False)
155 if body and request.method != "HEAD":
156 body_parts.append(body)
158 if not more_body:
159 response_complete.set()
161 try:
162 await self.app(scope, receive, send)
163 except Exception: # noqa: PIE-786
164 if self.raise_app_exceptions or not response_complete.is_set():
165 raise
167 assert response_complete.is_set()
168 assert status_code is not None
169 assert response_headers is not None
171 stream = ASGIResponseStream(body_parts)
173 return Response(status_code, headers=response_headers, stream=stream)