Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/worker.py: 8%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Async gunicorn worker for aiohttp.web"""
3import asyncio
4import inspect
5import os
6import re
7import signal
8import sys
9from types import FrameType
10from typing import TYPE_CHECKING, Any, Optional
12from gunicorn.config import AccessLogFormat as GunicornAccessLogFormat
13from gunicorn.workers import base
15from aiohttp import web
17from .helpers import set_result
18from .web_app import Application
19from .web_log import AccessLogger
21if TYPE_CHECKING:
22 import ssl
24 SSLContext = ssl.SSLContext
25else:
26 try:
27 import ssl
29 SSLContext = ssl.SSLContext
30 except ImportError: # pragma: no cover
31 ssl = None # type: ignore[assignment]
32 SSLContext = object # type: ignore[misc,assignment]
35__all__ = ("GunicornWebWorker", "GunicornUVLoopWebWorker")
38class GunicornWebWorker(base.Worker): # type: ignore[misc,no-any-unimported]
40 DEFAULT_AIOHTTP_LOG_FORMAT = AccessLogger.LOG_FORMAT
41 DEFAULT_GUNICORN_LOG_FORMAT = GunicornAccessLogFormat.default
43 def __init__(self, *args: Any, **kw: Any) -> None: # pragma: no cover
44 super().__init__(*args, **kw)
46 self._task: Optional[asyncio.Task[None]] = None
47 self.exit_code = 0
48 self._notify_waiter: Optional[asyncio.Future[bool]] = None
50 def init_process(self) -> None:
51 # create new event_loop after fork
52 asyncio.get_event_loop().close()
54 self.loop = asyncio.new_event_loop()
55 asyncio.set_event_loop(self.loop)
57 super().init_process()
59 def run(self) -> None:
60 self._task = self.loop.create_task(self._run())
62 try: # ignore all finalization problems
63 self.loop.run_until_complete(self._task)
64 except Exception:
65 self.log.exception("Exception in gunicorn worker")
66 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
67 self.loop.close()
69 sys.exit(self.exit_code)
71 async def _run(self) -> None:
72 runner = None
73 if isinstance(self.wsgi, Application):
74 app = self.wsgi
75 elif inspect.iscoroutinefunction(self.wsgi) or (
76 sys.version_info < (3, 14) and asyncio.iscoroutinefunction(self.wsgi)
77 ):
78 wsgi = await self.wsgi()
79 if isinstance(wsgi, web.AppRunner):
80 runner = wsgi
81 app = runner.app
82 else:
83 app = wsgi
84 else:
85 raise RuntimeError(
86 "wsgi app should be either Application or "
87 "async function returning Application, got {}".format(self.wsgi)
88 )
90 if runner is None:
91 access_log = self.log.access_log if self.cfg.accesslog else None
92 runner = web.AppRunner(
93 app,
94 logger=self.log,
95 keepalive_timeout=self.cfg.keepalive,
96 access_log=access_log,
97 access_log_format=self._get_valid_log_format(
98 self.cfg.access_log_format
99 ),
100 shutdown_timeout=self.cfg.graceful_timeout / 100 * 95,
101 )
102 await runner.setup()
104 ctx = self._create_ssl_context(self.cfg) if self.cfg.is_ssl else None
106 runner = runner
107 assert runner is not None
108 server = runner.server
109 assert server is not None
110 for sock in self.sockets:
111 site = web.SockSite(
112 runner,
113 sock,
114 ssl_context=ctx,
115 )
116 await site.start()
118 # If our parent changed then we shut down.
119 pid = os.getpid()
120 try:
121 while self.alive: # type: ignore[has-type]
122 self.notify()
124 cnt = server.requests_count
125 if self.max_requests and cnt > self.max_requests:
126 self.alive = False
127 self.log.info("Max requests, shutting down: %s", self)
129 elif pid == os.getpid() and self.ppid != os.getppid():
130 self.alive = False
131 self.log.info("Parent changed, shutting down: %s", self)
132 else:
133 await self._wait_next_notify()
134 except BaseException:
135 pass
137 await runner.cleanup()
139 def _wait_next_notify(self) -> "asyncio.Future[bool]":
140 self._notify_waiter_done()
142 loop = self.loop
143 assert loop is not None
144 self._notify_waiter = waiter = loop.create_future()
145 self.loop.call_later(1.0, self._notify_waiter_done, waiter)
147 return waiter
149 def _notify_waiter_done(
150 self, waiter: Optional["asyncio.Future[bool]"] = None
151 ) -> None:
152 if waiter is None:
153 waiter = self._notify_waiter
154 if waiter is not None:
155 set_result(waiter, True)
157 if waiter is self._notify_waiter:
158 self._notify_waiter = None
160 def init_signals(self) -> None:
161 # Set up signals through the event loop API.
163 self.loop.add_signal_handler(
164 signal.SIGQUIT, self.handle_quit, signal.SIGQUIT, None
165 )
167 self.loop.add_signal_handler(
168 signal.SIGTERM, self.handle_exit, signal.SIGTERM, None
169 )
171 self.loop.add_signal_handler(
172 signal.SIGINT, self.handle_quit, signal.SIGINT, None
173 )
175 self.loop.add_signal_handler(
176 signal.SIGWINCH, self.handle_winch, signal.SIGWINCH, None
177 )
179 self.loop.add_signal_handler(
180 signal.SIGUSR1, self.handle_usr1, signal.SIGUSR1, None
181 )
183 self.loop.add_signal_handler(
184 signal.SIGABRT, self.handle_abort, signal.SIGABRT, None
185 )
187 # Don't let SIGTERM and SIGUSR1 disturb active requests
188 # by interrupting system calls
189 signal.siginterrupt(signal.SIGTERM, False)
190 signal.siginterrupt(signal.SIGUSR1, False)
191 # Reset signals so Gunicorn doesn't swallow subprocess return codes
192 # See: https://github.com/aio-libs/aiohttp/issues/6130
194 def handle_quit(self, sig: int, frame: Optional[FrameType]) -> None:
195 self.alive = False
197 # worker_int callback
198 self.cfg.worker_int(self)
200 # wakeup closing process
201 self._notify_waiter_done()
203 def handle_abort(self, sig: int, frame: Optional[FrameType]) -> None:
204 self.alive = False
205 self.exit_code = 1
206 self.cfg.worker_abort(self)
207 sys.exit(1)
209 @staticmethod
210 def _create_ssl_context(cfg: Any) -> "SSLContext":
211 """Creates SSLContext instance for usage in asyncio.create_server.
213 See ssl.SSLSocket.__init__ for more details.
214 """
215 if ssl is None: # pragma: no cover
216 raise RuntimeError("SSL is not supported.")
218 ctx = ssl.SSLContext(cfg.ssl_version)
219 ctx.load_cert_chain(cfg.certfile, cfg.keyfile)
220 ctx.verify_mode = cfg.cert_reqs
221 if cfg.ca_certs:
222 ctx.load_verify_locations(cfg.ca_certs)
223 if cfg.ciphers:
224 ctx.set_ciphers(cfg.ciphers)
225 return ctx
227 def _get_valid_log_format(self, source_format: str) -> str:
228 if source_format == self.DEFAULT_GUNICORN_LOG_FORMAT:
229 return self.DEFAULT_AIOHTTP_LOG_FORMAT
230 elif re.search(r"%\([^\)]+\)", source_format):
231 raise ValueError(
232 "Gunicorn's style options in form of `%(name)s` are not "
233 "supported for the log formatting. Please use aiohttp's "
234 "format specification to configure access log formatting: "
235 "http://docs.aiohttp.org/en/stable/logging.html"
236 "#format-specification"
237 )
238 else:
239 return source_format
242class GunicornUVLoopWebWorker(GunicornWebWorker):
243 def init_process(self) -> None:
244 import uvloop
246 # Close any existing event loop before setting a
247 # new policy.
248 asyncio.get_event_loop().close()
250 # Setup uvloop policy, so that every
251 # asyncio.get_event_loop() will create an instance
252 # of uvloop event loop.
253 asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
255 super().init_process()