Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/face/testing.py: 18%

212 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:23 +0000

1# Design (and some implementation) of this owes heavily to Click's 

2# CliRunner (TODO: bring in license?) 

3 

4"""Porting notes: 

5 

6* EchoingStdin.read1() needed to exist for py3 and raw_input 

7* Not sure why the isolate context manager deals in byte streams and 

8 then relegates to the Result to do late encoding (in properties no 

9 less). This is especially troublesome because sys.stdout/stderr 

10 isn't the same stream as stdout/stderr as returned by the context 

11 manager. (see the extra flush calls in run's finally block.) Is 

12 it just for parity with py2? There was a related bug, sys.stdout was 

13 flushed, but not sys.stderr, which caused py3's error_bytes to come 

14 through as blank. 

15* sys.stderr had to be flushed, too, on py3 (in invoke's finally) 

16* Result.exception was redundant with exc_info 

17* Result.stderr raised a ValueError when stderr was empty, not just 

18 when it wasn't captured. 

19* Instead of isolated_filesystem, I just added chdir to run, 

20 because pytest already does temporary directories. 

21* Removed echo_stdin (stdin never echos, as it wouldn't with subprocess) 

22 

23""" 

24 

25import os 

26import sys 

27import shlex 

28import getpass 

29import contextlib 

30from subprocess import list2cmdline 

31from functools import partial 

32 

33try: 

34 from collections.abc import Container 

35except ImportError: 

36 from collections import Container 

37 

38PY2 = sys.version_info[0] == 2 

39 

40if PY2: 

41 from cStringIO import StringIO 

42else: 

43 import io 

44 unicode = str 

45 

46 

47from boltons.setutils import complement 

48 

49 

50def _make_input_stream(input, encoding): 

51 if input is None: 

52 input = b'' 

53 elif isinstance(input, unicode): 

54 input = input.encode(encoding) 

55 elif not isinstance(input, bytes): 

56 raise TypeError('expected bytes, text, or None, not: %r' % input) 

57 if PY2: 

58 return StringIO(input) 

59 return io.BytesIO(input) 

60 

61 

62def _fake_getpass(prompt='Password: ', stream=None): 

63 if not stream: 

64 stream = sys.stderr 

65 input = sys.stdin 

66 prompt = str(prompt) 

67 if prompt: 

68 stream.write(prompt) 

69 stream.flush() 

70 line = input.readline() 

71 if not line: 

72 raise EOFError 

73 if line[-1] == '\n': 

74 line = line[:-1] 

75 return line 

76 

77 

78class RunResult(object): 

79 "Holds the captured result of CommandChecker running a command." 

80 

81 def __init__(self, args, input, exit_code, stdout_bytes, stderr_bytes, exc_info=None, checker=None): 

82 self.args = args 

83 self.input = input 

84 self.checker = checker 

85 self.stdout_bytes = stdout_bytes 

86 self.stderr_bytes = stderr_bytes 

87 self.exit_code = exit_code # integer 

88 

89 # if an exception occurred: 

90 self.exc_info = exc_info 

91 # TODO: exc_info won't be available in subprocess... maybe the 

92 # text of a parsed-out traceback? But in general, tracebacks 

93 # aren't usually part of CLI UX... 

94 

95 @property 

96 def exception(self): 

97 return self.exc_info[1] if self.exc_info else None 

98 

99 @property 

100 def returncode(self): # for parity with subprocess.CompletedProcess 

101 return self.exit_code 

102 

103 @property 

104 def stdout(self): 

105 "The standard output as unicode string." 

106 return (self.stdout_bytes 

107 .decode(self.checker.encoding, 'replace') 

108 .replace('\r\n', '\n')) 

109 

110 @property 

111 def stderr(self): 

112 "The standard error as unicode string." 

113 if self.stderr_bytes is None: 

114 raise ValueError("stderr not separately captured") 

