Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/process.py: 21%

177 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1# 

2# Copyright 2011 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""Utilities for working with multiple processes, including both forking 

17the server into multiple processes and managing subprocesses. 

18""" 

19 

20import os 

21import multiprocessing 

22import signal 

23import subprocess 

24import sys 

25import time 

26 

27from binascii import hexlify 

28 

29from tornado.concurrent import ( 

30 Future, 

31 future_set_result_unless_cancelled, 

32 future_set_exception_unless_cancelled, 

33) 

34from tornado import ioloop 

35from tornado.iostream import PipeIOStream 

36from tornado.log import gen_log 

37 

38import typing 

39from typing import Optional, Any, Callable 

40 

41if typing.TYPE_CHECKING: 

42 from typing import List # noqa: F401 

43 

44# Re-export this exception for convenience. 

45CalledProcessError = subprocess.CalledProcessError 

46 

47 

48def cpu_count() -> int: 

49 """Returns the number of processors on this machine.""" 

50 if multiprocessing is None: 

51 return 1 

52 try: 

53 return multiprocessing.cpu_count() 

54 except NotImplementedError: 

55 pass 

56 try: 

57 return os.sysconf("SC_NPROCESSORS_CONF") # type: ignore 

58 except (AttributeError, ValueError): 

59 pass 

60 gen_log.error("Could not detect number of processors; assuming 1") 

61 return 1 

62 

63 

64def _reseed_random() -> None: 

65 if "random" not in sys.modules: 

66 return 

67 import random 

68 

69 # If os.urandom is available, this method does the same thing as 

70 # random.seed (at least as of python 2.6). If os.urandom is not 

71 # available, we mix in the pid in addition to a timestamp. 

72 try: 

73 seed = int(hexlify(os.urandom(16)), 16) 

74 except NotImplementedError: 

75 seed = int(time.time() * 1000) ^ os.getpid() 

76 random.seed(seed) 

77 

78 

79_task_id = None 

80 

81 

82def fork_processes( 

83 num_processes: Optional[int], max_restarts: Optional[int] = None 

84) -> int: 

85 """Starts multiple worker processes. 

86 

87 If ``num_processes`` is None or <= 0, we detect the number of cores 

88 available on this machine and fork that number of child 

89 processes. If ``num_processes`` is given and > 0, we fork that 

90 specific number of sub-processes. 

91 

92 Since we use processes and not threads, there is no shared memory 

93 between any server code. 

94 

95 Note that multiple processes are not compatible with the autoreload 

96 module (or the ``autoreload=True`` option to `tornado.web.Application` 

97 which defaults to True when ``debug=True``). 

98 When using multiple processes, no IOLoops can be created or 

99 referenced until after the call to ``fork_processes``. 

100 

101 In each child process, ``fork_processes`` returns its *task id*, a 

102 number between 0 and ``num_processes``. Processes that exit 

103 abnormally (due to a signal or non-zero exit status) are restarted 

104 with the same id (up to ``max_restarts`` times). In the parent 

105 process, ``fork_processes`` calls ``sys.exit(0)`` after all child 

106 processes have exited normally. 

107 

108 max_restarts defaults to 100. 

109 

110 Availability: Unix 

111 """ 

112 if sys.platform == "win32": 

113 # The exact form of this condition matters to mypy; it understands 

114 # if but not assert in this context. 

115 raise Exception("fork not available on windows") 

116 if max_restarts is None: 

117 max_restarts = 100 

118 

119 global _task_id 

120 assert _task_id is None 

121 if num_processes is None or num_processes <= 0: 

122 num_processes = cpu_count() 

123 gen_log.info("Starting %d processes", num_processes) 

124 children = {} 

125 

126 def start_child(i: int) -> Optional[int]: 

127 pid = os.fork() 

128 if pid == 0: 

129 # child process 

130 _reseed_random() 

131 global _task_id 

132 _task_id = i 

133 return i 

134 else: 

135 children[pid] = i 

136 return None 

137 

138 for i in range(num_processes): 

139 id = start_child(i) 

140 if id is not None: 

141 return id 

142 num_restarts = 0 

143 while children: 

144 pid, status = os.wait() 

145 if pid not in children: 

146 continue 

147 id = children.pop(pid) 

148 if os.WIFSIGNALED(status): 

149 gen_log.warning( 

150 "child %d (pid %d) killed by signal %d, restarting", 

151 id, 

152 pid, 

153 os.WTERMSIG(status), 

154 ) 

155 elif os.WEXITSTATUS(status) != 0: 

156 gen_log.warning( 

157 "child %d (pid %d) exited with status %d, restarting", 

158 id, 

159 pid, 

160 os.WEXITSTATUS(status), 

161 ) 

162 else: 

163 gen_log.info("child %d (pid %d) exited normally", id, pid) 

164 continue 

165 num_restarts += 1 

166 if num_restarts > max_restarts: 

167 raise RuntimeError("Too many child restarts, giving up") 

168 new_id = start_child(id) 

169 if new_id is not None: 

170 return new_id 

171 # All child processes exited cleanly, so exit the master process 

172 # instead of just returning to right after the call to 

173 # fork_processes (which will probably just start up another IOLoop 

174 # unless the caller checks the return value). 

175 sys.exit(0) 

176 

177 

178def task_id() -> Optional[int]: 

179 """Returns the current task id, if any. 

