Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/worker.py: 6%

129 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:56 +0000

1"""Async gunicorn worker for aiohttp.web""" 

2 

3import asyncio 

4import os 

5import re 

6import signal 

7import sys 

8from types import FrameType 

9from typing import Any, Awaitable, Callable, Optional, Union # noqa 

10 

11from gunicorn.config import AccessLogFormat as GunicornAccessLogFormat 

12from gunicorn.workers import base 

13 

14from aiohttp import web 

15 

16from .helpers import set_result 

17from .web_app import Application 

18from .web_log import AccessLogger 

19 

20try: 

21 import ssl 

22 

23 SSLContext = ssl.SSLContext 

24except ImportError: # pragma: no cover 

25 ssl = None # type: ignore[assignment] 

26 SSLContext = object # type: ignore[misc,assignment] 

27 

28 

29__all__ = ("GunicornWebWorker", "GunicornUVLoopWebWorker", "GunicornTokioWebWorker") 

30 

31 

32class GunicornWebWorker(base.Worker): # type: ignore[misc,no-any-unimported] 

33 

34 DEFAULT_AIOHTTP_LOG_FORMAT = AccessLogger.LOG_FORMAT 

35 DEFAULT_GUNICORN_LOG_FORMAT = GunicornAccessLogFormat.default 

36 

37 def __init__(self, *args: Any, **kw: Any) -> None: # pragma: no cover 

38 super().__init__(*args, **kw) 

39 

40 self._task: Optional[asyncio.Task[None]] = None 

41 self.exit_code = 0 

42 self._notify_waiter: Optional[asyncio.Future[bool]] = None 

43 

44 def init_process(self) -> None: 

45 # create new event_loop after fork 

46 asyncio.get_event_loop().close() 

47 

48 self.loop = asyncio.new_event_loop() 

49 asyncio.set_event_loop(self.loop) 

50 

51 super().init_process() 

52 

53 def run(self) -> None: 

54 self._task = self.loop.create_task(self._run()) 

55 

56 try: # ignore all finalization problems 

57 self.loop.run_until_complete(self._task) 

58 except Exception: 

59 self.log.exception("Exception in gunicorn worker") 

60 self.loop.run_until_complete(self.loop.shutdown_asyncgens()) 

61 self.loop.close() 

62 

63 sys.exit(self.exit_code) 

64 

65 async def _run(self) -> None: 

66 runner = None 

67 if isinstance(self.wsgi, Application): 

68 app = self.wsgi 

69 elif asyncio.iscoroutinefunction(self.wsgi): 

70 wsgi = await self.wsgi() 

71 if isinstance(wsgi, web.AppRunner): 

72 runner = wsgi 

73 app = runner.app 

74 else: 

75 app = wsgi 

76 else: 

77 raise RuntimeError( 

78 "wsgi app should be either Application or " 

79 "async function returning Application, got {}".format(self.wsgi) 

80 ) 

81 

82 if runner is None: 

83 access_log = self.log.access_log if self.cfg.accesslog else None 

84 runner = web.AppRunner( 

85 app, 

86 logger=self.log, 

87 keepalive_timeout=self.cfg.keepalive, 

88 access_log=access_log, 

89 access_log_format=self._get_valid_log_format( 

90 self.cfg.access_log_format 

91 ), 

92 ) 

93 await runner.setup() 

94 

95 ctx = self._create_ssl_context(self.cfg) if self.cfg.is_ssl else None 

96 

97 runner = runner 

98 assert runner is not None 

99 server = runner.server 

100 assert server is not None 

101 for sock in self.sockets: 

102 site = web.SockSite( 

103 runner, 

104 sock, 

105 ssl_context=ctx, 

106 shutdown_timeout=self.cfg.graceful_timeout / 100 * 95, 

107 ) 

108 await site.start() 

109 

110 # If our parent changed then we shut down. 

111 pid = os.getpid() 

112 try: 

113 while self.alive: # type: ignore[has-type] 

114 self.notify() 

115 

116 cnt = server.requests_count 

117 if self.cfg.max_requests and cnt > self.cfg.max_requests: 

118 self.alive = False 

119 self.log.info("Max requests, shutting down: %s", self) 

120 

121 elif pid == os.getpid() and self.ppid != os.getppid(): 

122 self.alive = False 

123 self.log.info("Parent changed, shutting down: %s", self) 

124 else: 

125 await self._wait_next_notify() 

126 except BaseException: 

127 pass 

128 

129 await runner.cleanup() 

130 

131 def _wait_next_notify(self) -> "asyncio.Future[bool]": 

132 self._notify_waiter_done() 

133 

134 loop = self.loop 

135 assert loop is not None 

136 self._notify_waiter = waiter = loop.create_future() 

137 self.loop.call_later(1.0, self._notify_waiter_done, waiter) 

138 

139 return waiter 

140 

141 def _notify_waiter_done( 

142 self, waiter: Optional["asyncio.Future[bool]"] = None 

143 ) -> None: 

