Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/face/testing.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

213 statements  

1# Design (and some implementation) of this owes heavily to Click's 

2# CliRunner (TODO: bring in license?) 

3 

4"""Porting notes: 

5 

6* EchoingStdin.read1() needed to exist for py3 and raw_input 

7* Not sure why the isolate context manager deals in byte streams and 

8 then relegates to the Result to do late encoding (in properties no 

9 less). This is especially troublesome because sys.stdout/stderr 

10 isn't the same stream as stdout/stderr as returned by the context 

11 manager. (see the extra flush calls in run's finally block.) Is 

12 it just for parity with py2? There was a related bug, sys.stdout was 

13 flushed, but not sys.stderr, which caused py3's error_bytes to come 

14 through as blank. 

15* sys.stderr had to be flushed, too, on py3 (in invoke's finally) 

16* Result.exception was redundant with exc_info 

17* Result.stderr raised a ValueError when stderr was empty, not just 

18 when it wasn't captured. 

19* Instead of isolated_filesystem, I just added chdir to run, 

20 because pytest already does temporary directories. 

21* Removed echo_stdin (stdin never echos, as it wouldn't with subprocess) 

22 

23""" 

24 

25import os 

26import sys 

27import shlex 

28import getpass 

29import contextlib 

30from subprocess import list2cmdline 

31from functools import partial 

32 

33try: 

34 from collections.abc import Container 

35except ImportError: 

36 from collections import Container 

37 

38PY2 = sys.version_info[0] == 2 

39 

40if PY2: 

41 from cStringIO import StringIO 

42else: 

43 import io 

44 unicode = str 

45 

46 

47from boltons.setutils import complement 

48 

49 

50def _make_input_stream(input, encoding): 

51 if input is None: 

52 input = b'' 

53 elif isinstance(input, unicode): 

54 input = input.encode(encoding) 

55 elif not isinstance(input, bytes): 

56 raise TypeError('expected bytes, text, or None, not: %r' % input) 

57 if PY2: 

58 return StringIO(input) 

59 return io.BytesIO(input) 

60 

61 

62def _fake_getpass(prompt='Password: ', stream=None): 

63 if not stream: 

64 stream = sys.stderr 

65 input = sys.stdin 

66 prompt = str(prompt) 

67 if prompt: 

68 stream.write(prompt) 

69 stream.flush() 

70 line = input.readline() 

71 if not line: 

72 raise EOFError 

73 if line[-1] == '\n': 

74 line = line[:-1] 

75 return line 

76 

77 

78class RunResult(object): 

79 """Returned from :meth:`CommandChecker.run()`, complete with the 

80 relevant inputs and outputs of the run. 

81 

82 Instances of this object are especially valuable for verifying 

83 expected output via the :attr:`~RunResult.stdout` and 

84 :attr:`~RunResult.stderr` attributes. 

85 

86 API modeled after :class:`subprocess.CompletedProcess` for 

87 familiarity and porting of tests. 

88 

89 """ 

90 

91 def __init__(self, args, input, exit_code, stdout_bytes, stderr_bytes, exc_info=None, checker=None): 

92 self.args = args 

93 self.input = input 

94 self.checker = checker 

95 self.stdout_bytes = stdout_bytes 

96 self.stderr_bytes = stderr_bytes 

97 self.exit_code = exit_code # integer 

98 

99 # if an exception occurred: 

100 self.exc_info = exc_info 

101 # TODO: exc_info won't be available in subprocess... maybe the 

102 # text of a parsed-out traceback? But in general, tracebacks 

103 # aren't usually part of CLI UX... 

104 

105 @property 

106 def exception(self): 

107 """Exception instance, if an uncaught error was raised. 

108 Equivalent to ``run_res.exc_info[1]``, but more readable.""" 

109 return self.exc_info[1] if self.exc_info else None 

110 

111 @property 

112 def returncode(self): # for parity with subprocess.CompletedProcess 

113 "Alias of :attr:`exit_code`, for parity with :class:`subprocess.CompletedProcess`" 

114 return self.exit_code 

115 

116 @property 

117 def stdout(self): 

118 """The text output ("stdout") of the command, as a decoded 

119 string. See :attr:`stdout_bytes` for the bytestring. 

120 """ 

121 return (self.stdout_bytes 

122 .decode(self.checker.encoding, 'replace') 

123 .replace('\r\n', '\n')) 

124 

125 @property 

126 def stderr(self): 

127 """The error output ("stderr") of the command, as a decoded 

128 string. See :attr:`stderr_bytes` for the bytestring. May be 

129 ``None`` if *mix_stderr* was set to ``True`` in the 

130 :class:`~face.CommandChecker`. 

131 """ 

132 if self.stderr_bytes is None: 

133 raise ValueError("stderr not separately captured") 

134 return (self.stderr_bytes 

135 .decode(self.checker.encoding, 'replace') 

136 .replace('\r\n', '\n')) 

137 

138 def __repr__(self): 

139 # very similar to subprocess.CompleteProcess repr 

140 args = ['args={!r}'.format(self.args), 

141 'returncode={!r}'.format(self.returncode)] 

