Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/invoke/parser/parser.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

219 statements  

1import copy 

2from typing import TYPE_CHECKING, Any, Iterable, List, Optional 

3 

4try: 

5 from ..vendor.lexicon import Lexicon 

6 from ..vendor.fluidity import StateMachine, state, transition 

7except ImportError: 

8 from lexicon import Lexicon # type: ignore[no-redef] 

9 from fluidity import ( # type: ignore[no-redef] 

10 StateMachine, 

11 state, 

12 transition, 

13 ) 

14 

15from ..exceptions import ParseError 

16from ..util import debug 

17 

18if TYPE_CHECKING: 

19 from .context import ParserContext 

20 

21 

22def is_flag(value: str) -> bool: 

23 return value.startswith("-") 

24 

25 

26def is_long_flag(value: str) -> bool: 

27 return value.startswith("--") 

28 

29 

30class ParseResult(List["ParserContext"]): 

31 """ 

32 List-like object with some extra parse-related attributes. 

33 

34 Specifically, a ``.remainder`` attribute, which is the string found after a 

35 ``--`` in any parsed argv list; and an ``.unparsed`` attribute, a list of 

36 tokens that were unable to be parsed. 

37 

38 .. versionadded:: 1.0 

39 """ 

40 

41 def __init__(self, *args: Any, **kwargs: Any) -> None: 

42 super().__init__(*args, **kwargs) 

43 self.remainder = "" 

44 self.unparsed: List[str] = [] 

45 

46 

47class Parser: 

48 """ 

49 Create parser conscious of ``contexts`` and optional ``initial`` context. 

50 

51 ``contexts`` should be an iterable of ``Context`` instances which will be 

52 searched when new context names are encountered during a parse. These 

53 Contexts determine what flags may follow them, as well as whether given 

54 flags take values. 

55 

56 ``initial`` is optional and will be used to determine validity of "core" 

57 options/flags at the start of the parse run, if any are encountered. 

58 

59 ``ignore_unknown`` determines what to do when contexts are found which do 

60 not map to any members of ``contexts``. By default it is ``False``, meaning 

61 any unknown contexts result in a parse error exception. If ``True``, 

62 encountering an unknown context halts parsing and populates the return 

63 value's ``.unparsed`` attribute with the remaining parse tokens. 

64 

65 .. versionadded:: 1.0 

66 """ 

67 

68 def __init__( 

69 self, 

70 contexts: Iterable["ParserContext"] = (), 

71 initial: Optional["ParserContext"] = None, 

72 ignore_unknown: bool = False, 

73 ) -> None: 

74 self.initial = initial 

75 self.contexts = Lexicon() 

76 self.ignore_unknown = ignore_unknown 

77 for context in contexts: 

78 debug("Adding {}".format(context)) 

79 if not context.name: 

80 raise ValueError("Non-initial contexts must have names.") 

81 exists = "A context named/aliased {!r} is already in this parser!" 

82 if context.name in self.contexts: 

83 raise ValueError(exists.format(context.name)) 

84 self.contexts[context.name] = context 

85 for alias in context.aliases: 

86 if alias in self.contexts: 

87 raise ValueError(exists.format(alias)) 

88 self.contexts.alias(alias, to=context.name) 

89 

90 def parse_argv(self, argv: List[str]) -> ParseResult: 

91 """ 

92 Parse an argv-style token list ``argv``. 

93 

94 Returns a list (actually a subclass, `.ParseResult`) of 

95 `.ParserContext` objects matching the order they were found in the 

96 ``argv`` and containing `.Argument` objects with updated values based 

97 on any flags given. 

98 

99 Assumes any program name has already been stripped out. Good:: 

100 

101 Parser(...).parse_argv(['--core-opt', 'task', '--task-opt']) 

102 

103 Bad:: 

104 

105 Parser(...).parse_argv(['invoke', '--core-opt', ...]) 

106 

107 :param argv: List of argument string tokens. 

108 :returns: 

109 A `.ParseResult` (a ``list`` subclass containing some number of 

110 `.ParserContext` objects). 

111 

112 .. versionadded:: 1.0 

113 """ 

114 machine = ParseMachine( 

115 # FIXME: initial should not be none 

116 initial=self.initial, # type: ignore[arg-type] 

117 contexts=self.contexts, 

118 ignore_unknown=self.ignore_unknown, 

119 ) 

120 # FIXME: Why isn't there str.partition for lists? There must be a 

121 # better way to do this. Split argv around the double-dash remainder 

122 # sentinel. 

123 debug("Starting argv: {!r}".format(argv)) 

124 try: 

125 ddash = argv.index("--") 

126 except ValueError: 

127 ddash = len(argv) # No remainder == body gets all 

128 body = argv[:ddash] 

