Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/asyncio/subprocess.py: 0%

163 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-01-17 06:13 +0000

1__all__ = 'create_subprocess_exec', 'create_subprocess_shell' 

2 

3import subprocess 

4import warnings 

5 

6from . import events 

7from . import protocols 

8from . import streams 

9from . import tasks 

10from .log import logger 

11 

12 

13PIPE = subprocess.PIPE 

14STDOUT = subprocess.STDOUT 

15DEVNULL = subprocess.DEVNULL 

16 

17 

18class SubprocessStreamProtocol(streams.FlowControlMixin, 

19 protocols.SubprocessProtocol): 

20 """Like StreamReaderProtocol, but for a subprocess.""" 

21 

22 def __init__(self, limit, loop): 

23 super().__init__(loop=loop) 

24 self._limit = limit 

25 self.stdin = self.stdout = self.stderr = None 

26 self._transport = None 

27 self._process_exited = False 

28 self._pipe_fds = [] 

29 self._stdin_closed = self._loop.create_future() 

30 

31 def __repr__(self): 

32 info = [self.__class__.__name__] 

33 if self.stdin is not None: 

34 info.append(f'stdin={self.stdin!r}') 

35 if self.stdout is not None: 

36 info.append(f'stdout={self.stdout!r}') 

37 if self.stderr is not None: 

38 info.append(f'stderr={self.stderr!r}') 

39 return '<{}>'.format(' '.join(info)) 

40 

41 def connection_made(self, transport): 

42 self._transport = transport 

43 

44 stdout_transport = transport.get_pipe_transport(1) 

45 if stdout_transport is not None: 

46 self.stdout = streams.StreamReader(limit=self._limit, 

47 loop=self._loop) 

48 self.stdout.set_transport(stdout_transport) 

49 self._pipe_fds.append(1) 

50 

51 stderr_transport = transport.get_pipe_transport(2) 

52 if stderr_transport is not None: 

53 self.stderr = streams.StreamReader(limit=self._limit, 

54 loop=self._loop) 

55 self.stderr.set_transport(stderr_transport) 

56 self._pipe_fds.append(2) 

57 

58 stdin_transport = transport.get_pipe_transport(0) 

59 if stdin_transport is not None: 

60 self.stdin = streams.StreamWriter(stdin_transport, 

61 protocol=self, 

62 reader=None, 

63 loop=self._loop) 

64 

65 def pipe_data_received(self, fd, data): 

66 if fd == 1: 

67 reader = self.stdout 

68 elif fd == 2: 

69 reader = self.stderr 

70 else: 

71 reader = None 

72 if reader is not None: 

73 reader.feed_data(data) 

74 

75 def pipe_connection_lost(self, fd, exc): 

76 if fd == 0: 

77 pipe = self.stdin 

78 if pipe is not None: 

79 pipe.close() 

80 self.connection_lost(exc) 

81 if exc is None: 

82 self._stdin_closed.set_result(None) 

83 else: 

84 self._stdin_closed.set_exception(exc) 

85 return 

86 if fd == 1: 

87 reader = self.stdout 

88 elif fd == 2: 

89 reader = self.stderr 

90 else: 

91 reader = None 

92 if reader is not None: 

93 if exc is None: 

94 reader.feed_eof() 

95 else: 

96 reader.set_exception(exc) 

97 

98 if fd in self._pipe_fds: 

99 self._pipe_fds.remove(fd) 

100 self._maybe_close_transport() 

101 

102 def process_exited(self): 

103 self._process_exited = True 

104 self._maybe_close_transport() 

105 

106 def _maybe_close_transport(self): 

107 if len(self._pipe_fds) == 0 and self._process_exited: 

108 self._transport.close() 

109 self._transport = None 

110 

111 def _get_close_waiter(self, stream): 

112 if stream is self.stdin: 

113 return self._stdin_closed 

114 

115 

116class Process: 

117 def __init__(self, transport, protocol, loop): 

118 self._transport = transport 

119 self._protocol = protocol 

120 self._loop = loop 

