Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_protocol.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import asyncio.streams
3import sys
4import traceback
5from collections import deque
6from collections.abc import Awaitable, Callable, Sequence
7from contextlib import suppress
8from html import escape as html_escape
9from http import HTTPStatus
10from logging import Logger
11from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, Union, cast
13import yarl
14from propcache import under_cached_property
16from .abc import AbstractAccessLogger, AbstractAsyncAccessLogger, AbstractStreamWriter
17from .base_protocol import PAUSE_RESUME_READING_ERRORS, BaseProtocol
18from .helpers import DEFAULT_CHUNK_SIZE, ceil_timeout, frozen_dataclass_decorator
19from .http import (
20 HttpProcessingError,
21 HttpRequestParser,
22 HttpVersion10,
23 RawRequestMessage,
24 StreamWriter,
25 WebSocketReader,
26)
27from .http_exceptions import BadHttpMethod
28from .log import access_logger, server_logger
29from .streams import EMPTY_PAYLOAD, StreamReader
30from .tcp_helpers import tcp_keepalive
31from .web_exceptions import HTTPException, HTTPInternalServerError
32from .web_log import AccessLogger
33from .web_request import BaseRequest
34from .web_response import Response, StreamResponse
36__all__ = ("RequestHandler", "RequestPayloadError", "PayloadAccessError")
38# Max parsed-but-unhandled pipelined requests buffered per connection before
39# reading is paused. Bounds memory a client can pin by keeping one handler busy
40# and pipelining behind it; reading resumes as the queue drains.
41MAX_MSG_QUEUE_SIZE = 32
43if TYPE_CHECKING:
44 import ssl
46 from .web_server import Server
49_Request = TypeVar("_Request", bound=BaseRequest)
50_RequestFactory = Callable[
51 [
52 RawRequestMessage,
53 StreamReader,
54 "RequestHandler[_Request]",
55 AbstractStreamWriter,
56 "asyncio.Task[None]",
57 ],
58 _Request,
59]
61_RequestHandler = Callable[[_Request], Awaitable[StreamResponse]]
62_AnyAbstractAccessLogger = Union[
63 type[AbstractAsyncAccessLogger],
64 type[AbstractAccessLogger],
65]
67ERROR = RawRequestMessage(
68 "UNKNOWN",
69 "/",
70 HttpVersion10,
71 {}, # type: ignore[arg-type]
72 {}, # type: ignore[arg-type]
73 True,
74 None,
75 False,
76 False,
77 yarl.URL("/"),
78)
81class RequestPayloadError(Exception):
82 """Payload parsing error."""
85class PayloadAccessError(Exception):
86 """Payload was accessed after response was sent."""
89_PAYLOAD_ACCESS_ERROR = PayloadAccessError()
92class AccessLoggerWrapper(AbstractAsyncAccessLogger):
93 """Wrap an AbstractAccessLogger so it behaves like an AbstractAsyncAccessLogger."""
95 __slots__ = ("access_logger", "_loop")
97 def __init__(
98 self, access_logger: AbstractAccessLogger, loop: asyncio.AbstractEventLoop
99 ) -> None:
100 self.access_logger = access_logger
101 self._loop = loop
102 super().__init__()
104 async def log(
105 self, request: BaseRequest, response: StreamResponse, request_start: float
106 ) -> None:
107 self.access_logger.log(request, response, self._loop.time() - request_start)
109 @property
110 def enabled(self) -> bool:
111 """Check if logger is enabled."""
112 return self.access_logger.enabled
115@frozen_dataclass_decorator
116class _ErrInfo:
117 status: int
118 exc: BaseException
119 message: str
122_MsgType = tuple[RawRequestMessage | _ErrInfo, StreamReader]
125class RequestHandler(BaseProtocol, Generic[_Request]):
126 """HTTP protocol implementation.
128 RequestHandler handles incoming HTTP request. It reads request line,
129 request headers and request payload and calls handle_request() method.
130 By default it always returns with 404 response.
132 RequestHandler handles errors in incoming request, like bad
133 status line, bad headers or incomplete payload. If any error occurs,
134 connection gets closed.
136 keepalive_timeout -- number of seconds before closing
137 keep-alive connection
139 tcp_keepalive -- TCP keep-alive is on, default is on
141 logger -- custom logger object
143 access_log_class -- custom class for access_logger
145 access_log -- custom logging object
147 access_log_format -- access log format string
149 loop -- Optional event loop
151 max_line_size -- Optional maximum header line size
153 max_field_size -- Optional maximum header field size
155 timeout_ceil_threshold -- Optional value to specify
156 threshold to ceil() timeout
157 values
159 """
161 __slots__ = (
162 "max_field_size",
163 "max_headers",
164 "max_line_size",
165 "_request_count",
166 "_keepalive",
167 "_manager",
168 "_request_handler",
169 "_request_factory",
170 "_tcp_keepalive",
171 "_next_keepalive_close_time",
172 "_keepalive_handle",
173 "_keepalive_timeout",
174 "_lingering_time",
175 "_messages",
176 "_max_msg_queue_size",
177 "_msg_queue_resume_size",
178 "_msg_queue_paused",
179 "_message_tail",
180 "_handler_waiter",
181 "_waiter",
182 "_task_handler",
183 "_payload_parser",
184 "_data_received_cb",
185 "logger",
186 "access_log",
187 "access_logger",
188 "_close",
189 "_force_close",
190 "_current_request",
191 "_timeout_ceil_threshold",
192 "_request_in_progress",
193 "_logging_enabled",
194 "_cache",
195 )
197 def __init__(
198 self,
199 manager: "Server[_Request]",
200 *,
201 loop: asyncio.AbstractEventLoop,
202 # Default should be high enough that it's likely longer than a reverse proxy.
203 keepalive_timeout: float = 3630,
204 tcp_keepalive: bool = True,
205 logger: Logger = server_logger,
206 access_log_class: _AnyAbstractAccessLogger = AccessLogger,
207 access_log: Logger | None = access_logger,
208 access_log_format: str = AccessLogger.LOG_FORMAT,
209 max_line_size: int = 8190,
210 max_headers: int = 128,
211 max_field_size: int = 8190,
212 lingering_time: float = 10.0,
213 read_bufsize: int = DEFAULT_CHUNK_SIZE,
214 auto_decompress: bool = True,
215 timeout_ceil_threshold: float = 5,
216 ):
217 self._max_msg_queue_size = MAX_MSG_QUEUE_SIZE
218 # Low-water mark: resume reading once the queue drains to half the limit
219 # so we refill in batches instead of churning pause/resume per request.
220 self._msg_queue_resume_size = MAX_MSG_QUEUE_SIZE // 2
221 # Set before super().__init__ so _reading_paused_for_msg_queue() is safe
222 # if BaseProtocol ever triggers a resume during init.
223 self._msg_queue_paused = False
224 parser = HttpRequestParser(
225 self,
226 loop,
227 read_bufsize,
228 max_line_size=max_line_size,
229 max_field_size=max_field_size,
230 max_headers=max_headers,
231 payload_exception=RequestPayloadError,
232 auto_decompress=auto_decompress,
233 max_msg_queue_size=MAX_MSG_QUEUE_SIZE,
234 )
235 super().__init__(loop, parser)
237 # _request_count is the number of requests processed with the same connection.
238 self._request_count = 0
239 self._keepalive = False
240 self._current_request: _Request | None = None
241 self._manager: Server[_Request] | None = manager
242 self._request_handler: _RequestHandler[_Request] | None = (
243 manager.request_handler
244 )
245 self._request_factory: _RequestFactory[_Request] | None = (
246 manager.request_factory
247 )
249 self.max_line_size = max_line_size
250 self.max_headers = max_headers
251 self.max_field_size = max_field_size
253 self._tcp_keepalive = tcp_keepalive
254 # placeholder to be replaced on keepalive timeout setup
255 self._next_keepalive_close_time = 0.0
256 self._keepalive_handle: asyncio.Handle | None = None
257 self._keepalive_timeout = keepalive_timeout
258 self._lingering_time = float(lingering_time)
260 self._messages: deque[_MsgType] = deque()
261 self._message_tail = b""
262 self._data_received_cb: Callable[[], None] | None = None
264 self._waiter: asyncio.Future[None] | None = None
265 self._handler_waiter: asyncio.Future[None] | None = None
266 self._task_handler: asyncio.Task[None] | None = None
267 self._payload_parser: Any = None
269 self._timeout_ceil_threshold: float = 5
270 try:
271 self._timeout_ceil_threshold = float(timeout_ceil_threshold)
272 except (TypeError, ValueError):
273 pass
275 self.logger = logger
276 self.access_log = access_log
277 if access_log:
278 if issubclass(access_log_class, AbstractAsyncAccessLogger):
279 self.access_logger: AbstractAsyncAccessLogger | None = (
280 access_log_class()
281 )
282 else:
283 access_logger = access_log_class(access_log, access_log_format)
284 self.access_logger = AccessLoggerWrapper(
285 access_logger,
286 self._loop,
287 )
288 self._logging_enabled = self.access_logger.enabled
289 else:
290 self.access_logger = None
291 self._logging_enabled = False
293 self._close = False
294 self._force_close = False
295 self._request_in_progress = False
296 self._cache: dict[str, Any] = {}
298 def __repr__(self) -> str:
299 return "<{} {}>".format(
300 self.__class__.__name__,
301 "connected" if self.transport is not None else "disconnected",
302 )
304 @under_cached_property
305 def ssl_context(self) -> Optional["ssl.SSLContext"]:
306 """Return SSLContext if available."""
307 return (
308 None
309 if self.transport is None
310 else self.transport.get_extra_info("sslcontext")
311 )
313 @under_cached_property
314 def peername(
315 self,
316 ) -> str | tuple[str, int, int, int] | tuple[str, int] | None:
317 """Return peername if available."""
318 return (
319 None
320 if self.transport is None
321 else self.transport.get_extra_info("peername")
322 )
324 @under_cached_property
325 def sockname(
326 self,
327 ) -> str | tuple[str, int, int, int] | tuple[str, int] | None:
328 """Return sockname if available."""
329 return (
330 None
331 if self.transport is None
332 else self.transport.get_extra_info("sockname")
333 )
335 @property
336 def keepalive_timeout(self) -> float:
337 return self._keepalive_timeout
339 async def shutdown(self, timeout: float | None = 15.0) -> None:
340 """Do worker process exit preparations.
342 We need to clean up everything and stop accepting requests.
343 It is especially important for keep-alive connections.
344 """
345 self._force_close = True
347 if self._keepalive_handle is not None:
348 self._keepalive_handle.cancel()
350 # Wait for graceful handler completion
351 if self._request_in_progress:
352 # The future is only created when we are shutting
353 # down while the handler is still processing a request
354 # to avoid creating a future for every request.
355 self._handler_waiter = self._loop.create_future()
356 try:
357 async with ceil_timeout(timeout):
358 await self._handler_waiter
359 except (asyncio.CancelledError, asyncio.TimeoutError):
360 self._handler_waiter = None
361 if (
362 sys.version_info >= (3, 11)
363 and (task := asyncio.current_task())
364 and task.cancelling()
365 ):
366 raise
367 # Then cancel handler and wait
368 try:
369 async with ceil_timeout(timeout):
370 if self._current_request is not None:
371 self._current_request._cancel(asyncio.CancelledError())
373 if self._task_handler is not None and not self._task_handler.done():
374 await asyncio.shield(self._task_handler)
375 except (asyncio.CancelledError, asyncio.TimeoutError):
376 if (
377 sys.version_info >= (3, 11)
378 and (task := asyncio.current_task())
379 and task.cancelling()
380 ):
381 raise
383 # force-close non-idle handler
384 if self._task_handler is not None:
385 self._task_handler.cancel()
387 self.force_close()
389 def connection_made(self, transport: asyncio.BaseTransport) -> None:
390 super().connection_made(transport)
392 real_transport = cast(asyncio.Transport, transport)
393 if self._tcp_keepalive:
394 tcp_keepalive(real_transport)
396 assert self._manager is not None
397 self._manager.connection_made(self, real_transport)
399 loop = self._loop
400 if sys.version_info >= (3, 12):
401 task = asyncio.Task(self.start(), loop=loop, eager_start=True)
402 else:
403 task = loop.create_task(self.start())
404 self._task_handler = task
406 def connection_lost(self, exc: BaseException | None) -> None:
407 if self._manager is None:
408 return
409 self._manager.connection_lost(self, exc)
411 # Grab value before setting _manager to None.
412 handler_cancellation = self._manager.handler_cancellation
414 self.force_close()
415 super().connection_lost(exc)
416 self._manager = None
417 self._request_factory = None
418 self._request_handler = None
419 self._parser = None
421 if self._keepalive_handle is not None:
422 self._keepalive_handle.cancel()
424 if self._current_request is not None:
425 if exc is None:
426 exc = ConnectionResetError("Connection lost")
427 self._current_request._cancel(exc)
429 if handler_cancellation and self._task_handler is not None:
430 self._task_handler.cancel()
432 self._task_handler = None
434 if self._payload_parser is not None:
435 self._payload_parser.feed_eof()
436 self._payload_parser = None
438 def set_parser(
439 self,
440 parser: WebSocketReader,
441 data_received_cb: Callable[[], None] | None = None,
442 ) -> None:
443 assert self._payload_parser is None
445 self._payload_parser = parser
446 self._data_received_cb = data_received_cb
448 if self._message_tail:
449 self._payload_parser.feed_data(self._message_tail)
450 self._message_tail = b""
452 def eof_received(self) -> None:
453 pass
455 def data_received(self, data: bytes) -> None:
456 if self._force_close or self._close:
457 return
458 # parse http messages
459 messages: Sequence[_MsgType]
460 if self._payload_parser is None and not self._upgraded:
461 assert self._parser is not None
462 try:
463 messages, upgraded, tail = self._parser.feed_data(data)
464 except HttpProcessingError as exc:
465 messages = [
466 (_ErrInfo(status=400, exc=exc, message=exc.message), EMPTY_PAYLOAD)
467 ]
468 upgraded = False
469 tail = b""
471 for msg, payload in messages:
472 self._request_count += 1
473 self._messages.append((msg, payload))
475 waiter = self._waiter
476 if messages and waiter is not None and not waiter.done():
477 # don't set result twice
478 waiter.set_result(None)
480 # Queue full: pause the transport (the parser already stopped
481 # emitting). start() resumes as it drains the queue.
482 if (
483 not self._msg_queue_paused
484 and len(self._messages) >= self._max_msg_queue_size
485 ):
486 self._pause_msg_queue_reading()
488 self._upgraded = upgraded
489 if upgraded and tail:
490 self._message_tail = tail
492 # no parser, just store
493 elif self._payload_parser is None and self._upgraded and data:
494 self._message_tail += data
496 # feed payload
497 elif data:
498 if self._data_received_cb is not None:
499 self._data_received_cb()
500 eof, tail = self._payload_parser.feed_data(data)
501 if eof:
502 self.close()
504 def _reading_paused_for_msg_queue(self) -> bool:
505 return self._msg_queue_paused
507 def _pause_msg_queue_reading(self) -> None:
508 self._msg_queue_paused = True
509 if self.transport is not None:
510 try:
511 self.transport.pause_reading()
512 except PAUSE_RESUME_READING_ERRORS:
513 # Transport lacks flow control; nothing to pause. Intentionally
514 # ignored (see PAUSE_RESUME_READING_ERRORS; do not use suppress).
515 pass
517 def _resume_msg_queue_reading(self) -> None:
518 if not self._upgraded:
519 # Reparse buffered pipelined requests while still marked paused so
520 # a refill past the limit does not re-pause an already-paused
521 # transport; only resume below once it stayed under the limit.
522 self.data_received(b"")
523 if len(self._messages) >= self._max_msg_queue_size:
524 return
525 self._msg_queue_paused = False
526 if not self._reading_paused and self.transport is not None:
527 try:
528 self.transport.resume_reading()
529 except PAUSE_RESUME_READING_ERRORS:
530 # Transport lacks flow control; nothing to resume. Intentionally
531 # ignored (see PAUSE_RESUME_READING_ERRORS; do not use suppress).
532 pass
534 def keep_alive(self, val: bool) -> None:
535 """Set keep-alive connection mode.
537 :param bool val: new state.
538 """
539 self._keepalive = val
540 if self._keepalive_handle:
541 self._keepalive_handle.cancel()
542 self._keepalive_handle = None
544 def close(self) -> None:
545 """Close connection.
547 Stop accepting new pipelining messages and close
548 connection when handlers done processing messages.
549 """
550 self._close = True
551 if self._waiter:
552 self._waiter.cancel()
554 def force_close(self) -> None:
555 """Forcefully close connection."""
556 self._force_close = True
557 if self._waiter:
558 self._waiter.cancel()
559 if self.transport is not None:
560 self.transport.close()
561 self.transport = None
563 async def log_access(
564 self,
565 request: BaseRequest,
566 response: StreamResponse,
567 request_start: float | None,
568 ) -> None:
569 if self._logging_enabled and self.access_logger is not None:
570 if TYPE_CHECKING:
571 assert request_start is not None
572 await self.access_logger.log(request, response, request_start)
574 def log_debug(self, *args: Any, **kw: Any) -> None:
575 if self._loop.get_debug():
576 self.logger.debug(*args, **kw)
578 def log_exception(self, *args: Any, **kw: Any) -> None:
579 self.logger.exception(*args, **kw)
581 def _process_keepalive(self) -> None:
582 self._keepalive_handle = None
583 if self._force_close or not self._keepalive:
584 return
586 loop = self._loop
587 now = loop.time()
588 close_time = self._next_keepalive_close_time
589 if now < close_time:
590 # Keep alive close check fired too early, reschedule
591 self._keepalive_handle = loop.call_at(close_time, self._process_keepalive)
592 return
594 # handler in idle state
595 if self._waiter and not self._waiter.done():
596 self.force_close()
598 async def _handle_request(
599 self,
600 request: _Request,
601 start_time: float | None,
602 request_handler: Callable[[_Request], Awaitable[StreamResponse]],
603 ) -> tuple[StreamResponse, bool]:
604 self._request_in_progress = True
605 try:
606 try:
607 self._current_request = request
608 resp = await request_handler(request)
609 finally:
610 self._current_request = None
611 except HTTPException as exc:
612 resp = Response(
613 status=exc.status, reason=exc.reason, text=exc.text, headers=exc.headers
614 )
615 resp._cookies = exc._cookies
616 resp, reset = await self.finish_response(request, resp, start_time)
617 except asyncio.CancelledError:
618 raise
619 except asyncio.TimeoutError as exc:
620 self.log_debug("Request handler timed out.", exc_info=exc)
621 resp = self.handle_error(request, 504)
622 resp, reset = await self.finish_response(request, resp, start_time)
623 except Exception as exc:
624 resp = self.handle_error(request, 500, exc)
625 resp, reset = await self.finish_response(request, resp, start_time)
626 else:
627 resp, reset = await self.finish_response(request, resp, start_time)
628 finally:
629 self._request_in_progress = False
630 if self._handler_waiter is not None:
631 self._handler_waiter.set_result(None)
633 return resp, reset
635 async def start(self) -> None:
636 """Process incoming request.
638 It reads request line, request headers and request payload, then
639 calls handle_request() method. Subclass has to override
640 handle_request(). start() handles various exceptions in request
641 or response handling. Connection is being closed always unless
642 keep_alive(True) specified.
643 """
644 loop = self._loop
645 manager = self._manager
646 assert manager is not None
647 keepalive_timeout = self._keepalive_timeout
648 resp = None
649 assert self._request_factory is not None
650 assert self._request_handler is not None
652 while not self._force_close:
653 if not self._messages:
654 try:
655 # wait for next request
656 self._waiter = loop.create_future()
657 await self._waiter
658 finally:
659 self._waiter = None
661 message, payload = self._messages.popleft()
663 # Free a parser slot; resume reading once drained to low water so
664 # pipelining keeps flowing while this request is handled.
665 # no branch: _parser is only None after connection_lost, whose path
666 # exits this loop, so the None case is not reachably exercisable.
667 if self._parser is not None: # pragma: no branch
668 self._parser.message_consumed()
669 if (
670 self._msg_queue_paused
671 and len(self._messages) <= self._msg_queue_resume_size
672 ):
673 self._resume_msg_queue_reading()
675 # time is only fetched if logging is enabled as otherwise
676 # its thrown away and never used.
677 start = loop.time() if self._logging_enabled else None
679 manager.requests_count += 1
680 writer = StreamWriter(self, loop)
681 if not isinstance(message, _ErrInfo):
682 request_handler = self._request_handler
683 else:
684 # make request_factory work
685 request_handler = self._make_error_handler(message)
686 message = ERROR
688 # Important don't hold a reference to the current task
689 # as on traceback it will prevent the task from being
690 # collected and will cause a memory leak.
691 request = self._request_factory(
692 message,
693 payload,
694 self,
695 writer,
696 self._task_handler or asyncio.current_task(loop), # type: ignore[arg-type]
697 )
698 try:
699 # a new task is used for copy context vars (#3406)
700 coro = self._handle_request(request, start, request_handler)
701 if sys.version_info >= (3, 12):
702 task = asyncio.Task(coro, loop=loop, eager_start=True)
703 else:
704 task = loop.create_task(coro)
705 try:
706 resp, reset = await task
707 except ConnectionError:
708 self.log_debug("Ignored premature client disconnection")
709 break
711 # Drop the processed task from asyncio.Task.all_tasks() early
712 del task
713 if reset:
714 self.log_debug("Ignored premature client disconnection 2")
715 break
717 # notify server about keep-alive
718 self._keepalive = bool(resp.keep_alive)
720 # check payload
721 if not payload.is_eof():
722 lingering_time = self._lingering_time
723 # Could be force closed while awaiting above tasks.
724 if not self._force_close and lingering_time: # type: ignore[redundant-expr]
725 self.log_debug(
726 "Start lingering close timer for %s sec.", lingering_time
727 )
729 now = loop.time()
730 end_t = now + lingering_time
732 try:
733 while not payload.is_eof() and now < end_t:
734 async with ceil_timeout(end_t - now):
735 # read and ignore
736 await payload.readany()
737 now = loop.time()
738 except (asyncio.CancelledError, asyncio.TimeoutError):
739 if (
740 sys.version_info >= (3, 11)
741 and (t := asyncio.current_task())
742 and t.cancelling()
743 ):
744 raise
746 # if payload still uncompleted
747 if not payload.is_eof() and not self._force_close:
748 self.log_debug("Uncompleted request.")
749 self.close()
751 payload.set_exception(_PAYLOAD_ACCESS_ERROR)
753 except asyncio.CancelledError:
754 self.log_debug("Ignored premature client disconnection")
755 self.force_close()
756 raise
757 except Exception as exc:
758 self.log_exception("Unhandled exception", exc_info=exc)
759 self.force_close()
760 except BaseException:
761 self.force_close()
762 raise
763 finally:
764 request._task = None # type: ignore[assignment] # Break reference cycle in case of exception
765 if self.transport is None and resp is not None:
766 self.log_debug("Ignored premature client disconnection.")
768 if self._keepalive and not self._close and not self._force_close:
769 # start keep-alive timer
770 close_time = loop.time() + keepalive_timeout
771 self._next_keepalive_close_time = close_time
772 if self._keepalive_handle is None:
773 self._keepalive_handle = loop.call_at(
774 close_time, self._process_keepalive
775 )
776 else:
777 break
779 # remove handler, close transport if no handlers left
780 if not self._force_close:
781 self._task_handler = None
782 if self.transport is not None:
783 self.transport.close()
785 async def finish_response(
786 self, request: BaseRequest, resp: StreamResponse, start_time: float | None
787 ) -> tuple[StreamResponse, bool]:
788 """Prepare the response and write_eof, then log access.
790 This has to
791 be called within the context of any exception so the access logger
792 can get exception information. Returns True if the client disconnects
793 prematurely.
794 """
795 request._finish()
796 if self._parser is not None:
797 self._parser.set_upgraded(False)
798 self._upgraded = False
799 if self._message_tail:
800 messages, _upgraded, tail = self._parser.feed_data(self._message_tail)
801 self._message_tail = tail
802 for msg, payload in messages:
803 self._request_count += 1
804 self._messages.append((msg, payload))
805 # This shouldn't be possible. If a future refactor results in this
806 # failing, then the code may need to be updated to set the waiter.
807 assert self._waiter is None
808 try:
809 prepare_meth = resp.prepare
810 except AttributeError:
811 if resp is None:
812 self.log_exception("Missing return statement on request handler") # type: ignore[unreachable]
813 else:
814 self.log_exception(
815 f"Web-handler should return a response instance, got {resp!r}"
816 )
817 exc = HTTPInternalServerError()
818 resp = Response(
819 status=exc.status, reason=exc.reason, text=exc.text, headers=exc.headers
820 )
821 prepare_meth = resp.prepare
822 try:
823 await prepare_meth(request)
824 await resp.write_eof()
825 except ConnectionError:
826 await self.log_access(request, resp, start_time)
827 return resp, True
829 await self.log_access(request, resp, start_time)
830 return resp, False
832 def handle_error(
833 self,
834 request: BaseRequest,
835 status: int = 500,
836 exc: BaseException | None = None,
837 message: str | None = None,
838 ) -> StreamResponse:
839 """Handle errors.
841 Returns HTTP response with specific status code. Logs additional
842 information. It always closes current connection.
843 """
844 if self._request_count == 1 and isinstance(exc, BadHttpMethod):
845 # BadHttpMethod is common when a client sends non-HTTP
846 # or encrypted traffic to an HTTP port. This is expected
847 # to happen when connected to the public internet so we log
848 # it at the debug level as to not fill logs with noise.
849 self.logger.debug(
850 "Error handling request from %s", request.remote, exc_info=exc
851 )
852 else:
853 self.log_exception(
854 "Error handling request from %s", request.remote, exc_info=exc
855 )
857 # some data already got sent, connection is broken
858 if request.writer.output_size > 0:
859 raise ConnectionError(
860 "Response is sent already, cannot send another response "
861 "with the error message"
862 )
864 ct = "text/plain"
865 if status == HTTPStatus.INTERNAL_SERVER_ERROR:
866 title = f"{HTTPStatus.INTERNAL_SERVER_ERROR.value} {HTTPStatus.INTERNAL_SERVER_ERROR.phrase}"
867 msg = HTTPStatus.INTERNAL_SERVER_ERROR.description
868 tb = None
869 if self._loop.get_debug():
870 with suppress(Exception):
871 tb = traceback.format_exc()
873 if "text/html" in request.headers.get("Accept", ""):
874 if tb:
875 tb = html_escape(tb)
876 msg = f"<h2>Traceback:</h2>\n<pre>{tb}</pre>"
877 message = (
878 "<html><head>"
879 f"<title>{title}</title>"
880 f"</head><body>\n<h1>{title}</h1>"
881 f"\n{msg}\n</body></html>\n"
882 )
883 ct = "text/html"
884 else:
885 if tb:
886 msg = tb
887 message = title + "\n\n" + msg
889 resp = Response(status=status, text=message, content_type=ct)
890 resp.force_close()
892 return resp
894 def _make_error_handler(
895 self, err_info: _ErrInfo
896 ) -> Callable[[BaseRequest], Awaitable[StreamResponse]]:
897 async def handler(request: BaseRequest) -> StreamResponse:
898 return self.handle_error(
899 request, err_info.status, err_info.exc, err_info.message
900 )
902 return handler