129 remainder = argv[ddash:][1:] # [1:] to strip off remainder itself 

130 if remainder: 

131 debug( 

132 "Remainder: argv[{!r}:][1:] => {!r}".format(ddash, remainder) 

133 ) 

134 for index, token in enumerate(body): 

135 # Handle non-space-delimited forms, if not currently expecting a 

136 # flag value and still in valid parsing territory (i.e. not in 

137 # "unknown" state which implies store-only) 

138 # NOTE: we do this in a few steps so we can 

139 # split-then-check-validity; necessary for things like when the 

140 # previously seen flag optionally takes a value. 

141 mutations = [] 

142 orig = token 

143 if is_flag(token) and not machine.result.unparsed: 

144 # Equals-sign-delimited flags, eg --foo=bar or -f=bar 

145 if "=" in token: 

146 token, _, value = token.partition("=") 

147 msg = "Splitting x=y expr {!r} into tokens {!r} and {!r}" 

148 debug(msg.format(orig, token, value)) 

149 mutations.append((index + 1, value)) 

150 # Contiguous boolean short flags, e.g. -qv 

151 elif not is_long_flag(token) and len(token) > 2: 

152 full_token = token[:] 

153 rest, token = token[2:], token[:2] 

154 err = "Splitting {!r} into token {!r} and rest {!r}" 

155 debug(err.format(full_token, token, rest)) 

156 # Handle boolean flag block vs short-flag + value. Make 

157 # sure not to test the token as a context flag if we've 

158 # passed into 'storing unknown stuff' territory (e.g. on a 

159 # core-args pass, handling what are going to be task args) 

160 have_flag = ( 

161 token in machine.context.flags 

162 and machine.current_state != "unknown" 

163 ) 

164 if have_flag and machine.context.flags[token].takes_value: 

165 msg = "{!r} is a flag for current context & it takes a value, giving it {!r}" # noqa 

166 debug(msg.format(token, rest)) 

167 mutations.append((index + 1, rest)) 

168 else: 

169 _rest = ["-{}".format(x) for x in rest] 

170 msg = "Splitting multi-flag glob {!r} into {!r} and {!r}" # noqa 

171 debug(msg.format(orig, token, _rest)) 

172 for item in reversed(_rest): 

173 mutations.append((index + 1, item)) 

174 # Here, we've got some possible mutations queued up, and 'token' 

175 # may have been overwritten as well. Whether we apply those and 

176 # continue as-is, or roll it back, depends: 

177 # - If the parser wasn't waiting for a flag value, we're already on 

178 # the right track, so apply mutations and move along to the 

179 # handle() step. 

180 # - If we ARE waiting for a value, and the flag expecting it ALWAYS 

181 # wants a value (it's not optional), we go back to using the 

182 # original token. (TODO: could reorganize this to avoid the 

183 # sub-parsing in this case, but optimizing for human-facing 

184 # execution isn't critical.) 

185 # - Finally, if we are waiting for a value AND it's optional, we 

186 # inspect the first sub-token/mutation to see if it would otherwise 

187 # have been a valid flag, and let that determine what we do (if 

188 # valid, we apply the mutations; if invalid, we reinstate the 

189 # original token.) 

190 if machine.waiting_for_flag_value: 

191 optional = machine.flag and machine.flag.optional 

192 subtoken_is_valid_flag = token in machine.context.flags 

193 if not (optional and subtoken_is_valid_flag): 

194 token = orig 

195 mutations = [] 

196 for index, value in mutations: 

197 body.insert(index, value) 

198 machine.handle(token) 

199 machine.finish() 

200 result = machine.result 

201 result.remainder = " ".join(remainder) 

202 return result 

203 

204 

205class ParseMachine(StateMachine): 

206 initial_state = "context" 

207 

208 state("context", enter=["complete_flag", "complete_context"]) 

209 state("unknown", enter=["complete_flag", "complete_context"]) 

210 state("end", enter=["complete_flag", "complete_context"]) 

211 

212 transition(from_=("context", "unknown"), event="finish", to="end") 

213 transition( 

214 from_="context", 

215 event="see_context", 

216 action="switch_to_context", 

217 to="context", 

218 ) 

219 transition( 

220 from_=("context", "unknown"), 

221 event="see_unknown", 

222 action="store_only", 

223 to="unknown", 

224 ) 

225 

226 def changing_state(self, from_: str, to: str) -> None: 

227 debug("ParseMachine: {!r} => {!r}".format(from_, to)) 

228 

229 def __init__( 

230 self, 

231 initial: "ParserContext", 

232 contexts: Lexicon, 

233 ignore_unknown: bool, 

234 ) -> None: 

235 # Initialize 

236 self.ignore_unknown = ignore_unknown 

237 self.initial = self.context = copy.deepcopy(initial) 

