Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_protocol.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

441 statements  

1import asyncio 

2import asyncio.streams 

3import sys 

4import traceback 

5from collections import deque 

6from collections.abc import Awaitable, Callable, Sequence 

7from contextlib import suppress 

8from html import escape as html_escape 

9from http import HTTPStatus 

10from logging import Logger 

11from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, Union, cast 

12 

13import yarl 

14from propcache import under_cached_property 

15 

16from .abc import AbstractAccessLogger, AbstractAsyncAccessLogger, AbstractStreamWriter 

17from .base_protocol import PAUSE_RESUME_READING_ERRORS, BaseProtocol 

18from .helpers import DEFAULT_CHUNK_SIZE, ceil_timeout, frozen_dataclass_decorator 

19from .http import ( 

20 HttpProcessingError, 

21 HttpRequestParser, 

22 HttpVersion10, 

23 RawRequestMessage, 

24 StreamWriter, 

25 WebSocketReader, 

26) 

27from .http_exceptions import BadHttpMethod 

28from .log import access_logger, server_logger 

29from .streams import EMPTY_PAYLOAD, StreamReader 

30from .tcp_helpers import tcp_keepalive 

31from .web_exceptions import HTTPException, HTTPInternalServerError 

32from .web_log import AccessLogger 

33from .web_request import BaseRequest 

34from .web_response import Response, StreamResponse 

35 

36__all__ = ("RequestHandler", "RequestPayloadError", "PayloadAccessError") 

37 

38# Max parsed-but-unhandled pipelined requests buffered per connection before 

39# reading is paused. Bounds memory a client can pin by keeping one handler busy 

40# and pipelining behind it; reading resumes as the queue drains. 

41MAX_MSG_QUEUE_SIZE = 32 

42 

43if TYPE_CHECKING: 

44 import ssl 

45 

46 from .web_server import Server 

47 

48 

49_Request = TypeVar("_Request", bound=BaseRequest) 

50_RequestFactory = Callable[ 

51 [ 

52 RawRequestMessage, 

53 StreamReader, 

54 "RequestHandler[_Request]", 

55 AbstractStreamWriter, 

56 "asyncio.Task[None]", 

57 ], 

58 _Request, 

59] 

60 

61_RequestHandler = Callable[[_Request], Awaitable[StreamResponse]] 

62_AnyAbstractAccessLogger = Union[ 

63 type[AbstractAsyncAccessLogger], 

64 type[AbstractAccessLogger], 

65] 

66 

67ERROR = RawRequestMessage( 

68 "UNKNOWN", 

69 "/", 

70 HttpVersion10, 

71 {}, # type: ignore[arg-type] 

72 {}, # type: ignore[arg-type] 

73 True, 

74 None, 

75 False, 

76 False, 

77 yarl.URL("/"), 

78) 

79 

80 

81class RequestPayloadError(Exception): 

82 """Payload parsing error.""" 

83 

84 

85class PayloadAccessError(Exception): 

86 """Payload was accessed after response was sent.""" 

87 

88 

89_PAYLOAD_ACCESS_ERROR = PayloadAccessError() 

90 

91 

92class AccessLoggerWrapper(AbstractAsyncAccessLogger): 

93 """Wrap an AbstractAccessLogger so it behaves like an AbstractAsyncAccessLogger.""" 

94 

95 __slots__ = ("access_logger", "_loop") 

96 

97 def __init__( 

98 self, access_logger: AbstractAccessLogger, loop: asyncio.AbstractEventLoop 

99 ) -> None: 

100 self.access_logger = access_logger 

101 self._loop = loop 

102 super().__init__() 

103 

104 async def log( 

105 self, request: BaseRequest, response: StreamResponse, request_start: float 

106 ) -> None: 

107 self.access_logger.log(request, response, self._loop.time() - request_start) 

108 

109 @property 

110 def enabled(self) -> bool: 

111 """Check if logger is enabled.""" 

112 return self.access_logger.enabled 

