Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/magics/script.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

211 statements  

1"""Magic functions for running cells in various scripts.""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6import asyncio 

7import asyncio.exceptions 

8import atexit 

9import errno 

10import os 

11import signal 

12import sys 

13import time 

14from codecs import getincrementaldecoder 

15from subprocess import CalledProcessError 

16from threading import Thread 

17 

18from traitlets import Any, Dict, List, default 

19 

20from IPython.core import magic_arguments 

21from IPython.core.async_helpers import _AsyncIOProxy 

22from IPython.core.magic import Magics, cell_magic, line_magic, magics_class 

23from IPython.utils.process import arg_split 

24 

25#----------------------------------------------------------------------------- 

26# Magic implementation classes 

27#----------------------------------------------------------------------------- 

28 

29def script_args(f): 

30 """single decorator for adding script args""" 

31 args = [ 

32 magic_arguments.argument( 

33 '--out', type=str, 

34 help="""The variable in which to store stdout from the script. 

35 If the script is backgrounded, this will be the stdout *pipe*, 

36 instead of the stderr text itself and will not be auto closed. 

37 """ 

38 ), 

39 magic_arguments.argument( 

40 '--err', type=str, 

41 help="""The variable in which to store stderr from the script. 

42 If the script is backgrounded, this will be the stderr *pipe*, 

43 instead of the stderr text itself and will not be autoclosed. 

44 """ 

45 ), 

46 magic_arguments.argument( 

47 '--bg', action="store_true", 

48 help="""Whether to run the script in the background. 

49 If given, the only way to see the output of the command is 

50 with --out/err. 

51 """ 

52 ), 

53 magic_arguments.argument( 

54 '--proc', type=str, 

55 help="""The variable in which to store Popen instance. 

56 This is used only when --bg option is given. 

57 """ 

58 ), 

59 magic_arguments.argument( 

60 '--no-raise-error', action="store_false", dest='raise_error', 

61 help="""Whether you should raise an error message in addition to 

62 a stream on stderr if you get a nonzero exit code. 

63 """, 

64 ), 

65 ] 

66 for arg in args: 

67 f = arg(f) 

68 return f 

69 

70 

71class RaiseAfterInterrupt(Exception): 

72 pass 

73 

74 

75@magics_class 

76class ScriptMagics(Magics): 

77 """Magics for talking to scripts 

78  

79 This defines a base `%%script` cell magic for running a cell 

80 with a program in a subprocess, and registers a few top-level 

81 magics that call %%script with common interpreters. 

82 """ 

83 

84 event_loop = Any( 

85 help=""" 

86 The event loop on which to run subprocesses 

87 

88 Not the main event loop, 

89 because we want to be able to make blocking calls 

90 and have certain requirements we don't want to impose on the main loop. 

91 """ 

92 ) 

93 

94 script_magics: List = List( 

95 help="""Extra script cell magics to define 

96  

97 This generates simple wrappers of `%%script foo` as `%%foo`. 

98  

99 If you want to add script magics that aren't on your path, 

100 specify them in script_paths 

101 """, 

102 ).tag(config=True) 

103 

104 @default('script_magics') 

105 def _script_magics_default(self): 

106 """default to a common list of programs""" 

107 

108 defaults = [ 

109 'sh', 

110 'bash', 

111 'perl', 

112 'ruby', 

113 'python', 

114 'python2', 

115 'python3', 

116 'pypy', 

117 ] 

118 if os.name == 'nt': 

119 defaults.extend([ 

120 'cmd', 

121 ]) 

122 

123 return defaults 

124 

125 script_paths = Dict( 

126 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby' 

127  

128 Only necessary for items in script_magics where the default path will not 

129 find the right interpreter. 

130 """ 

131 ).tag(config=True) 

132 

133 def __init__(self, shell=None): 

134 super(ScriptMagics, self).__init__(shell=shell) 

135 self._generate_script_magics() 

136 self.bg_processes = [] 

137 atexit.register(self.kill_bg_processes) 

138 

139 def __del__(self): 

140 self.kill_bg_processes() 

141 

142 def _generate_script_magics(self): 

143 cell_magics = self.magics['cell'] 

144 for name in self.script_magics: 

