Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/face/command.py: 14%

182 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:23 +0000

1 

2from __future__ import print_function 

3 

4import sys 

5from collections import OrderedDict 

6 

7from face.utils import unwrap_text, get_rdep_map, echo 

8from face.errors import ArgumentParseError, CommandLineError, UsageError 

9from face.parser import Parser, Flag 

10from face.helpers import HelpHandler 

11from face.middleware import (inject, 

12 get_arg_names, 

13 is_middleware, 

14 face_middleware, 

15 check_middleware, 

16 get_middleware_chain, 

17 _BUILTIN_PROVIDES) 

18 

19from boltons.strutils import camel2under 

20from boltons.iterutils import unique 

21 

22 

23def _get_default_name(func): 

24 from functools import partial 

25 if isinstance(func, partial): 

26 func = func.func # just one level of partial for now 

27 

28 # func_name on py2, __name__ on py3 

29 ret = getattr(func, 'func_name', getattr(func, '__name__', None)) # most functions hit this 

30 

31 if ret is None: 

32 ret = camel2under(func.__class__.__name__).lower() # callable instances, etc. 

33 

34 return ret 

35 

36 

37def _docstring_to_doc(func): 

38 doc = func.__doc__ 

39 if not doc: 

40 return '' 

41 

42 unwrapped = unwrap_text(doc) 

43 try: 

44 ret = [g for g in unwrapped.splitlines() if g][0] 

45 except IndexError: 

46 ret = '' 

47 

48 return ret 

49 

50 

51def default_print_error(msg): 

52 return echo.err(msg) 

53 

54 

55DEFAULT_HELP_HANDLER = HelpHandler() 

56 

57 

58# TODO: should name really go here? 

59class Command(Parser): 

60 def __init__(self, func, name=None, doc=None, **kwargs): 

61 """The central type in the face framework. Instantiate a Command, 

62 populate it with flags and subcommands, and then call 

63 command.run() to execute your CLI. 

64 

65 Note that only the first three constructor arguments are 

66 positional, the rest are keyword-only. 

67 

68 Args: 

69 func (callable): The function called when this command is 

70 run with an argv that contains no subcommands. 

71 name (str): The name of this command, used when this 

72 command is included as a subcommand. (Defaults to name 

73 of function) 

74 doc (str): A description or message that appears in various 

75 help outputs. 

76 flags (list): A list of Flag instances to initialize the 

77 Command with. Flags can always be added later with the 

78 .add() method. 

79 posargs (bool): Pass True if the command takes positional 

80 arguments. Defaults to False. Can also pass a PosArgSpec 

81 instance. 

82 post_posargs (bool): Pass True if the command takes 

83 additional positional arguments after a conventional '--' 

84 specifier. 

85 help (bool): Pass False to disable the automatically added 

86 --help flag. Defaults to True. Also accepts a HelpHandler 

87 instance, see those docs for more details. 

88 middlewares (list): A list of @face_middleware decorated 

89 callables which participate in dispatch. Also addable 

90 via the .add() method. See Middleware docs for more 

91 details. 

92 

93 """ 

94 name = name if name is not None else _get_default_name(func) 

95 

96 if doc is None: 

97 doc = _docstring_to_doc(func) 

98 

99 # TODO: default posargs if none by inspecting func 

100 super(Command, self).__init__(name, doc, 

101 flags=kwargs.pop('flags', None), 

102 posargs=kwargs.pop('posargs', None), 

103 post_posargs=kwargs.pop('post_posargs', None), 

104 flagfile=kwargs.pop('flagfile', True)) 

105 

106 _help = kwargs.pop('help', DEFAULT_HELP_HANDLER) 

107 self.help_handler = _help 

108 

109 # TODO: if func is callable, check that "next_" isn't taken 

110 self._path_func_map = OrderedDict() 

111 self._path_func_map[()] = func 

112 

113 middlewares = list(kwargs.pop('middlewares', None) or []) 

114 self._path_mw_map = OrderedDict() 

115 self._path_mw_map[()] = [] 

116 self._path_wrapped_map = OrderedDict() 

117 self._path_wrapped_map[()] = func 

118 for mw in middlewares: 

119 self.add_middleware(mw) 

120 

121 if kwargs: 

122 raise TypeError('unexpected keyword arguments: %r' % sorted(kwargs.keys())) 

123 

124 if _help: 

125 if _help.flag: 

126 self.add(_help.flag) 

127 if _help.subcmd: 

128 self.add(_help.func, _help.subcmd) # for 'help' as a subcmd 

129 

130 if not func and not _help: 

131 raise ValueError('Command requires a handler function or help handler' 

132 ' to be set, not: %r' % func) 

133 

134 return 

135 

136 @property 

137 def func(self): 

138 return self._path_func_map[()] 

139 

140 def add(self, *a, **kw): 