180 

181 Returns None if this process was not created by `fork_processes`. 

182 """ 

183 global _task_id 

184 return _task_id 

185 

186 

187class Subprocess(object): 

188 """Wraps ``subprocess.Popen`` with IOStream support. 

189 

190 The constructor is the same as ``subprocess.Popen`` with the following 

191 additions: 

192 

193 * ``stdin``, ``stdout``, and ``stderr`` may have the value 

194 ``tornado.process.Subprocess.STREAM``, which will make the corresponding 

195 attribute of the resulting Subprocess a `.PipeIOStream`. If this option 

196 is used, the caller is responsible for closing the streams when done 

197 with them. 

198 

199 The ``Subprocess.STREAM`` option and the ``set_exit_callback`` and 

200 ``wait_for_exit`` methods do not work on Windows. There is 

201 therefore no reason to use this class instead of 

202 ``subprocess.Popen`` on that platform. 

203 

204 .. versionchanged:: 5.0 

205 The ``io_loop`` argument (deprecated since version 4.1) has been removed. 

206 

207 """ 

208 

209 STREAM = object() 

210 

211 _initialized = False 

212 _waiting = {} # type: ignore 

213 _old_sigchld = None 

214 

215 def __init__(self, *args: Any, **kwargs: Any) -> None: 

216 self.io_loop = ioloop.IOLoop.current() 

217 # All FDs we create should be closed on error; those in to_close 

218 # should be closed in the parent process on success. 

219 pipe_fds = [] # type: List[int] 

220 to_close = [] # type: List[int] 

221 if kwargs.get("stdin") is Subprocess.STREAM: 

222 in_r, in_w = os.pipe() 

223 kwargs["stdin"] = in_r 

224 pipe_fds.extend((in_r, in_w)) 

225 to_close.append(in_r) 

226 self.stdin = PipeIOStream(in_w) 

227 if kwargs.get("stdout") is Subprocess.STREAM: 

228 out_r, out_w = os.pipe() 

229 kwargs["stdout"] = out_w 

230 pipe_fds.extend((out_r, out_w)) 

231 to_close.append(out_w) 

232 self.stdout = PipeIOStream(out_r) 

233 if kwargs.get("stderr") is Subprocess.STREAM: 

234 err_r, err_w = os.pipe() 

235 kwargs["stderr"] = err_w 

236 pipe_fds.extend((err_r, err_w)) 

237 to_close.append(err_w) 

238 self.stderr = PipeIOStream(err_r) 

239 try: 

240 self.proc = subprocess.Popen(*args, **kwargs) 

241 except: 

242 for fd in pipe_fds: 

243 os.close(fd) 

244 raise 

245 for fd in to_close: 

246 os.close(fd) 

247 self.pid = self.proc.pid 

248 for attr in ["stdin", "stdout", "stderr"]: 

249 if not hasattr(self, attr): # don't clobber streams set above 

250 setattr(self, attr, getattr(self.proc, attr)) 

251 self._exit_callback = None # type: Optional[Callable[[int], None]] 

252 self.returncode = None # type: Optional[int] 

253 

254 def set_exit_callback(self, callback: Callable[[int], None]) -> None: 

255 """Runs ``callback`` when this process exits. 

256 

257 The callback takes one argument, the return code of the process. 

258 

259 This method uses a ``SIGCHLD`` handler, which is a global setting 

