Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/IPython/core/magics/script.py: 22%

188 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:05 +0000

1"""Magic functions for running cells in various scripts.""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6import asyncio 

7import asyncio.exceptions 

8import atexit 

9import errno 

10import os 

11import signal 

12import sys 

13import time 

14from subprocess import CalledProcessError 

15from threading import Thread 

16 

17from traitlets import Any, Dict, List, default 

18 

19from IPython.core import magic_arguments 

20from IPython.core.async_helpers import _AsyncIOProxy 

21from IPython.core.magic import Magics, cell_magic, line_magic, magics_class 

22from IPython.utils.process import arg_split 

23 

24#----------------------------------------------------------------------------- 

25# Magic implementation classes 

26#----------------------------------------------------------------------------- 

27 

28def script_args(f): 

29 """single decorator for adding script args""" 

30 args = [ 

31 magic_arguments.argument( 

32 '--out', type=str, 

33 help="""The variable in which to store stdout from the script. 

34 If the script is backgrounded, this will be the stdout *pipe*, 

35 instead of the stderr text itself and will not be auto closed. 

36 """ 

37 ), 

38 magic_arguments.argument( 

39 '--err', type=str, 

40 help="""The variable in which to store stderr from the script. 

41 If the script is backgrounded, this will be the stderr *pipe*, 

42 instead of the stderr text itself and will not be autoclosed. 

43 """ 

44 ), 

45 magic_arguments.argument( 

46 '--bg', action="store_true", 

47 help="""Whether to run the script in the background. 

48 If given, the only way to see the output of the command is 

49 with --out/err. 

50 """ 

51 ), 

52 magic_arguments.argument( 

53 '--proc', type=str, 

54 help="""The variable in which to store Popen instance. 

55 This is used only when --bg option is given. 

56 """ 

57 ), 

58 magic_arguments.argument( 

59 '--no-raise-error', action="store_false", dest='raise_error', 

60 help="""Whether you should raise an error message in addition to 

61 a stream on stderr if you get a nonzero exit code. 

62 """, 

63 ), 

64 ] 

65 for arg in args: 

66 f = arg(f) 

67 return f 

68 

69 

70@magics_class 

71class ScriptMagics(Magics): 

72 """Magics for talking to scripts 

73  

74 This defines a base `%%script` cell magic for running a cell 

75 with a program in a subprocess, and registers a few top-level 

76 magics that call %%script with common interpreters. 

77 """ 

78 

79 event_loop = Any( 

80 help=""" 

81 The event loop on which to run subprocesses 

82 

83 Not the main event loop, 

84 because we want to be able to make blocking calls 

85 and have certain requirements we don't want to impose on the main loop. 

86 """ 

87 ) 

88 

89 script_magics = List( 

90 help="""Extra script cell magics to define 

91  

92 This generates simple wrappers of `%%script foo` as `%%foo`. 

93  

94 If you want to add script magics that aren't on your path, 

95 specify them in script_paths 

96 """, 

97 ).tag(config=True) 

98 @default('script_magics') 

99 def _script_magics_default(self): 

100 """default to a common list of programs""" 

101 

102 defaults = [ 

103 'sh', 

104 'bash', 

105 'perl', 

106 'ruby', 

107 'python', 

108 'python2', 

109 'python3', 

110 'pypy', 

111 ] 

112 if os.name == 'nt': 

113 defaults.extend([ 

114 'cmd', 

115 ]) 

116 

117 return defaults 

118 

119 script_paths = Dict( 

120 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby' 

121  

122 Only necessary for items in script_magics where the default path will not 

123 find the right interpreter. 

124 """ 

125 ).tag(config=True) 

126 

127 def __init__(self, shell=None): 

128 super(ScriptMagics, self).__init__(shell=shell) 

129 self._generate_script_magics() 

130 self.bg_processes = [] 

131 atexit.register(self.kill_bg_processes) 

132 

133 def __del__(self): 

134 self.kill_bg_processes() 

135 

136 def _generate_script_magics(self): 

137 cell_magics = self.magics['cell'] 

138 for name in self.script_magics: 

139 cell_magics[name] = self._make_script_magic(name) 

140 

141 def _make_script_magic(self, name): 

142 """make a named magic, that calls %%script with a particular program""" 

143 # expand to explicit path if necessary: 

144 script = self.script_paths.get(name, name) 

145 

146 @magic_arguments.magic_arguments() 

147 @script_args 

148 def named_script_magic(line, cell): 

149 # if line, add it as cl-flags 

150 if line: 

151 line = "%s %s" % (script, line) 

152 else: 

153 line = script 

154 return self.shebang(line, cell) 

155 

156 # write a basic docstring: 

157 named_script_magic.__doc__ = \ 

158 """%%{name} script magic 

159  

160 Run cells with {script} in a subprocess. 

161  

162 This is a shortcut for `%%script {script}` 

163 """.format(**locals()) 

164 

165 return named_script_magic 

166 

167 @magic_arguments.magic_arguments() 

168 @script_args 

169 @cell_magic("script") 

170 def shebang(self, line, cell): 

171 """Run a cell via a shell command 

172 

173 The `%%script` line is like the #! line of script, 

174 specifying a program (bash, perl, ruby, etc.) with which to run. 

175 

176 The rest of the cell is run by that program. 

177 

178 Examples 

179 -------- 

180 :: 

181 

182 In [1]: %%script bash 

183 ...: for i in 1 2 3; do 

184 ...: echo $i 

185 ...: done 

186 1 

187 2 

188 3 

189 """ 

190 

191 # Create the event loop in which to run script magics 

192 # this operates on a background thread 

193 if self.event_loop is None: 

194 if sys.platform == "win32": 

195 # don't override the current policy, 

196 # just create an event loop 