141 """Add a flag, subcommand, or middleware to this Command. 

142 

143 If the first argument is a callable, this method contructs a 

144 Command from it and the remaining arguments, all of which are 

145 optional. See the Command docs for for full details on names 

146 and defaults. 

147 

148 If the first argument is a string, this method constructs a 

149 Flag from that flag string and the rest of the method 

150 arguments, all of which are optional. See the Flag docs for 

151 more options. 

152 

153 If the argument is already an instance of Flag or Command, an 

154 exception is only raised on conflicting subcommands and 

155 flags. See add_command for details. 

156 

157 Middleware is only added if it is already decorated with 

158 @face_middleware. Use .add_middleware() for automatic wrapping 

159 of callables. 

160 

161 """ 

162 # TODO: need to check for middleware provides names + flag names 

163 # conflict 

164 

165 target = a[0] 

166 

167 if is_middleware(target): 

168 return self.add_middleware(target) 

169 

170 subcmd = a[0] 

171 if not isinstance(subcmd, Command) and callable(subcmd) or subcmd is None: 

172 subcmd = Command(*a, **kw) # attempt to construct a new subcmd 

173 

174 if isinstance(subcmd, Command): 

175 self.add_command(subcmd) 

176 return subcmd 

177 

178 flag = a[0] 

179 if not isinstance(flag, Flag): 

180 flag = Flag(*a, **kw) # attempt to construct a Flag from arguments 

181 super(Command, self).add(flag) 

182 

183 return flag 

184 

185 def add_command(self, subcmd): 

186 """Add a Command, and all of its subcommands, as a subcommand of this 

187 Command. 

188 

189 Middleware from the current command is layered on top of the 

190 subcommand's. An exception may be raised if there are 

191 conflicting middlewares or subcommand names. 

192 """ 

193 if not isinstance(subcmd, Command): 

194 raise TypeError('expected Command instance, not: %r' % subcmd) 

195 self_mw = self._path_mw_map[()] 

196 super(Command, self).add(subcmd) 

197 # map in new functions 

198 for path in self.subprs_map: 

199 if path not in self._path_func_map: 

200 self._path_func_map[path] = subcmd._path_func_map[path[1:]] 

201 sub_mw = subcmd._path_mw_map[path[1:]] 

202 self._path_mw_map[path] = self_mw + sub_mw # TODO: check for conflicts 

203 return 

204 

205 def add_middleware(self, mw): 

206 """Add a single middleware to this command. Outermost middleware 

207 should be added first. Remember: first added, first called. 

208 

209 """ 

210 if not is_middleware(mw): 

211 mw = face_middleware(mw) 

212 check_middleware(mw) 

213 

214 for flag in mw._face_flags: 

215 self.add(flag) 

216 

217 for path, mws in self._path_mw_map.items(): 

218 self._path_mw_map[path] = [mw] + mws # TODO: check for conflicts 

219 

220 return 

221 

222 def get_flag_map(self, path=(), with_hidden=True): 

223 """Command's get_flag_map differs from Parser's in that it filters 

224 the flag map to just the flags used by the endpoint at the 

225 associated subcommand *path*. 

226 """ 

227 flag_map = super(Command, self).get_flag_map(path=path, with_hidden=with_hidden) 

228 dep_names = self.get_dep_names(path) 

229 if 'args_' in dep_names or 'flags_' in dep_names: 

230 # the argument parse result and flag dict both capture 

231 # _all_ the flags, so for functions accepting these 

232 # arguments we bypass filtering. 

233 

234 # Also note that by setting an argument default in the 

235 # function definition, the dependency becomes "weak", and 

236 # this bypassing of filtering will not trigger, unless 

237 # another function in the chain has a non-default, 

238 # "strong" dependency. This behavior is especially useful 

239 # for middleware. 

240 

241 # TODO: add decorator for the corner case where a function 

242 # accepts these arguments and doesn't use them all. 

243 return OrderedDict(flag_map) 

244 

245 return OrderedDict([(k, f) for k, f in flag_map.items() if f.name in dep_names 

246 or f is self.flagfile_flag or f is self.help_handler.flag]) 

247 

248 def get_dep_names(self, path=()): 

249 """Get a list of the names of all required arguments of a command (and 

250 any associated middleware). 

251 

252 By specifying *path*, the same can be done for any subcommand. 

253 """ 

254 func = self._path_func_map[path] 

255 if not func: 

256 return [] # for when no handler is specified 

257 

258 mws = self._path_mw_map[path] 

259 

260 # start out with all args of handler function, which gets stronger dependencies 

261 required_args = set(get_arg_names(func, only_required=False)) 

262 dep_map = {func: set(required_args)} 

263 for mw in mws: 

264 arg_names = set(get_arg_names(mw, only_required=True)) 

265 for provide in mw._face_provides: 

266 dep_map[provide] = arg_names 

267 if not mw._face_optional: 

268 # all non-optional middlewares get their args required, too. 

269 required_args.update(arg_names) 

270 

271 rdep_map = get_rdep_map(dep_map) 

272 

273 recursive_required_args = rdep_map[func].union(required_args) 

274 

275 return sorted(recursive_required_args) 

