Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/worker.py: 6%
126 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1"""Async gunicorn worker for aiohttp.web"""
3import asyncio
4import os
5import re
6import signal
7import sys
8from types import FrameType
9from typing import Any, Awaitable, Callable, Optional, Union # noqa
11from gunicorn.config import AccessLogFormat as GunicornAccessLogFormat
12from gunicorn.workers import base
14from aiohttp import web
16from .helpers import set_result
17from .web_app import Application
18from .web_log import AccessLogger
20try:
21 import ssl
23 SSLContext = ssl.SSLContext
24except ImportError: # pragma: no cover
25 ssl = None # type: ignore[assignment]
26 SSLContext = object # type: ignore[misc,assignment]
29__all__ = ("GunicornWebWorker", "GunicornUVLoopWebWorker")
32class GunicornWebWorker(base.Worker): # type: ignore[misc,no-any-unimported]
34 DEFAULT_AIOHTTP_LOG_FORMAT = AccessLogger.LOG_FORMAT
35 DEFAULT_GUNICORN_LOG_FORMAT = GunicornAccessLogFormat.default
37 def __init__(self, *args: Any, **kw: Any) -> None: # pragma: no cover
38 super().__init__(*args, **kw)
40 self._task: Optional[asyncio.Task[None]] = None
41 self.exit_code = 0
42 self._notify_waiter: Optional[asyncio.Future[bool]] = None
44 def init_process(self) -> None:
45 # create new event_loop after fork
46 asyncio.get_event_loop().close()
48 self.loop = asyncio.new_event_loop()
49 asyncio.set_event_loop(self.loop)
51 super().init_process()
53 def run(self) -> None:
54 self._task = self.loop.create_task(self._run())
56 try: # ignore all finalization problems
57 self.loop.run_until_complete(self._task)
58 except Exception:
59 self.log.exception("Exception in gunicorn worker")
60 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
61 self.loop.close()
63 sys.exit(self.exit_code)
65 async def _run(self) -> None:
66 runner = None
67 if isinstance(self.wsgi, Application):
68 app = self.wsgi
69 elif asyncio.iscoroutinefunction(self.wsgi):
70 wsgi = await self.wsgi()
71 if isinstance(wsgi, web.AppRunner):
72 runner = wsgi
73 app = runner.app
74 else:
75 app = wsgi
76 else:
77 raise RuntimeError(
78 "wsgi app should be either Application or "
79 "async function returning Application, got {}".format(self.wsgi)
80 )
82 if runner is None:
83 access_log = self.log.access_log if self.cfg.accesslog else None
84 runner = web.AppRunner(
85 app,
86 logger=self.log,
87 keepalive_timeout=self.cfg.keepalive,
88 access_log=access_log,
89 access_log_format=self._get_valid_log_format(
90 self.cfg.access_log_format
91 ),
92 shutdown_timeout=self.cfg.graceful_timeout / 100 * 95,
93 )
94 await runner.setup()
96 ctx = self._create_ssl_context(self.cfg) if self.cfg.is_ssl else None
98 runner = runner
99 assert runner is not None
100 server = runner.server
101 assert server is not None
102 for sock in self.sockets:
103 site = web.SockSite(
104 runner,
105 sock,
106 ssl_context=ctx,
107 )
108 await site.start()
110 # If our parent changed then we shut down.
111 pid = os.getpid()
112 try:
113 while self.alive: # type: ignore[has-type]
114 self.notify()
116 cnt = server.requests_count
117 if self.max_requests and cnt > self.max_requests:
118 self.alive = False
119 self.log.info("Max requests, shutting down: %s", self)
121 elif pid == os.getpid() and self.ppid != os.getppid():
122 self.alive = False
123 self.log.info("Parent changed, shutting down: %s", self)
124 else:
125 await self._wait_next_notify()
126 except BaseException:
127 pass
129 await runner.cleanup()
131 def _wait_next_notify(self) -> "asyncio.Future[bool]":
132 self._notify_waiter_done()
134 loop = self.loop
135 assert loop is not None
136 self._notify_waiter = waiter = loop.create_future()
137 self.loop.call_later(1.0, self._notify_waiter_done, waiter)
139 return waiter
141 def _notify_waiter_done(
142 self, waiter: Optional["asyncio.Future[bool]"] = None
143 ) -> None:
144 if waiter is None:
145 waiter = self._notify_waiter
146 if waiter is not None:
147 set_result(waiter, True)
149 if waiter is self._notify_waiter:
150 self._notify_waiter = None
152 def init_signals(self) -> None:
153 # Set up signals through the event loop API.
155 self.loop.add_signal_handler(
156 signal.SIGQUIT, self.handle_quit, signal.SIGQUIT, None
157 )
159 self.loop.add_signal_handler(
160 signal.SIGTERM, self.handle_exit, signal.SIGTERM, None
161 )
163 self.loop.add_signal_handler(
164 signal.SIGINT, self.handle_quit, signal.SIGINT, None
165 )
167 self.loop.add_signal_handler(
168 signal.SIGWINCH, self.handle_winch, signal.SIGWINCH, None
169 )
171 self.loop.add_signal_handler(
172 signal.SIGUSR1, self.handle_usr1, signal.SIGUSR1, None
173 )
175 self.loop.add_signal_handler(
176 signal.SIGABRT, self.handle_abort, signal.SIGABRT, None
177 )
179 # Don't let SIGTERM and SIGUSR1 disturb active requests
180 # by interrupting system calls
181 signal.siginterrupt(signal.SIGTERM, False)
182 signal.siginterrupt(signal.SIGUSR1, False)
183 # Reset signals so Gunicorn doesn't swallow subprocess return codes
184 # See: https://github.com/aio-libs/aiohttp/issues/6130
186 def handle_quit(self, sig: int, frame: Optional[FrameType]) -> None:
187 self.alive = False
189 # worker_int callback
190 self.cfg.worker_int(self)
192 # wakeup closing process
193 self._notify_waiter_done()
195 def handle_abort(self, sig: int, frame: Optional[FrameType]) -> None:
196 self.alive = False
197 self.exit_code = 1
198 self.cfg.worker_abort(self)
199 sys.exit(1)
201 @staticmethod
202 def _create_ssl_context(cfg: Any) -> "SSLContext":
203 """Creates SSLContext instance for usage in asyncio.create_server.
205 See ssl.SSLSocket.__init__ for more details.
206 """
207 if ssl is None: # pragma: no cover
208 raise RuntimeError("SSL is not supported.")
210 ctx = ssl.SSLContext(cfg.ssl_version)
211 ctx.load_cert_chain(cfg.certfile, cfg.keyfile)
212 ctx.verify_mode = cfg.cert_reqs
213 if cfg.ca_certs:
214 ctx.load_verify_locations(cfg.ca_certs)
215 if cfg.ciphers:
216 ctx.set_ciphers(cfg.ciphers)
217 return ctx
219 def _get_valid_log_format(self, source_format: str) -> str:
220 if source_format == self.DEFAULT_GUNICORN_LOG_FORMAT:
221 return self.DEFAULT_AIOHTTP_LOG_FORMAT
222 elif re.search(r"%\([^\)]+\)", source_format):
223 raise ValueError(
224 "Gunicorn's style options in form of `%(name)s` are not "
225 "supported for the log formatting. Please use aiohttp's "
226 "format specification to configure access log formatting: "
227 "http://docs.aiohttp.org/en/stable/logging.html"
228 "#format-specification"
229 )
230 else:
231 return source_format
234class GunicornUVLoopWebWorker(GunicornWebWorker):
235 def init_process(self) -> None:
236 import uvloop
238 # Close any existing event loop before setting a
239 # new policy.
240 asyncio.get_event_loop().close()
242 # Setup uvloop policy, so that every
243 # asyncio.get_event_loop() will create an instance
244 # of uvloop event loop.
245 asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
247 super().init_process()