1"""Magic functions for running cells in various scripts."""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6import asyncio
7import asyncio.exceptions
8import atexit
9import errno
10import os
11import signal
12import sys
13import time
14from codecs import getincrementaldecoder
15from subprocess import CalledProcessError
16from threading import Thread
17
18from traitlets import Any, Dict, List, default
19
20from IPython.core import magic_arguments
21from IPython.core.async_helpers import _AsyncIOProxy
22from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
23from IPython.utils.process import arg_split
24
25#-----------------------------------------------------------------------------
26# Magic implementation classes
27#-----------------------------------------------------------------------------
28
29def script_args(f):
30 """single decorator for adding script args"""
31 args = [
32 magic_arguments.argument(
33 '--out', type=str,
34 help="""The variable in which to store stdout from the script.
35 If the script is backgrounded, this will be the stdout *pipe*,
36 instead of the stderr text itself and will not be auto closed.
37 """
38 ),
39 magic_arguments.argument(
40 '--err', type=str,
41 help="""The variable in which to store stderr from the script.
42 If the script is backgrounded, this will be the stderr *pipe*,
43 instead of the stderr text itself and will not be autoclosed.
44 """
45 ),
46 magic_arguments.argument(
47 '--bg', action="store_true",
48 help="""Whether to run the script in the background.
49 If given, the only way to see the output of the command is
50 with --out/err.
51 """
52 ),
53 magic_arguments.argument(
54 '--proc', type=str,
55 help="""The variable in which to store Popen instance.
56 This is used only when --bg option is given.
57 """
58 ),
59 magic_arguments.argument(
60 '--no-raise-error', action="store_false", dest='raise_error',
61 help="""Whether you should raise an error message in addition to
62 a stream on stderr if you get a nonzero exit code.
63 """,
64 ),
65 ]
66 for arg in args:
67 f = arg(f)
68 return f
69
70
71class RaiseAfterInterrupt(Exception):
72 pass
73
74
75@magics_class
76class ScriptMagics(Magics):
77 """Magics for talking to scripts
78
79 This defines a base `%%script` cell magic for running a cell
80 with a program in a subprocess, and registers a few top-level
81 magics that call %%script with common interpreters.
82 """
83
84 event_loop = Any(
85 help="""
86 The event loop on which to run subprocesses
87
88 Not the main event loop,
89 because we want to be able to make blocking calls
90 and have certain requirements we don't want to impose on the main loop.
91 """
92 )
93
94 script_magics: List = List(
95 help="""Extra script cell magics to define
96
97 This generates simple wrappers of `%%script foo` as `%%foo`.
98
99 If you want to add script magics that aren't on your path,
100 specify them in script_paths
101 """,
102 ).tag(config=True)
103
104 @default('script_magics')
105 def _script_magics_default(self):
106 """default to a common list of programs"""
107
108 defaults = [
109 'sh',
110 'bash',
111 'perl',
112 'ruby',
113 'python',
114 'python2',
115 'python3',
116 'pypy',
117 ]
118 if os.name == 'nt':
119 defaults.extend([
120 'cmd',
121 ])
122
123 return defaults
124
125 script_paths = Dict(
126 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
127
128 Only necessary for items in script_magics where the default path will not
129 find the right interpreter.
130 """
131 ).tag(config=True)
132
133 def __init__(self, shell=None):
134 super(ScriptMagics, self).__init__(shell=shell)
135 self._generate_script_magics()
136 self.bg_processes = []
137 atexit.register(self.kill_bg_processes)
138
139 def __del__(self):
140 self.kill_bg_processes()
141
142 def _generate_script_magics(self):
143 cell_magics = self.magics['cell']
144 for name in self.script_magics:
145 cell_magics[name] = self._make_script_magic(name)
146
147 def _make_script_magic(self, name):
148 """make a named magic, that calls %%script with a particular program"""
149 # expand to explicit path if necessary:
150 script = self.script_paths.get(name, name)
151
152 @magic_arguments.magic_arguments()
153 @script_args
154 def named_script_magic(line, cell):
155 # if line, add it as cl-flags
156 if line:
157 line = "%s %s" % (script, line)
158 else:
159 line = script
160 return self.shebang(line, cell)
161
162 # write a basic docstring:
163 named_script_magic.__doc__ = \
164 """%%{name} script magic
165
166 Run cells with {script} in a subprocess.
167
168 This is a shortcut for `%%script {script}`
169 """.format(**locals())
170
171 return named_script_magic
172
173 @magic_arguments.magic_arguments()
174 @script_args
175 @cell_magic("script")
176 def shebang(self, line, cell):
177 """Run a cell via a shell command
178
179 The `%%script` line is like the #! line of script,
180 specifying a program (bash, perl, ruby, etc.) with which to run.
181
182 The rest of the cell is run by that program.
183
184 .. versionchanged:: 9.0
185 Interrupting the script executed without `--bg` will end in
186 raising an exception (unless `--no-raise-error` is passed).
187
188 Examples
189 --------
190 ::
191
192 In [1]: %%script bash
193 ...: for i in 1 2 3; do
194 ...: echo $i
195 ...: done
196 1
197 2
198 3
199 """
200
201 # Create the event loop in which to run script magics
202 # this operates on a background thread
203 if self.event_loop is None:
204 if sys.platform == "win32":
205 # don't override the current policy,
206 # just create an event loop
207 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop()
208 else:
209 event_loop = asyncio.new_event_loop()
210 self.event_loop = event_loop
211
212 # start the loop in a background thread
213 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True)
214 asyncio_thread.start()
215 else:
216 event_loop = self.event_loop
217
218 def in_thread(coro):
219 """Call a coroutine on the asyncio thread"""
220 return asyncio.run_coroutine_threadsafe(coro, event_loop).result()
221
222 async def _readchunk(stream):
223 try:
224 return await stream.read(100)
225 except asyncio.exceptions.IncompleteReadError as e:
226 return e.partial
227 except asyncio.exceptions.LimitOverrunError as e:
228 return await stream.read(e.consumed)
229
230 async def _handle_stream(stream, stream_arg, file_object):
231 should_break = False
232 decoder = getincrementaldecoder("utf-8")(errors="replace")
233 while True:
234 chunk = decoder.decode(await _readchunk(stream))
235 if not chunk:
236 break
237 chunk = decoder.decode("", final=True)
238 should_break = True
239 if stream_arg:
240 self.shell.user_ns[stream_arg] = chunk
241 else:
242 file_object.write(chunk)
243 file_object.flush()
244 if should_break:
245 break
246
247 async def _stream_communicate(process, cell):
248 process.stdin.write(cell)
249 process.stdin.close()
250 stdout_task = asyncio.create_task(
251 _handle_stream(process.stdout, args.out, sys.stdout)
252 )
253 stderr_task = asyncio.create_task(
254 _handle_stream(process.stderr, args.err, sys.stderr)
255 )
256 await asyncio.wait([stdout_task, stderr_task])
257 await process.wait()
258
259 argv = arg_split(line, posix=not sys.platform.startswith("win"))
260 args, cmd = self.shebang.parser.parse_known_args(argv)
261
262 try:
263 p = in_thread(
264 asyncio.create_subprocess_exec(
265 *cmd,
266 stdout=asyncio.subprocess.PIPE,
267 stderr=asyncio.subprocess.PIPE,
268 stdin=asyncio.subprocess.PIPE,
269 )
270 )
271 except OSError as e:
272 if e.errno == errno.ENOENT:
273 print("Couldn't find program: %r" % cmd[0])
274 return
275 else:
276 raise
277
278 if not cell.endswith('\n'):
279 cell += '\n'
280 cell = cell.encode('utf8', 'replace')
281 if args.bg:
282 self.bg_processes.append(p)
283 self._gc_bg_processes()
284 to_close = []
285 if args.out:
286 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop)
287 else:
288 to_close.append(p.stdout)
289 if args.err:
290 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop)
291 else:
292 to_close.append(p.stderr)
293 event_loop.call_soon_threadsafe(
294 lambda: asyncio.Task(self._run_script(p, cell, to_close))
295 )
296 if args.proc:
297 proc_proxy = _AsyncIOProxy(p, event_loop)
298 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop)
299 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop)
300 self.shell.user_ns[args.proc] = proc_proxy
301 return
302
303 try:
304 in_thread(_stream_communicate(p, cell))
305 except KeyboardInterrupt:
306 try:
307 p.send_signal(signal.SIGINT)
308 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
309 if p.returncode is not None:
310 print("Process was interrupted.")
311 if args.raise_error:
312 raise RaiseAfterInterrupt()
313 else:
314 return
315 p.terminate()
316 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
317 if p.returncode is not None:
318 print("Process was terminated.")
319 if args.raise_error:
320 raise RaiseAfterInterrupt()
321 else:
322 return
323 p.kill()
324 print("Process was killed.")
325 if args.raise_error:
326 raise RaiseAfterInterrupt()
327 except RaiseAfterInterrupt:
328 pass
329 except OSError:
330 pass
331 except Exception as e:
332 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
333 if args.raise_error:
334 raise CalledProcessError(p.returncode, cell) from None
335 else:
336 return
337
338 if args.raise_error and p.returncode != 0:
339 # If we get here and p.returncode is still None, we must have
340 # killed it but not yet seen its return code. We don't wait for it,
341 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
342 rc = p.returncode or -9
343 raise CalledProcessError(rc, cell)
344
345 shebang.__skip_doctest__ = os.name != "posix"
346
347 async def _run_script(self, p, cell, to_close):
348 """callback for running the script in the background"""
349
350 p.stdin.write(cell)
351 await p.stdin.drain()
352 p.stdin.close()
353 await p.stdin.wait_closed()
354 await p.wait()
355 # asyncio read pipes have no close
356 # but we should drain the data anyway
357 for s in to_close:
358 await s.read()
359 self._gc_bg_processes()
360
361 @line_magic("killbgscripts")
362 def killbgscripts(self, _nouse_=''):
363 """Kill all BG processes started by %%script and its family."""
364 self.kill_bg_processes()
365 print("All background processes were killed.")
366
367 def kill_bg_processes(self):
368 """Kill all BG processes which are still running."""
369 if not self.bg_processes:
370 return
371 for p in self.bg_processes:
372 if p.returncode is None:
373 try:
374 p.send_signal(signal.SIGINT)
375 except:
376 pass
377 time.sleep(0.1)
378 self._gc_bg_processes()
379 if not self.bg_processes:
380 return
381 for p in self.bg_processes:
382 if p.returncode is None:
383 try:
384 p.terminate()
385 except:
386 pass
387 time.sleep(0.1)
388 self._gc_bg_processes()
389 if not self.bg_processes:
390 return
391 for p in self.bg_processes:
392 if p.returncode is None:
393 try:
394 p.kill()
395 except:
396 pass
397 self._gc_bg_processes()
398
399 def _gc_bg_processes(self):
400 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]