276 

277 def prepare(self, paths=None): 

278 """Compile and validate one or more subcommands to ensure all 

279 dependencies are met. Call this once all flags, subcommands, 

280 and middlewares have been added (using .add()). 

281 

282 This method is automatically called by .run() method, but it 

283 only does so for the specific subcommand being invoked. More 

284 conscientious users may want to call this method with no 

285 arguments to validate that all subcommands are ready for 

286 execution. 

287 """ 

288 # TODO: also pre-execute help formatting to make sure all 

289 # values are sane there, too 

290 if paths is None: 

291 paths = self._path_func_map.keys() 

292 

293 for path in paths: 

294 func = self._path_func_map[path] 

295 if func is None: 

296 continue # handled by run() 

297 

298 prs = self.subprs_map[path] if path else self 

299 provides = [] 

300 if prs.posargs.provides: 

301 provides += [prs.posargs.provides] 

302 if prs.post_posargs.provides: 

303 provides += [prs.post_posargs.provides] 

304 

305 deps = self.get_dep_names(path) 

306 flag_names = [f.name for f in self.get_flags(path=path)] 

307 all_mws = self._path_mw_map[path] 

308 

309 # filter out unused middlewares 

310 mws = [mw for mw in all_mws if not mw._face_optional 

311 or [p for p in mw._face_provides if p in deps]] 

312 provides += _BUILTIN_PROVIDES + flag_names 

313 try: 

314 wrapped = get_middleware_chain(mws, func, provides) 

315 except NameError as ne: 

316 ne.args = (ne.args[0] + ' (in path: %r)' % (path,),) 

317 raise 

318 

319 self._path_wrapped_map[path] = wrapped 

320 

321 return 

322 

323 def run(self, argv=None, extras=None, print_error=None): 

324 """Parses arguments and dispatches to the appropriate subcommand 

325 handler. If there is a parse error due to invalid user input, 

326 an error is printed and a CommandLineError is raised. If not 

327 caught, a CommandLineError will exit the process, typically 

328 with status code 1. Also handles dispatching to the 

329 appropriate HelpHandler, if configured. 

330 

331 Defaults to handling the arguments on the command line 

332 (``sys.argv``), but can also be explicitly passed arguments 

333 via the *argv* parameter. 

334 

335 Args: 

336 argv (list): A sequence of strings representing the 

337 command-line arguments. Defaults to ``sys.argv``. 

338 extras (dict): A map of additional arguments to be made 

339 available to the subcommand's handler function. 

340 print_error (callable): The function that formats/prints 

341 error messages before program exit on CLI errors. 

342 

343 .. note:: To ensure that the Command is configured properly, call 

344 .prepare() before calling .run(). 

345 

346 """ 

347 if print_error is None or print_error is True: 

348 print_error = default_print_error 

349 elif print_error and not callable(print_error): 

350 raise TypeError('expected callable for print_error, not %r' 

351 % print_error) 

352 

353 kwargs = dict(extras) if extras else {} 

354 kwargs['print_error_'] = print_error # TODO: print_error_ in builtin provides? 

355 

356 try: 

357 prs_res = self.parse(argv=argv) 

358 except ArgumentParseError as ape: 

359 prs_res = ape.prs_res 

360 

361 # even if parsing failed, check if the caller was trying to access the help flag 

362 cmd = prs_res.to_cmd_scope()['subcommand_'] 

363 if cmd.help_handler and prs_res.flags and prs_res.flags.get(cmd.help_handler.flag.name): 

364 kwargs.update(prs_res.to_cmd_scope()) 

365 return inject(cmd.help_handler.func, kwargs) 

366 

367 msg = 'error: ' + self.name 

368 if prs_res.subcmds: 

369 msg += ' ' + ' '.join(prs_res.subcmds or ()) 

370 

371 # args attribute, nothing to do with cmdline args this is 

372 # the standard-issue Exception 

373 e_msg = ape.args[0] 

374 if e_msg: 

375 msg += ': ' + e_msg 

376 cle = CommandLineError(msg) 

377 if print_error: 

378 print_error(msg) 

379 raise cle 

380 

381 kwargs.update(prs_res.to_cmd_scope()) 

382 

383 # default in case no middlewares have been installed 

384 func = self._path_func_map[prs_res.subcmds] 

385 

386 cmd = kwargs['subcommand_'] 

387 if cmd.help_handler and (not func or (prs_res.flags and prs_res.flags.get(cmd.help_handler.flag.name))): 

388 return inject(cmd.help_handler.func, kwargs) 

389 elif not func: # pragma: no cover 

390 raise RuntimeError('expected command handler or help handler to be set') 

391 

392 self.prepare(paths=[prs_res.subcmds]) 

393 wrapped = self._path_wrapped_map.get(prs_res.subcmds, func) 

394 

395 try: 

396 ret = inject(wrapped, kwargs) 

397 except UsageError as ue: 

398 if print_error: 

399 print_error(ue.format_message()) 

400 raise 

401 return ret