113 

114 

115@frozen_dataclass_decorator 

116class _ErrInfo: 

117 status: int 

118 exc: BaseException 

119 message: str 

120 

121 

122_MsgType = tuple[RawRequestMessage | _ErrInfo, StreamReader] 

123 

124 

125class RequestHandler(BaseProtocol, Generic[_Request]): 

126 """HTTP protocol implementation. 

127 

128 RequestHandler handles incoming HTTP request. It reads request line, 

129 request headers and request payload and calls handle_request() method. 

130 By default it always returns with 404 response. 

131 

132 RequestHandler handles errors in incoming request, like bad 

133 status line, bad headers or incomplete payload. If any error occurs, 

134 connection gets closed. 

135 

136 keepalive_timeout -- number of seconds before closing 

137 keep-alive connection 

138 

139 tcp_keepalive -- TCP keep-alive is on, default is on 

140 

141 logger -- custom logger object 

142 

143 access_log_class -- custom class for access_logger 

144 

145 access_log -- custom logging object 

146 

147 access_log_format -- access log format string 

148 

149 loop -- Optional event loop 

150 

151 max_line_size -- Optional maximum header line size 

152 

153 max_field_size -- Optional maximum header field size 

154 

155 timeout_ceil_threshold -- Optional value to specify 

156 threshold to ceil() timeout 

157 values 

158 

159 """ 

160 

161 __slots__ = ( 

162 "max_field_size", 

163 "max_headers", 

164 "max_line_size", 

165 "_request_count", 

166 "_keepalive", 

167 "_manager", 

168 "_request_handler", 

169 "_request_factory", 

170 "_tcp_keepalive", 

171 "_next_keepalive_close_time", 

172 "_keepalive_handle", 

173 "_keepalive_timeout", 

174 "_lingering_time", 

175 "_messages", 

176 "_max_msg_queue_size", 

177 "_msg_queue_resume_size", 

178 "_msg_queue_paused", 

179 "_message_tail", 

180 "_handler_waiter", 

181 "_waiter", 

182 "_task_handler", 

183 "_payload_parser", 

184 "_data_received_cb", 

185 "logger", 

186 "access_log", 

187 "access_logger", 

188 "_close", 

189 "_force_close", 

190 "_current_request", 

191 "_timeout_ceil_threshold", 

192 "_request_in_progress", 

193 "_logging_enabled", 

194 "_cache", 

195 ) 

196 

197 def __init__( 

198 self, 

199 manager: "Server[_Request]", 

200 *, 

201 loop: asyncio.AbstractEventLoop, 

202 # Default should be high enough that it's likely longer than a reverse proxy. 

203 keepalive_timeout: float = 3630, 

204 tcp_keepalive: bool = True, 

205 logger: Logger = server_logger, 

206 access_log_class: _AnyAbstractAccessLogger = AccessLogger, 

207 access_log: Logger | None = access_logger, 

208 access_log_format: str = AccessLogger.LOG_FORMAT, 

209 max_line_size: int = 8190, 

210 max_headers: int = 128, 

211 max_field_size: int = 8190, 

212 lingering_time: float = 10.0, 

213 read_bufsize: int = DEFAULT_CHUNK_SIZE, 

214 auto_decompress: bool = True, 

215 timeout_ceil_threshold: float = 5, 

216 ): 

217 self._max_msg_queue_size = MAX_MSG_QUEUE_SIZE 

218 # Low-water mark: resume reading once the queue drains to half the limit 

219 # so we refill in batches instead of churning pause/resume per request. 

220 self._msg_queue_resume_size = MAX_MSG_QUEUE_SIZE // 2 

221 # Set before super().__init__ so _reading_paused_for_msg_queue() is safe 

222 # if BaseProtocol ever triggers a resume during init. 

223 self._msg_queue_paused = False 

