Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/magics/script.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

207 statements  

1"""Magic functions for running cells in various scripts.""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6import asyncio 

7import asyncio.exceptions 

8import atexit 

9import errno 

10import os 

11import signal 

12import sys 

13import time 

14from codecs import getincrementaldecoder 

15from subprocess import CalledProcessError 

16from threading import Thread 

17 

18from traitlets import Any, Dict, List, default 

19 

20from IPython.core import magic_arguments 

21from IPython.core.async_helpers import _AsyncIOProxy 

22from IPython.core.magic import Magics, cell_magic, line_magic, magics_class 

23from IPython.utils.process import arg_split 

24 

25#----------------------------------------------------------------------------- 

26# Magic implementation classes 

27#----------------------------------------------------------------------------- 

28 

29def script_args(f): 

30 """single decorator for adding script args""" 

31 args = [ 

32 magic_arguments.argument( 

33 '--out', type=str, 

34 help="""The variable in which to store stdout from the script. 

35 If the script is backgrounded, this will be the stdout *pipe*, 

36 instead of the stderr text itself and will not be auto closed. 

37 """ 

38 ), 

39 magic_arguments.argument( 

40 '--err', type=str, 

41 help="""The variable in which to store stderr from the script. 

42 If the script is backgrounded, this will be the stderr *pipe*, 

43 instead of the stderr text itself and will not be autoclosed. 

44 """ 

45 ), 

46 magic_arguments.argument( 

47 '--bg', action="store_true", 

48 help="""Whether to run the script in the background. 

49 If given, the only way to see the output of the command is 

50 with --out/err. 

51 """ 

52 ), 

53 magic_arguments.argument( 

54 '--proc', type=str, 

55 help="""The variable in which to store Popen instance. 

56 This is used only when --bg option is given. 

57 """ 

58 ), 

59 magic_arguments.argument( 

60 '--no-raise-error', action="store_false", dest='raise_error', 

61 help="""Whether you should raise an error message in addition to 

62 a stream on stderr if you get a nonzero exit code. 

63 """, 

64 ), 

65 ] 

66 for arg in args: 

67 f = arg(f) 

68 return f 

69 

70 

71class RaiseAfterInterrupt(Exception): 

72 pass 

73 

74 

75@magics_class 

76class ScriptMagics(Magics): 

77 """Magics for talking to scripts 

78  

79 This defines a base `%%script` cell magic for running a cell 

80 with a program in a subprocess, and registers a few top-level 

81 magics that call %%script with common interpreters. 

82 """ 

83 

84 event_loop = Any( 

85 help=""" 

86 The event loop on which to run subprocesses 

87 

88 Not the main event loop, 

89 because we want to be able to make blocking calls 

90 and have certain requirements we don't want to impose on the main loop. 

91 """ 

92 ) 

93 

94 script_magics: List = List( 

95 help="""Extra script cell magics to define 

96  

97 This generates simple wrappers of `%%script foo` as `%%foo`. 

98  

99 If you want to add script magics that aren't on your path, 

100 specify them in script_paths 

101 """, 

102 ).tag(config=True) 

103 

104 @default('script_magics') 

105 def _script_magics_default(self): 

106 """default to a common list of programs""" 

107 

108 defaults = [ 

109 'sh', 

110 'bash', 

111 'perl', 

112 'ruby', 

113 'python', 

114 'python2', 

115 'python3', 

116 'pypy', 

117 ] 

118 if os.name == 'nt': 

119 defaults.extend([ 

120 'cmd', 

121 ]) 

122 

123 return defaults 

124 

125 script_paths = Dict( 

126 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby' 

127  

128 Only necessary for items in script_magics where the default path will not 

129 find the right interpreter. 

130 """ 

131 ).tag(config=True) 

132 

133 def __init__(self, shell=None): 

134 super(ScriptMagics, self).__init__(shell=shell) 

135 self._generate_script_magics() 

136 self.bg_processes = [] 

137 atexit.register(self.kill_bg_processes) 

138 

139 def __del__(self): 

140 self.kill_bg_processes() 

141 

142 def _generate_script_magics(self): 

143 cell_magics = self.magics['cell'] 

