Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_transports/wsgi.py: 29%

59 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1import io 

2import itertools 

3import sys 

4import typing 

5 

6from .._models import Request, Response 

7from .._types import SyncByteStream 

8from .base import BaseTransport 

9 

10if typing.TYPE_CHECKING: 

11 from _typeshed import OptExcInfo # pragma: no cover 

12 from _typeshed.wsgi import WSGIApplication # pragma: no cover 

13 

14_T = typing.TypeVar("_T") 

15 

16 

17def _skip_leading_empty_chunks(body: typing.Iterable[_T]) -> typing.Iterable[_T]: 

18 body = iter(body) 

19 for chunk in body: 

20 if chunk: 

21 return itertools.chain([chunk], body) 

22 return [] 

23 

24 

25class WSGIByteStream(SyncByteStream): 

26 def __init__(self, result: typing.Iterable[bytes]) -> None: 

27 self._close = getattr(result, "close", None) 

28 self._result = _skip_leading_empty_chunks(result) 

29 

30 def __iter__(self) -> typing.Iterator[bytes]: 

31 for part in self._result: 

32 yield part 

33 

34 def close(self) -> None: 

35 if self._close is not None: 

36 self._close() 

37 

38 

39class WSGITransport(BaseTransport): 

40 """ 

41 A custom transport that handles sending requests directly to an WSGI app. 

42 The simplest way to use this functionality is to use the `app` argument. 

43 

44 ``` 

45 client = httpx.Client(app=app) 

46 ``` 

47 

48 Alternatively, you can setup the transport instance explicitly. 

49 This allows you to include any additional configuration arguments specific 

50 to the WSGITransport class: 

51 

52 ``` 

53 transport = httpx.WSGITransport( 

54 app=app, 

55 script_name="/submount", 

56 remote_addr="1.2.3.4" 

57 ) 

58 client = httpx.Client(transport=transport) 

59 ``` 

60 

61 Arguments: 

62 

63 * `app` - The WSGI application. 

64 * `raise_app_exceptions` - Boolean indicating if exceptions in the application 

65 should be raised. Default to `True`. Can be set to `False` for use cases 

66 such as testing the content of a client 500 response. 

67 * `script_name` - The root path on which the WSGI application should be mounted. 

68 * `remote_addr` - A string indicating the client IP of incoming requests. 

69 ``` 

70 """ 

71 

72 def __init__( 

73 self, 

74 app: "WSGIApplication", 

75 raise_app_exceptions: bool = True, 

76 script_name: str = "", 

77 remote_addr: str = "127.0.0.1", 

78 wsgi_errors: typing.Optional[typing.TextIO] = None, 

79 ) -> None: 

80 self.app = app 

81 self.raise_app_exceptions = raise_app_exceptions 

82 self.script_name = script_name 

83 self.remote_addr = remote_addr 

84 self.wsgi_errors = wsgi_errors 

85 

86 def handle_request(self, request: Request) -> Response: 

87 request.read() 

88 wsgi_input = io.BytesIO(request.content) 

89 

90 port = request.url.port or {"http": 80, "https": 443}[request.url.scheme] 

91 environ = { 

92 "wsgi.version": (1, 0), 

93 "wsgi.url_scheme": request.url.scheme, 

94 "wsgi.input": wsgi_input, 

95 "wsgi.errors": self.wsgi_errors or sys.stderr, 

96 "wsgi.multithread": True, 

97 "wsgi.multiprocess": False, 

98 "wsgi.run_once": False, 

99 "REQUEST_METHOD": request.method, 

100 "SCRIPT_NAME": self.script_name, 

101 "PATH_INFO": request.url.path, 

102 "QUERY_STRING": request.url.query.decode("ascii"), 

103 "SERVER_NAME": request.url.host, 

104 "SERVER_PORT": str(port), 

105 "SERVER_PROTOCOL": "HTTP/1.1", 

106 "REMOTE_ADDR": self.remote_addr, 

107 } 

108 for header_key, header_value in request.headers.raw: 

109 key = header_key.decode("ascii").upper().replace("-", "_") 

110 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"): 

111 key = "HTTP_" + key 

112 environ[key] = header_value.decode("ascii") 

113 

114 seen_status = None 

115 seen_response_headers = None 

116 seen_exc_info = None 

117 

118 def start_response( 

119 status: str, 

120 response_headers: typing.List[typing.Tuple[str, str]], 

121 exc_info: typing.Optional["OptExcInfo"] = None, 

122 ) -> typing.Callable[[bytes], typing.Any]: 

123 nonlocal seen_status, seen_response_headers, seen_exc_info 

124 seen_status = status 

125 seen_response_headers = response_headers 

126 seen_exc_info = exc_info 

127 return lambda _: None 

128 

129 result = self.app(environ, start_response) 

130 

131 stream = WSGIByteStream(result) 

132 

133 assert seen_status is not None 

134 assert seen_response_headers is not None 

135 if seen_exc_info and seen_exc_info[0] and self.raise_app_exceptions: 

136 raise seen_exc_info[1] 

137 

138 status_code = int(seen_status.split()[0]) 

139 headers = [ 

140 (key.encode("ascii"), value.encode("ascii")) 

141 for key, value in seen_response_headers 

142 ] 

143 

144 return Response(status_code, headers=headers, stream=stream)