224 parser = HttpRequestParser( 

225 self, 

226 loop, 

227 read_bufsize, 

228 max_line_size=max_line_size, 

229 max_field_size=max_field_size, 

230 max_headers=max_headers, 

231 payload_exception=RequestPayloadError, 

232 auto_decompress=auto_decompress, 

233 max_msg_queue_size=MAX_MSG_QUEUE_SIZE, 

234 ) 

235 super().__init__(loop, parser) 

236 

237 # _request_count is the number of requests processed with the same connection. 

238 self._request_count = 0 

239 self._keepalive = False 

240 self._current_request: _Request | None = None 

241 self._manager: Server[_Request] | None = manager 

242 self._request_handler: _RequestHandler[_Request] | None = ( 

243 manager.request_handler 

244 ) 

245 self._request_factory: _RequestFactory[_Request] | None = ( 

246 manager.request_factory 

247 ) 

248 

249 self.max_line_size = max_line_size 

250 self.max_headers = max_headers 

251 self.max_field_size = max_field_size 

252 

253 self._tcp_keepalive = tcp_keepalive 

254 # placeholder to be replaced on keepalive timeout setup 

255 self._next_keepalive_close_time = 0.0 

256 self._keepalive_handle: asyncio.Handle | None = None 

257 self._keepalive_timeout = keepalive_timeout 

258 self._lingering_time = float(lingering_time) 

259 

260 self._messages: deque[_MsgType] = deque() 

261 self._message_tail = b"" 

262 self._data_received_cb: Callable[[], None] | None = None 

263 

264 self._waiter: asyncio.Future[None] | None = None 

265 self._handler_waiter: asyncio.Future[None] | None = None 

266 self._task_handler: asyncio.Task[None] | None = None 

267 self._payload_parser: Any = None 

268 

269 self._timeout_ceil_threshold: float = 5 

270 try: 

271 self._timeout_ceil_threshold = float(timeout_ceil_threshold) 

272 except (TypeError, ValueError): 

273 pass 

274 

275 self.logger = logger 

276 self.access_log = access_log 

277 if access_log: 

278 if issubclass(access_log_class, AbstractAsyncAccessLogger): 

279 self.access_logger: AbstractAsyncAccessLogger | None = ( 

280 access_log_class() 

281 ) 

282 else: 

283 access_logger = access_log_class(access_log, access_log_format) 

284 self.access_logger = AccessLoggerWrapper( 

285 access_logger, 

286 self._loop, 

287 ) 

288 self._logging_enabled = self.access_logger.enabled 

289 else: 

290 self.access_logger = None 

291 self._logging_enabled = False 

292 

293 self._close = False 

294 self._force_close = False 

295 self._request_in_progress = False 

296 self._cache: dict[str, Any] = {} 

297 

298 def __repr__(self) -> str: 

299 return "<{} {}>".format( 

300 self.__class__.__name__, 

301 "connected" if self.transport is not None else "disconnected", 

302 ) 

303 

304 @under_cached_property 

305 def ssl_context(self) -> Optional["ssl.SSLContext"]: 

306 """Return SSLContext if available.""" 

307 return ( 

308 None 

309 if self.transport is None 

310 else self.transport.get_extra_info("sslcontext") 

311 ) 

312 

313 @under_cached_property 

314 def peername( 

315 self, 

316 ) -> str | tuple[str, int, int, int] | tuple[str, int] | None: 

317 """Return peername if available.""" 

318 return ( 

319 None 

320 if self.transport is None 

321 else self.transport.get_extra_info("peername") 

322 ) 

323 

324 @under_cached_property 

325 def sockname( 

326 self, 

327 ) -> str | tuple[str, int, int, int] | tuple[str, int] | None: 

328 """Return sockname if available.""" 

329 return ( 

330 None 

331 if self.transport is None 

332 else self.transport.get_extra_info("sockname") 

333 ) 

334 

335 @property 

336 def keepalive_timeout(self) -> float: 

337 return self._keepalive_timeout 

338 

339 async def shutdown(self, timeout: float | None = 15.0) -> None: 

