Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/process.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

178 statements  

1# 

2# Copyright 2011 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""Utilities for working with multiple processes, including both forking 

17the server into multiple processes and managing subprocesses. 

18""" 

19 

20import asyncio 

21import os 

22import multiprocessing 

23import signal 

24import subprocess 

25import sys 

26import time 

27 

28from binascii import hexlify 

29 

30from tornado.concurrent import ( 

31 Future, 

32 future_set_result_unless_cancelled, 

33 future_set_exception_unless_cancelled, 

34) 

35from tornado import ioloop 

36from tornado.iostream import PipeIOStream 

37from tornado.log import gen_log 

38 

39import typing 

40from typing import Optional, Any, Callable 

41 

42if typing.TYPE_CHECKING: 

43 from typing import List # noqa: F401 

44 

45# Re-export this exception for convenience. 

46CalledProcessError = subprocess.CalledProcessError 

47 

48 

49def cpu_count() -> int: 

50 """Returns the number of processors on this machine.""" 

51 if multiprocessing is None: 

52 return 1 

53 try: 

54 return multiprocessing.cpu_count() 

55 except NotImplementedError: 

56 pass 

57 try: 

58 return os.sysconf("SC_NPROCESSORS_CONF") # type: ignore 

59 except (AttributeError, ValueError): 

60 pass 

61 gen_log.error("Could not detect number of processors; assuming 1") 

62 return 1 

63 

64 

65def _reseed_random() -> None: 

66 if "random" not in sys.modules: 

67 return 

68 import random 

69 

70 # If os.urandom is available, this method does the same thing as 

71 # random.seed (at least as of python 2.6). If os.urandom is not 

72 # available, we mix in the pid in addition to a timestamp. 

73 try: 

74 seed = int(hexlify(os.urandom(16)), 16) 

75 except NotImplementedError: 

76 seed = int(time.time() * 1000) ^ os.getpid() 

77 random.seed(seed) 

78 

79 

80_task_id = None 

81 

82 

83def fork_processes( 

84 num_processes: Optional[int], max_restarts: Optional[int] = None 

85) -> int: 

86 """Starts multiple worker processes. 

87 

88 If ``num_processes`` is None or <= 0, we detect the number of cores 

89 available on this machine and fork that number of child 

90 processes. If ``num_processes`` is given and > 0, we fork that 

91 specific number of sub-processes. 

92 

93 Since we use processes and not threads, there is no shared memory 

94 between any server code. 

95 

96 Note that multiple processes are not compatible with the autoreload 

97 module (or the ``autoreload=True`` option to `tornado.web.Application` 

98 which defaults to True when ``debug=True``). 

99 When using multiple processes, no IOLoops can be created or 

100 referenced until after the call to ``fork_processes``. 

101 

102 In each child process, ``fork_processes`` returns its *task id*, a 

103 number between 0 and ``num_processes``. Processes that exit 

104 abnormally (due to a signal or non-zero exit status) are restarted 

105 with the same id (up to ``max_restarts`` times). In the parent 

106 process, ``fork_processes`` calls ``sys.exit(0)`` after all child 

107 processes have exited normally. 

108 

109 max_restarts defaults to 100. 

110 

111 Availability: Unix 

112 """ 

113 if sys.platform == "win32": 

114 # The exact form of this condition matters to mypy; it understands 

115 # if but not assert in this context. 

116 raise Exception("fork not available on windows") 

117 if max_restarts is None: 

118 max_restarts = 100 

119 

120 global _task_id 

121 assert _task_id is None 

122 if num_processes is None or num_processes <= 0: 

123 num_processes = cpu_count() 

124 gen_log.info("Starting %d processes", num_processes) 

125 children = {} 

126 

127 def start_child(i: int) -> Optional[int]: 

128 pid = os.fork() 

129 if pid == 0: 

130 # child process 

131 _reseed_random() 

132 global _task_id 

133 _task_id = i 

134 return i 

135 else: 

136 children[pid] = i 

137 return None 

138 

139 for i in range(num_processes): 

140 id = start_child(i) 

141 if id is not None: 

142 return id 

143 num_restarts = 0 

144 while children: 

145 pid, status = os.wait() 

146 if pid not in children: 

147 continue 

148 id = children.pop(pid) 

149 if os.WIFSIGNALED(status): 

150 gen_log.warning( 

151 "child %d (pid %d) killed by signal %d, restarting", 

152 id, 

153 pid, 

154 os.WTERMSIG(status), 

155 ) 

156 elif os.WEXITSTATUS(status) != 0: 

157 gen_log.warning( 

158 "child %d (pid %d) exited with status %d, restarting", 

159 id, 

160 pid, 

161 os.WEXITSTATUS(status), 

162 ) 

163 else: 

164 gen_log.info("child %d (pid %d) exited normally", id, pid) 

165 continue 

166 num_restarts += 1 

167 if num_restarts > max_restarts: 

168 raise RuntimeError("Too many child restarts, giving up") 

169 new_id = start_child(id) 

170 if new_id is not None: 

171 return new_id 

172 # All child processes exited cleanly, so exit the master process 

173 # instead of just returning to right after the call to 

174 # fork_processes (which will probably just start up another IOLoop 

175 # unless the caller checks the return value). 

176 sys.exit(0) 

177 

178 

179def task_id() -> Optional[int]: 

180 """Returns the current task id, if any. 

