1"""Magic functions for running cells in various scripts."""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6import asyncio
7import asyncio.exceptions
8import atexit
9import errno
10import os
11import signal
12import sys
13import time
14from codecs import getincrementaldecoder
15from subprocess import CalledProcessError
16from threading import Thread
17
18from traitlets import Any, Dict, List, default
19
20from IPython.core import magic_arguments
21from IPython.core.async_helpers import _AsyncIOProxy
22from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
23from IPython.utils.process import arg_split
24
25#-----------------------------------------------------------------------------
26# Magic implementation classes
27#-----------------------------------------------------------------------------
28
29def script_args(f):
30 """single decorator for adding script args"""
31 args = [
32 magic_arguments.argument(
33 '--out', type=str,
34 help="""The variable in which to store stdout from the script.
35 If the script is backgrounded, this will be the stdout *pipe*,
36 instead of the stderr text itself and will not be auto closed.
37 """
38 ),
39 magic_arguments.argument(
40 '--err', type=str,
41 help="""The variable in which to store stderr from the script.
42 If the script is backgrounded, this will be the stderr *pipe*,
43 instead of the stderr text itself and will not be autoclosed.
44 """
45 ),
46 magic_arguments.argument(
47 '--bg', action="store_true",
48 help="""Whether to run the script in the background.
49 If given, the only way to see the output of the command is
50 with --out/err.
51 """
52 ),
53 magic_arguments.argument(
54 '--proc', type=str,
55 help="""The variable in which to store Popen instance.
56 This is used only when --bg option is given.
57 """
58 ),
59 magic_arguments.argument(
60 '--no-raise-error', action="store_false", dest='raise_error',
61 help="""Whether you should raise an error message in addition to
62 a stream on stderr if you get a nonzero exit code.
63 """,
64 ),
65 ]
66 for arg in args:
67 f = arg(f)
68 return f
69
70
71class RaiseAfterInterrupt(Exception):
72 pass
73
74
75@magics_class
76class ScriptMagics(Magics):
77 """Magics for talking to scripts
78
79 This defines a base `%%script` cell magic for running a cell
80 with a program in a subprocess, and registers a few top-level
81 magics that call %%script with common interpreters.
82 """
83
84 event_loop = Any(
85 help="""
86 The event loop on which to run subprocesses
87
88 Not the main event loop,
89 because we want to be able to make blocking calls
90 and have certain requirements we don't want to impose on the main loop.
91 """
92 )
93
94 script_magics: List = List(
95 help="""Extra script cell magics to define
96
97 This generates simple wrappers of `%%script foo` as `%%foo`.
98
99 If you want to add script magics that aren't on your path,
100 specify them in script_paths
101 """,
102 ).tag(config=True)
103
104 @default('script_magics')
105 def _script_magics_default(self):
106 """default to a common list of programs"""
107
108 defaults = [
109 'sh',
110 'bash',
111 'perl',
112 'ruby',
113 'python',
114 'python2',
115 'python3',
116 'pypy',
117 ]
118 if os.name == 'nt':
119 defaults.extend([
120 'cmd',
121 ])
122
123 return defaults
124
125 script_paths = Dict(
126 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
127
128 Only necessary for items in script_magics where the default path will not
129 find the right interpreter.
130 """
131 ).tag(config=True)
132
133 def __init__(self, shell=None):
134 super(ScriptMagics, self).__init__(shell=shell)
135 self._generate_script_magics()
136 self.bg_processes = []
137 atexit.register(self.kill_bg_processes)
138
139 def __del__(self):
140 self.kill_bg_processes()
141
142 def _generate_script_magics(self):
143 cell_magics = self.magics['cell']
144 for name in self.script_magics:
145 cell_magics[name] = self._make_script_magic(name)
146
147 def _make_script_magic(self, name):
148 """make a named magic, that calls %%script with a particular program"""
149 # expand to explicit path if necessary:
150 script = self.script_paths.get(name, name)
151
152 @magic_arguments.magic_arguments()
153 @script_args
154 def named_script_magic(line, cell):
155 # if line, add it as cl-flags
156 if line:
157 line = "%s %s" % (script, line)
158 else:
159 line = script
160 return self.shebang(line, cell)
161
162 # write a basic docstring:
163 named_script_magic.__doc__ = \
164 """%%{name} script magic
165
166 Run cells with {script} in a subprocess.
167
168 This is a shortcut for `%%script {script}`
169 """.format(**locals())
170
171 return named_script_magic
172
173 @magic_arguments.magic_arguments()
174 @script_args
175 @cell_magic("script")
176 def shebang(self, line, cell):
177 """Run a cell via a shell command
178
179 The `%%script` line is like the #! line of script,
180 specifying a program (bash, perl, ruby, etc.) with which to run.
181
182 The rest of the cell is run by that program.
183
184 .. versionchanged:: 9.0
185 Interrupting the script executed without `--bg` will end in
186 raising an exception (unless `--no-raise-error` is passed).
187
188 Examples
189 --------
190 ::
191
192 In [1]: %%script bash
193 ...: for i in 1 2 3; do
194 ...: echo $i
195 ...: done
196 1
197 2
198 3
199 """
200
201 # Create the event loop in which to run script magics
202 # this operates on a background thread
203 if self.event_loop is None:
204 if sys.platform == "win32":
205 # don't override the current policy,
206 # just create an event loop
207 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop()
208 else:
209 event_loop = asyncio.new_event_loop()
210 self.event_loop = event_loop
211
212 # start the loop in a background thread
213 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True)
214 asyncio_thread.start()
215 else:
216 event_loop = self.event_loop
217
218 def in_thread(coro):
219 """Call a coroutine on the asyncio thread"""
220 return asyncio.run_coroutine_threadsafe(coro, event_loop).result()
221
222 async def _readchunk(stream):
223 try:
224 return await stream.read(100)
225 except asyncio.exceptions.IncompleteReadError as e:
226 return e.partial
227 except asyncio.exceptions.LimitOverrunError as e:
228 return await stream.read(e.consumed)
229
230 async def _handle_stream(stream, stream_arg, file_object):
231 should_break = False
232 decoder = getincrementaldecoder("utf-8")(errors="replace")
233 while True:
234 chunk = decoder.decode(await _readchunk(stream))
235 if not chunk:
236 break
237 chunk = decoder.decode("", final=True)
238 should_break = True
239 if stream_arg:
240 self.shell.user_ns[stream_arg] += chunk
241 else:
242 file_object.write(chunk)
243 file_object.flush()
244 if should_break:
245 break
246
247 async def _stream_communicate(process, cell):
248 process.stdin.write(cell)
249 process.stdin.close()
250 stdout_task = asyncio.create_task(
251 _handle_stream(process.stdout, args.out, sys.stdout)
252 )
253 stderr_task = asyncio.create_task(
254 _handle_stream(process.stderr, args.err, sys.stderr)
255 )
256 await asyncio.wait([stdout_task, stderr_task])
257 await process.wait()
258
259 argv = arg_split(line, posix=not sys.platform.startswith("win"))
260 args, cmd = self.shebang.parser.parse_known_args(argv)
261
262 if args.out:
263 self.shell.user_ns[args.out] = ""
264 if args.err:
265 self.shell.user_ns[args.err] = ""
266
267 try:
268 p = in_thread(
269 asyncio.create_subprocess_exec(
270 *cmd,
271 stdout=asyncio.subprocess.PIPE,
272 stderr=asyncio.subprocess.PIPE,
273 stdin=asyncio.subprocess.PIPE,
274 )
275 )
276 except OSError as e:
277 if e.errno == errno.ENOENT:
278 print("Couldn't find program: %r" % cmd[0])
279 return
280 else:
281 raise
282
283 if not cell.endswith('\n'):
284 cell += '\n'
285 cell = cell.encode('utf8', 'replace')
286 if args.bg:
287 self.bg_processes.append(p)
288 self._gc_bg_processes()
289 to_close = []
290 if args.out:
291 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop)
292 else:
293 to_close.append(p.stdout)
294 if args.err:
295 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop)
296 else:
297 to_close.append(p.stderr)
298 event_loop.call_soon_threadsafe(
299 lambda: asyncio.Task(self._run_script(p, cell, to_close))
300 )
301 if args.proc:
302 proc_proxy = _AsyncIOProxy(p, event_loop)
303 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop)
304 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop)
305 self.shell.user_ns[args.proc] = proc_proxy
306 return
307
308 try:
309 in_thread(_stream_communicate(p, cell))
310 except KeyboardInterrupt:
311 try:
312 p.send_signal(signal.SIGINT)
313 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
314 if p.returncode is not None:
315 print("Process was interrupted.")
316 if args.raise_error:
317 raise RaiseAfterInterrupt()
318 else:
319 return
320 p.terminate()
321 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
322 if p.returncode is not None:
323 print("Process was terminated.")
324 if args.raise_error:
325 raise RaiseAfterInterrupt()
326 else:
327 return
328 p.kill()
329 print("Process was killed.")
330 if args.raise_error:
331 raise RaiseAfterInterrupt()
332 except RaiseAfterInterrupt:
333 pass
334 except OSError:
335 pass
336 except Exception as e:
337 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
338 if args.raise_error:
339 raise CalledProcessError(p.returncode, cell) from None
340 else:
341 return
342
343 if args.raise_error and p.returncode != 0:
344 # If we get here and p.returncode is still None, we must have
345 # killed it but not yet seen its return code. We don't wait for it,
346 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
347 rc = p.returncode or -9
348 raise CalledProcessError(rc, cell)
349
350 shebang.__skip_doctest__ = os.name != "posix"
351
352 async def _run_script(self, p, cell, to_close):
353 """callback for running the script in the background"""
354
355 p.stdin.write(cell)
356 await p.stdin.drain()
357 p.stdin.close()
358 await p.stdin.wait_closed()
359 await p.wait()
360 # asyncio read pipes have no close
361 # but we should drain the data anyway
362 for s in to_close:
363 await s.read()
364 self._gc_bg_processes()
365
366 @line_magic("killbgscripts")
367 def killbgscripts(self, _nouse_=''):
368 """Kill all BG processes started by %%script and its family."""
369 self.kill_bg_processes()
370 print("All background processes were killed.")
371
372 def kill_bg_processes(self):
373 """Kill all BG processes which are still running."""
374 if not self.bg_processes:
375 return
376 for p in self.bg_processes:
377 if p.returncode is None:
378 try:
379 p.send_signal(signal.SIGINT)
380 except:
381 pass
382 time.sleep(0.1)
383 self._gc_bg_processes()
384 if not self.bg_processes:
385 return
386 for p in self.bg_processes:
387 if p.returncode is None:
388 try:
389 p.terminate()
390 except:
391 pass
392 time.sleep(0.1)
393 self._gc_bg_processes()
394 if not self.bg_processes:
395 return
396 for p in self.bg_processes:
397 if p.returncode is None:
398 try:
399 p.kill()
400 except:
401 pass
402 self._gc_bg_processes()
403
404 def _gc_bg_processes(self):
405 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]