340 """Do worker process exit preparations. 

341 

342 We need to clean up everything and stop accepting requests. 

343 It is especially important for keep-alive connections. 

344 """ 

345 self._force_close = True 

346 

347 if self._keepalive_handle is not None: 

348 self._keepalive_handle.cancel() 

349 

350 # Wait for graceful handler completion 

351 if self._request_in_progress: 

352 # The future is only created when we are shutting 

353 # down while the handler is still processing a request 

354 # to avoid creating a future for every request. 

355 self._handler_waiter = self._loop.create_future() 

356 try: 

357 async with ceil_timeout(timeout): 

358 await self._handler_waiter 

359 except (asyncio.CancelledError, asyncio.TimeoutError): 

360 self._handler_waiter = None 

361 if ( 

362 sys.version_info >= (3, 11) 

363 and (task := asyncio.current_task()) 

364 and task.cancelling() 

365 ): 

366 raise 

367 # Then cancel handler and wait 

368 try: 

369 async with ceil_timeout(timeout): 

370 if self._current_request is not None: 

371 self._current_request._cancel(asyncio.CancelledError()) 

372 

373 if self._task_handler is not None and not self._task_handler.done(): 

374 await asyncio.shield(self._task_handler) 

375 except (asyncio.CancelledError, asyncio.TimeoutError): 

376 if ( 

377 sys.version_info >= (3, 11) 

378 and (task := asyncio.current_task()) 

379 and task.cancelling() 

380 ): 

381 raise 

382 

383 # force-close non-idle handler 

384 if self._task_handler is not None: 

385 self._task_handler.cancel() 

386 

387 self.force_close() 

388 

389 def connection_made(self, transport: asyncio.BaseTransport) -> None: 

390 super().connection_made(transport) 

391 

392 real_transport = cast(asyncio.Transport, transport) 

393 if self._tcp_keepalive: 

394 tcp_keepalive(real_transport) 

395 

396 assert self._manager is not None 

397 self._manager.connection_made(self, real_transport) 

398 

399 loop = self._loop 

400 if sys.version_info >= (3, 12): 

401 task = asyncio.Task(self.start(), loop=loop, eager_start=True) 

402 else: 

403 task = loop.create_task(self.start()) 

404 self._task_handler = task 

405 

406 def connection_lost(self, exc: BaseException | None) -> None: 

407 if self._manager is None: 

408 return 

409 self._manager.connection_lost(self, exc) 

410 

411 # Grab value before setting _manager to None. 

412 handler_cancellation = self._manager.handler_cancellation 

413 

414 self.force_close() 

415 super().connection_lost(exc) 

416 self._manager = None 

417 self._request_factory = None 

418 self._request_handler = None 

419 self._parser = None 

420 

421 if self._keepalive_handle is not None: 

422 self._keepalive_handle.cancel() 

423 

424 if self._current_request is not None: 

425 if exc is None: 

426 exc = ConnectionResetError("Connection lost") 

427 self._current_request._cancel(exc) 

428 

429 if handler_cancellation and self._task_handler is not None: 

430 self._task_handler.cancel() 

431 

432 self._task_handler = None 

433 

434 if self._payload_parser is not None: 

435 self._payload_parser.feed_eof() 

436 self._payload_parser = None 

437 

438 def set_parser( 

439 self, 

440 parser: WebSocketReader, 

441 data_received_cb: Callable[[], None] | None = None, 

442 ) -> None: 

443 assert self._payload_parser is None 

444 

445 self._payload_parser = parser 

446 self._data_received_cb = data_received_cb 

447 

448 if self._message_tail: 

449 self._payload_parser.feed_data(self._message_tail) 

450 self._message_tail = b"" 

451 

452 def eof_received(self) -> None: 

453 pass 

454 

455 def data_received(self, data: bytes) -> None: 

456 if self._force_close or self._close: 

457 return 

458 # parse http messages 

459 messages: Sequence[_MsgType] 

