Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/face/testing.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Design (and some implementation) of this owes heavily to Click's
2# CliRunner (TODO: bring in license?)
4"""Porting notes:
6* EchoingStdin.read1() needed to exist for py3 and raw_input
7* Not sure why the isolate context manager deals in byte streams and
8 then relegates to the Result to do late encoding (in properties no
9 less). This is especially troublesome because sys.stdout/stderr
10 isn't the same stream as stdout/stderr as returned by the context
11 manager. (see the extra flush calls in run's finally block.) Is
12 it just for parity with py2? There was a related bug, sys.stdout was
13 flushed, but not sys.stderr, which caused py3's error_bytes to come
14 through as blank.
15* sys.stderr had to be flushed, too, on py3 (in invoke's finally)
16* Result.exception was redundant with exc_info
17* Result.stderr raised a ValueError when stderr was empty, not just
18 when it wasn't captured.
19* Instead of isolated_filesystem, I just added chdir to run,
20 because pytest already does temporary directories.
21* Removed echo_stdin (stdin never echos, as it wouldn't with subprocess)
23"""
25import io
26import os
27import sys
28import shlex
29import getpass
30import contextlib
31from subprocess import list2cmdline
32from functools import partial
33from collections.abc import Container
35from boltons.setutils import complement
38def _make_input_stream(input, encoding):
39 if input is None:
40 input = b''
41 elif isinstance(input, str):
42 input = input.encode(encoding)
43 elif not isinstance(input, bytes):
44 raise TypeError(f'expected bytes, text, or None, not: {input!r}')
45 return io.BytesIO(input)
48def _fake_getpass(prompt='Password: ', stream=None):
49 if not stream:
50 stream = sys.stderr
51 input = sys.stdin
52 prompt = str(prompt)
53 if prompt:
54 stream.write(prompt)
55 stream.flush()
56 line = input.readline()
57 if not line:
58 raise EOFError
59 if line[-1] == '\n':
60 line = line[:-1]
61 return line
64class RunResult:
65 """Returned from :meth:`CommandChecker.run()`, complete with the
66 relevant inputs and outputs of the run.
68 Instances of this object are especially valuable for verifying
69 expected output via the :attr:`~RunResult.stdout` and
70 :attr:`~RunResult.stderr` attributes.
72 API modeled after :class:`subprocess.CompletedProcess` for
73 familiarity and porting of tests.
75 """
77 def __init__(self, args, input, exit_code, stdout_bytes, stderr_bytes, exc_info=None, checker=None):
78 self.args = args
79 self.input = input
80 self.checker = checker
81 self.stdout_bytes = stdout_bytes
82 self.stderr_bytes = stderr_bytes
83 self.exit_code = exit_code # integer
85 # if an exception occurred:
86 self.exc_info = exc_info
87 # TODO: exc_info won't be available in subprocess... maybe the
88 # text of a parsed-out traceback? But in general, tracebacks
89 # aren't usually part of CLI UX...
91 @property
92 def exception(self):
93 """Exception instance, if an uncaught error was raised.
94 Equivalent to ``run_res.exc_info[1]``, but more readable."""
95 return self.exc_info[1] if self.exc_info else None
97 @property
98 def returncode(self): # for parity with subprocess.CompletedProcess
99 "Alias of :attr:`exit_code`, for parity with :class:`subprocess.CompletedProcess`"
100 return self.exit_code
102 @property
103 def stdout(self):
104 """The text output ("stdout") of the command, as a decoded
105 string. See :attr:`stdout_bytes` for the bytestring.
106 """
107 return (self.stdout_bytes
108 .decode(self.checker.encoding, 'replace')
109 .replace('\r\n', '\n'))
111 @property
112 def stderr(self):
113 """The error output ("stderr") of the command, as a decoded
114 string. See :attr:`stderr_bytes` for the bytestring. May be
115 ``None`` if *mix_stderr* was set to ``True`` in the
116 :class:`~face.CommandChecker`.
117 """
118 if self.stderr_bytes is None:
119 raise ValueError("stderr not separately captured")
120 return (self.stderr_bytes
121 .decode(self.checker.encoding, 'replace')
122 .replace('\r\n', '\n'))
124 def __repr__(self):
125 # very similar to subprocess.CompleteProcess repr
126 args = [f'args={self.args!r}',
127 f'returncode={self.returncode!r}']
128 if self.stdout_bytes:
129 args.append(f'stdout={self.stdout!r}')
130 if self.stderr_bytes is not None:
131 args.append(f'stderr={self.stderr!r}')
132 if self.exception:
133 args.append(f'exception={self.exception!r}')
134 return f"{self.__class__.__name__}({', '.join(args)})"
138def _get_exp_code_text(exp_codes):
139 try:
140 codes_len = len(exp_codes)
141 except Exception:
142 comp_codes = complement(exp_codes)
143 try:
144 comp_codes = tuple(comp_codes)
145 return f'any code but {comp_codes[0] if len(comp_codes) == 1 else comp_codes!r}'
146 except Exception:
147 return repr(exp_codes)
148 if codes_len == 1:
149 return repr(exp_codes[0])
150 return f'one of {tuple(exp_codes)!r}'
153class CheckError(AssertionError):
154 """Rarely raised directly, :exc:`CheckError` is automatically
155 raised when a :meth:`CommandChecker.run()` call does not terminate
156 with an expected error code.
158 This error attempts to format the stdout, stderr, and stdin of the
159 run for easier debugging.
160 """
161 def __init__(self, result, exit_codes):
162 self.result = result
163 exp_code = _get_exp_code_text(exit_codes)
164 msg = ('Got exit code %r (expected %s) when running command: %s'
165 % (result.exit_code, exp_code, list2cmdline(result.args)))
166 if result.stdout:
167 msg += '\nstdout = """\n'
168 msg += result.stdout
169 msg += '"""\n'
170 if result.stderr_bytes:
171 msg += '\nstderr = """\n'
172 msg += result.stderr
173 msg += '"""\n'
174 if result.input:
175 msg += '\nstdin = """\n'
176 msg += result.input
177 msg += '"""\n'
178 AssertionError.__init__(self, msg)
181class CommandChecker:
182 """Face's main testing interface.
184 Wrap your :class:`Command` instance in a :class:`CommandChecker`,
185 :meth:`~CommandChecker.run()` commands with arguments, and get
186 :class:`RunResult` objects to validate your Command's behavior.
188 Args:
190 cmd: The :class:`Command` instance to test.
191 env (dict): An optional base environment to use for subsequent
192 calls issued through this checker. Defaults to ``{}``.
193 chdir (str): A default path to execute this checker's commands
194 in. Great for temporary directories to ensure test isolation.
195 mix_stderr (bool): Set to ``True`` to capture stderr into
196 stdout. This makes it easier to verify order of standard
197 output and errors. If ``True``, this checker's results'
198 error_bytes will be set to ``None``. Defaults to ``False``.
199 reraise (bool): Reraise uncaught exceptions from within *cmd*'s
200 endpoint functions, instead of returning a :class:`RunResult`
201 instance. Defaults to ``False``.
203 """
204 def __init__(self, cmd, env=None, chdir=None, mix_stderr=False, reraise=False):
205 self.cmd = cmd
206 self.base_env = env or {}
207 self.reraise = reraise
208 self.mix_stderr = mix_stderr
209 self.encoding = 'utf8' # not clear if this should be an arg yet
210 self.chdir = chdir
212 @contextlib.contextmanager
213 def _isolate(self, input=None, env=None, chdir=None):
214 old_cwd = os.getcwd()
215 old_stdin, old_stdout, old_stderr = sys.stdin, sys.stdout, sys.stderr
216 old_getpass = getpass.getpass
218 tmp_stdin = _make_input_stream(input, self.encoding)
220 full_env = dict(self.base_env)
222 chdir = chdir or self.chdir
223 if env:
224 full_env.update(env)
226 bytes_output = io.BytesIO()
227 tmp_stdin = io.TextIOWrapper(tmp_stdin, encoding=self.encoding)
228 tmp_stdout = io.TextIOWrapper(
229 bytes_output, encoding=self.encoding)
230 if self.mix_stderr:
231 tmp_stderr = tmp_stdout
232 else:
233 bytes_error = io.BytesIO()
234 tmp_stderr = io.TextIOWrapper(
235 bytes_error, encoding=self.encoding)
237 old_env = {}
238 try:
239 _sync_env(os.environ, full_env, old_env)
240 if chdir:
241 os.chdir(str(chdir))
242 sys.stdin, sys.stdout, sys.stderr = tmp_stdin, tmp_stdout, tmp_stderr
243 getpass.getpass = _fake_getpass
245 yield (bytes_output, bytes_error if not self.mix_stderr else None)
246 finally:
247 if chdir:
248 os.chdir(old_cwd)
250 _sync_env(os.environ, old_env)
252 # see note above
253 tmp_stdout.flush()
254 tmp_stderr.flush()
255 sys.stdin, sys.stdout, sys.stderr = old_stdin, old_stdout, old_stderr
256 getpass.getpass = old_getpass
258 return
260 def fail(self, *a, **kw):
261 """Convenience method around :meth:`~CommandChecker.run()`, with the
262 same signature, except that this will raise a
263 :exc:`CheckError` if the command completes with exit code
264 ``0``.
265 """
266 kw.setdefault('exit_code', complement({0}))
267 return self.run(*a, **kw)
269 def __getattr__(self, name):
270 if not name.startswith('fail_'):
271 return super().__getattr__(name)
272 _, _, code_str = name.partition('fail_')
273 try:
274 code = [int(cs) for cs in code_str.split('_')]
275 except Exception:
276 raise AttributeError('fail_* shortcuts must end in integers, not %r'
277 % code_str)
278 return partial(self.fail, exit_code=code)
280 def run(self, args, input=None, env=None, chdir=None, exit_code=0):
281 """The :meth:`run` method acts as the primary entrypoint to the
282 :class:`CommandChecker` instance. Pass arguments as a list or
283 string, and receive a :class:`RunResult` with which to verify
284 your command's output.
286 If the arguments do not result in an expected *exit_code*, a
287 :exc:`CheckError` will be raised.
289 Args:
291 args: A list or string representing arguments, as one might
292 find in :attr:`sys.argv` or at the command line.
293 input (str): A string (or list of lines) to be passed to
294 the command's stdin. Used for testing
295 :func:`~face.prompt` interactions, among others.
296 env (dict): A mapping of environment variables to apply on
297 top of the :class:`CommandChecker`'s base env vars.
298 chdir (str): A string (or stringifiable path) path to
299 switch to before running the command. Defaults to
300 ``None`` (runs in current directory).
301 exit_code (int): An integer or list of integer exit codes
302 expected from running the command with *args*. If the
303 actual exit code does not match *exit_code*,
304 :exc:`CheckError` is raised. Set to ``None`` to disable
305 this behavior and always return
306 :class:`RunResult`. Defaults to ``0``.
308 .. note::
310 At this time, :meth:`run` interacts with global process
311 state, and is not designed for parallel usage.
313 """
314 if isinstance(input, (list, tuple)):
315 input = '\n'.join(input)
316 if exit_code is None:
317 exit_codes = ()
318 elif isinstance(exit_code, int):
319 exit_codes = (exit_code,)
320 elif not isinstance(exit_code, Container):
321 raise TypeError('expected exit_code to be None, int, or'
322 ' Container of ints, representing expected'
323 ' exit_codes, not: %r' % (exit_code,))
324 else:
325 exit_codes = exit_code
326 with self._isolate(input=input, env=env, chdir=chdir) as (stdout, stderr):
327 exc_info = None
328 exit_code = 0
330 if isinstance(args, str):
331 args = shlex.split(args)
333 try:
334 res = self.cmd.run(args or ())
335 except SystemExit as se:
336 exc_info = sys.exc_info()
337 exit_code = se.code if se.code is not None else 0
338 except Exception:
339 if self.reraise:
340 raise
341 exit_code = -1 # TODO: something better?
342 exc_info = sys.exc_info()
343 finally:
344 sys.stdout.flush()
345 sys.stderr.flush()
346 stdout_bytes = stdout.getvalue()
347 stderr_bytes = stderr.getvalue() if not self.mix_stderr else None
349 run_res = RunResult(checker=self,
350 args=args,
351 input=input,
352 stdout_bytes=stdout_bytes,
353 stderr_bytes=stderr_bytes,
354 exit_code=exit_code,
355 exc_info=exc_info)
356 if exit_codes and exit_code not in exit_codes:
357 exc = CheckError(run_res, exit_codes)
358 raise exc
359 return run_res
362# syncing os.environ (as opposed to modifying a copy and setting it
363# back) takes care of cases when someone has a reference to environ
364def _sync_env(env, new, backup=None):
365 for key, value in new.items():
366 if backup is not None:
367 backup[key] = env.get(key)
368 if value is not None:
369 env[key] = value
370 continue
371 try:
372 del env[key]
373 except Exception:
374 pass
375 return backup