Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/face/testing.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

196 statements  

1# Design (and some implementation) of this owes heavily to Click's 

2# CliRunner (TODO: bring in license?) 

3 

4"""Porting notes: 

5 

6* EchoingStdin.read1() needed to exist for py3 and raw_input 

7* Not sure why the isolate context manager deals in byte streams and 

8 then relegates to the Result to do late encoding (in properties no 

9 less). This is especially troublesome because sys.stdout/stderr 

10 isn't the same stream as stdout/stderr as returned by the context 

11 manager. (see the extra flush calls in run's finally block.) Is 

12 it just for parity with py2? There was a related bug, sys.stdout was 

13 flushed, but not sys.stderr, which caused py3's error_bytes to come 

14 through as blank. 

15* sys.stderr had to be flushed, too, on py3 (in invoke's finally) 

16* Result.exception was redundant with exc_info 

17* Result.stderr raised a ValueError when stderr was empty, not just 

18 when it wasn't captured. 

19* Instead of isolated_filesystem, I just added chdir to run, 

20 because pytest already does temporary directories. 

21* Removed echo_stdin (stdin never echos, as it wouldn't with subprocess) 

22 

23""" 

24 

25import io 

26import os 

27import sys 

28import shlex 

29import getpass 

30import contextlib 

31from subprocess import list2cmdline 

32from functools import partial 

33from collections.abc import Container 

34 

35from boltons.setutils import complement 

36 

37 

38def _make_input_stream(input, encoding): 

39 if input is None: 

40 input = b'' 

41 elif isinstance(input, str): 

42 input = input.encode(encoding) 

43 elif not isinstance(input, bytes): 

44 raise TypeError(f'expected bytes, text, or None, not: {input!r}') 

45 return io.BytesIO(input) 

46 

47 

48def _fake_getpass(prompt='Password: ', stream=None): 

49 if not stream: 

50 stream = sys.stderr 

51 input = sys.stdin 

52 prompt = str(prompt) 

53 if prompt: 

54 stream.write(prompt) 

55 stream.flush() 

56 line = input.readline() 

57 if not line: 

58 raise EOFError 

59 if line[-1] == '\n': 

60 line = line[:-1] 

61 return line 

62 

63 

64class RunResult: 

65 """Returned from :meth:`CommandChecker.run()`, complete with the 

66 relevant inputs and outputs of the run. 

67 

68 Instances of this object are especially valuable for verifying 

69 expected output via the :attr:`~RunResult.stdout` and 

70 :attr:`~RunResult.stderr` attributes. 

71 

72 API modeled after :class:`subprocess.CompletedProcess` for 

73 familiarity and porting of tests. 

74 

75 """ 

76 

77 def __init__(self, args, input, exit_code, stdout_bytes, stderr_bytes, exc_info=None, checker=None): 

78 self.args = args 

79 self.input = input 

80 self.checker = checker 

81 self.stdout_bytes = stdout_bytes 

82 self.stderr_bytes = stderr_bytes 

83 self.exit_code = exit_code # integer 

84 

85 # if an exception occurred: 

86 self.exc_info = exc_info 

87 # TODO: exc_info won't be available in subprocess... maybe the 

88 # text of a parsed-out traceback? But in general, tracebacks 

89 # aren't usually part of CLI UX... 

90 

91 @property 

92 def exception(self): 

93 """Exception instance, if an uncaught error was raised. 

94 Equivalent to ``run_res.exc_info[1]``, but more readable.""" 

95 return self.exc_info[1] if self.exc_info else None 

96 

97 @property 

98 def returncode(self): # for parity with subprocess.CompletedProcess 

99 "Alias of :attr:`exit_code`, for parity with :class:`subprocess.CompletedProcess`" 

100 return self.exit_code 

101 

102 @property 

103 def stdout(self): 

104 """The text output ("stdout") of the command, as a decoded 

105 string. See :attr:`stdout_bytes` for the bytestring. 

106 """ 

107 return (self.stdout_bytes 

108 .decode(self.checker.encoding, 'replace') 

109 .replace('\r\n', '\n')) 

110 

111 @property 

112 def stderr(self): 

113 """The error output ("stderr") of the command, as a decoded 

114 string. See :attr:`stderr_bytes` for the bytestring. May be 

115 ``None`` if *mix_stderr* was set to ``True`` in the 

116 :class:`~face.CommandChecker`. 

117 """ 

118 if self.stderr_bytes is None: 

119 raise ValueError("stderr not separately captured") 

120 return (self.stderr_bytes 

121 .decode(self.checker.encoding, 'replace') 

122 .replace('\r\n', '\n')) 

123 

124 def __repr__(self): 

125 # very similar to subprocess.CompleteProcess repr 

126 args = [f'args={self.args!r}', 

127 f'returncode={self.returncode!r}'] 

128 if self.stdout_bytes: 

