1"""Magic functions for running cells in various scripts."""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6import asyncio
7import asyncio.exceptions
8import atexit
9import errno
10import os
11import signal
12import sys
13import time
14from subprocess import CalledProcessError
15from threading import Thread
16
17from traitlets import Any, Dict, List, default
18
19from IPython.core import magic_arguments
20from IPython.core.async_helpers import _AsyncIOProxy
21from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
22from IPython.utils.process import arg_split
23
24#-----------------------------------------------------------------------------
25# Magic implementation classes
26#-----------------------------------------------------------------------------
27
28def script_args(f):
29 """single decorator for adding script args"""
30 args = [
31 magic_arguments.argument(
32 '--out', type=str,
33 help="""The variable in which to store stdout from the script.
34 If the script is backgrounded, this will be the stdout *pipe*,
35 instead of the stderr text itself and will not be auto closed.
36 """
37 ),
38 magic_arguments.argument(
39 '--err', type=str,
40 help="""The variable in which to store stderr from the script.
41 If the script is backgrounded, this will be the stderr *pipe*,
42 instead of the stderr text itself and will not be autoclosed.
43 """
44 ),
45 magic_arguments.argument(
46 '--bg', action="store_true",
47 help="""Whether to run the script in the background.
48 If given, the only way to see the output of the command is
49 with --out/err.
50 """
51 ),
52 magic_arguments.argument(
53 '--proc', type=str,
54 help="""The variable in which to store Popen instance.
55 This is used only when --bg option is given.
56 """
57 ),
58 magic_arguments.argument(
59 '--no-raise-error', action="store_false", dest='raise_error',
60 help="""Whether you should raise an error message in addition to
61 a stream on stderr if you get a nonzero exit code.
62 """,
63 ),
64 ]
65 for arg in args:
66 f = arg(f)
67 return f
68
69
70class RaiseAfterInterrupt(Exception):
71 pass
72
73
74@magics_class
75class ScriptMagics(Magics):
76 """Magics for talking to scripts
77
78 This defines a base `%%script` cell magic for running a cell
79 with a program in a subprocess, and registers a few top-level
80 magics that call %%script with common interpreters.
81 """
82
83 event_loop = Any(
84 help="""
85 The event loop on which to run subprocesses
86
87 Not the main event loop,
88 because we want to be able to make blocking calls
89 and have certain requirements we don't want to impose on the main loop.
90 """
91 )
92
93 script_magics: List = List(
94 help="""Extra script cell magics to define
95
96 This generates simple wrappers of `%%script foo` as `%%foo`.
97
98 If you want to add script magics that aren't on your path,
99 specify them in script_paths
100 """,
101 ).tag(config=True)
102
103 @default('script_magics')
104 def _script_magics_default(self):
105 """default to a common list of programs"""
106
107 defaults = [
108 'sh',
109 'bash',
110 'perl',
111 'ruby',
112 'python',
113 'python2',
114 'python3',
115 'pypy',
116 ]
117 if os.name == 'nt':
118 defaults.extend([
119 'cmd',
120 ])
121
122 return defaults
123
124 script_paths = Dict(
125 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
126
127 Only necessary for items in script_magics where the default path will not
128 find the right interpreter.
129 """
130 ).tag(config=True)
131
132 def __init__(self, shell=None):
133 super(ScriptMagics, self).__init__(shell=shell)
134 self._generate_script_magics()
135 self.bg_processes = []
136 atexit.register(self.kill_bg_processes)
137
138 def __del__(self):
139 self.kill_bg_processes()
140
141 def _generate_script_magics(self):
142 cell_magics = self.magics['cell']
143 for name in self.script_magics:
144 cell_magics[name] = self._make_script_magic(name)
145
146 def _make_script_magic(self, name):
147 """make a named magic, that calls %%script with a particular program"""
148 # expand to explicit path if necessary:
149 script = self.script_paths.get(name, name)
150
151 @magic_arguments.magic_arguments()
152 @script_args
153 def named_script_magic(line, cell):
154 # if line, add it as cl-flags
155 if line:
156 line = "%s %s" % (script, line)
157 else:
158 line = script
159 return self.shebang(line, cell)
160
161 # write a basic docstring:
162 named_script_magic.__doc__ = \
163 """%%{name} script magic
164
165 Run cells with {script} in a subprocess.
166
167 This is a shortcut for `%%script {script}`
168 """.format(**locals())
169
170 return named_script_magic
171
172 @magic_arguments.magic_arguments()
173 @script_args
174 @cell_magic("script")
175 def shebang(self, line, cell):
176 """Run a cell via a shell command
177
178 The `%%script` line is like the #! line of script,
179 specifying a program (bash, perl, ruby, etc.) with which to run.
180
181 The rest of the cell is run by that program.
182
183 .. versionchanged:: 9.0
184 Interrupting the script executed without `--bg` will end in
185 raising an exception (unless `--no-raise-error` is passed).
186
187 Examples
188 --------
189 ::
190
191 In [1]: %%script bash
192 ...: for i in 1 2 3; do
193 ...: echo $i
194 ...: done
195 1
196 2
197 3
198 """
199
200 # Create the event loop in which to run script magics
201 # this operates on a background thread
202 if self.event_loop is None:
203 if sys.platform == "win32":
204 # don't override the current policy,
205 # just create an event loop
206 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop()
207 else:
208 event_loop = asyncio.new_event_loop()
209 self.event_loop = event_loop
210
211 # start the loop in a background thread
212 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True)
213 asyncio_thread.start()
214 else:
215 event_loop = self.event_loop
216
217 def in_thread(coro):
218 """Call a coroutine on the asyncio thread"""
219 return asyncio.run_coroutine_threadsafe(coro, event_loop).result()
220
221 async def _readchunk(stream):
222 try:
223 return await stream.read(100)
224 except asyncio.exceptions.IncompleteReadError as e:
225 return e.partial
226 except asyncio.exceptions.LimitOverrunError as e:
227 return await stream.read(e.consumed)
228
229 async def _handle_stream(stream, stream_arg, file_object):
230 while True:
231 chunk = (await _readchunk(stream)).decode("utf8", errors="replace")
232 if not chunk:
233 break
234 if stream_arg:
235 self.shell.user_ns[stream_arg] = chunk
236 else:
237 file_object.write(chunk)
238 file_object.flush()
239
240 async def _stream_communicate(process, cell):
241 process.stdin.write(cell)
242 process.stdin.close()
243 stdout_task = asyncio.create_task(
244 _handle_stream(process.stdout, args.out, sys.stdout)
245 )
246 stderr_task = asyncio.create_task(
247 _handle_stream(process.stderr, args.err, sys.stderr)
248 )
249 await asyncio.wait([stdout_task, stderr_task])
250 await process.wait()
251
252 argv = arg_split(line, posix=not sys.platform.startswith("win"))
253 args, cmd = self.shebang.parser.parse_known_args(argv)
254
255 try:
256 p = in_thread(
257 asyncio.create_subprocess_exec(
258 *cmd,
259 stdout=asyncio.subprocess.PIPE,
260 stderr=asyncio.subprocess.PIPE,
261 stdin=asyncio.subprocess.PIPE,
262 )
263 )
264 except OSError as e:
265 if e.errno == errno.ENOENT:
266 print("Couldn't find program: %r" % cmd[0])
267 return
268 else:
269 raise
270
271 if not cell.endswith('\n'):
272 cell += '\n'
273 cell = cell.encode('utf8', 'replace')
274 if args.bg:
275 self.bg_processes.append(p)
276 self._gc_bg_processes()
277 to_close = []
278 if args.out:
279 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop)
280 else:
281 to_close.append(p.stdout)
282 if args.err:
283 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop)
284 else:
285 to_close.append(p.stderr)
286 event_loop.call_soon_threadsafe(
287 lambda: asyncio.Task(self._run_script(p, cell, to_close))
288 )
289 if args.proc:
290 proc_proxy = _AsyncIOProxy(p, event_loop)
291 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop)
292 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop)
293 self.shell.user_ns[args.proc] = proc_proxy
294 return
295
296 try:
297 in_thread(_stream_communicate(p, cell))
298 except KeyboardInterrupt:
299 try:
300 p.send_signal(signal.SIGINT)
301 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
302 if p.returncode is not None:
303 print("Process was interrupted.")
304 if args.raise_error:
305 raise RaiseAfterInterrupt()
306 else:
307 return
308 p.terminate()
309 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
310 if p.returncode is not None:
311 print("Process was terminated.")
312 if args.raise_error:
313 raise RaiseAfterInterrupt()
314 else:
315 return
316 p.kill()
317 print("Process was killed.")
318 if args.raise_error:
319 raise RaiseAfterInterrupt()
320 except RaiseAfterInterrupt:
321 pass
322 except OSError:
323 pass
324 except Exception as e:
325 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
326 if args.raise_error:
327 raise CalledProcessError(p.returncode, cell) from None
328 else:
329 return
330
331 if args.raise_error and p.returncode != 0:
332 # If we get here and p.returncode is still None, we must have
333 # killed it but not yet seen its return code. We don't wait for it,
334 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
335 rc = p.returncode or -9
336 raise CalledProcessError(rc, cell)
337
338 shebang.__skip_doctest__ = os.name != "posix"
339
340 async def _run_script(self, p, cell, to_close):
341 """callback for running the script in the background"""
342
343 p.stdin.write(cell)
344 await p.stdin.drain()
345 p.stdin.close()
346 await p.stdin.wait_closed()
347 await p.wait()
348 # asyncio read pipes have no close
349 # but we should drain the data anyway
350 for s in to_close:
351 await s.read()
352 self._gc_bg_processes()
353
354 @line_magic("killbgscripts")
355 def killbgscripts(self, _nouse_=''):
356 """Kill all BG processes started by %%script and its family."""
357 self.kill_bg_processes()
358 print("All background processes were killed.")
359
360 def kill_bg_processes(self):
361 """Kill all BG processes which are still running."""
362 if not self.bg_processes:
363 return
364 for p in self.bg_processes:
365 if p.returncode is None:
366 try:
367 p.send_signal(signal.SIGINT)
368 except:
369 pass
370 time.sleep(0.1)
371 self._gc_bg_processes()
372 if not self.bg_processes:
373 return
374 for p in self.bg_processes:
375 if p.returncode is None:
376 try:
377 p.terminate()
378 except:
379 pass
380 time.sleep(0.1)
381 self._gc_bg_processes()
382 if not self.bg_processes:
383 return
384 for p in self.bg_processes:
385 if p.returncode is None:
386 try:
387 p.kill()
388 except:
389 pass
390 self._gc_bg_processes()
391
392 def _gc_bg_processes(self):
393 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]