Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/face/testing.py: 18%
212 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:23 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:23 +0000
1# Design (and some implementation) of this owes heavily to Click's
2# CliRunner (TODO: bring in license?)
4"""Porting notes:
6* EchoingStdin.read1() needed to exist for py3 and raw_input
7* Not sure why the isolate context manager deals in byte streams and
8 then relegates to the Result to do late encoding (in properties no
9 less). This is especially troublesome because sys.stdout/stderr
10 isn't the same stream as stdout/stderr as returned by the context
11 manager. (see the extra flush calls in run's finally block.) Is
12 it just for parity with py2? There was a related bug, sys.stdout was
13 flushed, but not sys.stderr, which caused py3's error_bytes to come
14 through as blank.
15* sys.stderr had to be flushed, too, on py3 (in invoke's finally)
16* Result.exception was redundant with exc_info
17* Result.stderr raised a ValueError when stderr was empty, not just
18 when it wasn't captured.
19* Instead of isolated_filesystem, I just added chdir to run,
20 because pytest already does temporary directories.
21* Removed echo_stdin (stdin never echos, as it wouldn't with subprocess)
23"""
25import os
26import sys
27import shlex
28import getpass
29import contextlib
30from subprocess import list2cmdline
31from functools import partial
33try:
34 from collections.abc import Container
35except ImportError:
36 from collections import Container
38PY2 = sys.version_info[0] == 2
40if PY2:
41 from cStringIO import StringIO
42else:
43 import io
44 unicode = str
47from boltons.setutils import complement
50def _make_input_stream(input, encoding):
51 if input is None:
52 input = b''
53 elif isinstance(input, unicode):
54 input = input.encode(encoding)
55 elif not isinstance(input, bytes):
56 raise TypeError('expected bytes, text, or None, not: %r' % input)
57 if PY2:
58 return StringIO(input)
59 return io.BytesIO(input)
62def _fake_getpass(prompt='Password: ', stream=None):
63 if not stream:
64 stream = sys.stderr
65 input = sys.stdin
66 prompt = str(prompt)
67 if prompt:
68 stream.write(prompt)
69 stream.flush()
70 line = input.readline()
71 if not line:
72 raise EOFError
73 if line[-1] == '\n':
74 line = line[:-1]
75 return line
78class RunResult(object):
79 "Holds the captured result of CommandChecker running a command."
81 def __init__(self, args, input, exit_code, stdout_bytes, stderr_bytes, exc_info=None, checker=None):
82 self.args = args
83 self.input = input
84 self.checker = checker
85 self.stdout_bytes = stdout_bytes
86 self.stderr_bytes = stderr_bytes
87 self.exit_code = exit_code # integer
89 # if an exception occurred:
90 self.exc_info = exc_info
91 # TODO: exc_info won't be available in subprocess... maybe the
92 # text of a parsed-out traceback? But in general, tracebacks
93 # aren't usually part of CLI UX...
95 @property
96 def exception(self):
97 return self.exc_info[1] if self.exc_info else None
99 @property
100 def returncode(self): # for parity with subprocess.CompletedProcess
101 return self.exit_code
103 @property
104 def stdout(self):
105 "The standard output as unicode string."
106 return (self.stdout_bytes
107 .decode(self.checker.encoding, 'replace')
108 .replace('\r\n', '\n'))
110 @property
111 def stderr(self):
112 "The standard error as unicode string."
113 if self.stderr_bytes is None:
114 raise ValueError("stderr not separately captured")
115 return (self.stderr_bytes
116 .decode(self.checker.encoding, 'replace')
117 .replace('\r\n', '\n'))
119 def __repr__(self):
120 # very similar to subprocess.CompleteProcess repr
121 args = ['args={!r}'.format(self.args),
122 'returncode={!r}'.format(self.returncode)]
123 if self.stdout_bytes:
124 args.append('stdout=%r' % (self.stdout,))
125 if self.stderr_bytes is not None:
126 args.append('stderr=%r' % (self.stderr,))
127 if self.exception:
128 args.append('exception=%r' % (self.exception,))
129 return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
131 # TODO:
132 # def check(self):
133 # if self.exit_code != 0:
134 # raise CheckError(repr(self))
137def _get_exp_code_text(exp_codes):
138 try:
139 codes_len = len(exp_codes)
140 except Exception:
141 comp_codes = complement(exp_codes)
142 try:
143 comp_codes = tuple(comp_codes)
144 return 'any code but %r' % (comp_codes[0] if len(comp_codes) == 1 else comp_codes)
145 except Exception:
146 return repr(exp_codes)
147 if codes_len == 1:
148 return repr(exp_codes[0])
149 return 'one of %r' % (tuple(exp_codes),)
152class CheckError(AssertionError):
153 def __init__(self, result, exit_codes):
154 self.result = result
155 exp_code = _get_exp_code_text(exit_codes)
156 msg = ('Got exit code %r (expected %s) when running command: %s'
157 % (result.exit_code, exp_code, list2cmdline(result.args)))
158 if result.stdout:
159 msg += '\nstdout = """\n'
160 msg += result.stdout
161 msg += '"""\n'
162 if result.stderr_bytes:
163 msg += '\nstderr = """\n'
164 msg += result.stderr
165 msg += '"""\n'
166 if result.input:
167 msg += '\nstdin = """\n'
168 msg += result.input
169 msg += '"""\n'
170 AssertionError.__init__(self, msg)
173class CommandChecker(object):
174 def __init__(self, cmd, env=None, chdir=None, mix_stderr=False, reraise=False):
175 self.cmd = cmd
176 self.base_env = env or {}
177 self.reraise = reraise
178 self.mix_stderr = mix_stderr
179 self.encoding = 'utf8' # not clear if this should be an arg yet
180 self.chdir = chdir
182 @contextlib.contextmanager
183 def _isolate(self, input=None, env=None, chdir=None):
184 old_cwd = os.getcwd()
185 old_stdin, old_stdout, old_stderr = sys.stdin, sys.stdout, sys.stderr
186 old_getpass = getpass.getpass
188 tmp_stdin = _make_input_stream(input, self.encoding)
190 full_env = dict(self.base_env)
192 chdir = chdir or self.chdir
193 if env:
194 full_env.update(env)
196 if PY2:
197 tmp_stdout = bytes_output = StringIO()
198 if self.mix_stderr:
199 tmp_stderr = tmp_stdout
200 else:
201 bytes_error = tmp_stderr = StringIO()
202 else:
203 bytes_output = io.BytesIO()
204 tmp_stdin = io.TextIOWrapper(tmp_stdin, encoding=self.encoding)
205 tmp_stdout = io.TextIOWrapper(
206 bytes_output, encoding=self.encoding)
207 if self.mix_stderr:
208 tmp_stderr = tmp_stdout
209 else:
210 bytes_error = io.BytesIO()
211 tmp_stderr = io.TextIOWrapper(
212 bytes_error, encoding=self.encoding)
214 old_env = {}
215 try:
216 _sync_env(os.environ, full_env, old_env)
217 if chdir:
218 os.chdir(str(chdir))
219 sys.stdin, sys.stdout, sys.stderr = tmp_stdin, tmp_stdout, tmp_stderr
220 getpass.getpass = _fake_getpass
222 yield (bytes_output, bytes_error if not self.mix_stderr else None)
223 finally:
224 if chdir:
225 os.chdir(old_cwd)
227 _sync_env(os.environ, old_env)
229 # see note above
230 tmp_stdout.flush()
231 tmp_stderr.flush()
232 sys.stdin, sys.stdout, sys.stderr = old_stdin, old_stdout, old_stderr
233 getpass.getpass = old_getpass
235 return
237 def fail(self, *a, **kw):
238 kw.setdefault('exit_code', complement(set([0])))
239 return self.run(*a, **kw)
241 def __getattr__(self, name):
242 if not name.startswith('fail_'):
243 return super(CommandChecker, self).__getattr__(name)
244 _, _, code_str = name.partition('fail_')
245 try:
246 code = [int(cs) for cs in code_str.split('_')]
247 except Exception:
248 raise AttributeError('fail_* shortcuts must end in integers, not %r'
249 % code_str)
250 return partial(self.fail, exit_code=code)
252 def run(self, args, input=None, env=None, chdir=None, exit_code=0):
253 if isinstance(input, (list, tuple)):
254 input = '\n'.join(input)
255 if exit_code is None:
256 exit_codes = ()
257 elif isinstance(exit_code, int):
258 exit_codes = (exit_code,)
259 elif not isinstance(exit_code, Container):
260 raise TypeError('expected exit_code to be None, int, or'
261 ' Container of ints, representing expected'
262 ' exit_codes, not: %r' % (exit_code,))
263 else:
264 exit_codes = exit_code
265 with self._isolate(input=input, env=env, chdir=chdir) as (stdout, stderr):
266 exc_info = None
267 exit_code = 0
269 if isinstance(args, (str, unicode)):
270 args = shlex.split(args)
272 try:
273 res = self.cmd.run(args or ())
274 except SystemExit as se:
275 exc_info = sys.exc_info()
276 exit_code = se.code if se.code is not None else 0
277 except Exception:
278 if self.reraise:
279 raise
280 exit_code = -1 # TODO: something better?
281 exc_info = sys.exc_info()
282 finally:
283 sys.stdout.flush()
284 sys.stderr.flush()
285 stdout_bytes = stdout.getvalue()
286 stderr_bytes = stderr.getvalue() if not self.mix_stderr else None
288 run_res = RunResult(checker=self,
289 args=args,
290 input=input,
291 stdout_bytes=stdout_bytes,
292 stderr_bytes=stderr_bytes,
293 exit_code=exit_code,
294 exc_info=exc_info)
295 if exit_codes and exit_code not in exit_codes:
296 exc = CheckError(run_res, exit_codes)
297 raise exc
298 return run_res
301# syncing os.environ (as opposed to modifying a copy and setting it
302# back) takes care of cases when someone has a reference to environ
303def _sync_env(env, new, backup=None):
304 if PY2:
305 # py2 expects bytes in os.environ
306 encode = lambda x: x.encode('utf8') if isinstance(x, unicode) else x
307 new = {encode(k): encode(v) for k, v in new.items()}
309 for key, value in new.items():
310 if backup is not None:
311 backup[key] = env.get(key)
312 if value is not None:
313 env[key] = value
314 continue
315 try:
316 del env[key]
317 except Exception:
318 pass
319 return backup