Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/IPython/core/magics/script.py: 22%

180 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1"""Magic functions for running cells in various scripts.""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6import asyncio 

7import atexit 

8import errno 

9import os 

10import signal 

11import sys 

12import time 

13from subprocess import CalledProcessError 

14from threading import Thread 

15 

16from traitlets import Any, Dict, List, default 

17 

18from IPython.core import magic_arguments 

19from IPython.core.async_helpers import _AsyncIOProxy 

20from IPython.core.magic import Magics, cell_magic, line_magic, magics_class 

21from IPython.utils.process import arg_split 

22 

23#----------------------------------------------------------------------------- 

24# Magic implementation classes 

25#----------------------------------------------------------------------------- 

26 

27def script_args(f): 

28 """single decorator for adding script args""" 

29 args = [ 

30 magic_arguments.argument( 

31 '--out', type=str, 

32 help="""The variable in which to store stdout from the script. 

33 If the script is backgrounded, this will be the stdout *pipe*, 

34 instead of the stderr text itself and will not be auto closed. 

35 """ 

36 ), 

37 magic_arguments.argument( 

38 '--err', type=str, 

39 help="""The variable in which to store stderr from the script. 

40 If the script is backgrounded, this will be the stderr *pipe*, 

41 instead of the stderr text itself and will not be autoclosed. 

42 """ 

43 ), 

44 magic_arguments.argument( 

45 '--bg', action="store_true", 

46 help="""Whether to run the script in the background. 

47 If given, the only way to see the output of the command is 

48 with --out/err. 

49 """ 

50 ), 

51 magic_arguments.argument( 

52 '--proc', type=str, 

53 help="""The variable in which to store Popen instance. 

54 This is used only when --bg option is given. 

55 """ 

56 ), 

57 magic_arguments.argument( 

58 '--no-raise-error', action="store_false", dest='raise_error', 

59 help="""Whether you should raise an error message in addition to 

60 a stream on stderr if you get a nonzero exit code. 

61 """, 

62 ), 

63 ] 

64 for arg in args: 

65 f = arg(f) 

66 return f 

67 

68 

69@magics_class 

70class ScriptMagics(Magics): 

71 """Magics for talking to scripts 

72  

73 This defines a base `%%script` cell magic for running a cell 

74 with a program in a subprocess, and registers a few top-level 

75 magics that call %%script with common interpreters. 

76 """ 

77 

78 event_loop = Any( 

79 help=""" 

80 The event loop on which to run subprocesses 

81 

82 Not the main event loop, 

83 because we want to be able to make blocking calls 

84 and have certain requirements we don't want to impose on the main loop. 

85 """ 

86 ) 

87 

88 script_magics = List( 

89 help="""Extra script cell magics to define 

90  

91 This generates simple wrappers of `%%script foo` as `%%foo`. 

92  

93 If you want to add script magics that aren't on your path, 

94 specify them in script_paths 

95 """, 

96 ).tag(config=True) 

97 @default('script_magics') 

98 def _script_magics_default(self): 

99 """default to a common list of programs""" 

100 

101 defaults = [ 

102 'sh', 

103 'bash', 

104 'perl', 

105 'ruby', 

106 'python', 

107 'python2', 

108 'python3', 

109 'pypy', 

110 ] 

111 if os.name == 'nt': 

112 defaults.extend([ 

113 'cmd', 

114 ]) 

115 

116 return defaults 

117 

118 script_paths = Dict( 

119 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby' 

120  

121 Only necessary for items in script_magics where the default path will not 

122 find the right interpreter. 

123 """ 

124 ).tag(config=True) 

125 

126 def __init__(self, shell=None): 

127 super(ScriptMagics, self).__init__(shell=shell) 

128 self._generate_script_magics() 

129 self.bg_processes = [] 

130 atexit.register(self.kill_bg_processes) 

131 

132 def __del__(self): 

133 self.kill_bg_processes() 

134 

135 def _generate_script_magics(self): 

136 cell_magics = self.magics['cell'] 

137 for name in self.script_magics: 

138 cell_magics[name] = self._make_script_magic(name) 

139 

140 def _make_script_magic(self, name): 

141 """make a named magic, that calls %%script with a particular program""" 

142 # expand to explicit path if necessary: 

143 script = self.script_paths.get(name, name) 

144 

145 @magic_arguments.magic_arguments() 

146 @script_args 

147 def named_script_magic(line, cell): 

148 # if line, add it as cl-flags 

149 if line: 

150 line = "%s %s" % (script, line) 

151 else: 

152 line = script 

153 return self.shebang(line, cell) 

154 

155 # write a basic docstring: 

156 named_script_magic.__doc__ = \ 

157 """%%{name} script magic 

158  

159 Run cells with {script} in a subprocess. 

160  

161 This is a shortcut for `%%script {script}` 

162 """.format(**locals()) 

163 

164 return named_script_magic 

165 

166 @magic_arguments.magic_arguments() 

167 @script_args 

168 @cell_magic("script") 

169 def shebang(self, line, cell): 

170 """Run a cell via a shell command 

171 

172 The `%%script` line is like the #! line of script, 

173 specifying a program (bash, perl, ruby, etc.) with which to run. 

174 

175 The rest of the cell is run by that program. 

176 

177 Examples 

178 -------- 

179 :: 

180 

181 In [1]: %%script bash 

182 ...: for i in 1 2 3; do 

183 ...: echo $i 

184 ...: done 

185 1 

186 2 

187 3 

188 """ 

189 

190 # Create the event loop in which to run script magics 

191 # this operates on a background thread 