460 if self._payload_parser is None and not self._upgraded: 

461 assert self._parser is not None 

462 try: 

463 messages, upgraded, tail = self._parser.feed_data(data) 

464 except HttpProcessingError as exc: 

465 messages = [ 

466 (_ErrInfo(status=400, exc=exc, message=exc.message), EMPTY_PAYLOAD) 

467 ] 

468 upgraded = False 

469 tail = b"" 

470 

471 for msg, payload in messages: 

472 self._request_count += 1 

473 self._messages.append((msg, payload)) 

474 

475 waiter = self._waiter 

476 if messages and waiter is not None and not waiter.done(): 

477 # don't set result twice 

478 waiter.set_result(None) 

479 

480 # Queue full: pause the transport (the parser already stopped 

481 # emitting). start() resumes as it drains the queue. 

482 if ( 

483 not self._msg_queue_paused 

484 and len(self._messages) >= self._max_msg_queue_size 

485 ): 

486 self._pause_msg_queue_reading() 

487 

488 self._upgraded = upgraded 

489 if upgraded and tail: 

490 self._message_tail = tail 

491 

492 # no parser, just store 

493 elif self._payload_parser is None and self._upgraded and data: 

494 self._message_tail += data 

495 

496 # feed payload 

497 elif data: 

498 if self._data_received_cb is not None: 

499 self._data_received_cb() 

500 eof, tail = self._payload_parser.feed_data(data) 

501 if eof: 

502 self.close() 

503 

504 def _reading_paused_for_msg_queue(self) -> bool: 

505 return self._msg_queue_paused 

506 

507 def _pause_msg_queue_reading(self) -> None: 

508 self._msg_queue_paused = True 

509 if self.transport is not None: 

510 try: 

511 self.transport.pause_reading() 

512 except PAUSE_RESUME_READING_ERRORS: 

513 # Transport lacks flow control; nothing to pause. Intentionally 

514 # ignored (see PAUSE_RESUME_READING_ERRORS; do not use suppress). 

515 pass 

516 

517 def _resume_msg_queue_reading(self) -> None: 

518 if not self._upgraded: 

519 # Reparse buffered pipelined requests while still marked paused so 

520 # a refill past the limit does not re-pause an already-paused 

521 # transport; only resume below once it stayed under the limit. 

522 self.data_received(b"") 

523 if len(self._messages) >= self._max_msg_queue_size: 

524 return 

525 self._msg_queue_paused = False 

526 if not self._reading_paused and self.transport is not None: 

527 try: 

528 self.transport.resume_reading() 

529 except PAUSE_RESUME_READING_ERRORS: 

530 # Transport lacks flow control; nothing to resume. Intentionally 

531 # ignored (see PAUSE_RESUME_READING_ERRORS; do not use suppress). 

532 pass 

533 

534 def keep_alive(self, val: bool) -> None: 

535 """Set keep-alive connection mode. 

536 

537 :param bool val: new state. 

538 """ 

539 self._keepalive = val 

540 if self._keepalive_handle: 

541 self._keepalive_handle.cancel() 

542 self._keepalive_handle = None 

543 

544 def close(self) -> None: 

545 """Close connection. 

546 

547 Stop accepting new pipelining messages and close 

548 connection when handlers done processing messages. 

549 """ 

550 self._close = True 

551 if self._waiter: 

552 self._waiter.cancel() 

553 

554 def force_close(self) -> None: 

555 """Forcefully close connection.""" 

556 self._force_close = True 

557 if self._waiter: 

558 self._waiter.cancel() 

559 if self.transport is not None: 

560 self.transport.close() 

561 self.transport = None 

562 

563 async def log_access( 

564 self, 

565 request: BaseRequest, 

566 response: StreamResponse, 

567 request_start: float | None, 

568 ) -> None: 

569 if self._logging_enabled and self.access_logger is not None: 

570 if TYPE_CHECKING: 

571 assert request_start is not None 

572 await self.access_logger.log(request, response, request_start) 