129 args.append(f'stdout={self.stdout!r}') 

130 if self.stderr_bytes is not None: 

131 args.append(f'stderr={self.stderr!r}') 

132 if self.exception: 

133 args.append(f'exception={self.exception!r}') 

134 return f"{self.__class__.__name__}({', '.join(args)})" 

135 

136 

137 

138def _get_exp_code_text(exp_codes): 

139 try: 

140 codes_len = len(exp_codes) 

141 except Exception: 

142 comp_codes = complement(exp_codes) 

143 try: 

144 comp_codes = tuple(comp_codes) 

145 return f'any code but {comp_codes[0] if len(comp_codes) == 1 else comp_codes!r}' 

146 except Exception: 

147 return repr(exp_codes) 

148 if codes_len == 1: 

149 return repr(exp_codes[0]) 

150 return f'one of {tuple(exp_codes)!r}' 

151 

152 

153class CheckError(AssertionError): 

154 """Rarely raised directly, :exc:`CheckError` is automatically 

155 raised when a :meth:`CommandChecker.run()` call does not terminate 

156 with an expected error code. 

157 

158 This error attempts to format the stdout, stderr, and stdin of the 

159 run for easier debugging. 

160 """ 

161 def __init__(self, result, exit_codes): 

162 self.result = result 

163 exp_code = _get_exp_code_text(exit_codes) 

164 msg = ('Got exit code %r (expected %s) when running command: %s' 

165 % (result.exit_code, exp_code, list2cmdline(result.args))) 

166 if result.stdout: 

167 msg += '\nstdout = """\n' 

168 msg += result.stdout 

169 msg += '"""\n' 

170 if result.stderr_bytes: 

171 msg += '\nstderr = """\n' 

172 msg += result.stderr 

173 msg += '"""\n' 

174 if result.input: 

175 msg += '\nstdin = """\n' 

176 msg += result.input 

177 msg += '"""\n' 

178 AssertionError.__init__(self, msg) 

179 

180 

181class CommandChecker: 

182 """Face's main testing interface. 

183 

184 Wrap your :class:`Command` instance in a :class:`CommandChecker`, 

185 :meth:`~CommandChecker.run()` commands with arguments, and get 

186 :class:`RunResult` objects to validate your Command's behavior. 

187 

188 Args: 

189 

190 cmd: The :class:`Command` instance to test. 

191 env (dict): An optional base environment to use for subsequent 

192 calls issued through this checker. Defaults to ``{}``. 

193 chdir (str): A default path to execute this checker's commands 

194 in. Great for temporary directories to ensure test isolation. 

195 mix_stderr (bool): Set to ``True`` to capture stderr into 

196 stdout. This makes it easier to verify order of standard 

197 output and errors. If ``True``, this checker's results' 

198 error_bytes will be set to ``None``. Defaults to ``False``. 

199 reraise (bool): Reraise uncaught exceptions from within *cmd*'s 

200 endpoint functions, instead of returning a :class:`RunResult` 

201 instance. Defaults to ``False``. 

202 

203 """ 

204 def __init__(self, cmd, env=None, chdir=None, mix_stderr=False, reraise=False): 

205 self.cmd = cmd 

206 self.base_env = env or {} 

207 self.reraise = reraise 

208 self.mix_stderr = mix_stderr 

209 self.encoding = 'utf8' # not clear if this should be an arg yet 

210 self.chdir = chdir 

211 

212 @contextlib.contextmanager 

213 def _isolate(self, input=None, env=None, chdir=None): 

214 old_cwd = os.getcwd() 

215 old_stdin, old_stdout, old_stderr = sys.stdin, sys.stdout, sys.stderr 

216 old_getpass = getpass.getpass 

217 

218 tmp_stdin = _make_input_stream(input, self.encoding) 

219 

220 full_env = dict(self.base_env) 

221 

222 chdir = chdir or self.chdir 

223 if env: 

224 full_env.update(env) 

225 

226 bytes_output = io.BytesIO() 

227 tmp_stdin = io.TextIOWrapper(tmp_stdin, encoding=self.encoding) 

228 tmp_stdout = io.TextIOWrapper( 

229 bytes_output, encoding=self.encoding) 

230 if self.mix_stderr: 

231 tmp_stderr = tmp_stdout 

232 else: 

233 bytes_error = io.BytesIO() 

234 tmp_stderr = io.TextIOWrapper( 

235 bytes_error, encoding=self.encoding) 

236 

237 old_env = {} 

238 try: 

239 _sync_env(os.environ, full_env, old_env) 

240 if chdir: 

241 os.chdir(str(chdir)) 

242 sys.stdin, sys.stdout, sys.stderr = tmp_stdin, tmp_stdout, tmp_stderr 

243 getpass.getpass = _fake_getpass 

244 

245 yield (bytes_output, bytes_error if not self.mix_stderr else None) 

