1import copy
2from typing import TYPE_CHECKING, Any, Iterable, List, Optional
3
4try:
5 from ..vendor.lexicon import Lexicon
6 from ..vendor.fluidity import StateMachine, state, transition
7except ImportError:
8 from lexicon import Lexicon # type: ignore[no-redef]
9 from fluidity import ( # type: ignore[no-redef]
10 StateMachine,
11 state,
12 transition,
13 )
14
15from ..exceptions import ParseError
16from ..util import debug
17
18if TYPE_CHECKING:
19 from .context import ParserContext
20
21
22def is_flag(value: str) -> bool:
23 return value.startswith("-")
24
25
26def is_long_flag(value: str) -> bool:
27 return value.startswith("--")
28
29
30class ParseResult(List["ParserContext"]):
31 """
32 List-like object with some extra parse-related attributes.
33
34 Specifically, a ``.remainder`` attribute, which is the string found after a
35 ``--`` in any parsed argv list; and an ``.unparsed`` attribute, a list of
36 tokens that were unable to be parsed.
37
38 .. versionadded:: 1.0
39 """
40
41 def __init__(self, *args: Any, **kwargs: Any) -> None:
42 super().__init__(*args, **kwargs)
43 self.remainder = ""
44 self.unparsed: List[str] = []
45
46
47class Parser:
48 """
49 Create parser conscious of ``contexts`` and optional ``initial`` context.
50
51 ``contexts`` should be an iterable of ``Context`` instances which will be
52 searched when new context names are encountered during a parse. These
53 Contexts determine what flags may follow them, as well as whether given
54 flags take values.
55
56 ``initial`` is optional and will be used to determine validity of "core"
57 options/flags at the start of the parse run, if any are encountered.
58
59 ``ignore_unknown`` determines what to do when contexts are found which do
60 not map to any members of ``contexts``. By default it is ``False``, meaning
61 any unknown contexts result in a parse error exception. If ``True``,
62 encountering an unknown context halts parsing and populates the return
63 value's ``.unparsed`` attribute with the remaining parse tokens.
64
65 .. versionadded:: 1.0
66 """
67
68 def __init__(
69 self,
70 contexts: Iterable["ParserContext"] = (),
71 initial: Optional["ParserContext"] = None,
72 ignore_unknown: bool = False,
73 ) -> None:
74 self.initial = initial
75 self.contexts = Lexicon()
76 self.ignore_unknown = ignore_unknown
77 for context in contexts:
78 debug("Adding {}".format(context))
79 if not context.name:
80 raise ValueError("Non-initial contexts must have names.")
81 exists = "A context named/aliased {!r} is already in this parser!"
82 if context.name in self.contexts:
83 raise ValueError(exists.format(context.name))
84 self.contexts[context.name] = context
85 for alias in context.aliases:
86 if alias in self.contexts:
87 raise ValueError(exists.format(alias))
88 self.contexts.alias(alias, to=context.name)
89
90 def parse_argv(self, argv: List[str]) -> ParseResult:
91 """
92 Parse an argv-style token list ``argv``.
93
94 Returns a list (actually a subclass, `.ParseResult`) of
95 `.ParserContext` objects matching the order they were found in the
96 ``argv`` and containing `.Argument` objects with updated values based
97 on any flags given.
98
99 Assumes any program name has already been stripped out. Good::
100
101 Parser(...).parse_argv(['--core-opt', 'task', '--task-opt'])
102
103 Bad::
104
105 Parser(...).parse_argv(['invoke', '--core-opt', ...])
106
107 :param argv: List of argument string tokens.
108 :returns:
109 A `.ParseResult` (a ``list`` subclass containing some number of
110 `.ParserContext` objects).
111
112 .. versionadded:: 1.0
113 """
114 machine = ParseMachine(
115 # FIXME: initial should not be none
116 initial=self.initial, # type: ignore[arg-type]
117 contexts=self.contexts,
118 ignore_unknown=self.ignore_unknown,
119 )
120 # FIXME: Why isn't there str.partition for lists? There must be a
121 # better way to do this. Split argv around the double-dash remainder
122 # sentinel.
123 debug("Starting argv: {!r}".format(argv))
124 try:
125 ddash = argv.index("--")
126 except ValueError:
127 ddash = len(argv) # No remainder == body gets all
128 body = argv[:ddash]
129 remainder = argv[ddash:][1:] # [1:] to strip off remainder itself
130 if remainder:
131 debug(
132 "Remainder: argv[{!r}:][1:] => {!r}".format(ddash, remainder)
133 )
134 for index, token in enumerate(body):
135 # Handle non-space-delimited forms, if not currently expecting a
136 # flag value and still in valid parsing territory (i.e. not in
137 # "unknown" state which implies store-only)
138 # NOTE: we do this in a few steps so we can
139 # split-then-check-validity; necessary for things like when the
140 # previously seen flag optionally takes a value.
141 mutations = []
142 orig = token
143 if is_flag(token) and not machine.result.unparsed:
144 # Equals-sign-delimited flags, eg --foo=bar or -f=bar
145 if "=" in token:
146 token, _, value = token.partition("=")
147 msg = "Splitting x=y expr {!r} into tokens {!r} and {!r}"
148 debug(msg.format(orig, token, value))
149 mutations.append((index + 1, value))
150 # Contiguous boolean short flags, e.g. -qv
151 elif not is_long_flag(token) and len(token) > 2:
152 full_token = token[:]
153 rest, token = token[2:], token[:2]
154 err = "Splitting {!r} into token {!r} and rest {!r}"
155 debug(err.format(full_token, token, rest))
156 # Handle boolean flag block vs short-flag + value. Make
157 # sure not to test the token as a context flag if we've
158 # passed into 'storing unknown stuff' territory (e.g. on a
159 # core-args pass, handling what are going to be task args)
160 have_flag = (
161 token in machine.context.flags
162 and machine.current_state != "unknown"
163 )
164 if have_flag and machine.context.flags[token].takes_value:
165 msg = "{!r} is a flag for current context & it takes a value, giving it {!r}" # noqa
166 debug(msg.format(token, rest))
167 mutations.append((index + 1, rest))
168 else:
169 _rest = ["-{}".format(x) for x in rest]
170 msg = "Splitting multi-flag glob {!r} into {!r} and {!r}" # noqa
171 debug(msg.format(orig, token, _rest))
172 for item in reversed(_rest):
173 mutations.append((index + 1, item))
174 # Here, we've got some possible mutations queued up, and 'token'
175 # may have been overwritten as well. Whether we apply those and
176 # continue as-is, or roll it back, depends:
177 # - If the parser wasn't waiting for a flag value, we're already on
178 # the right track, so apply mutations and move along to the
179 # handle() step.
180 # - If we ARE waiting for a value, and the flag expecting it ALWAYS
181 # wants a value (it's not optional), we go back to using the
182 # original token. (TODO: could reorganize this to avoid the
183 # sub-parsing in this case, but optimizing for human-facing
184 # execution isn't critical.)
185 # - Finally, if we are waiting for a value AND it's optional, we
186 # inspect the first sub-token/mutation to see if it would otherwise
187 # have been a valid flag, and let that determine what we do (if
188 # valid, we apply the mutations; if invalid, we reinstate the
189 # original token.)
190 if machine.waiting_for_flag_value:
191 optional = machine.flag and machine.flag.optional
192 subtoken_is_valid_flag = token in machine.context.flags
193 if not (optional and subtoken_is_valid_flag):
194 token = orig
195 mutations = []
196 for index, value in mutations:
197 body.insert(index, value)
198 machine.handle(token)
199 machine.finish()
200 result = machine.result
201 result.remainder = " ".join(remainder)
202 return result
203
204
205class ParseMachine(StateMachine):
206 initial_state = "context"
207
208 state("context", enter=["complete_flag", "complete_context"])
209 state("unknown", enter=["complete_flag", "complete_context"])
210 state("end", enter=["complete_flag", "complete_context"])
211
212 transition(from_=("context", "unknown"), event="finish", to="end")
213 transition(
214 from_="context",
215 event="see_context",
216 action="switch_to_context",
217 to="context",
218 )
219 transition(
220 from_=("context", "unknown"),
221 event="see_unknown",
222 action="store_only",
223 to="unknown",
224 )
225
226 def changing_state(self, from_: str, to: str) -> None:
227 debug("ParseMachine: {!r} => {!r}".format(from_, to))
228
229 def __init__(
230 self,
231 initial: "ParserContext",
232 contexts: Lexicon,
233 ignore_unknown: bool,
234 ) -> None:
235 # Initialize
236 self.ignore_unknown = ignore_unknown
237 self.initial = self.context = copy.deepcopy(initial)
238 debug("Initialized with context: {!r}".format(self.context))
239 self.flag = None
240 self.flag_got_value = False
241 self.result = ParseResult()
242 self.contexts = copy.deepcopy(contexts)
243 debug("Available contexts: {!r}".format(self.contexts))
244 # In case StateMachine does anything in __init__
245 super().__init__()
246
247 @property
248 def waiting_for_flag_value(self) -> bool:
249 # Do we have a current flag, and does it expect a value (vs being a
250 # bool/toggle)?
251 takes_value = self.flag and self.flag.takes_value
252 if not takes_value:
253 return False
254 # OK, this flag is one that takes values.
255 # Is it a list type (which has only just been switched to)? Then it'll
256 # always accept more values.
257 # TODO: how to handle somebody wanting it to be some other iterable
258 # like tuple or custom class? Or do we just say unsupported?
259 if self.flag.kind is list and not self.flag_got_value:
260 return True
261 # Not a list, okay. Does it already have a value?
262 has_value = self.flag.raw_value is not None
263 # If it doesn't have one, we're waiting for one (which tells the parser
264 # how to proceed and typically to store the next token.)
265 # TODO: in the negative case here, we should do something else instead:
266 # - Except, "hey you screwed up, you already gave that flag!"
267 # - Overwrite, "oh you changed your mind?" - which requires more work
268 # elsewhere too, unfortunately. (Perhaps additional properties on
269 # Argument that can be queried, e.g. "arg.is_iterable"?)
270 return not has_value
271
272 def handle(self, token: str) -> None:
273 debug("Handling token: {!r}".format(token))
274 # Handle unknown state at the top: we don't care about even
275 # possibly-valid input if we've encountered unknown input.
276 if self.current_state == "unknown":
277 debug("Top-of-handle() see_unknown({!r})".format(token))
278 self.see_unknown(token)
279 return
280 # Flag
281 if self.context and token in self.context.flags:
282 debug("Saw flag {!r}".format(token))
283 self.switch_to_flag(token)
284 elif self.context and token in self.context.inverse_flags:
285 debug("Saw inverse flag {!r}".format(token))
286 self.switch_to_flag(token, inverse=True)
287 # Value for current flag
288 elif self.waiting_for_flag_value:
289 debug(
290 "We're waiting for a flag value so {!r} must be it?".format(
291 token
292 )
293 ) # noqa
294 self.see_value(token)
295 # Positional args (must come above context-name check in case we still
296 # need a posarg and the user legitimately wants to give it a value that
297 # just happens to be a valid context name.)
298 elif self.context and self.context.missing_positional_args:
299 msg = "Context {!r} requires positional args, eating {!r}"
300 debug(msg.format(self.context, token))
301 self.see_positional_arg(token)
302 # New context
303 elif token in self.contexts:
304 self.see_context(token)
305 # Initial-context flag being given as per-task flag (e.g. --help)
306 elif self.initial and token in self.initial.flags:
307 debug("Saw (initial-context) flag {!r}".format(token))
308 flag = self.initial.flags[token]
309 # Special-case for core --help flag: context name is used as value.
310 if flag.name == "help":
311 flag.value = self.context.name
312 msg = "Saw --help in a per-task context, setting task name ({!r}) as its value" # noqa
313 debug(msg.format(flag.value))
314 # All others: just enter the 'switch to flag' parser state
315 else:
316 # TODO: handle inverse core flags too? There are none at the
317 # moment (e.g. --no-dedupe is actually 'no_dedupe', not a
318 # default-False 'dedupe') and it's up to us whether we actually
319 # put any in place.
320 self.switch_to_flag(token)
321 # Unknown
322 else:
323 if not self.ignore_unknown:
324 debug("Can't find context named {!r}, erroring".format(token))
325 self.error("No idea what {!r} is!".format(token))
326 else:
327 debug("Bottom-of-handle() see_unknown({!r})".format(token))
328 self.see_unknown(token)
329
330 def store_only(self, token: str) -> None:
331 # Start off the unparsed list
332 debug("Storing unknown token {!r}".format(token))
333 self.result.unparsed.append(token)
334
335 def complete_context(self) -> None:
336 debug(
337 "Wrapping up context {!r}".format(
338 self.context.name if self.context else self.context
339 )
340 )
341 # Ensure all of context's positional args have been given.
342 if self.context and self.context.missing_positional_args:
343 err = "'{}' did not receive required positional arguments: {}"
344 names = ", ".join(
345 "'{}'".format(x.name)
346 for x in self.context.missing_positional_args
347 )
348 self.error(err.format(self.context.name, names))
349 if self.context and self.context not in self.result:
350 self.result.append(self.context)
351
352 def switch_to_context(self, name: str) -> None:
353 self.context = copy.deepcopy(self.contexts[name])
354 debug("Moving to context {!r}".format(name))
355 debug("Context args: {!r}".format(self.context.args))
356 debug("Context flags: {!r}".format(self.context.flags))
357 debug("Context inverse_flags: {!r}".format(self.context.inverse_flags))
358
359 def complete_flag(self) -> None:
360 if self.flag:
361 msg = "Completing current flag {} before moving on"
362 debug(msg.format(self.flag))
363 # Barf if we needed a value and didn't get one
364 if (
365 self.flag
366 and self.flag.takes_value
367 and self.flag.raw_value is None
368 and not self.flag.optional
369 ):
370 err = "Flag {!r} needed value and was not given one!"
371 self.error(err.format(self.flag))
372 # Handle optional-value flags; at this point they were not given an
373 # explicit value, but they were seen, ergo they should get treated like
374 # bools.
375 if self.flag and self.flag.raw_value is None and self.flag.optional:
376 msg = "Saw optional flag {!r} go by w/ no value; setting to True"
377 debug(msg.format(self.flag.name))
378 # Skip casting so the bool gets preserved
379 self.flag.set_value(True, cast=False)
380
381 def check_ambiguity(self, value: Any) -> bool:
382 """
383 Guard against ambiguity when current flag takes an optional value.
384
385 .. versionadded:: 1.0
386 """
387 # No flag is currently being examined, or one is but it doesn't take an
388 # optional value? Ambiguity isn't possible.
389 if not (self.flag and self.flag.optional):
390 return False
391 # We *are* dealing with an optional-value flag, but it's already
392 # received a value? There can't be ambiguity here either.
393 if self.flag.raw_value is not None:
394 return False
395 # Otherwise, there *may* be ambiguity if 1 or more of the below tests
396 # fail.
397 tests = []
398 # Unfilled posargs still exist?
399 tests.append(self.context and self.context.missing_positional_args)
400 # Value matches another valid task/context name?
401 tests.append(value in self.contexts)
402 if any(tests):
403 msg = "{!r} is ambiguous when given after an optional-value flag"
404 raise ParseError(msg.format(value))
405
406 def switch_to_flag(self, flag: str, inverse: bool = False) -> None:
407 # Sanity check for ambiguity w/ prior optional-value flag
408 self.check_ambiguity(flag)
409 # Also tie it off, in case prior had optional value or etc. Seems to be
410 # harmless for other kinds of flags. (TODO: this is a serious indicator
411 # that we need to move some of this flag-by-flag bookkeeping into the
412 # state machine bits, if possible - as-is it was REAL confusing re: why
413 # this was manually required!)
414 self.complete_flag()
415 # Set flag/arg obj
416 flag = self.context.inverse_flags[flag] if inverse else flag
417 # Update state
418 try:
419 self.flag = self.context.flags[flag]
420 except KeyError as e:
421 # Try fallback to initial/core flag
422 try:
423 self.flag = self.initial.flags[flag]
424 except KeyError:
425 # If it wasn't in either, raise the original context's
426 # exception, as that's more useful / correct.
427 raise e
428 debug("Moving to flag {!r}".format(self.flag))
429 # Bookkeeping for iterable-type flags (where the typical 'value
430 # non-empty/nondefault -> clearly it got its value already' test is
431 # insufficient)
432 self.flag_got_value = False
433 # Handle boolean flags (which can immediately be updated)
434 if self.flag and not self.flag.takes_value:
435 val = not inverse
436 debug("Marking seen flag {!r} as {}".format(self.flag, val))
437 self.flag.value = val
438
439 def see_value(self, value: Any) -> None:
440 self.check_ambiguity(value)
441 if self.flag and self.flag.takes_value:
442 debug("Setting flag {!r} to value {!r}".format(self.flag, value))
443 self.flag.value = value
444 self.flag_got_value = True
445 else:
446 self.error("Flag {!r} doesn't take any value!".format(self.flag))
447
448 def see_positional_arg(self, value: Any) -> None:
449 for arg in self.context.positional_args:
450 if arg.value is None:
451 arg.value = value
452 break
453
454 def error(self, msg: str) -> None:
455 raise ParseError(msg, self.context)