573 

574 def log_debug(self, *args: Any, **kw: Any) -> None: 

575 if self._loop.get_debug(): 

576 self.logger.debug(*args, **kw) 

577 

578 def log_exception(self, *args: Any, **kw: Any) -> None: 

579 self.logger.exception(*args, **kw) 

580 

581 def _process_keepalive(self) -> None: 

582 self._keepalive_handle = None 

583 if self._force_close or not self._keepalive: 

584 return 

585 

586 loop = self._loop 

587 now = loop.time() 

588 close_time = self._next_keepalive_close_time 

589 if now < close_time: 

590 # Keep alive close check fired too early, reschedule 

591 self._keepalive_handle = loop.call_at(close_time, self._process_keepalive) 

592 return 

593 

594 # handler in idle state 

595 if self._waiter and not self._waiter.done(): 

596 self.force_close() 

597 

598 async def _handle_request( 

599 self, 

600 request: _Request, 

601 start_time: float | None, 

602 request_handler: Callable[[_Request], Awaitable[StreamResponse]], 

603 ) -> tuple[StreamResponse, bool]: 

604 self._request_in_progress = True 

605 try: 

606 try: 

607 self._current_request = request 

608 resp = await request_handler(request) 

609 finally: 

610 self._current_request = None 

611 except HTTPException as exc: 

612 resp = Response( 

613 status=exc.status, reason=exc.reason, text=exc.text, headers=exc.headers 

614 ) 

615 resp._cookies = exc._cookies 

616 resp, reset = await self.finish_response(request, resp, start_time) 

617 except asyncio.CancelledError: 

618 raise 

619 except asyncio.TimeoutError as exc: 

620 self.log_debug("Request handler timed out.", exc_info=exc) 

621 resp = self.handle_error(request, 504) 

622 resp, reset = await self.finish_response(request, resp, start_time) 

623 except Exception as exc: 

624 resp = self.handle_error(request, 500, exc) 

625 resp, reset = await self.finish_response(request, resp, start_time) 

626 else: 

627 resp, reset = await self.finish_response(request, resp, start_time) 

628 finally: 

629 self._request_in_progress = False 

630 if self._handler_waiter is not None: 

631 self._handler_waiter.set_result(None) 

632 

633 return resp, reset 

634 

635 async def start(self) -> None: 

636 """Process incoming request. 

637 

638 It reads request line, request headers and request payload, then 

639 calls handle_request() method. Subclass has to override 

640 handle_request(). start() handles various exceptions in request 

641 or response handling. Connection is being closed always unless 

642 keep_alive(True) specified. 

643 """ 

644 loop = self._loop 

645 manager = self._manager 

646 assert manager is not None 

647 keepalive_timeout = self._keepalive_timeout 

648 resp = None 

649 assert self._request_factory is not None 

650 assert self._request_handler is not None 

651 

652 while not self._force_close: 

653 if not self._messages: 

654 try: 

655 # wait for next request 

656 self._waiter = loop.create_future() 

657 await self._waiter 

658 finally: 

659 self._waiter = None 

660 

661 message, payload = self._messages.popleft() 

662 

663 # Free a parser slot; resume reading once drained to low water so 

664 # pipelining keeps flowing while this request is handled. 

665 # no branch: _parser is only None after connection_lost, whose path 

666 # exits this loop, so the None case is not reachably exercisable. 

667 if self._parser is not None: # pragma: no branch 

668 self._parser.message_consumed() 

669 if ( 

670 self._msg_queue_paused 

671 and len(self._messages) <= self._msg_queue_resume_size 

672 ): 

673 self._resume_msg_queue_reading() 

674 

675 # time is only fetched if logging is enabled as otherwise 

676 # its thrown away and never used. 

677 start = loop.time() if self._logging_enabled else None 

678 

679 manager.requests_count += 1 

680 writer = StreamWriter(self, loop) 

681 if not isinstance(message, _ErrInfo): 

