Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/worker.py: 7%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

137 statements  

1"""Async gunicorn worker for aiohttp.web""" 

2 

3import asyncio 

4import inspect 

5import os 

6import re 

7import signal 

8import sys 

9from types import FrameType 

10from typing import TYPE_CHECKING, Any, Optional 

11 

12from gunicorn.config import AccessLogFormat as GunicornAccessLogFormat 

13from gunicorn.workers import base 

14 

15from aiohttp import web 

16 

17from .helpers import set_result 

18from .web_app import Application 

19from .web_log import AccessLogger 

20 

21if TYPE_CHECKING: 

22 import ssl 

23 

24 SSLContext = ssl.SSLContext 

25else: 

26 try: 

27 import ssl 

28 

29 SSLContext = ssl.SSLContext 

30 except ImportError: # pragma: no cover 

31 ssl = None # type: ignore[assignment] 

32 SSLContext = object # type: ignore[misc,assignment] 

33 

34 

35__all__ = ("GunicornWebWorker", "GunicornUVLoopWebWorker") 

36 

37 

38class GunicornWebWorker(base.Worker): # type: ignore[misc,no-any-unimported] 

39 DEFAULT_AIOHTTP_LOG_FORMAT = AccessLogger.LOG_FORMAT 

40 DEFAULT_GUNICORN_LOG_FORMAT = GunicornAccessLogFormat.default 

41 

42 def __init__(self, *args: Any, **kw: Any) -> None: # pragma: no cover 

43 super().__init__(*args, **kw) 

44 

45 self._task: Optional[asyncio.Task[None]] = None 

46 self.exit_code = 0 

47 self._notify_waiter: Optional[asyncio.Future[bool]] = None 

48 

49 def init_process(self) -> None: 

50 # create new event_loop after fork 

51 try: 

52 asyncio.get_event_loop().close() 

53 except RuntimeError: 

54 # No loop was running 

55 pass 

56 

57 self.loop = asyncio.new_event_loop() 

58 asyncio.set_event_loop(self.loop) 

59 

60 super().init_process() 

61 

62 def run(self) -> None: 

63 self._task = self.loop.create_task(self._run()) 

64 

65 try: # ignore all finalization problems 

66 self.loop.run_until_complete(self._task) 

67 except Exception: 

68 self.log.exception("Exception in gunicorn worker") 

69 self.loop.run_until_complete(self.loop.shutdown_asyncgens()) 

70 self.loop.close() 

71 

72 sys.exit(self.exit_code) 

73 

74 async def _run(self) -> None: 

75 runner = None 

76 if isinstance(self.wsgi, Application): 

77 app = self.wsgi 

78 elif inspect.iscoroutinefunction(self.wsgi) or ( 

79 sys.version_info < (3, 14) and asyncio.iscoroutinefunction(self.wsgi) 

80 ): 

81 wsgi = await self.wsgi() 

82 if isinstance(wsgi, web.AppRunner): 

83 runner = wsgi 

84 app = runner.app 

85 else: 

86 app = wsgi 

87 else: 

88 raise RuntimeError( 

89 "wsgi app should be either Application or " 

90 "async function returning Application, got {}".format(self.wsgi) 

91 ) 

92 

93 if runner is None: 

94 access_log = self.log.access_log if self.cfg.accesslog else None 

95 runner = web.AppRunner( 

96 app, 

97 logger=self.log, 

98 keepalive_timeout=self.cfg.keepalive, 

99 access_log=access_log, 

100 access_log_format=self._get_valid_log_format( 

101 self.cfg.access_log_format 

102 ), 

103 shutdown_timeout=self.cfg.graceful_timeout / 100 * 95, 

104 ) 

105 await runner.setup() 

106 

107 ctx = self._create_ssl_context(self.cfg) if self.cfg.is_ssl else None 

108 

109 runner = runner 

110 assert runner is not None 

111 server = runner.server 

112 assert server is not None 

113 for sock in self.sockets: 

114 site = web.SockSite( 

115 runner, 

116 sock, 

117 ssl_context=ctx, 

118 ) 

119 await site.start() 

120 

121 # If our parent changed then we shut down. 

122 pid = os.getpid() 

123 try: 

124 while self.alive: # type: ignore[has-type] 

125 self.notify() 

126 

127 cnt = server.requests_count 

128 if self.max_requests and cnt > self.max_requests: 

129 self.alive = False 

130 self.log.info("Max requests, shutting down: %s", self) 

131 

132 elif pid == os.getpid() and self.ppid != os.getppid(): 

133 self.alive = False 

134 self.log.info("Parent changed, shutting down: %s", self) 

135 else: 

136 await self._wait_next_notify() 

137 except BaseException: 