144 for name in self.script_magics: 

145 cell_magics[name] = self._make_script_magic(name) 

146 

147 def _make_script_magic(self, name): 

148 """make a named magic, that calls %%script with a particular program""" 

149 # expand to explicit path if necessary: 

150 script = self.script_paths.get(name, name) 

151 

152 @magic_arguments.magic_arguments() 

153 @script_args 

154 def named_script_magic(line, cell): 

155 # if line, add it as cl-flags 

156 if line: 

157 line = "%s %s" % (script, line) 

158 else: 

159 line = script 

160 return self.shebang(line, cell) 

161 

162 # write a basic docstring: 

163 named_script_magic.__doc__ = \ 

164 """%%{name} script magic 

165  

166 Run cells with {script} in a subprocess. 

167  

168 This is a shortcut for `%%script {script}` 

169 """.format(**locals()) 

170 

171 return named_script_magic 

172 

173 @magic_arguments.magic_arguments() 

174 @script_args 

175 @cell_magic("script") 

176 def shebang(self, line, cell): 

177 """Run a cell via a shell command 

178 

179 The `%%script` line is like the #! line of script, 

180 specifying a program (bash, perl, ruby, etc.) with which to run. 

181 

182 The rest of the cell is run by that program. 

183 

184 .. versionchanged:: 9.0 

185 Interrupting the script executed without `--bg` will end in 

186 raising an exception (unless `--no-raise-error` is passed). 

187 

188 Examples 

189 -------- 

190 :: 

191 

192 In [1]: %%script bash 

193 ...: for i in 1 2 3; do 

194 ...: echo $i 

195 ...: done 

196 1 

197 2 

198 3 

199 """ 

200 

201 # Create the event loop in which to run script magics 

202 # this operates on a background thread 

203 if self.event_loop is None: 

204 if sys.platform == "win32": 

205 # don't override the current policy, 

206 # just create an event loop 

207 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop() 

208 else: 

209 event_loop = asyncio.new_event_loop() 

210 self.event_loop = event_loop 

211 

212 # start the loop in a background thread 

213 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True) 

214 asyncio_thread.start() 

215 else: 

216 event_loop = self.event_loop 

217 

218 def in_thread(coro): 

219 """Call a coroutine on the asyncio thread""" 

220 return asyncio.run_coroutine_threadsafe(coro, event_loop).result() 

221 

222 async def _readchunk(stream): 

223 try: 

224 return await stream.read(100) 

225 except asyncio.exceptions.IncompleteReadError as e: 

226 return e.partial 

227 except asyncio.exceptions.LimitOverrunError as e: 

228 return await stream.read(e.consumed) 

229 

230 async def _handle_stream(stream, stream_arg, file_object): 

231 should_break = False 

232 decoder = getincrementaldecoder("utf-8")(errors="replace") 

233 while True: 

234 chunk = decoder.decode(await _readchunk(stream)) 

235 if not chunk: 

236 break 

237 chunk = decoder.decode("", final=True) 

238 should_break = True 

239 if stream_arg: 

240 self.shell.user_ns[stream_arg] = chunk 

241 else: 

242 file_object.write(chunk) 

243 file_object.flush() 

244 if should_break: 

245 break 

246 

247 async def _stream_communicate(process, cell): 

248 process.stdin.write(cell) 

249 process.stdin.close() 

250 stdout_task = asyncio.create_task( 

251 _handle_stream(process.stdout, args.out, sys.stdout) 

252 ) 

253 stderr_task = asyncio.create_task( 

254 _handle_stream(process.stderr, args.err, sys.stderr) 

255 ) 

256 await asyncio.wait([stdout_task, stderr_task]) 

257 await process.wait() 

258 

259 argv = arg_split(line, posix=not sys.platform.startswith("win")) 

260 args, cmd = self.shebang.parser.parse_known_args(argv) 

261 

262 try: 

263 p = in_thread( 

264 asyncio.create_subprocess_exec( 

265 *cmd, 

266 stdout=asyncio.subprocess.PIPE, 

267 stderr=asyncio.subprocess.PIPE, 

268 stdin=asyncio.subprocess.PIPE, 

269 ) 

270 ) 

