Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/IPython/core/magics/script.py: 22%
188 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:05 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:05 +0000
1"""Magic functions for running cells in various scripts."""
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
6import asyncio
7import asyncio.exceptions
8import atexit
9import errno
10import os
11import signal
12import sys
13import time
14from subprocess import CalledProcessError
15from threading import Thread
17from traitlets import Any, Dict, List, default
19from IPython.core import magic_arguments
20from IPython.core.async_helpers import _AsyncIOProxy
21from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
22from IPython.utils.process import arg_split
24#-----------------------------------------------------------------------------
25# Magic implementation classes
26#-----------------------------------------------------------------------------
28def script_args(f):
29 """single decorator for adding script args"""
30 args = [
31 magic_arguments.argument(
32 '--out', type=str,
33 help="""The variable in which to store stdout from the script.
34 If the script is backgrounded, this will be the stdout *pipe*,
35 instead of the stderr text itself and will not be auto closed.
36 """
37 ),
38 magic_arguments.argument(
39 '--err', type=str,
40 help="""The variable in which to store stderr from the script.
41 If the script is backgrounded, this will be the stderr *pipe*,
42 instead of the stderr text itself and will not be autoclosed.
43 """
44 ),
45 magic_arguments.argument(
46 '--bg', action="store_true",
47 help="""Whether to run the script in the background.
48 If given, the only way to see the output of the command is
49 with --out/err.
50 """
51 ),
52 magic_arguments.argument(
53 '--proc', type=str,
54 help="""The variable in which to store Popen instance.
55 This is used only when --bg option is given.
56 """
57 ),
58 magic_arguments.argument(
59 '--no-raise-error', action="store_false", dest='raise_error',
60 help="""Whether you should raise an error message in addition to
61 a stream on stderr if you get a nonzero exit code.
62 """,
63 ),
64 ]
65 for arg in args:
66 f = arg(f)
67 return f
70@magics_class
71class ScriptMagics(Magics):
72 """Magics for talking to scripts
74 This defines a base `%%script` cell magic for running a cell
75 with a program in a subprocess, and registers a few top-level
76 magics that call %%script with common interpreters.
77 """
79 event_loop = Any(
80 help="""
81 The event loop on which to run subprocesses
83 Not the main event loop,
84 because we want to be able to make blocking calls
85 and have certain requirements we don't want to impose on the main loop.
86 """
87 )
89 script_magics = List(
90 help="""Extra script cell magics to define
92 This generates simple wrappers of `%%script foo` as `%%foo`.
94 If you want to add script magics that aren't on your path,
95 specify them in script_paths
96 """,
97 ).tag(config=True)
98 @default('script_magics')
99 def _script_magics_default(self):
100 """default to a common list of programs"""
102 defaults = [
103 'sh',
104 'bash',
105 'perl',
106 'ruby',
107 'python',
108 'python2',
109 'python3',
110 'pypy',
111 ]
112 if os.name == 'nt':
113 defaults.extend([
114 'cmd',
115 ])
117 return defaults
119 script_paths = Dict(
120 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
122 Only necessary for items in script_magics where the default path will not
123 find the right interpreter.
124 """
125 ).tag(config=True)
127 def __init__(self, shell=None):
128 super(ScriptMagics, self).__init__(shell=shell)
129 self._generate_script_magics()
130 self.bg_processes = []
131 atexit.register(self.kill_bg_processes)
133 def __del__(self):
134 self.kill_bg_processes()
136 def _generate_script_magics(self):
137 cell_magics = self.magics['cell']
138 for name in self.script_magics:
139 cell_magics[name] = self._make_script_magic(name)
141 def _make_script_magic(self, name):
142 """make a named magic, that calls %%script with a particular program"""
143 # expand to explicit path if necessary:
144 script = self.script_paths.get(name, name)
146 @magic_arguments.magic_arguments()
147 @script_args
148 def named_script_magic(line, cell):
149 # if line, add it as cl-flags
150 if line:
151 line = "%s %s" % (script, line)
152 else:
153 line = script
154 return self.shebang(line, cell)
156 # write a basic docstring:
157 named_script_magic.__doc__ = \
158 """%%{name} script magic
160 Run cells with {script} in a subprocess.
162 This is a shortcut for `%%script {script}`
163 """.format(**locals())
165 return named_script_magic
167 @magic_arguments.magic_arguments()
168 @script_args
169 @cell_magic("script")
170 def shebang(self, line, cell):
171 """Run a cell via a shell command
173 The `%%script` line is like the #! line of script,
174 specifying a program (bash, perl, ruby, etc.) with which to run.
176 The rest of the cell is run by that program.
178 Examples
179 --------
180 ::
182 In [1]: %%script bash
183 ...: for i in 1 2 3; do
184 ...: echo $i
185 ...: done
186 1
187 2
188 3
189 """
191 # Create the event loop in which to run script magics
192 # this operates on a background thread
193 if self.event_loop is None:
194 if sys.platform == "win32":
195 # don't override the current policy,
196 # just create an event loop
197 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop()
198 else:
199 event_loop = asyncio.new_event_loop()
200 self.event_loop = event_loop
202 # start the loop in a background thread
203 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True)
204 asyncio_thread.start()
205 else:
206 event_loop = self.event_loop
208 def in_thread(coro):
209 """Call a coroutine on the asyncio thread"""
210 return asyncio.run_coroutine_threadsafe(coro, event_loop).result()
212 async def _readchunk(stream):
213 try:
214 return await stream.readuntil(b"\n")
215 except asyncio.exceptions.IncompleteReadError as e:
216 return e.partial
217 except asyncio.exceptions.LimitOverrunError as e:
218 return await stream.read(e.consumed)
220 async def _handle_stream(stream, stream_arg, file_object):
221 while True:
222 chunk = (await _readchunk(stream)).decode("utf8", errors="replace")
223 if not chunk:
224 break
225 if stream_arg:
226 self.shell.user_ns[stream_arg] = chunk
227 else:
228 file_object.write(chunk)
229 file_object.flush()
231 async def _stream_communicate(process, cell):
232 process.stdin.write(cell)
233 process.stdin.close()
234 stdout_task = asyncio.create_task(
235 _handle_stream(process.stdout, args.out, sys.stdout)
236 )
237 stderr_task = asyncio.create_task(
238 _handle_stream(process.stderr, args.err, sys.stderr)
239 )
240 await asyncio.wait([stdout_task, stderr_task])
241 await process.wait()
243 argv = arg_split(line, posix=not sys.platform.startswith("win"))
244 args, cmd = self.shebang.parser.parse_known_args(argv)
246 try:
247 p = in_thread(
248 asyncio.create_subprocess_exec(
249 *cmd,
250 stdout=asyncio.subprocess.PIPE,
251 stderr=asyncio.subprocess.PIPE,
252 stdin=asyncio.subprocess.PIPE,
253 )
254 )
255 except OSError as e:
256 if e.errno == errno.ENOENT:
257 print("Couldn't find program: %r" % cmd[0])
258 return
259 else:
260 raise
262 if not cell.endswith('\n'):
263 cell += '\n'
264 cell = cell.encode('utf8', 'replace')
265 if args.bg:
266 self.bg_processes.append(p)
267 self._gc_bg_processes()
268 to_close = []
269 if args.out:
270 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop)
271 else:
272 to_close.append(p.stdout)
273 if args.err:
274 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop)
275 else:
276 to_close.append(p.stderr)
277 event_loop.call_soon_threadsafe(
278 lambda: asyncio.Task(self._run_script(p, cell, to_close))
279 )
280 if args.proc:
281 proc_proxy = _AsyncIOProxy(p, event_loop)
282 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop)
283 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop)
284 self.shell.user_ns[args.proc] = proc_proxy
285 return
287 try:
288 in_thread(_stream_communicate(p, cell))
289 except KeyboardInterrupt:
290 try:
291 p.send_signal(signal.SIGINT)
292 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
293 if p.returncode is not None:
294 print("Process is interrupted.")
295 return
296 p.terminate()
297 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
298 if p.returncode is not None:
299 print("Process is terminated.")
300 return
301 p.kill()
302 print("Process is killed.")
303 except OSError:
304 pass
305 except Exception as e:
306 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
307 return
309 if args.raise_error and p.returncode != 0:
310 # If we get here and p.returncode is still None, we must have
311 # killed it but not yet seen its return code. We don't wait for it,
312 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
313 rc = p.returncode or -9
314 raise CalledProcessError(rc, cell)
316 shebang.__skip_doctest__ = os.name != "posix"
318 async def _run_script(self, p, cell, to_close):
319 """callback for running the script in the background"""
321 p.stdin.write(cell)
322 await p.stdin.drain()
323 p.stdin.close()
324 await p.stdin.wait_closed()
325 await p.wait()
326 # asyncio read pipes have no close
327 # but we should drain the data anyway
328 for s in to_close:
329 await s.read()
330 self._gc_bg_processes()
332 @line_magic("killbgscripts")
333 def killbgscripts(self, _nouse_=''):
334 """Kill all BG processes started by %%script and its family."""
335 self.kill_bg_processes()
336 print("All background processes were killed.")
338 def kill_bg_processes(self):
339 """Kill all BG processes which are still running."""
340 if not self.bg_processes:
341 return
342 for p in self.bg_processes:
343 if p.returncode is None:
344 try:
345 p.send_signal(signal.SIGINT)
346 except:
347 pass
348 time.sleep(0.1)
349 self._gc_bg_processes()
350 if not self.bg_processes:
351 return
352 for p in self.bg_processes:
353 if p.returncode is None:
354 try:
355 p.terminate()
356 except:
357 pass
358 time.sleep(0.1)
359 self._gc_bg_processes()
360 if not self.bg_processes:
361 return
362 for p in self.bg_processes:
363 if p.returncode is None:
364 try:
365 p.kill()
366 except:
367 pass
368 self._gc_bg_processes()
370 def _gc_bg_processes(self):
371 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]