181 

182 Returns None if this process was not created by `fork_processes`. 

183 """ 

184 global _task_id 

185 return _task_id 

186 

187 

188class Subprocess(object): 

189 """Wraps ``subprocess.Popen`` with IOStream support. 

190 

191 The constructor is the same as ``subprocess.Popen`` with the following 

192 additions: 

193 

194 * ``stdin``, ``stdout``, and ``stderr`` may have the value 

195 ``tornado.process.Subprocess.STREAM``, which will make the corresponding 

196 attribute of the resulting Subprocess a `.PipeIOStream`. If this option 

197 is used, the caller is responsible for closing the streams when done 

198 with them. 

199 

200 The ``Subprocess.STREAM`` option and the ``set_exit_callback`` and 

201 ``wait_for_exit`` methods do not work on Windows. There is 

202 therefore no reason to use this class instead of 

203 ``subprocess.Popen`` on that platform. 

204 

205 .. versionchanged:: 5.0 

206 The ``io_loop`` argument (deprecated since version 4.1) has been removed. 

207 

208 """ 

209 

210 STREAM = object() 

211 

212 _initialized = False 

213 _waiting = {} # type: ignore 

214 

215 def __init__(self, *args: Any, **kwargs: Any) -> None: 

216 self.io_loop = ioloop.IOLoop.current() 

217 # All FDs we create should be closed on error; those in to_close 

218 # should be closed in the parent process on success. 

219 pipe_fds = [] # type: List[int] 

220 to_close = [] # type: List[int] 

221 if kwargs.get("stdin") is Subprocess.STREAM: 

222 in_r, in_w = os.pipe() 

223 kwargs["stdin"] = in_r 

224 pipe_fds.extend((in_r, in_w)) 

225 to_close.append(in_r) 

226 self.stdin = PipeIOStream(in_w) 

227 if kwargs.get("stdout") is Subprocess.STREAM: 

228 out_r, out_w = os.pipe() 

229 kwargs["stdout"] = out_w 

230 pipe_fds.extend((out_r, out_w)) 

231 to_close.append(out_w) 

232 self.stdout = PipeIOStream(out_r) 

233 if kwargs.get("stderr") is Subprocess.STREAM: 

234 err_r, err_w = os.pipe() 

235 kwargs["stderr"] = err_w 

236 pipe_fds.extend((err_r, err_w)) 

237 to_close.append(err_w) 

238 self.stderr = PipeIOStream(err_r) 

239 try: 

240 self.proc = subprocess.Popen(*args, **kwargs) 

241 except: 

242 for fd in pipe_fds: 

243 os.close(fd) 

244 raise 

245 for fd in to_close: 

246 os.close(fd) 

247 self.pid = self.proc.pid 

248 for attr in ["stdin", "stdout", "stderr"]: 

249 if not hasattr(self, attr): # don't clobber streams set above 

250 setattr(self, attr, getattr(self.proc, attr)) 

251 self._exit_callback = None # type: Optional[Callable[[int], None]] 

252 self.returncode = None # type: Optional[int] 

253 

254 def set_exit_callback(self, callback: Callable[[int], None]) -> None: 

255 """Runs ``callback`` when this process exits. 

256 

257 The callback takes one argument, the return code of the process. 

258 

