Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/worker.py: 7%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

138 statements  

1"""Async gunicorn worker for aiohttp.web""" 

2 

3import asyncio 

4import inspect 

5import os 

6import re 

7import signal 

8import sys 

9from types import FrameType 

10from typing import TYPE_CHECKING, Any, Optional 

11 

12from gunicorn.config import AccessLogFormat as GunicornAccessLogFormat 

13from gunicorn.workers import base 

14 

15from aiohttp import web 

16 

17from .helpers import set_result 

18from .web_app import Application 

19from .web_log import AccessLogger 

20 

21if TYPE_CHECKING: 

22 import ssl 

23 

24 SSLContext = ssl.SSLContext 

25else: 

26 try: 

27 import ssl 

28 

29 SSLContext = ssl.SSLContext 

30 except ImportError: # pragma: no cover 

31 ssl = None # type: ignore[assignment] 

32 SSLContext = object # type: ignore[misc,assignment] 

33 

34 

35__all__ = ("GunicornWebWorker", "GunicornUVLoopWebWorker") 

36 

37 

38class GunicornWebWorker(base.Worker): # type: ignore[misc,no-any-unimported] 

39 DEFAULT_AIOHTTP_LOG_FORMAT = AccessLogger.LOG_FORMAT 

40 DEFAULT_GUNICORN_LOG_FORMAT = GunicornAccessLogFormat.default 

41 

42 def __init__(self, *args: Any, **kw: Any) -> None: # pragma: no cover 

43 super().__init__(*args, **kw) 

44 

45 self._task: asyncio.Task[None] | None = None 

46 self.exit_code = 0 

47 self._notify_waiter: asyncio.Future[bool] | None = None 

48 

49 def init_process(self) -> None: 

50 # create new event_loop after fork 

51 try: 

52 asyncio.get_event_loop().close() 

53 except RuntimeError: 

54 # No loop was running 

55 pass 

56 

57 self.loop = asyncio.new_event_loop() 

58 asyncio.set_event_loop(self.loop) 

59 

60 super().init_process() 

61 

62 def run(self) -> None: 

63 self._task = self.loop.create_task(self._run()) 

64 

65 try: # ignore all finalization problems 

66 self.loop.run_until_complete(self._task) 

67 except Exception: 

68 self.log.exception("Exception in gunicorn worker") 

69 self.loop.run_until_complete(self.loop.shutdown_asyncgens()) 

70 self.loop.close() 

71 

72 sys.exit(self.exit_code) 

73 

74 async def _run(self) -> None: 

75 runner = None 

76 if isinstance(self.wsgi, Application): 

77 app = self.wsgi 

78 elif inspect.iscoroutinefunction(self.wsgi) or ( 

79 sys.version_info < (3, 14) and asyncio.iscoroutinefunction(self.wsgi) 

80 ): 

81 wsgi = await self.wsgi() 

82 if isinstance(wsgi, web.AppRunner): 

83 runner = wsgi 

84 app = runner.app 

85 else: 

86 app = wsgi 

87 else: 

88 raise RuntimeError( 

89 "wsgi app should be either Application or " 

90 f"async function returning Application, got {self.wsgi}" 

91 ) 

92 

93 if runner is None: 

94 access_log = self.log.access_log if self.cfg.accesslog else None 

95 runner = web.AppRunner( 

96 app, 

97 logger=self.log, 

98 keepalive_timeout=self.cfg.keepalive, 

99 access_log=access_log, 

100 access_log_format=self._get_valid_log_format( 

101 self.cfg.access_log_format 

102 ), 

103 shutdown_timeout=self.cfg.graceful_timeout / 100 * 95, 

104 ) 

105 await runner.setup() 

106 

107 ctx = self._create_ssl_context(self.cfg) if self.cfg.is_ssl else None 

108 

109 runner = runner 

110 assert runner is not None 

111 server = runner.server 

112 assert server is not None 

113 for sock in self.sockets: 

114 site = web.SockSite( 

115 runner, 

116 sock, 

117 ssl_context=ctx, 

118 ) 

119 await site.start() 

120 

121 # If our parent changed then we shut down. 

122 pid = os.getpid() 

123 try: 

124 while self.alive: # type: ignore[has-type] 

125 self.notify() 

126 

127 cnt = server.requests_count 

128 if self.max_requests and cnt > self.max_requests: 

129 self.alive = False 

130 self.log.info("Max requests, shutting down: %s", self) 

131 

132 elif pid == os.getpid() and self.ppid != os.getppid(): 

133 self.alive = False 

134 self.log.info("Parent changed, shutting down: %s", self) 

135 else: 

136 await self._wait_next_notify() 

137 except Exception: 

138 pass 

139 

140 await runner.cleanup() 

141 