144 if waiter is None: 

145 waiter = self._notify_waiter 

146 if waiter is not None: 

147 set_result(waiter, True) 

148 

149 if waiter is self._notify_waiter: 

150 self._notify_waiter = None 

151 

152 def init_signals(self) -> None: 

153 # Set up signals through the event loop API. 

154 

155 self.loop.add_signal_handler( 

156 signal.SIGQUIT, self.handle_quit, signal.SIGQUIT, None 

157 ) 

158 

159 self.loop.add_signal_handler( 

160 signal.SIGTERM, self.handle_exit, signal.SIGTERM, None 

161 ) 

162 

163 self.loop.add_signal_handler( 

164 signal.SIGINT, self.handle_quit, signal.SIGINT, None 

165 ) 

166 

167 self.loop.add_signal_handler( 

168 signal.SIGWINCH, self.handle_winch, signal.SIGWINCH, None 

169 ) 

170 

171 self.loop.add_signal_handler( 

172 signal.SIGUSR1, self.handle_usr1, signal.SIGUSR1, None 

173 ) 

174 

175 self.loop.add_signal_handler( 

176 signal.SIGABRT, self.handle_abort, signal.SIGABRT, None 

177 ) 

178 

179 # Don't let SIGTERM and SIGUSR1 disturb active requests 

180 # by interrupting system calls 

181 signal.siginterrupt(signal.SIGTERM, False) 

182 signal.siginterrupt(signal.SIGUSR1, False) 

183 # Reset signals so Gunicorn doesn't swallow subprocess return codes 

184 # See: https://github.com/aio-libs/aiohttp/issues/6130 

185 if sys.version_info < (3, 8): 

186 # Starting from Python 3.8, 

187 # the default child watcher is ThreadedChildWatcher. 

188 # The watcher doesn't depend on SIGCHLD signal, 

189 # there is no need to reset it. 

190 signal.signal(signal.SIGCHLD, signal.SIG_DFL) 

191 

192 def handle_quit(self, sig: int, frame: FrameType) -> None: 

193 self.alive = False 

194 

195 # worker_int callback 

196 self.cfg.worker_int(self) 

197 

198 # wakeup closing process 

199 self._notify_waiter_done() 

200 

201 def handle_abort(self, sig: int, frame: FrameType) -> None: 

202 self.alive = False 

203 self.exit_code = 1 

204 self.cfg.worker_abort(self) 

205 sys.exit(1) 

206 

207 @staticmethod 

208 def _create_ssl_context(cfg: Any) -> "SSLContext": 

209 """Creates SSLContext instance for usage in asyncio.create_server. 

210 

211 See ssl.SSLSocket.__init__ for more details. 

212 """ 

213 if ssl is None: # pragma: no cover 

214 raise RuntimeError("SSL is not supported.") 

215 

216 ctx = ssl.SSLContext(cfg.ssl_version) 

217 ctx.load_cert_chain(cfg.certfile, cfg.keyfile) 

218 ctx.verify_mode = cfg.cert_reqs 

219 if cfg.ca_certs: 

220 ctx.load_verify_locations(cfg.ca_certs) 

221 if cfg.ciphers: 

222 ctx.set_ciphers(cfg.ciphers) 

223 return ctx 

224 

225 def _get_valid_log_format(self, source_format: str) -> str: 

226 if source_format == self.DEFAULT_GUNICORN_LOG_FORMAT: 

227 return self.DEFAULT_AIOHTTP_LOG_FORMAT 

228 elif re.search(r"%\([^\)]+\)", source_format): 

229 raise ValueError( 

230 "Gunicorn's style options in form of `%(name)s` are not " 

231 "supported for the log formatting. Please use aiohttp's " 

232 "format specification to configure access log formatting: " 

233 "http://docs.aiohttp.org/en/stable/logging.html" 

234 "#format-specification" 

235 ) 

236 else: 

237 return source_format 

238 

239 

240class GunicornUVLoopWebWorker(GunicornWebWorker): 

241 def init_process(self) -> None: 

242 import uvloop 

243 

244 # Close any existing event loop before setting a 

245 # new policy. 

246 asyncio.get_event_loop().close() 

247 

248 # Setup uvloop policy, so that every 

249 # asyncio.get_event_loop() will create an instance 

250 # of uvloop event loop. 

251 asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) 

252 

253 super().init_process() 

254 

255 

256class GunicornTokioWebWorker(GunicornWebWorker): 

257 def init_process(self) -> None: # pragma: no cover 

258 import tokio 

259 

260 # Close any existing event loop before setting a 

261 # new policy. 

262 asyncio.get_event_loop().close() 

263 

264 # Setup tokio policy, so that every 

265 # asyncio.get_event_loop() will create an instance 

266 # of tokio event loop. 

267 asyncio.set_event_loop_policy(tokio.EventLoopPolicy()) 

268 

269 super().init_process()