Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/asyncio/base_subprocess.py: 21%

196 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1import collections 

2import subprocess 

3import warnings 

4 

5from . import protocols 

6from . import transports 

7from .log import logger 

8 

9 

10class BaseSubprocessTransport(transports.SubprocessTransport): 

11 

12 def __init__(self, loop, protocol, args, shell, 

13 stdin, stdout, stderr, bufsize, 

14 waiter=None, extra=None, **kwargs): 

15 super().__init__(extra) 

16 self._closed = False 

17 self._protocol = protocol 

18 self._loop = loop 

19 self._proc = None 

20 self._pid = None 

21 self._returncode = None 

22 self._exit_waiters = [] 

23 self._pending_calls = collections.deque() 

24 self._pipes = {} 

25 self._finished = False 

26 

27 if stdin == subprocess.PIPE: 

28 self._pipes[0] = None 

29 if stdout == subprocess.PIPE: 

30 self._pipes[1] = None 

31 if stderr == subprocess.PIPE: 

32 self._pipes[2] = None 

33 

34 # Create the child process: set the _proc attribute 

35 try: 

36 self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, 

37 stderr=stderr, bufsize=bufsize, **kwargs) 

38 except: 

39 self.close() 

40 raise 

41 

42 self._pid = self._proc.pid 

43 self._extra['subprocess'] = self._proc 

44 

45 if self._loop.get_debug(): 

46 if isinstance(args, (bytes, str)): 

47 program = args 

48 else: 

49 program = args[0] 

50 logger.debug('process %r created: pid %s', 

51 program, self._pid) 

52 

53 self._loop.create_task(self._connect_pipes(waiter)) 

54 

55 def __repr__(self): 

56 info = [self.__class__.__name__] 

57 if self._closed: 

58 info.append('closed') 

59 if self._pid is not None: 

60 info.append(f'pid={self._pid}') 

61 if self._returncode is not None: 

62 info.append(f'returncode={self._returncode}') 

63 elif self._pid is not None: 

64 info.append('running') 

65 else: 

66 info.append('not started') 

67 

68 stdin = self._pipes.get(0) 

69 if stdin is not None: 

70 info.append(f'stdin={stdin.pipe}') 

71 

72 stdout = self._pipes.get(1) 

73 stderr = self._pipes.get(2) 

74 if stdout is not None and stderr is stdout: 

75 info.append(f'stdout=stderr={stdout.pipe}') 

76 else: 

77 if stdout is not None: 

78 info.append(f'stdout={stdout.pipe}') 

79 if stderr is not None: 

80 info.append(f'stderr={stderr.pipe}') 

81 

82 return '<{}>'.format(' '.join(info)) 

83 

84 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 

85 raise NotImplementedError 

86 

87 def set_protocol(self, protocol): 

88 self._protocol = protocol 

89 

90 def get_protocol(self): 

91 return self._protocol 

92 

93 def is_closing(self): 

94 return self._closed 

95 

96 def close(self): 

97 if self._closed: 

98 return 

99 self._closed = True 

100 

101 for proto in self._pipes.values(): 

102 if proto is None: 

103 continue 

104 proto.pipe.close() 

105 

106 if (self._proc is not None and 

107 # has the child process finished? 

108 self._returncode is None and 

109 # the child process has finished, but the 

110 # transport hasn't been notified yet? 

111 self._proc.poll() is None): 

112 

113 if self._loop.get_debug(): 

114 logger.warning('Close running child process: kill %r', self) 

115 

116 try: 

117 self._proc.kill() 

118 except ProcessLookupError: 

119 pass 

120 

121 # Don't clear the _proc reference yet: _post_init() may still run 

122 

123 def __del__(self, _warn=warnings.warn): 

124 if not self._closed: 

125 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

126 self.close() 

127 

128 def get_pid(self): 

129 return self._pid 

130 

131 def get_returncode(self): 

132 return self._returncode 

133 

134 def get_pipe_transport(self, fd): 

135 if fd in self._pipes: 

136 return self._pipes[fd].pipe 

137 else: 

138 return None 

139 

140 def _check_proc(self): 

141 if self._proc is None: 

142 raise ProcessLookupError() 

143 

144 def send_signal(self, signal): 

145 self._check_proc() 

146 self._proc.send_signal(signal) 

147 

148 def terminate(self): 

149 self._check_proc() 

150 self._proc.terminate() 

151 

152 def kill(self): 

153 self._check_proc() 

154 self._proc.kill() 

155 