142 def _wait_next_notify(self) -> "asyncio.Future[bool]": 

143 self._notify_waiter_done() 

144 

145 loop = self.loop 

146 assert loop is not None 

147 self._notify_waiter = waiter = loop.create_future() 

148 self.loop.call_later(1.0, self._notify_waiter_done, waiter) 

149 

150 return waiter 

151 

152 def _notify_waiter_done( 

153 self, waiter: Optional["asyncio.Future[bool]"] = None 

154 ) -> None: 

155 if waiter is None: 

156 waiter = self._notify_waiter 

157 if waiter is not None: 

158 set_result(waiter, True) 

159 

160 if waiter is self._notify_waiter: 

161 self._notify_waiter = None 

162 

163 def init_signals(self) -> None: 

164 # Set up signals through the event loop API. 

165 

166 self.loop.add_signal_handler( 

167 signal.SIGQUIT, self.handle_quit, signal.SIGQUIT, None 

168 ) 

169 

170 self.loop.add_signal_handler( 

171 signal.SIGTERM, self.handle_exit, signal.SIGTERM, None 

172 ) 

173 

174 self.loop.add_signal_handler( 

175 signal.SIGINT, self.handle_quit, signal.SIGINT, None 

176 ) 

177 

178 self.loop.add_signal_handler( 

179 signal.SIGWINCH, self.handle_winch, signal.SIGWINCH, None 

180 ) 

181 

182 self.loop.add_signal_handler( 

183 signal.SIGUSR1, self.handle_usr1, signal.SIGUSR1, None 

184 ) 

185 

186 self.loop.add_signal_handler( 

187 signal.SIGABRT, self.handle_abort, signal.SIGABRT, None 

188 ) 

189 

190 # Don't let SIGTERM and SIGUSR1 disturb active requests 

191 # by interrupting system calls 

192 signal.siginterrupt(signal.SIGTERM, False) 

193 signal.siginterrupt(signal.SIGUSR1, False) 

194 

195 # Reset SIGCHLD to default so Gunicorn doesn't swallow subprocess 

196 # return codes. Without this, workers inherit the master arbiter's 

197 # SIGCHLD handler, causing spurious "Worker exited" errors when 

198 # application code spawns subprocesses. 

199 signal.signal(signal.SIGCHLD, signal.SIG_DFL) 

200 

201 def handle_quit(self, sig: int, frame: FrameType | None) -> None: 

202 self.alive = False 

203 

204 # worker_int callback 

205 self.cfg.worker_int(self) 

206 

207 # wakeup closing process 

208 self._notify_waiter_done() 

209 

210 def handle_abort(self, sig: int, frame: FrameType | None) -> None: 

211 self.alive = False 

212 self.exit_code = 1 

213 self.cfg.worker_abort(self) 

214 sys.exit(1) 

215 

216 @staticmethod 

217 def _create_ssl_context(cfg: Any) -> "SSLContext": 

218 """Creates SSLContext instance for usage in asyncio.create_server. 

219 

220 See ssl.SSLSocket.__init__ for more details. 

221 """ 

222 if ssl is None: # pragma: no cover 

223 raise RuntimeError("SSL is not supported.") 

224 

225 ctx = ssl.SSLContext(cfg.ssl_version) 

226 ctx.load_cert_chain(cfg.certfile, cfg.keyfile) 

227 ctx.verify_mode = cfg.cert_reqs 

228 if cfg.ca_certs: 

229 ctx.load_verify_locations(cfg.ca_certs) 

230 if cfg.ciphers: 

231 ctx.set_ciphers(cfg.ciphers) 

232 return ctx 

233 

234 def _get_valid_log_format(self, source_format: str) -> str: 

235 if source_format == self.DEFAULT_GUNICORN_LOG_FORMAT: 

236 return self.DEFAULT_AIOHTTP_LOG_FORMAT 

237 elif re.search(r"%\([^\)]+\)", source_format): 

238 raise ValueError( 

239 "Gunicorn's style options in form of `%(name)s` are not " 

240 "supported for the log formatting. Please use aiohttp's " 

241 "format specification to configure access log formatting: " 

242 "http://docs.aiohttp.org/en/stable/logging.html" 

243 "#format-specification" 

244 ) 

245 else: 

246 return source_format 

247 

248 

249class GunicornUVLoopWebWorker(GunicornWebWorker): 

250 def init_process(self) -> None: 

251 import uvloop 

252 

253 # Close any existing event loop before setting a 

254 # new policy. 

255 try: 

256 asyncio.get_event_loop().close() 

257 except RuntimeError: 

258 # No loop was running 

259 pass 

260 

261 # Setup uvloop policy, so that every 

262 # asyncio.get_event_loop() will create an instance 

263 # of uvloop event loop. 

264 asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) 

265 

266 super().init_process()