238 debug("Initialized with context: {!r}".format(self.context)) 

239 self.flag = None 

240 self.flag_got_value = False 

241 self.result = ParseResult() 

242 self.contexts = copy.deepcopy(contexts) 

243 debug("Available contexts: {!r}".format(self.contexts)) 

244 # In case StateMachine does anything in __init__ 

245 super().__init__() 

246 

247 @property 

248 def waiting_for_flag_value(self) -> bool: 

249 # Do we have a current flag, and does it expect a value (vs being a 

250 # bool/toggle)? 

251 takes_value = self.flag and self.flag.takes_value 

252 if not takes_value: 

253 return False 

254 # OK, this flag is one that takes values. 

255 # Is it a list type (which has only just been switched to)? Then it'll 

256 # always accept more values. 

257 # TODO: how to handle somebody wanting it to be some other iterable 

258 # like tuple or custom class? Or do we just say unsupported? 

259 if self.flag.kind is list and not self.flag_got_value: 

260 return True 

261 # Not a list, okay. Does it already have a value? 

262 has_value = self.flag.raw_value is not None 

263 # If it doesn't have one, we're waiting for one (which tells the parser 

264 # how to proceed and typically to store the next token.) 

265 # TODO: in the negative case here, we should do something else instead: 

266 # - Except, "hey you screwed up, you already gave that flag!" 

267 # - Overwrite, "oh you changed your mind?" - which requires more work 

268 # elsewhere too, unfortunately. (Perhaps additional properties on 

269 # Argument that can be queried, e.g. "arg.is_iterable"?) 

270 return not has_value 

271 

272 def handle(self, token: str) -> None: 

273 debug("Handling token: {!r}".format(token)) 

274 # Handle unknown state at the top: we don't care about even 

275 # possibly-valid input if we've encountered unknown input. 

276 if self.current_state == "unknown": 

277 debug("Top-of-handle() see_unknown({!r})".format(token)) 

278 self.see_unknown(token) 

279 return 

280 # Flag 

281 if self.context and token in self.context.flags: 

282 debug("Saw flag {!r}".format(token)) 

283 self.switch_to_flag(token) 

284 elif self.context and token in self.context.inverse_flags: 

285 debug("Saw inverse flag {!r}".format(token)) 

286 self.switch_to_flag(token, inverse=True) 

287 # Value for current flag 

288 elif self.waiting_for_flag_value: 

289 debug( 

290 "We're waiting for a flag value so {!r} must be it?".format( 

291 token 

292 ) 

293 ) # noqa 

294 self.see_value(token) 

295 # Positional args (must come above context-name check in case we still 

296 # need a posarg and the user legitimately wants to give it a value that 

297 # just happens to be a valid context name.) 

298 elif self.context and self.context.missing_positional_args: 

299 msg = "Context {!r} requires positional args, eating {!r}" 

300 debug(msg.format(self.context, token)) 

301 self.see_positional_arg(token) 

302 # New context 

303 elif token in self.contexts: 

304 self.see_context(token) 

305 # Initial-context flag being given as per-task flag (e.g. --help) 

306 elif self.initial and token in self.initial.flags: 

307 debug("Saw (initial-context) flag {!r}".format(token)) 

308 flag = self.initial.flags[token] 

309 # Special-case for core --help flag: context name is used as value. 

310 if flag.name == "help": 

311 flag.value = self.context.name 

312 msg = "Saw --help in a per-task context, setting task name ({!r}) as its value" # noqa 

313 debug(msg.format(flag.value)) 

314 # All others: just enter the 'switch to flag' parser state 

315 else: 

316 # TODO: handle inverse core flags too? There are none at the 

317 # moment (e.g. --no-dedupe is actually 'no_dedupe', not a 

318 # default-False 'dedupe') and it's up to us whether we actually 

319 # put any in place. 

320 self.switch_to_flag(token) 

321 # Unknown 

322 else: 

323 if not self.ignore_unknown: 

324 debug("Can't find context named {!r}, erroring".format(token)) 

325 self.error("No idea what {!r} is!".format(token)) 

326 else: 

327 debug("Bottom-of-handle() see_unknown({!r})".format(token)) 

328 self.see_unknown(token) 

329 

330 def store_only(self, token: str) -> None: 

331 # Start off the unparsed list 

332 debug("Storing unknown token {!r}".format(token)) 

333 self.result.unparsed.append(token) 

334 

335 def complete_context(self) -> None: 

336 debug( 

337 "Wrapping up context {!r}".format( 

338 self.context.name if self.context else self.context 

339 ) 

340 ) 

341 # Ensure all of context's positional args have been given. 

342 if self.context and self.context.missing_positional_args: 

343 err = "'{}' did not receive required positional arguments: {}" 