259 This method uses a ``SIGCHLD`` handler, which is a global setting 

260 and may conflict if you have other libraries trying to handle the 

261 same signal. If you are using more than one ``IOLoop`` it may 

262 be necessary to call `Subprocess.initialize` first to designate 

263 one ``IOLoop`` to run the signal handlers. 

264 

265 In many cases a close callback on the stdout or stderr streams 

266 can be used as an alternative to an exit callback if the 

267 signal handler is causing a problem. 

268 

269 Availability: Unix 

270 """ 

271 self._exit_callback = callback 

272 Subprocess.initialize() 

273 Subprocess._waiting[self.pid] = self 

274 Subprocess._try_cleanup_process(self.pid) 

275 

276 def wait_for_exit(self, raise_error: bool = True) -> "Future[int]": 

277 """Returns a `.Future` which resolves when the process exits. 

278 

279 Usage:: 

280 

281 ret = yield proc.wait_for_exit() 

282 

283 This is a coroutine-friendly alternative to `set_exit_callback` 

284 (and a replacement for the blocking `subprocess.Popen.wait`). 

285 

286 By default, raises `subprocess.CalledProcessError` if the process 

287 has a non-zero exit status. Use ``wait_for_exit(raise_error=False)`` 

288 to suppress this behavior and return the exit status without raising. 

289 

290 .. versionadded:: 4.2 

291 

292 Availability: Unix 

293 """ 

294 future = Future() # type: Future[int] 

295 

296 def callback(ret: int) -> None: 

297 if ret != 0 and raise_error: 

298 # Unfortunately we don't have the original args any more. 

299 future_set_exception_unless_cancelled( 

300 future, CalledProcessError(ret, "unknown") 

301 ) 

302 else: 

303 future_set_result_unless_cancelled(future, ret) 

304 

305 self.set_exit_callback(callback) 

306 return future 

307 

308 @classmethod 

309 def initialize(cls) -> None: 

310 """Initializes the ``SIGCHLD`` handler. 

311 

312 The signal handler is run on an `.IOLoop` to avoid locking issues. 

313 Note that the `.IOLoop` used for signal handling need not be the 

314 same one used by individual Subprocess objects (as long as the 

315 ``IOLoops`` are each running in separate threads). 

316 

317 .. versionchanged:: 5.0 

318 The ``io_loop`` argument (deprecated since version 4.1) has been 

319 removed. 

320 

321 Availability: Unix 

322 """ 

323 if cls._initialized: 

324 return 

325 loop = asyncio.get_event_loop() 

326 loop.add_signal_handler(signal.SIGCHLD, cls._cleanup) 

327 cls._initialized = True 

328 

329 @classmethod 

330 def uninitialize(cls) -> None: 

331 """Removes the ``SIGCHLD`` handler.""" 

332 if not cls._initialized: 

333 return 

334 loop = asyncio.get_event_loop() 

335 loop.remove_signal_handler(signal.SIGCHLD) 

336 cls._initialized = False 

337 

338 @classmethod 

339 def _cleanup(cls) -> None: 

340 for pid in list(cls._waiting.keys()): # make a copy 

341 cls._try_cleanup_process(pid) 

342 

343 @classmethod 

344 def _try_cleanup_process(cls, pid: int) -> None: 

345 try: 

346 ret_pid, status = os.waitpid(pid, os.WNOHANG) # type: ignore 

347 except ChildProcessError: 

348 return 

349 if ret_pid == 0: 

350 return 

351 assert ret_pid == pid 

352 subproc = cls._waiting.pop(pid) 

353 subproc.io_loop.add_callback(subproc._set_returncode, status) 

354 

355 def _set_returncode(self, status: int) -> None: 

356 if sys.platform == "win32": 

357 self.returncode = -1 

358 else: 

359 if os.WIFSIGNALED(status): 

360 self.returncode = -os.WTERMSIG(status) 

361 else: 

362 assert os.WIFEXITED(status) 

363 self.returncode = os.WEXITSTATUS(status) 

364 # We've taken over wait() duty from the subprocess.Popen 

365 # object. If we don't inform it of the process's return code, 

366 # it will log a warning at destruction in python 3.6+. 

367 self.proc.returncode = self.returncode 

368 if self._exit_callback: 

369 callback = self._exit_callback 

370 self._exit_callback = None 

371 callback(self.returncode)