682 request_handler = self._request_handler 

683 else: 

684 # make request_factory work 

685 request_handler = self._make_error_handler(message) 

686 message = ERROR 

687 

688 # Important don't hold a reference to the current task 

689 # as on traceback it will prevent the task from being 

690 # collected and will cause a memory leak. 

691 request = self._request_factory( 

692 message, 

693 payload, 

694 self, 

695 writer, 

696 self._task_handler or asyncio.current_task(loop), # type: ignore[arg-type] 

697 ) 

698 try: 

699 # a new task is used for copy context vars (#3406) 

700 coro = self._handle_request(request, start, request_handler) 

701 if sys.version_info >= (3, 12): 

702 task = asyncio.Task(coro, loop=loop, eager_start=True) 

703 else: 

704 task = loop.create_task(coro) 

705 try: 

706 resp, reset = await task 

707 except ConnectionError: 

708 self.log_debug("Ignored premature client disconnection") 

709 break 

710 

711 # Drop the processed task from asyncio.Task.all_tasks() early 

712 del task 

713 if reset: 

714 self.log_debug("Ignored premature client disconnection 2") 

715 break 

716 

717 # notify server about keep-alive 

718 self._keepalive = bool(resp.keep_alive) 

719 

720 # check payload 

721 if not payload.is_eof(): 

722 lingering_time = self._lingering_time 

723 # Could be force closed while awaiting above tasks. 

724 if not self._force_close and lingering_time: # type: ignore[redundant-expr] 

725 self.log_debug( 

726 "Start lingering close timer for %s sec.", lingering_time 

727 ) 

728 

729 now = loop.time() 

730 end_t = now + lingering_time 

731 

732 try: 

733 while not payload.is_eof() and now < end_t: 

734 async with ceil_timeout(end_t - now): 

735 # read and ignore 

736 await payload.readany() 

737 now = loop.time() 

738 except (asyncio.CancelledError, asyncio.TimeoutError): 

739 if ( 

740 sys.version_info >= (3, 11) 

741 and (t := asyncio.current_task()) 

742 and t.cancelling() 

743 ): 

744 raise 

745 

746 # if payload still uncompleted 

747 if not payload.is_eof() and not self._force_close: 

748 self.log_debug("Uncompleted request.") 

749 self.close() 

750 

751 payload.set_exception(_PAYLOAD_ACCESS_ERROR) 

752 

753 except asyncio.CancelledError: 

754 self.log_debug("Ignored premature client disconnection") 

755 self.force_close() 

756 raise 

757 except Exception as exc: 

758 self.log_exception("Unhandled exception", exc_info=exc) 

759 self.force_close() 

760 except BaseException: 

761 self.force_close() 

762 raise 

763 finally: 

764 request._task = None # type: ignore[assignment] # Break reference cycle in case of exception 

765 if self.transport is None and resp is not None: 

766 self.log_debug("Ignored premature client disconnection.") 

767 

768 if self._keepalive and not self._close and not self._force_close: 

769 # start keep-alive timer 

770 close_time = loop.time() + keepalive_timeout 

771 self._next_keepalive_close_time = close_time 

772 if self._keepalive_handle is None: 

773 self._keepalive_handle = loop.call_at( 

774 close_time, self._process_keepalive 

775 ) 

776 else: 

777 break 

778 

779 # remove handler, close transport if no handlers left 

780 if not self._force_close: 

781 self._task_handler = None 

782 if self.transport is not None: 

783 self.transport.close() 

784 

785 async def finish_response( 

786 self, request: BaseRequest, resp: StreamResponse, start_time: float | None 

787 ) -> tuple[StreamResponse, bool]: 

788 """Prepare the response and write_eof, then log access. 

789 

790 This has to 

791 be called within the context of any exception so the access logger 

792 can get exception information. Returns True if the client disconnects 

793 prematurely. 

794 """ 

795 request._finish() 

796 if self._parser is not None: 

797 self._parser.set_upgraded(False) 