121 self.stdin = protocol.stdin 

122 self.stdout = protocol.stdout 

123 self.stderr = protocol.stderr 

124 self.pid = transport.get_pid() 

125 

126 def __repr__(self): 

127 return f'<{self.__class__.__name__} {self.pid}>' 

128 

129 @property 

130 def returncode(self): 

131 return self._transport.get_returncode() 

132 

133 async def wait(self): 

134 """Wait until the process exit and return the process return code.""" 

135 return await self._transport._wait() 

136 

137 def send_signal(self, signal): 

138 self._transport.send_signal(signal) 

139 

140 def terminate(self): 

141 self._transport.terminate() 

142 

143 def kill(self): 

144 self._transport.kill() 

145 

146 async def _feed_stdin(self, input): 

147 debug = self._loop.get_debug() 

148 self.stdin.write(input) 

149 if debug: 

150 logger.debug( 

151 '%r communicate: feed stdin (%s bytes)', self, len(input)) 

152 try: 

153 await self.stdin.drain() 

154 except (BrokenPipeError, ConnectionResetError) as exc: 

155 # communicate() ignores BrokenPipeError and ConnectionResetError 

156 if debug: 

157 logger.debug('%r communicate: stdin got %r', self, exc) 

158 

159 if debug: 

160 logger.debug('%r communicate: close stdin', self) 

161 self.stdin.close() 

162 

163 async def _noop(self): 

164 return None 

165 

166 async def _read_stream(self, fd): 

167 transport = self._transport.get_pipe_transport(fd) 

168 if fd == 2: 

169 stream = self.stderr 

170 else: 

171 assert fd == 1 

172 stream = self.stdout 

173 if self._loop.get_debug(): 

174 name = 'stdout' if fd == 1 else 'stderr' 

175 logger.debug('%r communicate: read %s', self, name) 

176 output = await stream.read() 

177 if self._loop.get_debug(): 

178 name = 'stdout' if fd == 1 else 'stderr' 

179 logger.debug('%r communicate: close %s', self, name) 

180 transport.close() 

181 return output 

182 

183 async def communicate(self, input=None): 

184 if input is not None: 

185 stdin = self._feed_stdin(input) 

186 else: 

187 stdin = self._noop() 

188 if self.stdout is not None: 

189 stdout = self._read_stream(1) 

190 else: 

191 stdout = self._noop() 

192 if self.stderr is not None: 

193 stderr = self._read_stream(2) 

194 else: 

195 stderr = self._noop() 

196 stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr, 

197 loop=self._loop) 

198 await self.wait() 

199 return (stdout, stderr) 

200 

201 

202async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, 

203 loop=None, limit=streams._DEFAULT_LIMIT, 

204 **kwds): 

205 if loop is None: 

206 loop = events.get_event_loop() 

207 else: 

208 warnings.warn("The loop argument is deprecated since Python 3.8 " 

209 "and scheduled for removal in Python 3.10.", 

210 DeprecationWarning, 

211 stacklevel=2 

212 ) 

213 

214 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, 

215 loop=loop) 

216 transport, protocol = await loop.subprocess_shell( 

217 protocol_factory, 

218 cmd, stdin=stdin, stdout=stdout, 

219 stderr=stderr, **kwds) 

220 return Process(transport, protocol, loop) 

221 

222 

223async def create_subprocess_exec(program, *args, stdin=None, stdout=None, 

224 stderr=None, loop=None, 

225 limit=streams._DEFAULT_LIMIT, **kwds): 

226 if loop is None: 

227 loop = events.get_event_loop() 

228 else: 

229 warnings.warn("The loop argument is deprecated since Python 3.8 " 

230 "and scheduled for removal in Python 3.10.", 

231 DeprecationWarning, 

232 stacklevel=2 

233 ) 

234 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, 

235 loop=loop) 

236 transport, protocol = await loop.subprocess_exec( 

237 protocol_factory, 

238 program, *args, 

239 stdin=stdin, stdout=stdout, 

240 stderr=stderr, **kwds) 

241 return Process(transport, protocol, loop)