Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/worker.py: 8%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

131 statements  

1"""Async gunicorn worker for aiohttp.web""" 

2 

3import asyncio 

4import inspect 

5import os 

6import re 

7import signal 

8import sys 

9from types import FrameType 

10from typing import TYPE_CHECKING, Any, Optional 

11 

12from gunicorn.config import AccessLogFormat as GunicornAccessLogFormat 

13from gunicorn.workers import base 

14 

15from aiohttp import web 

16 

17from .helpers import set_result 

18from .web_app import Application 

19from .web_log import AccessLogger 

20 

21if TYPE_CHECKING: 

22 import ssl 

23 

24 SSLContext = ssl.SSLContext 

25else: 

26 try: 

27 import ssl 

28 

29 SSLContext = ssl.SSLContext 

30 except ImportError: # pragma: no cover 

31 ssl = None # type: ignore[assignment] 

32 SSLContext = object # type: ignore[misc,assignment] 

33 

34 

35__all__ = ("GunicornWebWorker", "GunicornUVLoopWebWorker") 

36 

37 

38class GunicornWebWorker(base.Worker): # type: ignore[misc,no-any-unimported] 

39 

40 DEFAULT_AIOHTTP_LOG_FORMAT = AccessLogger.LOG_FORMAT 

41 DEFAULT_GUNICORN_LOG_FORMAT = GunicornAccessLogFormat.default 

42 

43 def __init__(self, *args: Any, **kw: Any) -> None: # pragma: no cover 

44 super().__init__(*args, **kw) 

45 

46 self._task: Optional[asyncio.Task[None]] = None 

47 self.exit_code = 0 

48 self._notify_waiter: Optional[asyncio.Future[bool]] = None 

49 

50 def init_process(self) -> None: 

51 # create new event_loop after fork 

52 asyncio.get_event_loop().close() 

53 

54 self.loop = asyncio.new_event_loop() 

55 asyncio.set_event_loop(self.loop) 

56 

57 super().init_process() 

58 

59 def run(self) -> None: 

60 self._task = self.loop.create_task(self._run()) 

61 

62 try: # ignore all finalization problems 

63 self.loop.run_until_complete(self._task) 

64 except Exception: 

65 self.log.exception("Exception in gunicorn worker") 

66 self.loop.run_until_complete(self.loop.shutdown_asyncgens()) 

67 self.loop.close() 

68 

69 sys.exit(self.exit_code) 

70 

71 async def _run(self) -> None: 

72 runner = None 

73 if isinstance(self.wsgi, Application): 

74 app = self.wsgi 

75 elif inspect.iscoroutinefunction(self.wsgi) or ( 

76 sys.version_info < (3, 14) and asyncio.iscoroutinefunction(self.wsgi) 

77 ): 

78 wsgi = await self.wsgi() 

79 if isinstance(wsgi, web.AppRunner): 

80 runner = wsgi 

81 app = runner.app 

82 else: 

83 app = wsgi 

84 else: 

85 raise RuntimeError( 

86 "wsgi app should be either Application or " 

87 "async function returning Application, got {}".format(self.wsgi) 

88 ) 

89 

90 if runner is None: 

91 access_log = self.log.access_log if self.cfg.accesslog else None 

92 runner = web.AppRunner( 

93 app, 

94 logger=self.log, 

95 keepalive_timeout=self.cfg.keepalive, 

96 access_log=access_log, 

97 access_log_format=self._get_valid_log_format( 

98 self.cfg.access_log_format 

99 ), 

100 shutdown_timeout=self.cfg.graceful_timeout / 100 * 95, 

101 ) 

102 await runner.setup() 

103 

104 ctx = self._create_ssl_context(self.cfg) if self.cfg.is_ssl else None 

105 

106 runner = runner 

107 assert runner is not None 

108 server = runner.server 

109 assert server is not None 

110 for sock in self.sockets: 

111 site = web.SockSite( 

112 runner, 

113 sock, 

114 ssl_context=ctx, 

115 ) 

116 await site.start() 

117 

118 # If our parent changed then we shut down. 

119 pid = os.getpid() 

120 try: 

121 while self.alive: # type: ignore[has-type] 

122 self.notify() 

123 

124 cnt = server.requests_count 

125 if self.max_requests and cnt > self.max_requests: 

126 self.alive = False 

127 self.log.info("Max requests, shutting down: %s", self) 

128 

129 elif pid == os.getpid() and self.ppid != os.getppid(): 

130 self.alive = False 

131 self.log.info("Parent changed, shutting down: %s", self) 

132 else: 

133 await self._wait_next_notify() 

134 except BaseException: 