138 pass 

139 

140 await runner.cleanup() 

141 

142 def _wait_next_notify(self) -> "asyncio.Future[bool]": 

143 self._notify_waiter_done() 

144 

145 loop = self.loop 

146 assert loop is not None 

147 self._notify_waiter = waiter = loop.create_future() 

148 self.loop.call_later(1.0, self._notify_waiter_done, waiter) 

149 

150 return waiter 

151 

152 def _notify_waiter_done( 

153 self, waiter: Optional["asyncio.Future[bool]"] = None 

154 ) -> None: 

155 if waiter is None: 

156 waiter = self._notify_waiter 

157 if waiter is not None: 

158 set_result(waiter, True) 

159 

160 if waiter is self._notify_waiter: 

161 self._notify_waiter = None 

162 

163 def init_signals(self) -> None: 

164 # Set up signals through the event loop API. 

165 

166 self.loop.add_signal_handler( 

167 signal.SIGQUIT, self.handle_quit, signal.SIGQUIT, None 

168 ) 

169 

170 self.loop.add_signal_handler( 

171 signal.SIGTERM, self.handle_exit, signal.SIGTERM, None 

172 ) 

173 

174 self.loop.add_signal_handler( 

175 signal.SIGINT, self.handle_quit, signal.SIGINT, None 

176 ) 

177 

178 self.loop.add_signal_handler( 

179 signal.SIGWINCH, self.handle_winch, signal.SIGWINCH, None 

180 ) 

181 

182 self.loop.add_signal_handler( 

183 signal.SIGUSR1, self.handle_usr1, signal.SIGUSR1, None 

184 ) 

185 

186 self.loop.add_signal_handler( 

187 signal.SIGABRT, self.handle_abort, signal.SIGABRT, None 

188 ) 

189 

190 # Don't let SIGTERM and SIGUSR1 disturb active requests 

191 # by interrupting system calls 

192 signal.siginterrupt(signal.SIGTERM, False) 

193 signal.siginterrupt(signal.SIGUSR1, False) 

194 # Reset signals so Gunicorn doesn't swallow subprocess return codes 

195 # See: https://github.com/aio-libs/aiohttp/issues/6130 

196 

197 def handle_quit(self, sig: int, frame: Optional[FrameType]) -> None: 

198 self.alive = False 

199 

200 # worker_int callback 

201 self.cfg.worker_int(self) 

202 

203 # wakeup closing process 

204 self._notify_waiter_done() 

205 

206 def handle_abort(self, sig: int, frame: Optional[FrameType]) -> None: 

207 self.alive = False 

208 self.exit_code = 1 

209 self.cfg.worker_abort(self) 

210 sys.exit(1) 

211 

212 @staticmethod 

213 def _create_ssl_context(cfg: Any) -> "SSLContext": 

214 """Creates SSLContext instance for usage in asyncio.create_server. 

215 

216 See ssl.SSLSocket.__init__ for more details. 

217 """ 

218 if ssl is None: # pragma: no cover 

219 raise RuntimeError("SSL is not supported.") 

220 

221 ctx = ssl.SSLContext(cfg.ssl_version) 

222 ctx.load_cert_chain(cfg.certfile, cfg.keyfile) 

223 ctx.verify_mode = cfg.cert_reqs 

224 if cfg.ca_certs: 

225 ctx.load_verify_locations(cfg.ca_certs) 

226 if cfg.ciphers: 

227 ctx.set_ciphers(cfg.ciphers) 

228 return ctx 

229 

230 def _get_valid_log_format(self, source_format: str) -> str: 

231 if source_format == self.DEFAULT_GUNICORN_LOG_FORMAT: 

232 return self.DEFAULT_AIOHTTP_LOG_FORMAT 

233 elif re.search(r"%\([^\)]+\)", source_format): 

234 raise ValueError( 

235 "Gunicorn's style options in form of `%(name)s` are not " 

236 "supported for the log formatting. Please use aiohttp's " 

237 "format specification to configure access log formatting: " 

238 "http://docs.aiohttp.org/en/stable/logging.html" 

239 "#format-specification" 

240 ) 

241 else: 

242 return source_format 

243 

244 

245class GunicornUVLoopWebWorker(GunicornWebWorker): 

246 def init_process(self) -> None: 

247 import uvloop 

248 

249 # Close any existing event loop before setting a 

250 # new policy. 

251 try: 

252 asyncio.get_event_loop().close() 

253 except RuntimeError: 

254 # No loop was running 

255 pass 

256 

257 # Setup uvloop policy, so that every 

258 # asyncio.get_event_loop() will create an instance 

259 # of uvloop event loop. 

260 asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) 

261 

262 super().init_process()