798 self._upgraded = False 

799 if self._message_tail: 

800 messages, _upgraded, tail = self._parser.feed_data(self._message_tail) 

801 self._message_tail = tail 

802 for msg, payload in messages: 

803 self._request_count += 1 

804 self._messages.append((msg, payload)) 

805 # This shouldn't be possible. If a future refactor results in this 

806 # failing, then the code may need to be updated to set the waiter. 

807 assert self._waiter is None 

808 try: 

809 prepare_meth = resp.prepare 

810 except AttributeError: 

811 if resp is None: 

812 self.log_exception("Missing return statement on request handler") # type: ignore[unreachable] 

813 else: 

814 self.log_exception( 

815 f"Web-handler should return a response instance, got {resp!r}" 

816 ) 

817 exc = HTTPInternalServerError() 

818 resp = Response( 

819 status=exc.status, reason=exc.reason, text=exc.text, headers=exc.headers 

820 ) 

821 prepare_meth = resp.prepare 

822 try: 

823 await prepare_meth(request) 

824 await resp.write_eof() 

825 except ConnectionError: 

826 await self.log_access(request, resp, start_time) 

827 return resp, True 

828 

829 await self.log_access(request, resp, start_time) 

830 return resp, False 

831 

832 def handle_error( 

833 self, 

834 request: BaseRequest, 

835 status: int = 500, 

836 exc: BaseException | None = None, 

837 message: str | None = None, 

838 ) -> StreamResponse: 

839 """Handle errors. 

840 

841 Returns HTTP response with specific status code. Logs additional 

842 information. It always closes current connection. 

843 """ 

844 if self._request_count == 1 and isinstance(exc, BadHttpMethod): 

845 # BadHttpMethod is common when a client sends non-HTTP 

846 # or encrypted traffic to an HTTP port. This is expected 

847 # to happen when connected to the public internet so we log 

848 # it at the debug level as to not fill logs with noise. 

849 self.logger.debug( 

850 "Error handling request from %s", request.remote, exc_info=exc 

851 ) 

852 else: 

853 self.log_exception( 

854 "Error handling request from %s", request.remote, exc_info=exc 

855 ) 

856 

857 # some data already got sent, connection is broken 

858 if request.writer.output_size > 0: 

859 raise ConnectionError( 

860 "Response is sent already, cannot send another response " 

861 "with the error message" 

862 ) 

863 

864 ct = "text/plain" 

865 if status == HTTPStatus.INTERNAL_SERVER_ERROR: 

866 title = f"{HTTPStatus.INTERNAL_SERVER_ERROR.value} {HTTPStatus.INTERNAL_SERVER_ERROR.phrase}" 

867 msg = HTTPStatus.INTERNAL_SERVER_ERROR.description 

868 tb = None 

869 if self._loop.get_debug(): 

870 with suppress(Exception): 

871 tb = traceback.format_exc() 

872 

873 if "text/html" in request.headers.get("Accept", ""): 

874 if tb: 

875 tb = html_escape(tb) 

876 msg = f"<h2>Traceback:</h2>\n<pre>{tb}</pre>" 

877 message = ( 

878 "<html><head>" 

879 f"<title>{title}</title>" 

880 f"</head><body>\n<h1>{title}</h1>" 

881 f"\n{msg}\n</body></html>\n" 

882 ) 

883 ct = "text/html" 

884 else: 

885 if tb: 

886 msg = tb 

887 message = title + "\n\n" + msg 

888 

889 resp = Response(status=status, text=message, content_type=ct) 

890 resp.force_close() 

891 

892 return resp 

893 

894 def _make_error_handler( 

895 self, err_info: _ErrInfo 

896 ) -> Callable[[BaseRequest], Awaitable[StreamResponse]]: 

897 async def handler(request: BaseRequest) -> StreamResponse: 

898 return self.handle_error( 

899 request, err_info.status, err_info.exc, err_info.message 

900 ) 

901 

902 return handler