142 if self.stdout_bytes: 

143 args.append('stdout=%r' % (self.stdout,)) 

144 if self.stderr_bytes is not None: 

145 args.append('stderr=%r' % (self.stderr,)) 

146 if self.exception: 

147 args.append('exception=%r' % (self.exception,)) 

148 return "%s(%s)" % (self.__class__.__name__, ', '.join(args)) 

149 

150 

151 

152def _get_exp_code_text(exp_codes): 

153 try: 

154 codes_len = len(exp_codes) 

155 except Exception: 

156 comp_codes = complement(exp_codes) 

157 try: 

158 comp_codes = tuple(comp_codes) 

159 return 'any code but %r' % (comp_codes[0] if len(comp_codes) == 1 else comp_codes) 

160 except Exception: 

161 return repr(exp_codes) 

162 if codes_len == 1: 

163 return repr(exp_codes[0]) 

164 return 'one of %r' % (tuple(exp_codes),) 

165 

166 

167class CheckError(AssertionError): 

168 """Rarely raised directly, :exc:`CheckError` is automatically 

169 raised when a :meth:`CommandChecker.run()` call does not terminate 

170 with an expected error code. 

171 

172 This error attempts to format the stdout, stderr, and stdin of the 

173 run for easier debugging. 

174 """ 

175 def __init__(self, result, exit_codes): 

176 self.result = result 

177 exp_code = _get_exp_code_text(exit_codes) 

178 msg = ('Got exit code %r (expected %s) when running command: %s' 

179 % (result.exit_code, exp_code, list2cmdline(result.args))) 

180 if result.stdout: 

181 msg += '\nstdout = """\n' 

182 msg += result.stdout 

183 msg += '"""\n' 

184 if result.stderr_bytes: 

185 msg += '\nstderr = """\n' 

186 msg += result.stderr 

187 msg += '"""\n' 

188 if result.input: 

189 msg += '\nstdin = """\n' 

190 msg += result.input 

191 msg += '"""\n' 

192 AssertionError.__init__(self, msg) 

193 

194 

195class CommandChecker(object): 

196 """Face's main testing interface. 

197 

198 Wrap your :class:`Command` instance in a :class:`CommandChecker`, 

199 :meth:`~CommandChecker.run()` commands with arguments, and get 

200 :class:`RunResult` objects to validate your Command's behavior. 

201 

202 Args: 

203 

204 cmd: The :class:`Command` instance to test. 

205 env (dict): An optional base environment to use for subsequent 

206 calls issued through this checker. Defaults to ``{}``. 

207 chdir (str): A default path to execute this checker's commands 

208 in. Great for temporary directories to ensure test isolation. 

209 mix_stderr (bool): Set to ``True`` to capture stderr into 

210 stdout. This makes it easier to verify order of standard 

211 output and errors. If ``True``, this checker's results' 

212 error_bytes will be set to ``None``. Defaults to ``False``. 

213 reraise (bool): Reraise uncaught exceptions from within *cmd*'s 

214 endpoint functions, instead of returning a :class:`RunResult` 

215 instance. Defaults to ``False``. 

216 

217 """ 

218 def __init__(self, cmd, env=None, chdir=None, mix_stderr=False, reraise=False): 

219 self.cmd = cmd 

220 self.base_env = env or {} 

221 self.reraise = reraise 

222 self.mix_stderr = mix_stderr 

223 self.encoding = 'utf8' # not clear if this should be an arg yet 

224 self.chdir = chdir 

225 

226 @contextlib.contextmanager 

227 def _isolate(self, input=None, env=None, chdir=None): 

228 old_cwd = os.getcwd() 

229 old_stdin, old_stdout, old_stderr = sys.stdin, sys.stdout, sys.stderr 

230 old_getpass = getpass.getpass 

231 

232 tmp_stdin = _make_input_stream(input, self.encoding) 

233 

234 full_env = dict(self.base_env) 

235 

236 chdir = chdir or self.chdir 

237 if env: 

238 full_env.update(env) 

239 

240 if PY2: 

241 tmp_stdout = bytes_output = StringIO() 

242 if self.mix_stderr: 

243 tmp_stderr = tmp_stdout 

244 else: 

245 bytes_error = tmp_stderr = StringIO() 

246 else: 

247 bytes_output = io.BytesIO() 

248 tmp_stdin = io.TextIOWrapper(tmp_stdin, encoding=self.encoding) 

249 tmp_stdout = io.TextIOWrapper( 

250 bytes_output, encoding=self.encoding) 

251 if self.mix_stderr: 

252 tmp_stderr = tmp_stdout 

253 else: 

254 bytes_error = io.BytesIO() 

255 tmp_stderr = io.TextIOWrapper( 

256 bytes_error, encoding=self.encoding) 

257 

258 old_env = {} 

259 try: 

260 _sync_env(os.environ, full_env, old_env) 

261 if chdir: 

262 os.chdir(str(chdir)) 

263 sys.stdin, sys.stdout, sys.stderr = tmp_stdin, tmp_stdout, tmp_stderr 

264 getpass.getpass = _fake_getpass 

265 

