Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/face/testing.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Design (and some implementation) of this owes heavily to Click's
2# CliRunner (TODO: bring in license?)
4"""Porting notes:
6* EchoingStdin.read1() needed to exist for py3 and raw_input
7* Not sure why the isolate context manager deals in byte streams and
8 then relegates to the Result to do late encoding (in properties no
9 less). This is especially troublesome because sys.stdout/stderr
10 isn't the same stream as stdout/stderr as returned by the context
11 manager. (see the extra flush calls in run's finally block.) Is
12 it just for parity with py2? There was a related bug, sys.stdout was
13 flushed, but not sys.stderr, which caused py3's error_bytes to come
14 through as blank.
15* sys.stderr had to be flushed, too, on py3 (in invoke's finally)
16* Result.exception was redundant with exc_info
17* Result.stderr raised a ValueError when stderr was empty, not just
18 when it wasn't captured.
19* Instead of isolated_filesystem, I just added chdir to run,
20 because pytest already does temporary directories.
21* Removed echo_stdin (stdin never echos, as it wouldn't with subprocess)
23"""
25import os
26import sys
27import shlex
28import getpass
29import contextlib
30from subprocess import list2cmdline
31from functools import partial
33try:
34 from collections.abc import Container
35except ImportError:
36 from collections import Container
38PY2 = sys.version_info[0] == 2
40if PY2:
41 from cStringIO import StringIO
42else:
43 import io
44 unicode = str
47from boltons.setutils import complement
50def _make_input_stream(input, encoding):
51 if input is None:
52 input = b''
53 elif isinstance(input, unicode):
54 input = input.encode(encoding)
55 elif not isinstance(input, bytes):
56 raise TypeError('expected bytes, text, or None, not: %r' % input)
57 if PY2:
58 return StringIO(input)
59 return io.BytesIO(input)
62def _fake_getpass(prompt='Password: ', stream=None):
63 if not stream:
64 stream = sys.stderr
65 input = sys.stdin
66 prompt = str(prompt)
67 if prompt:
68 stream.write(prompt)
69 stream.flush()
70 line = input.readline()
71 if not line:
72 raise EOFError
73 if line[-1] == '\n':
74 line = line[:-1]
75 return line
78class RunResult(object):
79 """Returned from :meth:`CommandChecker.run()`, complete with the
80 relevant inputs and outputs of the run.
82 Instances of this object are especially valuable for verifying
83 expected output via the :attr:`~RunResult.stdout` and
84 :attr:`~RunResult.stderr` attributes.
86 API modeled after :class:`subprocess.CompletedProcess` for
87 familiarity and porting of tests.
89 """
91 def __init__(self, args, input, exit_code, stdout_bytes, stderr_bytes, exc_info=None, checker=None):
92 self.args = args
93 self.input = input
94 self.checker = checker
95 self.stdout_bytes = stdout_bytes
96 self.stderr_bytes = stderr_bytes
97 self.exit_code = exit_code # integer
99 # if an exception occurred:
100 self.exc_info = exc_info
101 # TODO: exc_info won't be available in subprocess... maybe the
102 # text of a parsed-out traceback? But in general, tracebacks
103 # aren't usually part of CLI UX...
105 @property
106 def exception(self):
107 """Exception instance, if an uncaught error was raised.
108 Equivalent to ``run_res.exc_info[1]``, but more readable."""
109 return self.exc_info[1] if self.exc_info else None
111 @property
112 def returncode(self): # for parity with subprocess.CompletedProcess
113 "Alias of :attr:`exit_code`, for parity with :class:`subprocess.CompletedProcess`"
114 return self.exit_code
116 @property
117 def stdout(self):
118 """The text output ("stdout") of the command, as a decoded
119 string. See :attr:`stdout_bytes` for the bytestring.
120 """
121 return (self.stdout_bytes
122 .decode(self.checker.encoding, 'replace')
123 .replace('\r\n', '\n'))
125 @property
126 def stderr(self):
127 """The error output ("stderr") of the command, as a decoded
128 string. See :attr:`stderr_bytes` for the bytestring. May be
129 ``None`` if *mix_stderr* was set to ``True`` in the
130 :class:`~face.CommandChecker`.
131 """
132 if self.stderr_bytes is None:
133 raise ValueError("stderr not separately captured")
134 return (self.stderr_bytes
135 .decode(self.checker.encoding, 'replace')
136 .replace('\r\n', '\n'))
138 def __repr__(self):
139 # very similar to subprocess.CompleteProcess repr
140 args = ['args={!r}'.format(self.args),
141 'returncode={!r}'.format(self.returncode)]
142 if self.stdout_bytes:
143 args.append('stdout=%r' % (self.stdout,))
144 if self.stderr_bytes is not None:
145 args.append('stderr=%r' % (self.stderr,))
146 if self.exception:
147 args.append('exception=%r' % (self.exception,))
148 return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
152def _get_exp_code_text(exp_codes):
153 try:
154 codes_len = len(exp_codes)
155 except Exception:
156 comp_codes = complement(exp_codes)
157 try:
158 comp_codes = tuple(comp_codes)
159 return 'any code but %r' % (comp_codes[0] if len(comp_codes) == 1 else comp_codes)
160 except Exception:
161 return repr(exp_codes)
162 if codes_len == 1:
163 return repr(exp_codes[0])
164 return 'one of %r' % (tuple(exp_codes),)
167class CheckError(AssertionError):
168 """Rarely raised directly, :exc:`CheckError` is automatically
169 raised when a :meth:`CommandChecker.run()` call does not terminate
170 with an expected error code.
172 This error attempts to format the stdout, stderr, and stdin of the
173 run for easier debugging.
174 """
175 def __init__(self, result, exit_codes):
176 self.result = result
177 exp_code = _get_exp_code_text(exit_codes)
178 msg = ('Got exit code %r (expected %s) when running command: %s'
179 % (result.exit_code, exp_code, list2cmdline(result.args)))
180 if result.stdout:
181 msg += '\nstdout = """\n'
182 msg += result.stdout
183 msg += '"""\n'
184 if result.stderr_bytes:
185 msg += '\nstderr = """\n'
186 msg += result.stderr
187 msg += '"""\n'
188 if result.input:
189 msg += '\nstdin = """\n'
190 msg += result.input
191 msg += '"""\n'
192 AssertionError.__init__(self, msg)
195class CommandChecker(object):
196 """Face's main testing interface.
198 Wrap your :class:`Command` instance in a :class:`CommandChecker`,
199 :meth:`~CommandChecker.run()` commands with arguments, and get
200 :class:`RunResult` objects to validate your Command's behavior.
202 Args:
204 cmd: The :class:`Command` instance to test.
205 env (dict): An optional base environment to use for subsequent
206 calls issued through this checker. Defaults to ``{}``.
207 chdir (str): A default path to execute this checker's commands
208 in. Great for temporary directories to ensure test isolation.
209 mix_stderr (bool): Set to ``True`` to capture stderr into
210 stdout. This makes it easier to verify order of standard
211 output and errors. If ``True``, this checker's results'
212 error_bytes will be set to ``None``. Defaults to ``False``.
213 reraise (bool): Reraise uncaught exceptions from within *cmd*'s
214 endpoint functions, instead of returning a :class:`RunResult`
215 instance. Defaults to ``False``.
217 """
218 def __init__(self, cmd, env=None, chdir=None, mix_stderr=False, reraise=False):
219 self.cmd = cmd
220 self.base_env = env or {}
221 self.reraise = reraise
222 self.mix_stderr = mix_stderr
223 self.encoding = 'utf8' # not clear if this should be an arg yet
224 self.chdir = chdir
226 @contextlib.contextmanager
227 def _isolate(self, input=None, env=None, chdir=None):
228 old_cwd = os.getcwd()
229 old_stdin, old_stdout, old_stderr = sys.stdin, sys.stdout, sys.stderr
230 old_getpass = getpass.getpass
232 tmp_stdin = _make_input_stream(input, self.encoding)
234 full_env = dict(self.base_env)
236 chdir = chdir or self.chdir
237 if env:
238 full_env.update(env)
240 if PY2:
241 tmp_stdout = bytes_output = StringIO()
242 if self.mix_stderr:
243 tmp_stderr = tmp_stdout
244 else:
245 bytes_error = tmp_stderr = StringIO()
246 else:
247 bytes_output = io.BytesIO()
248 tmp_stdin = io.TextIOWrapper(tmp_stdin, encoding=self.encoding)
249 tmp_stdout = io.TextIOWrapper(
250 bytes_output, encoding=self.encoding)
251 if self.mix_stderr:
252 tmp_stderr = tmp_stdout
253 else:
254 bytes_error = io.BytesIO()
255 tmp_stderr = io.TextIOWrapper(
256 bytes_error, encoding=self.encoding)
258 old_env = {}
259 try:
260 _sync_env(os.environ, full_env, old_env)
261 if chdir:
262 os.chdir(str(chdir))
263 sys.stdin, sys.stdout, sys.stderr = tmp_stdin, tmp_stdout, tmp_stderr
264 getpass.getpass = _fake_getpass
266 yield (bytes_output, bytes_error if not self.mix_stderr else None)
267 finally:
268 if chdir:
269 os.chdir(old_cwd)
271 _sync_env(os.environ, old_env)
273 # see note above
274 tmp_stdout.flush()
275 tmp_stderr.flush()
276 sys.stdin, sys.stdout, sys.stderr = old_stdin, old_stdout, old_stderr
277 getpass.getpass = old_getpass
279 return
281 def fail(self, *a, **kw):
282 """Convenience method around :meth:`~CommandChecker.run()`, with the
283 same signature, except that this will raise a
284 :exc:`CheckError` if the command completes with exit code
285 ``0``.
286 """
287 kw.setdefault('exit_code', complement(set([0])))
288 return self.run(*a, **kw)
290 def __getattr__(self, name):
291 if not name.startswith('fail_'):
292 return super(CommandChecker, self).__getattr__(name)
293 _, _, code_str = name.partition('fail_')
294 try:
295 code = [int(cs) for cs in code_str.split('_')]
296 except Exception:
297 raise AttributeError('fail_* shortcuts must end in integers, not %r'
298 % code_str)
299 return partial(self.fail, exit_code=code)
301 def run(self, args, input=None, env=None, chdir=None, exit_code=0):
302 """The :meth:`run` method acts as the primary entrypoint to the
303 :class:`CommandChecker` instance. Pass arguments as a list or
304 string, and receive a :class:`RunResult` with which to verify
305 your command's output.
307 If the arguments do not result in an expected *exit_code*, a
308 :exc:`CheckError` will be raised.
310 Args:
312 args: A list or string representing arguments, as one might
313 find in :attr:`sys.argv` or at the command line.
314 input (str): A string (or list of lines) to be passed to
315 the command's stdin. Used for testing
316 :func:`~face.prompt` interactions, among others.
317 env (dict): A mapping of environment variables to apply on
318 top of the :class:`CommandChecker`'s base env vars.
319 chdir (str): A string (or stringifiable path) path to
320 switch to before running the command. Defaults to
321 ``None`` (runs in current directory).
322 exit_code (int): An integer or list of integer exit codes
323 expected from running the command with *args*. If the
324 actual exit code does not match *exit_code*,
325 :exc:`CheckError` is raised. Set to ``None`` to disable
326 this behavior and always return
327 :class:`RunResult`. Defaults to ``0``.
329 .. note::
331 At this time, :meth:`run` interacts with global process
332 state, and is not designed for parallel usage.
334 """
335 if isinstance(input, (list, tuple)):
336 input = '\n'.join(input)
337 if exit_code is None:
338 exit_codes = ()
339 elif isinstance(exit_code, int):
340 exit_codes = (exit_code,)
341 elif not isinstance(exit_code, Container):
342 raise TypeError('expected exit_code to be None, int, or'
343 ' Container of ints, representing expected'
344 ' exit_codes, not: %r' % (exit_code,))
345 else:
346 exit_codes = exit_code
347 with self._isolate(input=input, env=env, chdir=chdir) as (stdout, stderr):
348 exc_info = None
349 exit_code = 0
351 if isinstance(args, (str, unicode)):
352 args = shlex.split(args)
354 try:
355 res = self.cmd.run(args or ())
356 except SystemExit as se:
357 exc_info = sys.exc_info()
358 exit_code = se.code if se.code is not None else 0
359 except Exception:
360 if self.reraise:
361 raise
362 exit_code = -1 # TODO: something better?
363 exc_info = sys.exc_info()
364 finally:
365 sys.stdout.flush()
366 sys.stderr.flush()
367 stdout_bytes = stdout.getvalue()
368 stderr_bytes = stderr.getvalue() if not self.mix_stderr else None
370 run_res = RunResult(checker=self,
371 args=args,
372 input=input,
373 stdout_bytes=stdout_bytes,
374 stderr_bytes=stderr_bytes,
375 exit_code=exit_code,
376 exc_info=exc_info)
377 if exit_codes and exit_code not in exit_codes:
378 exc = CheckError(run_res, exit_codes)
379 raise exc
380 return run_res
383# syncing os.environ (as opposed to modifying a copy and setting it
384# back) takes care of cases when someone has a reference to environ
385def _sync_env(env, new, backup=None):
386 if PY2:
387 # py2 expects bytes in os.environ
388 encode = lambda x: x.encode('utf8') if isinstance(x, unicode) else x
389 new = {encode(k): encode(v) for k, v in new.items()}
391 for key, value in new.items():
392 if backup is not None:
393 backup[key] = env.get(key)
394 if value is not None:
395 env[key] = value
396 continue
397 try:
398 del env[key]
399 except Exception:
400 pass
401 return backup