344 names = ", ".join( 

345 "'{}'".format(x.name) 

346 for x in self.context.missing_positional_args 

347 ) 

348 self.error(err.format(self.context.name, names)) 

349 if self.context and self.context not in self.result: 

350 self.result.append(self.context) 

351 

352 def switch_to_context(self, name: str) -> None: 

353 self.context = copy.deepcopy(self.contexts[name]) 

354 debug("Moving to context {!r}".format(name)) 

355 debug("Context args: {!r}".format(self.context.args)) 

356 debug("Context flags: {!r}".format(self.context.flags)) 

357 debug("Context inverse_flags: {!r}".format(self.context.inverse_flags)) 

358 

359 def complete_flag(self) -> None: 

360 if self.flag: 

361 msg = "Completing current flag {} before moving on" 

362 debug(msg.format(self.flag)) 

363 # Barf if we needed a value and didn't get one 

364 if ( 

365 self.flag 

366 and self.flag.takes_value 

367 and self.flag.raw_value is None 

368 and not self.flag.optional 

369 ): 

370 err = "Flag {!r} needed value and was not given one!" 

371 self.error(err.format(self.flag)) 

372 # Handle optional-value flags; at this point they were not given an 

373 # explicit value, but they were seen, ergo they should get treated like 

374 # bools. 

375 if self.flag and self.flag.raw_value is None and self.flag.optional: 

376 msg = "Saw optional flag {!r} go by w/ no value; setting to True" 

377 debug(msg.format(self.flag.name)) 

378 # Skip casting so the bool gets preserved 

379 self.flag.set_value(True, cast=False) 

380 

381 def check_ambiguity(self, value: Any) -> bool: 

382 """ 

383 Guard against ambiguity when current flag takes an optional value. 

384 

385 .. versionadded:: 1.0 

386 """ 

387 # No flag is currently being examined, or one is but it doesn't take an 

388 # optional value? Ambiguity isn't possible. 

389 if not (self.flag and self.flag.optional): 

390 return False 

391 # We *are* dealing with an optional-value flag, but it's already 

392 # received a value? There can't be ambiguity here either. 

393 if self.flag.raw_value is not None: 

394 return False 

395 # Otherwise, there *may* be ambiguity if 1 or more of the below tests 

396 # fail. 

397 tests = [] 

398 # Unfilled posargs still exist? 

399 tests.append(self.context and self.context.missing_positional_args) 

400 # Value matches another valid task/context name? 

401 tests.append(value in self.contexts) 

402 if any(tests): 

403 msg = "{!r} is ambiguous when given after an optional-value flag" 

404 raise ParseError(msg.format(value)) 

405 

406 def switch_to_flag(self, flag: str, inverse: bool = False) -> None: 

407 # Sanity check for ambiguity w/ prior optional-value flag 

408 self.check_ambiguity(flag) 

409 # Also tie it off, in case prior had optional value or etc. Seems to be 

410 # harmless for other kinds of flags. (TODO: this is a serious indicator 

411 # that we need to move some of this flag-by-flag bookkeeping into the 

412 # state machine bits, if possible - as-is it was REAL confusing re: why 

413 # this was manually required!) 

414 self.complete_flag() 

415 # Set flag/arg obj 

416 flag = self.context.inverse_flags[flag] if inverse else flag 

417 # Update state 

418 try: 

419 self.flag = self.context.flags[flag] 

420 except KeyError as e: 

421 # Try fallback to initial/core flag 

422 try: 

423 self.flag = self.initial.flags[flag] 

424 except KeyError: 

425 # If it wasn't in either, raise the original context's 

426 # exception, as that's more useful / correct. 

427 raise e 

428 debug("Moving to flag {!r}".format(self.flag)) 

429 # Bookkeeping for iterable-type flags (where the typical 'value 

430 # non-empty/nondefault -> clearly it got its value already' test is 

431 # insufficient) 

432 self.flag_got_value = False 

433 # Handle boolean flags (which can immediately be updated) 

434 if self.flag and not self.flag.takes_value: 

435 val = not inverse 

436 debug("Marking seen flag {!r} as {}".format(self.flag, val)) 

437 self.flag.value = val 

438 

439 def see_value(self, value: Any) -> None: 

440 self.check_ambiguity(value) 

441 if self.flag and self.flag.takes_value: 

442 debug("Setting flag {!r} to value {!r}".format(self.flag, value)) 

443 self.flag.value = value 

444 self.flag_got_value = True 

445 else: 

446 self.error("Flag {!r} doesn't take any value!".format(self.flag)) 

447 

448 def see_positional_arg(self, value: Any) -> None: 

449 for arg in self.context.positional_args: 

450 if arg.value is None: 

451 arg.value = value 

452 break 

453 

454 def error(self, msg: str) -> None: 

455 raise ParseError(msg, self.context)