Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/worker.py: 7%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Async gunicorn worker for aiohttp.web"""
3import asyncio
4import inspect
5import os
6import re
7import signal
8import sys
9from types import FrameType
10from typing import TYPE_CHECKING, Any, Optional
12from gunicorn.config import AccessLogFormat as GunicornAccessLogFormat
13from gunicorn.workers import base
15from aiohttp import web
17from .helpers import set_result
18from .web_app import Application
19from .web_log import AccessLogger
21if TYPE_CHECKING:
22 import ssl
24 SSLContext = ssl.SSLContext
25else:
26 try:
27 import ssl
29 SSLContext = ssl.SSLContext
30 except ImportError: # pragma: no cover
31 ssl = None # type: ignore[assignment]
32 SSLContext = object # type: ignore[misc,assignment]
35__all__ = ("GunicornWebWorker", "GunicornUVLoopWebWorker")
38class GunicornWebWorker(base.Worker): # type: ignore[misc,no-any-unimported]
39 DEFAULT_AIOHTTP_LOG_FORMAT = AccessLogger.LOG_FORMAT
40 DEFAULT_GUNICORN_LOG_FORMAT = GunicornAccessLogFormat.default
42 def __init__(self, *args: Any, **kw: Any) -> None: # pragma: no cover
43 super().__init__(*args, **kw)
45 self._task: Optional[asyncio.Task[None]] = None
46 self.exit_code = 0
47 self._notify_waiter: Optional[asyncio.Future[bool]] = None
49 def init_process(self) -> None:
50 # create new event_loop after fork
51 try:
52 asyncio.get_event_loop().close()
53 except RuntimeError:
54 # No loop was running
55 pass
57 self.loop = asyncio.new_event_loop()
58 asyncio.set_event_loop(self.loop)
60 super().init_process()
62 def run(self) -> None:
63 self._task = self.loop.create_task(self._run())
65 try: # ignore all finalization problems
66 self.loop.run_until_complete(self._task)
67 except Exception:
68 self.log.exception("Exception in gunicorn worker")
69 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
70 self.loop.close()
72 sys.exit(self.exit_code)
74 async def _run(self) -> None:
75 runner = None
76 if isinstance(self.wsgi, Application):
77 app = self.wsgi
78 elif inspect.iscoroutinefunction(self.wsgi) or (
79 sys.version_info < (3, 14) and asyncio.iscoroutinefunction(self.wsgi)
80 ):
81 wsgi = await self.wsgi()
82 if isinstance(wsgi, web.AppRunner):
83 runner = wsgi
84 app = runner.app
85 else:
86 app = wsgi
87 else:
88 raise RuntimeError(
89 "wsgi app should be either Application or "
90 "async function returning Application, got {}".format(self.wsgi)
91 )
93 if runner is None:
94 access_log = self.log.access_log if self.cfg.accesslog else None
95 runner = web.AppRunner(
96 app,
97 logger=self.log,
98 keepalive_timeout=self.cfg.keepalive,
99 access_log=access_log,
100 access_log_format=self._get_valid_log_format(
101 self.cfg.access_log_format
102 ),
103 shutdown_timeout=self.cfg.graceful_timeout / 100 * 95,
104 )
105 await runner.setup()
107 ctx = self._create_ssl_context(self.cfg) if self.cfg.is_ssl else None
109 runner = runner
110 assert runner is not None
111 server = runner.server
112 assert server is not None
113 for sock in self.sockets:
114 site = web.SockSite(
115 runner,
116 sock,
117 ssl_context=ctx,
118 )
119 await site.start()
121 # If our parent changed then we shut down.
122 pid = os.getpid()
123 try:
124 while self.alive: # type: ignore[has-type]
125 self.notify()
127 cnt = server.requests_count
128 if self.max_requests and cnt > self.max_requests:
129 self.alive = False
130 self.log.info("Max requests, shutting down: %s", self)
132 elif pid == os.getpid() and self.ppid != os.getppid():
133 self.alive = False
134 self.log.info("Parent changed, shutting down: %s", self)
135 else:
136 await self._wait_next_notify()
137 except BaseException:
138 pass
140 await runner.cleanup()
142 def _wait_next_notify(self) -> "asyncio.Future[bool]":
143 self._notify_waiter_done()
145 loop = self.loop
146 assert loop is not None
147 self._notify_waiter = waiter = loop.create_future()
148 self.loop.call_later(1.0, self._notify_waiter_done, waiter)
150 return waiter
152 def _notify_waiter_done(
153 self, waiter: Optional["asyncio.Future[bool]"] = None
154 ) -> None:
155 if waiter is None:
156 waiter = self._notify_waiter
157 if waiter is not None:
158 set_result(waiter, True)
160 if waiter is self._notify_waiter:
161 self._notify_waiter = None
163 def init_signals(self) -> None:
164 # Set up signals through the event loop API.
166 self.loop.add_signal_handler(
167 signal.SIGQUIT, self.handle_quit, signal.SIGQUIT, None
168 )
170 self.loop.add_signal_handler(
171 signal.SIGTERM, self.handle_exit, signal.SIGTERM, None
172 )
174 self.loop.add_signal_handler(
175 signal.SIGINT, self.handle_quit, signal.SIGINT, None
176 )
178 self.loop.add_signal_handler(
179 signal.SIGWINCH, self.handle_winch, signal.SIGWINCH, None
180 )
182 self.loop.add_signal_handler(
183 signal.SIGUSR1, self.handle_usr1, signal.SIGUSR1, None
184 )
186 self.loop.add_signal_handler(
187 signal.SIGABRT, self.handle_abort, signal.SIGABRT, None
188 )
190 # Don't let SIGTERM and SIGUSR1 disturb active requests
191 # by interrupting system calls
192 signal.siginterrupt(signal.SIGTERM, False)
193 signal.siginterrupt(signal.SIGUSR1, False)
194 # Reset signals so Gunicorn doesn't swallow subprocess return codes
195 # See: https://github.com/aio-libs/aiohttp/issues/6130
197 def handle_quit(self, sig: int, frame: Optional[FrameType]) -> None:
198 self.alive = False
200 # worker_int callback
201 self.cfg.worker_int(self)
203 # wakeup closing process
204 self._notify_waiter_done()
206 def handle_abort(self, sig: int, frame: Optional[FrameType]) -> None:
207 self.alive = False
208 self.exit_code = 1
209 self.cfg.worker_abort(self)
210 sys.exit(1)
212 @staticmethod
213 def _create_ssl_context(cfg: Any) -> "SSLContext":
214 """Creates SSLContext instance for usage in asyncio.create_server.
216 See ssl.SSLSocket.__init__ for more details.
217 """
218 if ssl is None: # pragma: no cover
219 raise RuntimeError("SSL is not supported.")
221 ctx = ssl.SSLContext(cfg.ssl_version)
222 ctx.load_cert_chain(cfg.certfile, cfg.keyfile)
223 ctx.verify_mode = cfg.cert_reqs
224 if cfg.ca_certs:
225 ctx.load_verify_locations(cfg.ca_certs)
226 if cfg.ciphers:
227 ctx.set_ciphers(cfg.ciphers)
228 return ctx
230 def _get_valid_log_format(self, source_format: str) -> str:
231 if source_format == self.DEFAULT_GUNICORN_LOG_FORMAT:
232 return self.DEFAULT_AIOHTTP_LOG_FORMAT
233 elif re.search(r"%\([^\)]+\)", source_format):
234 raise ValueError(
235 "Gunicorn's style options in form of `%(name)s` are not "
236 "supported for the log formatting. Please use aiohttp's "
237 "format specification to configure access log formatting: "
238 "http://docs.aiohttp.org/en/stable/logging.html"
239 "#format-specification"
240 )
241 else:
242 return source_format
245class GunicornUVLoopWebWorker(GunicornWebWorker):
246 def init_process(self) -> None:
247 import uvloop
249 # Close any existing event loop before setting a
250 # new policy.
251 try:
252 asyncio.get_event_loop().close()
253 except RuntimeError:
254 # No loop was running
255 pass
257 # Setup uvloop policy, so that every
258 # asyncio.get_event_loop() will create an instance
259 # of uvloop event loop.
260 asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
262 super().init_process()