197 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop() 

198 else: 

199 event_loop = asyncio.new_event_loop() 

200 self.event_loop = event_loop 

201 

202 # start the loop in a background thread 

203 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True) 

204 asyncio_thread.start() 

205 else: 

206 event_loop = self.event_loop 

207 

208 def in_thread(coro): 

209 """Call a coroutine on the asyncio thread""" 

210 return asyncio.run_coroutine_threadsafe(coro, event_loop).result() 

211 

212 async def _readchunk(stream): 

213 try: 

214 return await stream.readuntil(b"\n") 

215 except asyncio.exceptions.IncompleteReadError as e: 

216 return e.partial 

217 except asyncio.exceptions.LimitOverrunError as e: 

218 return await stream.read(e.consumed) 

219 

220 async def _handle_stream(stream, stream_arg, file_object): 

221 while True: 

222 chunk = (await _readchunk(stream)).decode("utf8", errors="replace") 

223 if not chunk: 

224 break 

225 if stream_arg: 

226 self.shell.user_ns[stream_arg] = chunk 

227 else: 

228 file_object.write(chunk) 

229 file_object.flush() 

230 

231 async def _stream_communicate(process, cell): 

232 process.stdin.write(cell) 

233 process.stdin.close() 

234 stdout_task = asyncio.create_task( 

235 _handle_stream(process.stdout, args.out, sys.stdout) 

236 ) 

237 stderr_task = asyncio.create_task( 

238 _handle_stream(process.stderr, args.err, sys.stderr) 

239 ) 

240 await asyncio.wait([stdout_task, stderr_task]) 

241 await process.wait() 

242 

243 argv = arg_split(line, posix=not sys.platform.startswith("win")) 

244 args, cmd = self.shebang.parser.parse_known_args(argv) 

245 

246 try: 

247 p = in_thread( 

248 asyncio.create_subprocess_exec( 

249 *cmd, 

250 stdout=asyncio.subprocess.PIPE, 

251 stderr=asyncio.subprocess.PIPE, 

252 stdin=asyncio.subprocess.PIPE, 

253 ) 

254 ) 

255 except OSError as e: 

256 if e.errno == errno.ENOENT: 

257 print("Couldn't find program: %r" % cmd[0]) 

258 return 

259 else: 

260 raise 

261 

262 if not cell.endswith('\n'): 

263 cell += '\n' 

264 cell = cell.encode('utf8', 'replace') 

265 if args.bg: 

266 self.bg_processes.append(p) 

267 self._gc_bg_processes() 

268 to_close = [] 

269 if args.out: 

270 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop) 

271 else: 

272 to_close.append(p.stdout) 

273 if args.err: 

274 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop) 

275 else: 

276 to_close.append(p.stderr) 

277 event_loop.call_soon_threadsafe( 

278 lambda: asyncio.Task(self._run_script(p, cell, to_close)) 

279 ) 

280 if args.proc: 

281 proc_proxy = _AsyncIOProxy(p, event_loop) 

282 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop) 

283 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop) 

284 self.shell.user_ns[args.proc] = proc_proxy 

285 return 

286 

287 try: 

288 in_thread(_stream_communicate(p, cell)) 

289 except KeyboardInterrupt: 

290 try: 

291 p.send_signal(signal.SIGINT) 

292 in_thread(asyncio.wait_for(p.wait(), timeout=0.1)) 

293 if p.returncode is not None: 

294 print("Process is interrupted.") 

295 return 

296 p.terminate() 

297 in_thread(asyncio.wait_for(p.wait(), timeout=0.1)) 

298 if p.returncode is not None: 

299 print("Process is terminated.") 

300 return 

301 p.kill() 

302 print("Process is killed.") 

303 except OSError: 

304 pass 

305 except Exception as e: 

306 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e)) 

307 return 

308 

309 if args.raise_error and p.returncode != 0: 

310 # If we get here and p.returncode is still None, we must have 

311 # killed it but not yet seen its return code. We don't wait for it, 

312 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL 

313 rc = p.returncode or -9 

314 raise CalledProcessError(rc, cell) 

315 

316 shebang.__skip_doctest__ = os.name != "posix" 

317 

318 async def _run_script(self, p, cell, to_close): 

319 """callback for running the script in the background""" 

320 

321 p.stdin.write(cell) 

322 await p.stdin.drain() 

323 p.stdin.close() 

324 await p.stdin.wait_closed() 

325 await p.wait() 

326 # asyncio read pipes have no close 

327 # but we should drain the data anyway 

328 for s in to_close: 

329 await s.read() 

330 self._gc_bg_processes() 

331 

332 @line_magic("killbgscripts") 

333 def killbgscripts(self, _nouse_=''): 

334 """Kill all BG processes started by %%script and its family.""" 

335 self.kill_bg_processes() 

336 print("All background processes were killed.") 

337 

338 def kill_bg_processes(self): 

339 """Kill all BG processes which are still running.""" 

340 if not self.bg_processes: 

341 return 

342 for p in self.bg_processes: 

343 if p.returncode is None: 

344 try: 

345 p.send_signal(signal.SIGINT) 

346 except: 

347 pass 

348 time.sleep(0.1) 

349 self._gc_bg_processes() 

350 if not self.bg_processes: 

351 return 

352 for p in self.bg_processes: 

353 if p.returncode is None: 

354 try: 

355 p.terminate() 

356 except: 

357 pass 

358 time.sleep(0.1) 

359 self._gc_bg_processes() 

360 if not self.bg_processes: 

361 return 

362 for p in self.bg_processes: 

363 if p.returncode is None: 

364 try: 

365 p.kill() 

366 except: 

367 pass 

368 self._gc_bg_processes() 

369 

370 def _gc_bg_processes(self): 

371 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]