145 cell_magics[name] = self._make_script_magic(name) 

146 

147 def _make_script_magic(self, name): 

148 """make a named magic, that calls %%script with a particular program""" 

149 # expand to explicit path if necessary: 

150 script = self.script_paths.get(name, name) 

151 

152 @magic_arguments.magic_arguments() 

153 @script_args 

154 def named_script_magic(line, cell): 

155 # if line, add it as cl-flags 

156 if line: 

157 line = "%s %s" % (script, line) 

158 else: 

159 line = script 

160 return self.shebang(line, cell) 

161 

162 # write a basic docstring: 

163 named_script_magic.__doc__ = \ 

164 """%%{name} script magic 

165  

166 Run cells with {script} in a subprocess. 

167  

168 This is a shortcut for `%%script {script}` 

169 """.format(**locals()) 

170 

171 return named_script_magic 

172 

173 @magic_arguments.magic_arguments() 

174 @script_args 

175 @cell_magic("script") 

176 def shebang(self, line, cell): 

177 """Run a cell via a shell command 

178 

179 The `%%script` line is like the #! line of script, 

180 specifying a program (bash, perl, ruby, etc.) with which to run. 

181 

182 The rest of the cell is run by that program. 

183 

184 .. versionchanged:: 9.0 

185 Interrupting the script executed without `--bg` will end in 

186 raising an exception (unless `--no-raise-error` is passed). 

187 

188 Examples 

189 -------- 

190 :: 

191 

192 In [1]: %%script bash 

193 ...: for i in 1 2 3; do 

194 ...: echo $i 

195 ...: done 

196 1 

197 2 

198 3 

199 """ 

200 

201 # Create the event loop in which to run script magics 

202 # this operates on a background thread 

203 if self.event_loop is None: 

204 if sys.platform == "win32": 

205 # don't override the current policy, 

206 # just create an event loop 

207 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop() 

208 else: 

209 event_loop = asyncio.new_event_loop() 

210 self.event_loop = event_loop 

211 

212 # start the loop in a background thread 

213 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True) 

214 asyncio_thread.start() 

215 else: 

216 event_loop = self.event_loop 

217 

218 def in_thread(coro): 

219 """Call a coroutine on the asyncio thread""" 

220 return asyncio.run_coroutine_threadsafe(coro, event_loop).result() 

221 

222 async def _readchunk(stream): 

223 try: 

224 return await stream.read(100) 

225 except asyncio.exceptions.IncompleteReadError as e: 

226 return e.partial 

227 except asyncio.exceptions.LimitOverrunError as e: 

228 return await stream.read(e.consumed) 

229 

230 async def _handle_stream(stream, stream_arg, file_object): 

231 should_break = False 

232 decoder = getincrementaldecoder("utf-8")(errors="replace") 

233 while True: 

234 chunk = decoder.decode(await _readchunk(stream)) 

235 if not chunk: 

236 break 

237 chunk = decoder.decode("", final=True) 

238 should_break = True 

239 if stream_arg: 

240 self.shell.user_ns[stream_arg] += chunk 

241 else: 

242 file_object.write(chunk) 

243 file_object.flush() 

244 if should_break: 

245 break 

246 

247 async def _stream_communicate(process, cell): 

248 process.stdin.write(cell) 

249 process.stdin.close() 

250 stdout_task = asyncio.create_task( 

251 _handle_stream(process.stdout, args.out, sys.stdout) 

252 ) 

253 stderr_task = asyncio.create_task( 

254 _handle_stream(process.stderr, args.err, sys.stderr) 

255 ) 

256 await asyncio.wait([stdout_task, stderr_task]) 

257 await process.wait() 

258 

259 argv = arg_split(line, posix=not sys.platform.startswith("win")) 

260 args, cmd = self.shebang.parser.parse_known_args(argv) 

261 

262 if args.out: 

263 self.shell.user_ns[args.out] = "" 

264 if args.err: 

265 self.shell.user_ns[args.err] = "" 

266 

267 try: 

268 p = in_thread( 

269 asyncio.create_subprocess_exec( 

270 *cmd, 

271 stdout=asyncio.subprocess.PIPE, 

272 stderr=asyncio.subprocess.PIPE, 

273 stdin=asyncio.subprocess.PIPE, 

274 ) 

275 ) 