115 return (self.stderr_bytes 

116 .decode(self.checker.encoding, 'replace') 

117 .replace('\r\n', '\n')) 

118 

119 def __repr__(self): 

120 # very similar to subprocess.CompleteProcess repr 

121 args = ['args={!r}'.format(self.args), 

122 'returncode={!r}'.format(self.returncode)] 

123 if self.stdout_bytes: 

124 args.append('stdout=%r' % (self.stdout,)) 

125 if self.stderr_bytes is not None: 

126 args.append('stderr=%r' % (self.stderr,)) 

127 if self.exception: 

128 args.append('exception=%r' % (self.exception,)) 

129 return "%s(%s)" % (self.__class__.__name__, ', '.join(args)) 

130 

131 # TODO: 

132 # def check(self): 

133 # if self.exit_code != 0: 

134 # raise CheckError(repr(self)) 

135 

136 

137def _get_exp_code_text(exp_codes): 

138 try: 

139 codes_len = len(exp_codes) 

140 except Exception: 

141 comp_codes = complement(exp_codes) 

142 try: 

143 comp_codes = tuple(comp_codes) 

144 return 'any code but %r' % (comp_codes[0] if len(comp_codes) == 1 else comp_codes) 

145 except Exception: 

146 return repr(exp_codes) 

147 if codes_len == 1: 

148 return repr(exp_codes[0]) 

149 return 'one of %r' % (tuple(exp_codes),) 

150 

151 

152class CheckError(AssertionError): 

153 def __init__(self, result, exit_codes): 

154 self.result = result 

155 exp_code = _get_exp_code_text(exit_codes) 

156 msg = ('Got exit code %r (expected %s) when running command: %s' 

157 % (result.exit_code, exp_code, list2cmdline(result.args))) 

158 if result.stdout: 

159 msg += '\nstdout = """\n' 

160 msg += result.stdout 

161 msg += '"""\n' 

162 if result.stderr_bytes: 

163 msg += '\nstderr = """\n' 

164 msg += result.stderr 

165 msg += '"""\n' 

166 if result.input: 

167 msg += '\nstdin = """\n' 

168 msg += result.input 

169 msg += '"""\n' 

170 AssertionError.__init__(self, msg) 

171 

172 

173class CommandChecker(object): 

174 def __init__(self, cmd, env=None, chdir=None, mix_stderr=False, reraise=False): 

175 self.cmd = cmd 

176 self.base_env = env or {} 

177 self.reraise = reraise 

178 self.mix_stderr = mix_stderr 

179 self.encoding = 'utf8' # not clear if this should be an arg yet 

180 self.chdir = chdir 

181 

182 @contextlib.contextmanager 

183 def _isolate(self, input=None, env=None, chdir=None): 

184 old_cwd = os.getcwd() 

185 old_stdin, old_stdout, old_stderr = sys.stdin, sys.stdout, sys.stderr 

186 old_getpass = getpass.getpass 

187 

188 tmp_stdin = _make_input_stream(input, self.encoding) 

189 

190 full_env = dict(self.base_env) 

191 

192 chdir = chdir or self.chdir 

193 if env: 

194 full_env.update(env) 

195 

196 if PY2: 

197 tmp_stdout = bytes_output = StringIO() 

198 if self.mix_stderr: 

199 tmp_stderr = tmp_stdout 

200 else: 

201 bytes_error = tmp_stderr = StringIO() 

202 else: 

203 bytes_output = io.BytesIO() 

204 tmp_stdin = io.TextIOWrapper(tmp_stdin, encoding=self.encoding) 

205 tmp_stdout = io.TextIOWrapper( 

206 bytes_output, encoding=self.encoding) 

207 if self.mix_stderr: 

208 tmp_stderr = tmp_stdout 

209 else: 

210 bytes_error = io.BytesIO() 

211 tmp_stderr = io.TextIOWrapper( 

212 bytes_error, encoding=self.encoding) 

213 

214 old_env = {} 

215 try: 

