Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/process.py: 21%
177 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1#
2# Copyright 2011 Facebook
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""Utilities for working with multiple processes, including both forking
17the server into multiple processes and managing subprocesses.
18"""
20import os
21import multiprocessing
22import signal
23import subprocess
24import sys
25import time
27from binascii import hexlify
29from tornado.concurrent import (
30 Future,
31 future_set_result_unless_cancelled,
32 future_set_exception_unless_cancelled,
33)
34from tornado import ioloop
35from tornado.iostream import PipeIOStream
36from tornado.log import gen_log
38import typing
39from typing import Optional, Any, Callable
41if typing.TYPE_CHECKING:
42 from typing import List # noqa: F401
44# Re-export this exception for convenience.
45CalledProcessError = subprocess.CalledProcessError
48def cpu_count() -> int:
49 """Returns the number of processors on this machine."""
50 if multiprocessing is None:
51 return 1
52 try:
53 return multiprocessing.cpu_count()
54 except NotImplementedError:
55 pass
56 try:
57 return os.sysconf("SC_NPROCESSORS_CONF") # type: ignore
58 except (AttributeError, ValueError):
59 pass
60 gen_log.error("Could not detect number of processors; assuming 1")
61 return 1
64def _reseed_random() -> None:
65 if "random" not in sys.modules:
66 return
67 import random
69 # If os.urandom is available, this method does the same thing as
70 # random.seed (at least as of python 2.6). If os.urandom is not
71 # available, we mix in the pid in addition to a timestamp.
72 try:
73 seed = int(hexlify(os.urandom(16)), 16)
74 except NotImplementedError:
75 seed = int(time.time() * 1000) ^ os.getpid()
76 random.seed(seed)
79_task_id = None
82def fork_processes(
83 num_processes: Optional[int], max_restarts: Optional[int] = None
84) -> int:
85 """Starts multiple worker processes.
87 If ``num_processes`` is None or <= 0, we detect the number of cores
88 available on this machine and fork that number of child
89 processes. If ``num_processes`` is given and > 0, we fork that
90 specific number of sub-processes.
92 Since we use processes and not threads, there is no shared memory
93 between any server code.
95 Note that multiple processes are not compatible with the autoreload
96 module (or the ``autoreload=True`` option to `tornado.web.Application`
97 which defaults to True when ``debug=True``).
98 When using multiple processes, no IOLoops can be created or
99 referenced until after the call to ``fork_processes``.
101 In each child process, ``fork_processes`` returns its *task id*, a
102 number between 0 and ``num_processes``. Processes that exit
103 abnormally (due to a signal or non-zero exit status) are restarted
104 with the same id (up to ``max_restarts`` times). In the parent
105 process, ``fork_processes`` calls ``sys.exit(0)`` after all child
106 processes have exited normally.
108 max_restarts defaults to 100.
110 Availability: Unix
111 """
112 if sys.platform == "win32":
113 # The exact form of this condition matters to mypy; it understands
114 # if but not assert in this context.
115 raise Exception("fork not available on windows")
116 if max_restarts is None:
117 max_restarts = 100
119 global _task_id
120 assert _task_id is None
121 if num_processes is None or num_processes <= 0:
122 num_processes = cpu_count()
123 gen_log.info("Starting %d processes", num_processes)
124 children = {}
126 def start_child(i: int) -> Optional[int]:
127 pid = os.fork()
128 if pid == 0:
129 # child process
130 _reseed_random()
131 global _task_id
132 _task_id = i
133 return i
134 else:
135 children[pid] = i
136 return None
138 for i in range(num_processes):
139 id = start_child(i)
140 if id is not None:
141 return id
142 num_restarts = 0
143 while children:
144 pid, status = os.wait()
145 if pid not in children:
146 continue
147 id = children.pop(pid)
148 if os.WIFSIGNALED(status):
149 gen_log.warning(
150 "child %d (pid %d) killed by signal %d, restarting",
151 id,
152 pid,
153 os.WTERMSIG(status),
154 )
155 elif os.WEXITSTATUS(status) != 0:
156 gen_log.warning(
157 "child %d (pid %d) exited with status %d, restarting",
158 id,
159 pid,
160 os.WEXITSTATUS(status),
161 )
162 else:
163 gen_log.info("child %d (pid %d) exited normally", id, pid)
164 continue
165 num_restarts += 1
166 if num_restarts > max_restarts:
167 raise RuntimeError("Too many child restarts, giving up")
168 new_id = start_child(id)
169 if new_id is not None:
170 return new_id
171 # All child processes exited cleanly, so exit the master process
172 # instead of just returning to right after the call to
173 # fork_processes (which will probably just start up another IOLoop
174 # unless the caller checks the return value).
175 sys.exit(0)
178def task_id() -> Optional[int]:
179 """Returns the current task id, if any.
181 Returns None if this process was not created by `fork_processes`.
182 """
183 global _task_id
184 return _task_id
187class Subprocess(object):
188 """Wraps ``subprocess.Popen`` with IOStream support.
190 The constructor is the same as ``subprocess.Popen`` with the following
191 additions:
193 * ``stdin``, ``stdout``, and ``stderr`` may have the value
194 ``tornado.process.Subprocess.STREAM``, which will make the corresponding
195 attribute of the resulting Subprocess a `.PipeIOStream`. If this option
196 is used, the caller is responsible for closing the streams when done
197 with them.
199 The ``Subprocess.STREAM`` option and the ``set_exit_callback`` and
200 ``wait_for_exit`` methods do not work on Windows. There is
201 therefore no reason to use this class instead of
202 ``subprocess.Popen`` on that platform.
204 .. versionchanged:: 5.0
205 The ``io_loop`` argument (deprecated since version 4.1) has been removed.
207 """
209 STREAM = object()
211 _initialized = False
212 _waiting = {} # type: ignore
213 _old_sigchld = None
215 def __init__(self, *args: Any, **kwargs: Any) -> None:
216 self.io_loop = ioloop.IOLoop.current()
217 # All FDs we create should be closed on error; those in to_close
218 # should be closed in the parent process on success.
219 pipe_fds = [] # type: List[int]
220 to_close = [] # type: List[int]
221 if kwargs.get("stdin") is Subprocess.STREAM:
222 in_r, in_w = os.pipe()
223 kwargs["stdin"] = in_r
224 pipe_fds.extend((in_r, in_w))
225 to_close.append(in_r)
226 self.stdin = PipeIOStream(in_w)
227 if kwargs.get("stdout") is Subprocess.STREAM:
228 out_r, out_w = os.pipe()
229 kwargs["stdout"] = out_w
230 pipe_fds.extend((out_r, out_w))
231 to_close.append(out_w)
232 self.stdout = PipeIOStream(out_r)
233 if kwargs.get("stderr") is Subprocess.STREAM:
234 err_r, err_w = os.pipe()
235 kwargs["stderr"] = err_w
236 pipe_fds.extend((err_r, err_w))
237 to_close.append(err_w)
238 self.stderr = PipeIOStream(err_r)
239 try:
240 self.proc = subprocess.Popen(*args, **kwargs)
241 except:
242 for fd in pipe_fds:
243 os.close(fd)
244 raise
245 for fd in to_close:
246 os.close(fd)
247 self.pid = self.proc.pid
248 for attr in ["stdin", "stdout", "stderr"]:
249 if not hasattr(self, attr): # don't clobber streams set above
250 setattr(self, attr, getattr(self.proc, attr))
251 self._exit_callback = None # type: Optional[Callable[[int], None]]
252 self.returncode = None # type: Optional[int]
254 def set_exit_callback(self, callback: Callable[[int], None]) -> None:
255 """Runs ``callback`` when this process exits.
257 The callback takes one argument, the return code of the process.
259 This method uses a ``SIGCHLD`` handler, which is a global setting
260 and may conflict if you have other libraries trying to handle the
261 same signal. If you are using more than one ``IOLoop`` it may
262 be necessary to call `Subprocess.initialize` first to designate
263 one ``IOLoop`` to run the signal handlers.
265 In many cases a close callback on the stdout or stderr streams
266 can be used as an alternative to an exit callback if the
267 signal handler is causing a problem.
269 Availability: Unix
270 """
271 self._exit_callback = callback
272 Subprocess.initialize()
273 Subprocess._waiting[self.pid] = self
274 Subprocess._try_cleanup_process(self.pid)
276 def wait_for_exit(self, raise_error: bool = True) -> "Future[int]":
277 """Returns a `.Future` which resolves when the process exits.
279 Usage::
281 ret = yield proc.wait_for_exit()
283 This is a coroutine-friendly alternative to `set_exit_callback`
284 (and a replacement for the blocking `subprocess.Popen.wait`).
286 By default, raises `subprocess.CalledProcessError` if the process
287 has a non-zero exit status. Use ``wait_for_exit(raise_error=False)``
288 to suppress this behavior and return the exit status without raising.
290 .. versionadded:: 4.2
292 Availability: Unix
293 """
294 future = Future() # type: Future[int]
296 def callback(ret: int) -> None:
297 if ret != 0 and raise_error:
298 # Unfortunately we don't have the original args any more.
299 future_set_exception_unless_cancelled(
300 future, CalledProcessError(ret, "unknown")
301 )
302 else:
303 future_set_result_unless_cancelled(future, ret)
305 self.set_exit_callback(callback)
306 return future
308 @classmethod
309 def initialize(cls) -> None:
310 """Initializes the ``SIGCHLD`` handler.
312 The signal handler is run on an `.IOLoop` to avoid locking issues.
313 Note that the `.IOLoop` used for signal handling need not be the
314 same one used by individual Subprocess objects (as long as the
315 ``IOLoops`` are each running in separate threads).
317 .. versionchanged:: 5.0
318 The ``io_loop`` argument (deprecated since version 4.1) has been
319 removed.
321 Availability: Unix
322 """
323 if cls._initialized:
324 return
325 io_loop = ioloop.IOLoop.current()
326 cls._old_sigchld = signal.signal(
327 signal.SIGCHLD,
328 lambda sig, frame: io_loop.add_callback_from_signal(cls._cleanup),
329 )
330 cls._initialized = True
332 @classmethod
333 def uninitialize(cls) -> None:
334 """Removes the ``SIGCHLD`` handler."""
335 if not cls._initialized:
336 return
337 signal.signal(signal.SIGCHLD, cls._old_sigchld)
338 cls._initialized = False
340 @classmethod
341 def _cleanup(cls) -> None:
342 for pid in list(cls._waiting.keys()): # make a copy
343 cls._try_cleanup_process(pid)
345 @classmethod
346 def _try_cleanup_process(cls, pid: int) -> None:
347 try:
348 ret_pid, status = os.waitpid(pid, os.WNOHANG) # type: ignore
349 except ChildProcessError:
350 return
351 if ret_pid == 0:
352 return
353 assert ret_pid == pid
354 subproc = cls._waiting.pop(pid)
355 subproc.io_loop.add_callback_from_signal(subproc._set_returncode, status)
357 def _set_returncode(self, status: int) -> None:
358 if sys.platform == "win32":
359 self.returncode = -1
360 else:
361 if os.WIFSIGNALED(status):
362 self.returncode = -os.WTERMSIG(status)
363 else:
364 assert os.WIFEXITED(status)
365 self.returncode = os.WEXITSTATUS(status)
366 # We've taken over wait() duty from the subprocess.Popen
367 # object. If we don't inform it of the process's return code,
368 # it will log a warning at destruction in python 3.6+.
369 self.proc.returncode = self.returncode
370 if self._exit_callback:
371 callback = self._exit_callback
372 self._exit_callback = None
373 callback(self.returncode)