156 async def _connect_pipes(self, waiter): 

157 try: 

158 proc = self._proc 

159 loop = self._loop 

160 

161 if proc.stdin is not None: 

162 _, pipe = await loop.connect_write_pipe( 

163 lambda: WriteSubprocessPipeProto(self, 0), 

164 proc.stdin) 

165 self._pipes[0] = pipe 

166 

167 if proc.stdout is not None: 

168 _, pipe = await loop.connect_read_pipe( 

169 lambda: ReadSubprocessPipeProto(self, 1), 

170 proc.stdout) 

171 self._pipes[1] = pipe 

172 

173 if proc.stderr is not None: 

174 _, pipe = await loop.connect_read_pipe( 

175 lambda: ReadSubprocessPipeProto(self, 2), 

176 proc.stderr) 

177 self._pipes[2] = pipe 

178 

179 assert self._pending_calls is not None 

180 

181 loop.call_soon(self._protocol.connection_made, self) 

182 for callback, data in self._pending_calls: 

183 loop.call_soon(callback, *data) 

184 self._pending_calls = None 

185 except (SystemExit, KeyboardInterrupt): 

186 raise 

187 except BaseException as exc: 

188 if waiter is not None and not waiter.cancelled(): 

189 waiter.set_exception(exc) 

190 else: 

191 if waiter is not None and not waiter.cancelled(): 

192 waiter.set_result(None) 

193 

194 def _call(self, cb, *data): 

195 if self._pending_calls is not None: 

196 self._pending_calls.append((cb, data)) 

197 else: 

198 self._loop.call_soon(cb, *data) 

199 

200 def _pipe_connection_lost(self, fd, exc): 

201 self._call(self._protocol.pipe_connection_lost, fd, exc) 

202 self._try_finish() 

203 

204 def _pipe_data_received(self, fd, data): 

205 self._call(self._protocol.pipe_data_received, fd, data) 

206 

207 def _process_exited(self, returncode): 

208 assert returncode is not None, returncode 

209 assert self._returncode is None, self._returncode 

210 if self._loop.get_debug(): 

211 logger.info('%r exited with return code %r', self, returncode) 

212 self._returncode = returncode 

213 if self._proc.returncode is None: 

214 # asyncio uses a child watcher: copy the status into the Popen 

215 # object. On Python 3.6, it is required to avoid a ResourceWarning. 

216 self._proc.returncode = returncode 

217 self._call(self._protocol.process_exited) 

218 self._try_finish() 

219 

220 # wake up futures waiting for wait() 

221 for waiter in self._exit_waiters: 

222 if not waiter.cancelled(): 

223 waiter.set_result(returncode) 

224 self._exit_waiters = None 

225 

226 async def _wait(self): 

227 """Wait until the process exit and return the process return code. 

228 

229 This method is a coroutine.""" 

230 if self._returncode is not None: 

231 return self._returncode 

232 

233 waiter = self._loop.create_future() 

234 self._exit_waiters.append(waiter) 

235 return await waiter 

236 

237 def _try_finish(self): 

238 assert not self._finished 

239 if self._returncode is None: 

240 return 

241 if all(p is not None and p.disconnected 

242 for p in self._pipes.values()): 

243 self._finished = True 

244 self._call(self._call_connection_lost, None) 

245 

246 def _call_connection_lost(self, exc): 

247 try: 

248 self._protocol.connection_lost(exc) 

249 finally: 

250 self._loop = None 

251 self._proc = None 

252 self._protocol = None 

253 

254 

255class WriteSubprocessPipeProto(protocols.BaseProtocol): 

256 

257 def __init__(self, proc, fd): 

258 self.proc = proc 

259 self.fd = fd 

260 self.pipe = None 

261 self.disconnected = False 

262 

263 def connection_made(self, transport): 

264 self.pipe = transport 

265 

266 def __repr__(self): 

267 return f'<{self.__class__.__name__} fd={self.fd} pipe={self.pipe!r}>' 

268 

269 def connection_lost(self, exc): 

270 self.disconnected = True 

271 self.proc._pipe_connection_lost(self.fd, exc) 

272 self.proc = None 

273 

274 def pause_writing(self): 

275 self.proc._protocol.pause_writing() 

276 

277 def resume_writing(self): 

278 self.proc._protocol.resume_writing() 

279 

280 

281class ReadSubprocessPipeProto(WriteSubprocessPipeProto, 

282 protocols.Protocol): 

283 

284 def data_received(self, data): 

285 self.proc._pipe_data_received(self.fd, data)