246 finally: 

247 if chdir: 

248 os.chdir(old_cwd) 

249 

250 _sync_env(os.environ, old_env) 

251 

252 # see note above 

253 tmp_stdout.flush() 

254 tmp_stderr.flush() 

255 sys.stdin, sys.stdout, sys.stderr = old_stdin, old_stdout, old_stderr 

256 getpass.getpass = old_getpass 

257 

258 return 

259 

260 def fail(self, *a, **kw): 

261 """Convenience method around :meth:`~CommandChecker.run()`, with the 

262 same signature, except that this will raise a 

263 :exc:`CheckError` if the command completes with exit code 

264 ``0``. 

265 """ 

266 kw.setdefault('exit_code', complement({0})) 

267 return self.run(*a, **kw) 

268 

269 def __getattr__(self, name): 

270 if not name.startswith('fail_'): 

271 return super().__getattr__(name) 

272 _, _, code_str = name.partition('fail_') 

273 try: 

274 code = [int(cs) for cs in code_str.split('_')] 

275 except Exception: 

276 raise AttributeError('fail_* shortcuts must end in integers, not %r' 

277 % code_str) 

278 return partial(self.fail, exit_code=code) 

279 

280 def run(self, args, input=None, env=None, chdir=None, exit_code=0): 

281 """The :meth:`run` method acts as the primary entrypoint to the 

282 :class:`CommandChecker` instance. Pass arguments as a list or 

283 string, and receive a :class:`RunResult` with which to verify 

284 your command's output. 

285 

286 If the arguments do not result in an expected *exit_code*, a 

287 :exc:`CheckError` will be raised. 

288 

289 Args: 

290 

291 args: A list or string representing arguments, as one might 

292 find in :attr:`sys.argv` or at the command line. 

293 input (str): A string (or list of lines) to be passed to 

294 the command's stdin. Used for testing 

295 :func:`~face.prompt` interactions, among others. 

296 env (dict): A mapping of environment variables to apply on 

297 top of the :class:`CommandChecker`'s base env vars. 

298 chdir (str): A string (or stringifiable path) path to 

299 switch to before running the command. Defaults to 

300 ``None`` (runs in current directory). 

301 exit_code (int): An integer or list of integer exit codes 

302 expected from running the command with *args*. If the 

303 actual exit code does not match *exit_code*, 

304 :exc:`CheckError` is raised. Set to ``None`` to disable 

305 this behavior and always return 

306 :class:`RunResult`. Defaults to ``0``. 

307 

308 .. note:: 

309 

310 At this time, :meth:`run` interacts with global process 

311 state, and is not designed for parallel usage. 

312 

313 """ 

314 if isinstance(input, (list, tuple)): 

315 input = '\n'.join(input) 

316 if exit_code is None: 

317 exit_codes = () 

318 elif isinstance(exit_code, int): 

319 exit_codes = (exit_code,) 

320 elif not isinstance(exit_code, Container): 

321 raise TypeError('expected exit_code to be None, int, or' 

322 ' Container of ints, representing expected' 

323 ' exit_codes, not: %r' % (exit_code,)) 

324 else: 

325 exit_codes = exit_code 

326 with self._isolate(input=input, env=env, chdir=chdir) as (stdout, stderr): 

327 exc_info = None 

328 exit_code = 0 

329 

330 if isinstance(args, str): 

331 args = shlex.split(args) 

332 

333 try: 

334 res = self.cmd.run(args or ()) 

335 except SystemExit as se: 

336 exc_info = sys.exc_info() 

337 exit_code = se.code if se.code is not None else 0 

338 except Exception: 

339 if self.reraise: 

340 raise 

341 exit_code = -1 # TODO: something better? 

342 exc_info = sys.exc_info() 

343 finally: 

344 sys.stdout.flush() 

345 sys.stderr.flush() 

346 stdout_bytes = stdout.getvalue() 

347 stderr_bytes = stderr.getvalue() if not self.mix_stderr else None 

348 

349 run_res = RunResult(checker=self, 

350 args=args, 

351 input=input, 

352 stdout_bytes=stdout_bytes, 

353 stderr_bytes=stderr_bytes, 

354 exit_code=exit_code, 

355 exc_info=exc_info) 

356 if exit_codes and exit_code not in exit_codes: 

357 exc = CheckError(run_res, exit_codes) 

358 raise exc 

359 return run_res 

360 

361 

362# syncing os.environ (as opposed to modifying a copy and setting it 

363# back) takes care of cases when someone has a reference to environ 

364def _sync_env(env, new, backup=None): 

365 for key, value in new.items(): 

366 if backup is not None: 

367 backup[key] = env.get(key) 

368 if value is not None: 

369 env[key] = value 

370 continue 

371 try: 

372 del env[key] 

373 except Exception: 

374 pass 

375 return backup