266 yield (bytes_output, bytes_error if not self.mix_stderr else None) 

267 finally: 

268 if chdir: 

269 os.chdir(old_cwd) 

270 

271 _sync_env(os.environ, old_env) 

272 

273 # see note above 

274 tmp_stdout.flush() 

275 tmp_stderr.flush() 

276 sys.stdin, sys.stdout, sys.stderr = old_stdin, old_stdout, old_stderr 

277 getpass.getpass = old_getpass 

278 

279 return 

280 

281 def fail(self, *a, **kw): 

282 """Convenience method around :meth:`~CommandChecker.run()`, with the 

283 same signature, except that this will raise a 

284 :exc:`CheckError` if the command completes with exit code 

285 ``0``. 

286 """ 

287 kw.setdefault('exit_code', complement(set([0]))) 

288 return self.run(*a, **kw) 

289 

290 def __getattr__(self, name): 

291 if not name.startswith('fail_'): 

292 return super(CommandChecker, self).__getattr__(name) 

293 _, _, code_str = name.partition('fail_') 

294 try: 

295 code = [int(cs) for cs in code_str.split('_')] 

296 except Exception: 

297 raise AttributeError('fail_* shortcuts must end in integers, not %r' 

298 % code_str) 

299 return partial(self.fail, exit_code=code) 

300 

301 def run(self, args, input=None, env=None, chdir=None, exit_code=0): 

302 """The :meth:`run` method acts as the primary entrypoint to the 

303 :class:`CommandChecker` instance. Pass arguments as a list or 

304 string, and receive a :class:`RunResult` with which to verify 

305 your command's output. 

306 

307 If the arguments do not result in an expected *exit_code*, a 

308 :exc:`CheckError` will be raised. 

309 

310 Args: 

311 

312 args: A list or string representing arguments, as one might 

313 find in :attr:`sys.argv` or at the command line. 

314 input (str): A string (or list of lines) to be passed to 

315 the command's stdin. Used for testing 

316 :func:`~face.prompt` interactions, among others. 

317 env (dict): A mapping of environment variables to apply on 

318 top of the :class:`CommandChecker`'s base env vars. 

319 chdir (str): A string (or stringifiable path) path to 

320 switch to before running the command. Defaults to 

321 ``None`` (runs in current directory). 

322 exit_code (int): An integer or list of integer exit codes 

323 expected from running the command with *args*. If the 

324 actual exit code does not match *exit_code*, 

325 :exc:`CheckError` is raised. Set to ``None`` to disable 

326 this behavior and always return 

327 :class:`RunResult`. Defaults to ``0``. 

328 

329 .. note:: 

330 

331 At this time, :meth:`run` interacts with global process 

332 state, and is not designed for parallel usage. 

333 

334 """ 

335 if isinstance(input, (list, tuple)): 

336 input = '\n'.join(input) 

337 if exit_code is None: 

338 exit_codes = () 

339 elif isinstance(exit_code, int): 

340 exit_codes = (exit_code,) 

341 elif not isinstance(exit_code, Container): 

342 raise TypeError('expected exit_code to be None, int, or' 

343 ' Container of ints, representing expected' 

344 ' exit_codes, not: %r' % (exit_code,)) 

345 else: 

346 exit_codes = exit_code 

347 with self._isolate(input=input, env=env, chdir=chdir) as (stdout, stderr): 

348 exc_info = None 

349 exit_code = 0 

350 

351 if isinstance(args, (str, unicode)): 

352 args = shlex.split(args) 

353 

354 try: 

355 res = self.cmd.run(args or ()) 

356 except SystemExit as se: 

357 exc_info = sys.exc_info() 

358 exit_code = se.code if se.code is not None else 0 

359 except Exception: 

360 if self.reraise: 

361 raise 

362 exit_code = -1 # TODO: something better? 

363 exc_info = sys.exc_info() 

364 finally: 

365 sys.stdout.flush() 

366 sys.stderr.flush() 

367 stdout_bytes = stdout.getvalue() 

368 stderr_bytes = stderr.getvalue() if not self.mix_stderr else None 

369 

370 run_res = RunResult(checker=self, 

371 args=args, 

372 input=input, 

373 stdout_bytes=stdout_bytes, 

374 stderr_bytes=stderr_bytes, 

375 exit_code=exit_code, 

376 exc_info=exc_info) 

377 if exit_codes and exit_code not in exit_codes: 

378 exc = CheckError(run_res, exit_codes) 

379 raise exc 

380 return run_res 

381 

382 

383# syncing os.environ (as opposed to modifying a copy and setting it 

384# back) takes care of cases when someone has a reference to environ 

385def _sync_env(env, new, backup=None): 

386 if PY2: 

387 # py2 expects bytes in os.environ 

388 encode = lambda x: x.encode('utf8') if isinstance(x, unicode) else x 

389 new = {encode(k): encode(v) for k, v in new.items()} 

390 

391 for key, value in new.items(): 

392 if backup is not None: 

393 backup[key] = env.get(key) 

394 if value is not None: 

395 env[key] = value 

396 continue 

397 try: 

398 del env[key] 

399 except Exception: 

400 pass 

401 return backup