135 pass 

136 

137 await runner.cleanup() 

138 

139 def _wait_next_notify(self) -> "asyncio.Future[bool]": 

140 self._notify_waiter_done() 

141 

142 loop = self.loop 

143 assert loop is not None 

144 self._notify_waiter = waiter = loop.create_future() 

145 self.loop.call_later(1.0, self._notify_waiter_done, waiter) 

146 

147 return waiter 

148 

149 def _notify_waiter_done( 

150 self, waiter: Optional["asyncio.Future[bool]"] = None 

151 ) -> None: 

152 if waiter is None: 

153 waiter = self._notify_waiter 

154 if waiter is not None: 

155 set_result(waiter, True) 

156 

157 if waiter is self._notify_waiter: 

158 self._notify_waiter = None 

159 

160 def init_signals(self) -> None: 

161 # Set up signals through the event loop API. 

162 

163 self.loop.add_signal_handler( 

164 signal.SIGQUIT, self.handle_quit, signal.SIGQUIT, None 

165 ) 

166 

167 self.loop.add_signal_handler( 

168 signal.SIGTERM, self.handle_exit, signal.SIGTERM, None 

169 ) 

170 

171 self.loop.add_signal_handler( 

172 signal.SIGINT, self.handle_quit, signal.SIGINT, None 

173 ) 

174 

175 self.loop.add_signal_handler( 

176 signal.SIGWINCH, self.handle_winch, signal.SIGWINCH, None 

177 ) 

178 

179 self.loop.add_signal_handler( 

180 signal.SIGUSR1, self.handle_usr1, signal.SIGUSR1, None 

181 ) 

182 

183 self.loop.add_signal_handler( 

184 signal.SIGABRT, self.handle_abort, signal.SIGABRT, None 

185 ) 

186 

187 # Don't let SIGTERM and SIGUSR1 disturb active requests 

188 # by interrupting system calls 

189 signal.siginterrupt(signal.SIGTERM, False) 

190 signal.siginterrupt(signal.SIGUSR1, False) 

191 # Reset signals so Gunicorn doesn't swallow subprocess return codes 

192 # See: https://github.com/aio-libs/aiohttp/issues/6130 

193 

194 def handle_quit(self, sig: int, frame: Optional[FrameType]) -> None: 

195 self.alive = False 

196 

197 # worker_int callback 

198 self.cfg.worker_int(self) 

199 

200 # wakeup closing process 

201 self._notify_waiter_done() 

202 

203 def handle_abort(self, sig: int, frame: Optional[FrameType]) -> None: 

204 self.alive = False 

205 self.exit_code = 1 

206 self.cfg.worker_abort(self) 

207 sys.exit(1) 

208 

209 @staticmethod 

210 def _create_ssl_context(cfg: Any) -> "SSLContext": 

211 """Creates SSLContext instance for usage in asyncio.create_server. 

212 

213 See ssl.SSLSocket.__init__ for more details. 

214 """ 

215 if ssl is None: # pragma: no cover 

216 raise RuntimeError("SSL is not supported.") 

217 

218 ctx = ssl.SSLContext(cfg.ssl_version) 

219 ctx.load_cert_chain(cfg.certfile, cfg.keyfile) 

220 ctx.verify_mode = cfg.cert_reqs 

221 if cfg.ca_certs: 

222 ctx.load_verify_locations(cfg.ca_certs) 

223 if cfg.ciphers: 

224 ctx.set_ciphers(cfg.ciphers) 

225 return ctx 

226 

227 def _get_valid_log_format(self, source_format: str) -> str: 

228 if source_format == self.DEFAULT_GUNICORN_LOG_FORMAT: 

229 return self.DEFAULT_AIOHTTP_LOG_FORMAT 

230 elif re.search(r"%\([^\)]+\)", source_format): 

231 raise ValueError( 

232 "Gunicorn's style options in form of `%(name)s` are not " 

233 "supported for the log formatting. Please use aiohttp's " 

234 "format specification to configure access log formatting: " 

235 "http://docs.aiohttp.org/en/stable/logging.html" 

236 "#format-specification" 

237 ) 

238 else: 

239 return source_format 

240 

241 

242class GunicornUVLoopWebWorker(GunicornWebWorker): 

243 def init_process(self) -> None: 

244 import uvloop 

245 

246 # Close any existing event loop before setting a 

247 # new policy. 

248 asyncio.get_event_loop().close() 

249 

250 # Setup uvloop policy, so that every 

251 # asyncio.get_event_loop() will create an instance 

252 # of uvloop event loop. 

253 asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) 

254 

255 super().init_process()