216 _sync_env(os.environ, full_env, old_env) 

217 if chdir: 

218 os.chdir(str(chdir)) 

219 sys.stdin, sys.stdout, sys.stderr = tmp_stdin, tmp_stdout, tmp_stderr 

220 getpass.getpass = _fake_getpass 

221 

222 yield (bytes_output, bytes_error if not self.mix_stderr else None) 

223 finally: 

224 if chdir: 

225 os.chdir(old_cwd) 

226 

227 _sync_env(os.environ, old_env) 

228 

229 # see note above 

230 tmp_stdout.flush() 

231 tmp_stderr.flush() 

232 sys.stdin, sys.stdout, sys.stderr = old_stdin, old_stdout, old_stderr 

233 getpass.getpass = old_getpass 

234 

235 return 

236 

237 def fail(self, *a, **kw): 

238 kw.setdefault('exit_code', complement(set([0]))) 

239 return self.run(*a, **kw) 

240 

241 def __getattr__(self, name): 

242 if not name.startswith('fail_'): 

243 return super(CommandChecker, self).__getattr__(name) 

244 _, _, code_str = name.partition('fail_') 

245 try: 

246 code = [int(cs) for cs in code_str.split('_')] 

247 except Exception: 

248 raise AttributeError('fail_* shortcuts must end in integers, not %r' 

249 % code_str) 

250 return partial(self.fail, exit_code=code) 

251 

252 def run(self, args, input=None, env=None, chdir=None, exit_code=0): 

253 if isinstance(input, (list, tuple)): 

254 input = '\n'.join(input) 

255 if exit_code is None: 

256 exit_codes = () 

257 elif isinstance(exit_code, int): 

258 exit_codes = (exit_code,) 

259 elif not isinstance(exit_code, Container): 

260 raise TypeError('expected exit_code to be None, int, or' 

261 ' Container of ints, representing expected' 

262 ' exit_codes, not: %r' % (exit_code,)) 

263 else: 

264 exit_codes = exit_code 

265 with self._isolate(input=input, env=env, chdir=chdir) as (stdout, stderr): 

266 exc_info = None 

267 exit_code = 0 

268 

269 if isinstance(args, (str, unicode)): 

270 args = shlex.split(args) 

271 

272 try: 

273 res = self.cmd.run(args or ()) 

274 except SystemExit as se: 

275 exc_info = sys.exc_info() 

276 exit_code = se.code if se.code is not None else 0 

277 except Exception: 

278 if self.reraise: 

279 raise 

280 exit_code = -1 # TODO: something better? 

281 exc_info = sys.exc_info() 

282 finally: 

283 sys.stdout.flush() 

284 sys.stderr.flush() 

285 stdout_bytes = stdout.getvalue() 

286 stderr_bytes = stderr.getvalue() if not self.mix_stderr else None 

287 

288 run_res = RunResult(checker=self, 

289 args=args, 

290 input=input, 

291 stdout_bytes=stdout_bytes, 

292 stderr_bytes=stderr_bytes, 

293 exit_code=exit_code, 

294 exc_info=exc_info) 

295 if exit_codes and exit_code not in exit_codes: 

296 exc = CheckError(run_res, exit_codes) 

297 raise exc 

298 return run_res 

299 

300 

301# syncing os.environ (as opposed to modifying a copy and setting it 

302# back) takes care of cases when someone has a reference to environ 

303def _sync_env(env, new, backup=None): 

304 if PY2: 

305 # py2 expects bytes in os.environ 

306 encode = lambda x: x.encode('utf8') if isinstance(x, unicode) else x 

307 new = {encode(k): encode(v) for k, v in new.items()} 

308 

309 for key, value in new.items(): 

310 if backup is not None: 

311 backup[key] = env.get(key) 

312 if value is not None: 

313 env[key] = value 

314 continue 

315 try: 

316 del env[key] 

317 except Exception: 

318 pass 

319 return backup