1from __future__ import annotations
2
3import html
4import inspect
5import sys
6import traceback
7import typing
8
9from starlette._utils import is_async_callable
10from starlette.concurrency import run_in_threadpool
11from starlette.requests import Request
12from starlette.responses import HTMLResponse, PlainTextResponse, Response
13from starlette.types import ASGIApp, Message, Receive, Scope, Send
14
15STYLES = """
16p {
17 color: #211c1c;
18}
19.traceback-container {
20 border: 1px solid #038BB8;
21}
22.traceback-title {
23 background-color: #038BB8;
24 color: lemonchiffon;
25 padding: 12px;
26 font-size: 20px;
27 margin-top: 0px;
28}
29.frame-line {
30 padding-left: 10px;
31 font-family: monospace;
32}
33.frame-filename {
34 font-family: monospace;
35}
36.center-line {
37 background-color: #038BB8;
38 color: #f9f6e1;
39 padding: 5px 0px 5px 5px;
40}
41.lineno {
42 margin-right: 5px;
43}
44.frame-title {
45 font-weight: unset;
46 padding: 10px 10px 10px 10px;
47 background-color: #E4F4FD;
48 margin-right: 10px;
49 color: #191f21;
50 font-size: 17px;
51 border: 1px solid #c7dce8;
52}
53.collapse-btn {
54 float: right;
55 padding: 0px 5px 1px 5px;
56 border: solid 1px #96aebb;
57 cursor: pointer;
58}
59.collapsed {
60 display: none;
61}
62.source-code {
63 font-family: courier;
64 font-size: small;
65 padding-bottom: 10px;
66}
67"""
68
69JS = """
70<script type="text/javascript">
71 function collapse(element){
72 const frameId = element.getAttribute("data-frame-id");
73 const frame = document.getElementById(frameId);
74
75 if (frame.classList.contains("collapsed")){
76 element.innerHTML = "‒";
77 frame.classList.remove("collapsed");
78 } else {
79 element.innerHTML = "+";
80 frame.classList.add("collapsed");
81 }
82 }
83</script>
84"""
85
86TEMPLATE = """
87<html>
88 <head>
89 <style type='text/css'>
90 {styles}
91 </style>
92 <title>Starlette Debugger</title>
93 </head>
94 <body>
95 <h1>500 Server Error</h1>
96 <h2>{error}</h2>
97 <div class="traceback-container">
98 <p class="traceback-title">Traceback</p>
99 <div>{exc_html}</div>
100 </div>
101 {js}
102 </body>
103</html>
104"""
105
106FRAME_TEMPLATE = """
107<div>
108 <p class="frame-title">File <span class="frame-filename">{frame_filename}</span>,
109 line <i>{frame_lineno}</i>,
110 in <b>{frame_name}</b>
111 <span class="collapse-btn" data-frame-id="{frame_filename}-{frame_lineno}" onclick="collapse(this)">{collapse_button}</span>
112 </p>
113 <div id="{frame_filename}-{frame_lineno}" class="source-code {collapsed}">{code_context}</div>
114</div>
115""" # noqa: E501
116
117LINE = """
118<p><span class="frame-line">
119<span class="lineno">{lineno}.</span> {line}</span></p>
120"""
121
122CENTER_LINE = """
123<p class="center-line"><span class="frame-line center-line">
124<span class="lineno">{lineno}.</span> {line}</span></p>
125"""
126
127
128class ServerErrorMiddleware:
129 """
130 Handles returning 500 responses when a server error occurs.
131
132 If 'debug' is set, then traceback responses will be returned,
133 otherwise the designated 'handler' will be called.
134
135 This middleware class should generally be used to wrap *everything*
136 else up, so that unhandled exceptions anywhere in the stack
137 always result in an appropriate 500 response.
138 """
139
140 def __init__(
141 self,
142 app: ASGIApp,
143 handler: typing.Callable[[Request, Exception], typing.Any] | None = None,
144 debug: bool = False,
145 ) -> None:
146 self.app = app
147 self.handler = handler
148 self.debug = debug
149
150 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
151 if scope["type"] != "http":
152 await self.app(scope, receive, send)
153 return
154
155 response_started = False
156
157 async def _send(message: Message) -> None:
158 nonlocal response_started, send
159
160 if message["type"] == "http.response.start":
161 response_started = True
162 await send(message)
163
164 try:
165 await self.app(scope, receive, _send)
166 except Exception as exc:
167 request = Request(scope)
168 if self.debug:
169 # In debug mode, return traceback responses.
170 response = self.debug_response(request, exc)
171 elif self.handler is None:
172 # Use our default 500 error handler.
173 response = self.error_response(request, exc)
174 else:
175 # Use an installed 500 error handler.
176 if is_async_callable(self.handler):
177 response = await self.handler(request, exc)
178 else:
179 response = await run_in_threadpool(self.handler, request, exc)
180
181 if not response_started:
182 await response(scope, receive, send)
183
184 # We always continue to raise the exception.
185 # This allows servers to log the error, or allows test clients
186 # to optionally raise the error within the test case.
187 raise exc
188
189 def format_line(self, index: int, line: str, frame_lineno: int, frame_index: int) -> str:
190 values = {
191 # HTML escape - line could contain < or >
192 "line": html.escape(line).replace(" ", " "),
193 "lineno": (frame_lineno - frame_index) + index,
194 }
195
196 if index != frame_index:
197 return LINE.format(**values)
198 return CENTER_LINE.format(**values)
199
200 def generate_frame_html(self, frame: inspect.FrameInfo, is_collapsed: bool) -> str:
201 code_context = "".join(
202 self.format_line(
203 index,
204 line,
205 frame.lineno,
206 frame.index, # type: ignore[arg-type]
207 )
208 for index, line in enumerate(frame.code_context or [])
209 )
210
211 values = {
212 # HTML escape - filename could contain < or >, especially if it's a virtual
213 # file e.g. <stdin> in the REPL
214 "frame_filename": html.escape(frame.filename),
215 "frame_lineno": frame.lineno,
216 # HTML escape - if you try very hard it's possible to name a function with <
217 # or >
218 "frame_name": html.escape(frame.function),
219 "code_context": code_context,
220 "collapsed": "collapsed" if is_collapsed else "",
221 "collapse_button": "+" if is_collapsed else "‒",
222 }
223 return FRAME_TEMPLATE.format(**values)
224
225 def generate_html(self, exc: Exception, limit: int = 7) -> str:
226 traceback_obj = traceback.TracebackException.from_exception(exc, capture_locals=True)
227
228 exc_html = ""
229 is_collapsed = False
230 exc_traceback = exc.__traceback__
231 if exc_traceback is not None:
232 frames = inspect.getinnerframes(exc_traceback, limit)
233 for frame in reversed(frames):
234 exc_html += self.generate_frame_html(frame, is_collapsed)
235 is_collapsed = True
236
237 if sys.version_info >= (3, 13): # pragma: no cover
238 exc_type_str = traceback_obj.exc_type_str
239 else: # pragma: no cover
240 exc_type_str = traceback_obj.exc_type.__name__
241
242 # escape error class and text
243 error = f"{html.escape(exc_type_str)}: {html.escape(str(traceback_obj))}"
244
245 return TEMPLATE.format(styles=STYLES, js=JS, error=error, exc_html=exc_html)
246
247 def generate_plain_text(self, exc: Exception) -> str:
248 return "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
249
250 def debug_response(self, request: Request, exc: Exception) -> Response:
251 accept = request.headers.get("accept", "")
252
253 if "text/html" in accept:
254 content = self.generate_html(exc)
255 return HTMLResponse(content, status_code=500)
256 content = self.generate_plain_text(exc)
257 return PlainTextResponse(content, status_code=500)
258
259 def error_response(self, request: Request, exc: Exception) -> Response:
260 return PlainTextResponse("Internal Server Error", status_code=500)