192 if self.event_loop is None: 

193 if sys.platform == "win32": 

194 # don't override the current policy, 

195 # just create an event loop 

196 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop() 

197 else: 

198 event_loop = asyncio.new_event_loop() 

199 self.event_loop = event_loop 

200 

201 # start the loop in a background thread 

202 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True) 

203 asyncio_thread.start() 

204 else: 

205 event_loop = self.event_loop 

206 

207 def in_thread(coro): 

208 """Call a coroutine on the asyncio thread""" 

209 return asyncio.run_coroutine_threadsafe(coro, event_loop).result() 

210 

211 async def _handle_stream(stream, stream_arg, file_object): 

212 while True: 

213 line = (await stream.readline()).decode("utf8", errors="replace") 

214 if not line: 

215 break 

216 if stream_arg: 

217 self.shell.user_ns[stream_arg] = line 

218 else: 

219 file_object.write(line) 

220 file_object.flush() 

221 

222 async def _stream_communicate(process, cell): 

223 process.stdin.write(cell) 

224 process.stdin.close() 

225 stdout_task = asyncio.create_task( 

226 _handle_stream(process.stdout, args.out, sys.stdout) 

227 ) 

228 stderr_task = asyncio.create_task( 

229 _handle_stream(process.stderr, args.err, sys.stderr) 

230 ) 

231 await asyncio.wait([stdout_task, stderr_task]) 

232 await process.wait() 

233 

234 argv = arg_split(line, posix=not sys.platform.startswith("win")) 

235 args, cmd = self.shebang.parser.parse_known_args(argv) 

236 

237 try: 

238 p = in_thread( 

239 asyncio.create_subprocess_exec( 

240 *cmd, 

241 stdout=asyncio.subprocess.PIPE, 

242 stderr=asyncio.subprocess.PIPE, 

243 stdin=asyncio.subprocess.PIPE, 

244 ) 

245 ) 

246 except OSError as e: 

247 if e.errno == errno.ENOENT: 

248 print("Couldn't find program: %r" % cmd[0]) 

249 return 

250 else: 

251 raise 

252 

253 if not cell.endswith('\n'): 

254 cell += '\n' 

255 cell = cell.encode('utf8', 'replace') 

256 if args.bg: 

257 self.bg_processes.append(p) 

258 self._gc_bg_processes() 

259 to_close = [] 

260 if args.out: 

261 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop) 

262 else: 

263 to_close.append(p.stdout) 

264 if args.err: 

265 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop) 

266 else: 

267 to_close.append(p.stderr) 

268 event_loop.call_soon_threadsafe( 

269 lambda: asyncio.Task(self._run_script(p, cell, to_close)) 

270 ) 

271 if args.proc: 

272 proc_proxy = _AsyncIOProxy(p, event_loop) 

273 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop) 

274 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop) 

275 self.shell.user_ns[args.proc] = proc_proxy 

276 return 

277 

278 try: 

279 in_thread(_stream_communicate(p, cell)) 

280 except KeyboardInterrupt: 

281 try: 

282 p.send_signal(signal.SIGINT) 

283 in_thread(asyncio.wait_for(p.wait(), timeout=0.1)) 

284 if p.returncode is not None: 

285 print("Process is interrupted.") 

286 return 

287 p.terminate() 

288 in_thread(asyncio.wait_for(p.wait(), timeout=0.1)) 

289 if p.returncode is not None: 

290 print("Process is terminated.") 

291 return 

292 p.kill() 

293 print("Process is killed.") 

294 except OSError: 

295 pass 

296 except Exception as e: 

297 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e)) 

298 return 

299 

300 if args.raise_error and p.returncode != 0: 

301 # If we get here and p.returncode is still None, we must have 

302 # killed it but not yet seen its return code. We don't wait for it, 

303 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL 

304 rc = p.returncode or -9 

305 raise CalledProcessError(rc, cell) 

306 

307 shebang.__skip_doctest__ = os.name != "posix" 

308 

309 async def _run_script(self, p, cell, to_close): 

310 """callback for running the script in the background""" 

311 

312 p.stdin.write(cell) 

313 await p.stdin.drain() 

314 p.stdin.close() 

315 await p.stdin.wait_closed() 

316 await p.wait() 

317 # asyncio read pipes have no close 

318 # but we should drain the data anyway 

319 for s in to_close: 

320 await s.read() 

321 self._gc_bg_processes() 

322 

323 @line_magic("killbgscripts") 

324 def killbgscripts(self, _nouse_=''): 

325 """Kill all BG processes started by %%script and its family.""" 

326 self.kill_bg_processes() 

327 print("All background processes were killed.") 

328 

329 def kill_bg_processes(self): 

330 """Kill all BG processes which are still running.""" 

331 if not self.bg_processes: 

332 return 

333 for p in self.bg_processes: 

334 if p.returncode is None: 

335 try: 

336 p.send_signal(signal.SIGINT) 

337 except: 

338 pass 

339 time.sleep(0.1) 

340 self._gc_bg_processes() 

341 if not self.bg_processes: 

342 return 

343 for p in self.bg_processes: 

344 if p.returncode is None: 

345 try: 

346 p.terminate() 

347 except: 

348 pass 

349 time.sleep(0.1) 

350 self._gc_bg_processes() 

351 if not self.bg_processes: 

352 return 

353 for p in self.bg_processes: 

354 if p.returncode is None: 

355 try: 

356 p.kill() 

357 except: 

358 pass 

359 self._gc_bg_processes() 

360 

361 def _gc_bg_processes(self): 

362 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]