271 except OSError as e: 

272 if e.errno == errno.ENOENT: 

273 print("Couldn't find program: %r" % cmd[0]) 

274 return 

275 else: 

276 raise 

277 

278 if not cell.endswith('\n'): 

279 cell += '\n' 

280 cell = cell.encode('utf8', 'replace') 

281 if args.bg: 

282 self.bg_processes.append(p) 

283 self._gc_bg_processes() 

284 to_close = [] 

285 if args.out: 

286 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop) 

287 else: 

288 to_close.append(p.stdout) 

289 if args.err: 

290 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop) 

291 else: 

292 to_close.append(p.stderr) 

293 event_loop.call_soon_threadsafe( 

294 lambda: asyncio.Task(self._run_script(p, cell, to_close)) 

295 ) 

296 if args.proc: 

297 proc_proxy = _AsyncIOProxy(p, event_loop) 

298 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop) 

299 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop) 

300 self.shell.user_ns[args.proc] = proc_proxy 

301 return 

302 

303 try: 

304 in_thread(_stream_communicate(p, cell)) 

305 except KeyboardInterrupt: 

306 try: 

307 p.send_signal(signal.SIGINT) 

308 in_thread(asyncio.wait_for(p.wait(), timeout=0.1)) 

309 if p.returncode is not None: 

310 print("Process was interrupted.") 

311 if args.raise_error: 

312 raise RaiseAfterInterrupt() 

313 else: 

314 return 

315 p.terminate() 

316 in_thread(asyncio.wait_for(p.wait(), timeout=0.1)) 

317 if p.returncode is not None: 

318 print("Process was terminated.") 

319 if args.raise_error: 

320 raise RaiseAfterInterrupt() 

321 else: 

322 return 

323 p.kill() 

324 print("Process was killed.") 

325 if args.raise_error: 

326 raise RaiseAfterInterrupt() 

327 except RaiseAfterInterrupt: 

328 pass 

329 except OSError: 

330 pass 

331 except Exception as e: 

332 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e)) 

333 if args.raise_error: 

334 raise CalledProcessError(p.returncode, cell) from None 

335 else: 

336 return 

337 

338 if args.raise_error and p.returncode != 0: 

339 # If we get here and p.returncode is still None, we must have 

340 # killed it but not yet seen its return code. We don't wait for it, 

341 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL 

342 rc = p.returncode or -9 

343 raise CalledProcessError(rc, cell) 

344 

345 shebang.__skip_doctest__ = os.name != "posix" 

346 

347 async def _run_script(self, p, cell, to_close): 

348 """callback for running the script in the background""" 

349 

350 p.stdin.write(cell) 

351 await p.stdin.drain() 

352 p.stdin.close() 

353 await p.stdin.wait_closed() 

354 await p.wait() 

355 # asyncio read pipes have no close 

356 # but we should drain the data anyway 

357 for s in to_close: 

358 await s.read() 

359 self._gc_bg_processes() 

360 

361 @line_magic("killbgscripts") 

362 def killbgscripts(self, _nouse_=''): 

363 """Kill all BG processes started by %%script and its family.""" 

364 self.kill_bg_processes() 

365 print("All background processes were killed.") 

366 

367 def kill_bg_processes(self): 

368 """Kill all BG processes which are still running.""" 

369 if not self.bg_processes: 

370 return 

371 for p in self.bg_processes: 

372 if p.returncode is None: 

373 try: 

374 p.send_signal(signal.SIGINT) 

375 except: 

376 pass 

377 time.sleep(0.1) 

378 self._gc_bg_processes() 

379 if not self.bg_processes: 

380 return 

381 for p in self.bg_processes: 

382 if p.returncode is None: 

383 try: 

384 p.terminate() 

385 except: 

386 pass 

387 time.sleep(0.1) 

388 self._gc_bg_processes() 

389 if not self.bg_processes: 

390 return 

391 for p in self.bg_processes: 

392 if p.returncode is None: 

393 try: 

394 p.kill() 

395 except: 

396 pass 

397 self._gc_bg_processes() 

398 

399 def _gc_bg_processes(self): 

400 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]