260 and may conflict if you have other libraries trying to handle the 

261 same signal. If you are using more than one ``IOLoop`` it may 

262 be necessary to call `Subprocess.initialize` first to designate 

263 one ``IOLoop`` to run the signal handlers. 

264 

265 In many cases a close callback on the stdout or stderr streams 

266 can be used as an alternative to an exit callback if the 

267 signal handler is causing a problem. 

268 

269 Availability: Unix 

270 """ 

271 self._exit_callback = callback 

272 Subprocess.initialize() 

273 Subprocess._waiting[self.pid] = self 

274 Subprocess._try_cleanup_process(self.pid) 

275 

276 def wait_for_exit(self, raise_error: bool = True) -> "Future[int]": 

277 """Returns a `.Future` which resolves when the process exits. 

278 

279 Usage:: 

280 

281 ret = yield proc.wait_for_exit() 

282 

283 This is a coroutine-friendly alternative to `set_exit_callback` 

284 (and a replacement for the blocking `subprocess.Popen.wait`). 

285 

286 By default, raises `subprocess.CalledProcessError` if the process 

287 has a non-zero exit status. Use ``wait_for_exit(raise_error=False)`` 

288 to suppress this behavior and return the exit status without raising. 

289 

290 .. versionadded:: 4.2 

291 

292 Availability: Unix 

293 """ 

294 future = Future() # type: Future[int] 

295 

296 def callback(ret: int) -> None: 

297 if ret != 0 and raise_error: 

298 # Unfortunately we don't have the original args any more. 

299 future_set_exception_unless_cancelled( 

300 future, CalledProcessError(ret, "unknown") 

301 ) 

302 else: 

303 future_set_result_unless_cancelled(future, ret) 

304 

305 self.set_exit_callback(callback) 

306 return future 

307 

308 @classmethod 

309 def initialize(cls) -> None: 

310 """Initializes the ``SIGCHLD`` handler. 

311 

312 The signal handler is run on an `.IOLoop` to avoid locking issues. 

313 Note that the `.IOLoop` used for signal handling need not be the 

314 same one used by individual Subprocess objects (as long as the 

315 ``IOLoops`` are each running in separate threads). 

316 

317 .. versionchanged:: 5.0 

318 The ``io_loop`` argument (deprecated since version 4.1) has been 

319 removed. 

320 

321 Availability: Unix 

322 """ 

323 if cls._initialized: 

324 return 

325 io_loop = ioloop.IOLoop.current() 

326 cls._old_sigchld = signal.signal( 

327 signal.SIGCHLD, 

328 lambda sig, frame: io_loop.add_callback_from_signal(cls._cleanup), 

329 ) 

330 cls._initialized = True 

331 

332 @classmethod 

333 def uninitialize(cls) -> None: 

334 """Removes the ``SIGCHLD`` handler.""" 

335 if not cls._initialized: 

336 return 

337 signal.signal(signal.SIGCHLD, cls._old_sigchld) 

338 cls._initialized = False 

339 

340 @classmethod 

341 def _cleanup(cls) -> None: 

342 for pid in list(cls._waiting.keys()): # make a copy 

343 cls._try_cleanup_process(pid) 

344 

345 @classmethod 

346 def _try_cleanup_process(cls, pid: int) -> None: 

347 try: 

348 ret_pid, status = os.waitpid(pid, os.WNOHANG) # type: ignore 

349 except ChildProcessError: 

350 return 

351 if ret_pid == 0: 

352 return 

353 assert ret_pid == pid 

354 subproc = cls._waiting.pop(pid) 

355 subproc.io_loop.add_callback_from_signal(subproc._set_returncode, status) 

356 

357 def _set_returncode(self, status: int) -> None: 

358 if sys.platform == "win32": 

359 self.returncode = -1 

360 else: 

361 if os.WIFSIGNALED(status): 

362 self.returncode = -os.WTERMSIG(status) 

363 else: 

364 assert os.WIFEXITED(status) 

365 self.returncode = os.WEXITSTATUS(status) 

366 # We've taken over wait() duty from the subprocess.Popen 

367 # object. If we don't inform it of the process's return code, 

368 # it will log a warning at destruction in python 3.6+. 

369 self.proc.returncode = self.returncode 

370 if self._exit_callback: 

371 callback = self._exit_callback 

372 self._exit_callback = None 

373 callback(self.returncode)