Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/process.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Copyright 2011 Facebook
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""Utilities for working with multiple processes, including both forking
17the server into multiple processes and managing subprocesses.
18"""
20import asyncio
21import os
22import multiprocessing
23import signal
24import subprocess
25import sys
26import time
28from binascii import hexlify
30from tornado.concurrent import (
31 Future,
32 future_set_result_unless_cancelled,
33 future_set_exception_unless_cancelled,
34)
35from tornado import ioloop
36from tornado.iostream import PipeIOStream
37from tornado.log import gen_log
39import typing
40from typing import Optional, Any, Callable
42if typing.TYPE_CHECKING:
43 from typing import List # noqa: F401
45# Re-export this exception for convenience.
46CalledProcessError = subprocess.CalledProcessError
49def cpu_count() -> int:
50 """Returns the number of processors on this machine."""
51 if multiprocessing is None:
52 return 1
53 try:
54 return multiprocessing.cpu_count()
55 except NotImplementedError:
56 pass
57 try:
58 return os.sysconf("SC_NPROCESSORS_CONF") # type: ignore
59 except (AttributeError, ValueError):
60 pass
61 gen_log.error("Could not detect number of processors; assuming 1")
62 return 1
65def _reseed_random() -> None:
66 if "random" not in sys.modules:
67 return
68 import random
70 # If os.urandom is available, this method does the same thing as
71 # random.seed (at least as of python 2.6). If os.urandom is not
72 # available, we mix in the pid in addition to a timestamp.
73 try:
74 seed = int(hexlify(os.urandom(16)), 16)
75 except NotImplementedError:
76 seed = int(time.time() * 1000) ^ os.getpid()
77 random.seed(seed)
80_task_id = None
83def fork_processes(
84 num_processes: Optional[int], max_restarts: Optional[int] = None
85) -> int:
86 """Starts multiple worker processes.
88 If ``num_processes`` is None or <= 0, we detect the number of cores
89 available on this machine and fork that number of child
90 processes. If ``num_processes`` is given and > 0, we fork that
91 specific number of sub-processes.
93 Since we use processes and not threads, there is no shared memory
94 between any server code.
96 Note that multiple processes are not compatible with the autoreload
97 module (or the ``autoreload=True`` option to `tornado.web.Application`
98 which defaults to True when ``debug=True``).
99 When using multiple processes, no IOLoops can be created or
100 referenced until after the call to ``fork_processes``.
102 In each child process, ``fork_processes`` returns its *task id*, a
103 number between 0 and ``num_processes``. Processes that exit
104 abnormally (due to a signal or non-zero exit status) are restarted
105 with the same id (up to ``max_restarts`` times). In the parent
106 process, ``fork_processes`` calls ``sys.exit(0)`` after all child
107 processes have exited normally.
109 max_restarts defaults to 100.
111 Availability: Unix
112 """
113 if sys.platform == "win32":
114 # The exact form of this condition matters to mypy; it understands
115 # if but not assert in this context.
116 raise Exception("fork not available on windows")
117 if max_restarts is None:
118 max_restarts = 100
120 global _task_id
121 assert _task_id is None
122 if num_processes is None or num_processes <= 0:
123 num_processes = cpu_count()
124 gen_log.info("Starting %d processes", num_processes)
125 children = {}
127 def start_child(i: int) -> Optional[int]:
128 pid = os.fork()
129 if pid == 0:
130 # child process
131 _reseed_random()
132 global _task_id
133 _task_id = i
134 return i
135 else:
136 children[pid] = i
137 return None
139 for i in range(num_processes):
140 id = start_child(i)
141 if id is not None:
142 return id
143 num_restarts = 0
144 while children:
145 pid, status = os.wait()
146 if pid not in children:
147 continue
148 id = children.pop(pid)
149 if os.WIFSIGNALED(status):
150 gen_log.warning(
151 "child %d (pid %d) killed by signal %d, restarting",
152 id,
153 pid,
154 os.WTERMSIG(status),
155 )
156 elif os.WEXITSTATUS(status) != 0:
157 gen_log.warning(
158 "child %d (pid %d) exited with status %d, restarting",
159 id,
160 pid,
161 os.WEXITSTATUS(status),
162 )
163 else:
164 gen_log.info("child %d (pid %d) exited normally", id, pid)
165 continue
166 num_restarts += 1
167 if num_restarts > max_restarts:
168 raise RuntimeError("Too many child restarts, giving up")
169 new_id = start_child(id)
170 if new_id is not None:
171 return new_id
172 # All child processes exited cleanly, so exit the master process
173 # instead of just returning to right after the call to
174 # fork_processes (which will probably just start up another IOLoop
175 # unless the caller checks the return value).
176 sys.exit(0)
179def task_id() -> Optional[int]:
180 """Returns the current task id, if any.
182 Returns None if this process was not created by `fork_processes`.
183 """
184 global _task_id
185 return _task_id
188class Subprocess(object):
189 """Wraps ``subprocess.Popen`` with IOStream support.
191 The constructor is the same as ``subprocess.Popen`` with the following
192 additions:
194 * ``stdin``, ``stdout``, and ``stderr`` may have the value
195 ``tornado.process.Subprocess.STREAM``, which will make the corresponding
196 attribute of the resulting Subprocess a `.PipeIOStream`. If this option
197 is used, the caller is responsible for closing the streams when done
198 with them.
200 The ``Subprocess.STREAM`` option and the ``set_exit_callback`` and
201 ``wait_for_exit`` methods do not work on Windows. There is
202 therefore no reason to use this class instead of
203 ``subprocess.Popen`` on that platform.
205 .. versionchanged:: 5.0
206 The ``io_loop`` argument (deprecated since version 4.1) has been removed.
208 """
210 STREAM = object()
212 _initialized = False
213 _waiting = {} # type: ignore
215 def __init__(self, *args: Any, **kwargs: Any) -> None:
216 self.io_loop = ioloop.IOLoop.current()
217 # All FDs we create should be closed on error; those in to_close
218 # should be closed in the parent process on success.
219 pipe_fds = [] # type: List[int]
220 to_close = [] # type: List[int]
221 if kwargs.get("stdin") is Subprocess.STREAM:
222 in_r, in_w = os.pipe()
223 kwargs["stdin"] = in_r
224 pipe_fds.extend((in_r, in_w))
225 to_close.append(in_r)
226 self.stdin = PipeIOStream(in_w)
227 if kwargs.get("stdout") is Subprocess.STREAM:
228 out_r, out_w = os.pipe()
229 kwargs["stdout"] = out_w
230 pipe_fds.extend((out_r, out_w))
231 to_close.append(out_w)
232 self.stdout = PipeIOStream(out_r)
233 if kwargs.get("stderr") is Subprocess.STREAM:
234 err_r, err_w = os.pipe()
235 kwargs["stderr"] = err_w
236 pipe_fds.extend((err_r, err_w))
237 to_close.append(err_w)
238 self.stderr = PipeIOStream(err_r)
239 try:
240 self.proc = subprocess.Popen(*args, **kwargs)
241 except:
242 for fd in pipe_fds:
243 os.close(fd)
244 raise
245 for fd in to_close:
246 os.close(fd)
247 self.pid = self.proc.pid
248 for attr in ["stdin", "stdout", "stderr"]:
249 if not hasattr(self, attr): # don't clobber streams set above
250 setattr(self, attr, getattr(self.proc, attr))
251 self._exit_callback = None # type: Optional[Callable[[int], None]]
252 self.returncode = None # type: Optional[int]
254 def set_exit_callback(self, callback: Callable[[int], None]) -> None:
255 """Runs ``callback`` when this process exits.
257 The callback takes one argument, the return code of the process.
259 This method uses a ``SIGCHLD`` handler, which is a global setting
260 and may conflict if you have other libraries trying to handle the
261 same signal. If you are using more than one ``IOLoop`` it may
262 be necessary to call `Subprocess.initialize` first to designate
263 one ``IOLoop`` to run the signal handlers.
265 In many cases a close callback on the stdout or stderr streams
266 can be used as an alternative to an exit callback if the
267 signal handler is causing a problem.
269 Availability: Unix
270 """
271 self._exit_callback = callback
272 Subprocess.initialize()
273 Subprocess._waiting[self.pid] = self
274 Subprocess._try_cleanup_process(self.pid)
276 def wait_for_exit(self, raise_error: bool = True) -> "Future[int]":
277 """Returns a `.Future` which resolves when the process exits.
279 Usage::
281 ret = yield proc.wait_for_exit()
283 This is a coroutine-friendly alternative to `set_exit_callback`
284 (and a replacement for the blocking `subprocess.Popen.wait`).
286 By default, raises `subprocess.CalledProcessError` if the process
287 has a non-zero exit status. Use ``wait_for_exit(raise_error=False)``
288 to suppress this behavior and return the exit status without raising.
290 .. versionadded:: 4.2
292 Availability: Unix
293 """
294 future = Future() # type: Future[int]
296 def callback(ret: int) -> None:
297 if ret != 0 and raise_error:
298 # Unfortunately we don't have the original args any more.
299 future_set_exception_unless_cancelled(
300 future, CalledProcessError(ret, "unknown")
301 )
302 else:
303 future_set_result_unless_cancelled(future, ret)
305 self.set_exit_callback(callback)
306 return future
308 @classmethod
309 def initialize(cls) -> None:
310 """Initializes the ``SIGCHLD`` handler.
312 The signal handler is run on an `.IOLoop` to avoid locking issues.
313 Note that the `.IOLoop` used for signal handling need not be the
314 same one used by individual Subprocess objects (as long as the
315 ``IOLoops`` are each running in separate threads).
317 .. versionchanged:: 5.0
318 The ``io_loop`` argument (deprecated since version 4.1) has been
319 removed.
321 Availability: Unix
322 """
323 if cls._initialized:
324 return
325 loop = asyncio.get_event_loop()
326 loop.add_signal_handler(signal.SIGCHLD, cls._cleanup)
327 cls._initialized = True
329 @classmethod
330 def uninitialize(cls) -> None:
331 """Removes the ``SIGCHLD`` handler."""
332 if not cls._initialized:
333 return
334 loop = asyncio.get_event_loop()
335 loop.remove_signal_handler(signal.SIGCHLD)
336 cls._initialized = False
338 @classmethod
339 def _cleanup(cls) -> None:
340 for pid in list(cls._waiting.keys()): # make a copy
341 cls._try_cleanup_process(pid)
343 @classmethod
344 def _try_cleanup_process(cls, pid: int) -> None:
345 try:
346 ret_pid, status = os.waitpid(pid, os.WNOHANG) # type: ignore
347 except ChildProcessError:
348 return
349 if ret_pid == 0:
350 return
351 assert ret_pid == pid
352 subproc = cls._waiting.pop(pid)
353 subproc.io_loop.add_callback(subproc._set_returncode, status)
355 def _set_returncode(self, status: int) -> None:
356 if sys.platform == "win32":
357 self.returncode = -1
358 else:
359 if os.WIFSIGNALED(status):
360 self.returncode = -os.WTERMSIG(status)
361 else:
362 assert os.WIFEXITED(status)
363 self.returncode = os.WEXITSTATUS(status)
364 # We've taken over wait() duty from the subprocess.Popen
365 # object. If we don't inform it of the process's return code,
366 # it will log a warning at destruction in python 3.6+.
367 self.proc.returncode = self.returncode
368 if self._exit_callback:
369 callback = self._exit_callback
370 self._exit_callback = None
371 callback(self.returncode)