276 except OSError as e: 

277 if e.errno == errno.ENOENT: 

278 print("Couldn't find program: %r" % cmd[0]) 

279 return 

280 else: 

281 raise 

282 

283 if not cell.endswith('\n'): 

284 cell += '\n' 

285 cell = cell.encode('utf8', 'replace') 

286 if args.bg: 

287 self.bg_processes.append(p) 

288 self._gc_bg_processes() 

289 to_close = [] 

290 if args.out: 

291 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop) 

292 else: 

293 to_close.append(p.stdout) 

294 if args.err: 

295 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop) 

296 else: 

297 to_close.append(p.stderr) 

298 event_loop.call_soon_threadsafe( 

299 lambda: asyncio.Task(self._run_script(p, cell, to_close)) 

300 ) 

301 if args.proc: 

302 proc_proxy = _AsyncIOProxy(p, event_loop) 

303 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop) 

304 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop) 

305 self.shell.user_ns[args.proc] = proc_proxy 

306 return 

307 

308 try: 

309 in_thread(_stream_communicate(p, cell)) 

310 except KeyboardInterrupt: 

311 try: 

312 p.send_signal(signal.SIGINT) 

313 in_thread(asyncio.wait_for(p.wait(), timeout=0.1)) 

314 if p.returncode is not None: 

315 print("Process was interrupted.") 

316 if args.raise_error: 

317 raise RaiseAfterInterrupt() 

318 else: 

319 return 

320 p.terminate() 

321 in_thread(asyncio.wait_for(p.wait(), timeout=0.1)) 

322 if p.returncode is not None: 

323 print("Process was terminated.") 

324 if args.raise_error: 

325 raise RaiseAfterInterrupt() 

326 else: 

327 return 

328 p.kill() 

329 print("Process was killed.") 

330 if args.raise_error: 

331 raise RaiseAfterInterrupt() 

332 except RaiseAfterInterrupt: 

333 pass 

334 except OSError: 

335 pass 

336 except Exception as e: 

337 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e)) 

338 if args.raise_error: 

339 raise CalledProcessError(p.returncode, cell) from None 

340 else: 

341 return 

342 

343 if args.raise_error and p.returncode != 0: 

344 # If we get here and p.returncode is still None, we must have 

345 # killed it but not yet seen its return code. We don't wait for it, 

346 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL 

347 rc = p.returncode or -9 

348 raise CalledProcessError(rc, cell) 

349 

350 shebang.__skip_doctest__ = os.name != "posix" 

351 

352 async def _run_script(self, p, cell, to_close): 

353 """callback for running the script in the background""" 

354 

355 p.stdin.write(cell) 

356 await p.stdin.drain() 

357 p.stdin.close() 

358 await p.stdin.wait_closed() 

359 await p.wait() 

360 # asyncio read pipes have no close 

361 # but we should drain the data anyway 

362 for s in to_close: 

363 await s.read() 

364 self._gc_bg_processes() 

365 

366 @line_magic("killbgscripts") 

367 def killbgscripts(self, _nouse_=''): 

368 """Kill all BG processes started by %%script and its family.""" 

369 self.kill_bg_processes() 

370 print("All background processes were killed.") 

371 

372 def kill_bg_processes(self): 

373 """Kill all BG processes which are still running.""" 

374 if not self.bg_processes: 

375 return 

376 for p in self.bg_processes: 

377 if p.returncode is None: 

378 try: 

379 p.send_signal(signal.SIGINT) 

380 except: 

381 pass 

382 time.sleep(0.1) 

383 self._gc_bg_processes() 

384 if not self.bg_processes: 

385 return 

386 for p in self.bg_processes: 

387 if p.returncode is None: 

388 try: 

389 p.terminate() 

390 except: 

391 pass 

392 time.sleep(0.1) 

393 self._gc_bg_processes() 

394 if not self.bg_processes: 

395 return 

396 for p in self.bg_processes: 

397 if p.returncode is None: 

398 try: 

399 p.kill() 

400 except: 

401 pass 

402 self._gc_bg_processes() 

403 

404 def _gc_bg_processes(self): 

405 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]