Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/traitlets/config/loader.py: 23%
571 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1"""A simple configuration system."""
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
6import argparse
7import copy
8import functools
9import json
10import os
11import re
12import sys
13import typing as t
14import warnings
16from traitlets.traitlets import Any, Container, Dict, HasTraits, List, Undefined
18from ..utils import cast_unicode, filefind
20# -----------------------------------------------------------------------------
21# Exceptions
22# -----------------------------------------------------------------------------
25class ConfigError(Exception):
26 pass
29class ConfigLoaderError(ConfigError):
30 pass
33class ConfigFileNotFound(ConfigError): # noqa
34 pass
37class ArgumentError(ConfigLoaderError):
38 pass
41# -----------------------------------------------------------------------------
42# Argparse fix
43# -----------------------------------------------------------------------------
45# Unfortunately argparse by default prints help messages to stderr instead of
46# stdout. This makes it annoying to capture long help screens at the command
47# line, since one must know how to pipe stderr, which many users don't know how
48# to do. So we override the print_help method with one that defaults to
49# stdout and use our class instead.
52class _Sentinel:
53 def __repr__(self):
54 return "<Sentinel deprecated>"
56 def __str__(self):
57 return "<deprecated>"
60_deprecated = _Sentinel()
63class ArgumentParser(argparse.ArgumentParser):
64 """Simple argparse subclass that prints help to stdout by default."""
66 def print_help(self, file=None):
67 if file is None:
68 file = sys.stdout
69 return super().print_help(file)
71 print_help.__doc__ = argparse.ArgumentParser.print_help.__doc__
74# -----------------------------------------------------------------------------
75# Config class for holding config information
76# -----------------------------------------------------------------------------
79def execfile(fname, glob):
80 with open(fname, "rb") as f:
81 exec(compile(f.read(), fname, "exec"), glob, glob) # noqa
84class LazyConfigValue(HasTraits):
85 """Proxy object for exposing methods on configurable containers
87 These methods allow appending/extending/updating
88 to add to non-empty defaults instead of clobbering them.
90 Exposes:
92 - append, extend, insert on lists
93 - update on dicts
94 - update, add on sets
95 """
97 _value = None
99 # list methods
100 _extend = List()
101 _prepend = List()
102 _inserts = List()
104 def append(self, obj):
105 """Append an item to a List"""
106 self._extend.append(obj)
108 def extend(self, other):
109 """Extend a list"""
110 self._extend.extend(other)
112 def prepend(self, other):
113 """like list.extend, but for the front"""
114 self._prepend[:0] = other
116 def merge_into(self, other):
117 """
118 Merge with another earlier LazyConfigValue or an earlier container.
119 This is useful when having global system-wide configuration files.
121 Self is expected to have higher precedence.
123 Parameters
124 ----------
125 other : LazyConfigValue or container
127 Returns
128 -------
129 LazyConfigValue
130 if ``other`` is also lazy, a reified container otherwise.
131 """
132 if isinstance(other, LazyConfigValue):
133 other._extend.extend(self._extend)
134 self._extend = other._extend
136 self._prepend.extend(other._prepend)
138 other._inserts.extend(self._inserts)
139 self._inserts = other._inserts
141 if self._update:
142 other.update(self._update)
143 self._update = other._update
144 return self
145 else:
146 # other is a container, reify now.
147 return self.get_value(other)
149 def insert(self, index, other):
150 if not isinstance(index, int):
151 raise TypeError("An integer is required")
152 self._inserts.append((index, other))
154 # dict methods
155 # update is used for both dict and set
156 _update = Any()
158 def update(self, other):
159 """Update either a set or dict"""
160 if self._update is None:
161 if isinstance(other, dict):
162 self._update = {}
163 else:
164 self._update = set()
165 self._update.update(other)
167 # set methods
168 def add(self, obj):
169 """Add an item to a set"""
170 self.update({obj})
172 def get_value(self, initial):
173 """construct the value from the initial one
175 after applying any insert / extend / update changes
176 """
177 if self._value is not None:
178 return self._value
179 value = copy.deepcopy(initial)
180 if isinstance(value, list):
181 for idx, obj in self._inserts:
182 value.insert(idx, obj)
183 value[:0] = self._prepend
184 value.extend(self._extend)
186 elif isinstance(value, dict):
187 if self._update:
188 value.update(self._update)
189 elif isinstance(value, set):
190 if self._update:
191 value.update(self._update)
192 self._value = value
193 return value
195 def to_dict(self):
196 """return JSONable dict form of my data
198 Currently update as dict or set, extend, prepend as lists, and inserts as list of tuples.
199 """
200 d = {}
201 if self._update:
202 d["update"] = self._update
203 if self._extend:
204 d["extend"] = self._extend
205 if self._prepend:
206 d["prepend"] = self._prepend
207 elif self._inserts:
208 d["inserts"] = self._inserts
209 return d
211 def __repr__(self):
212 if self._value is not None:
213 return f"<{self.__class__.__name__} value={self._value!r}>"
214 else:
215 return f"<{self.__class__.__name__} {self.to_dict()!r}>"
218def _is_section_key(key):
219 """Is a Config key a section name (does it start with a capital)?"""
220 if key and key[0].upper() == key[0] and not key.startswith("_"):
221 return True
222 else:
223 return False
226class Config(dict): # type:ignore[type-arg]
227 """An attribute-based dict that can do smart merges.
229 Accessing a field on a config object for the first time populates the key
230 with either a nested Config object for keys starting with capitals
231 or :class:`.LazyConfigValue` for lowercase keys,
232 allowing quick assignments such as::
234 c = Config()
235 c.Class.int_trait = 5
236 c.Class.list_trait.append("x")
238 """
240 def __init__(self, *args, **kwds):
241 dict.__init__(self, *args, **kwds)
242 self._ensure_subconfig()
244 def _ensure_subconfig(self):
245 """ensure that sub-dicts that should be Config objects are
247 casts dicts that are under section keys to Config objects,
248 which is necessary for constructing Config objects from dict literals.
249 """
250 for key in self:
251 obj = self[key]
252 if _is_section_key(key) and isinstance(obj, dict) and not isinstance(obj, Config):
253 setattr(self, key, Config(obj))
255 def _merge(self, other):
256 """deprecated alias, use Config.merge()"""
257 self.merge(other)
259 def merge(self, other):
260 """merge another config object into this one"""
261 to_update = {}
262 for k, v in other.items():
263 if k not in self:
264 to_update[k] = v
265 else: # I have this key
266 if isinstance(v, Config) and isinstance(self[k], Config):
267 # Recursively merge common sub Configs
268 self[k].merge(v)
269 elif isinstance(v, LazyConfigValue):
270 self[k] = v.merge_into(self[k])
271 else:
272 # Plain updates for non-Configs
273 to_update[k] = v
275 self.update(to_update)
277 def collisions(self, other: "Config") -> t.Dict[str, t.Any]:
278 """Check for collisions between two config objects.
280 Returns a dict of the form {"Class": {"trait": "collision message"}}`,
281 indicating which values have been ignored.
283 An empty dict indicates no collisions.
284 """
285 collisions: t.Dict[str, t.Any] = {}
286 for section in self:
287 if section not in other:
288 continue
289 mine = self[section]
290 theirs = other[section]
291 for key in mine:
292 if key in theirs and mine[key] != theirs[key]:
293 collisions.setdefault(section, {})
294 collisions[section][key] = f"{mine[key]!r} ignored, using {theirs[key]!r}"
295 return collisions
297 def __contains__(self, key):
298 # allow nested contains of the form `"Section.key" in config`
299 if "." in key:
300 first, remainder = key.split(".", 1)
301 if first not in self:
302 return False
303 return remainder in self[first]
305 return super().__contains__(key)
307 # .has_key is deprecated for dictionaries.
308 has_key = __contains__
310 def _has_section(self, key):
311 return _is_section_key(key) and key in self
313 def copy(self):
314 return type(self)(dict.copy(self))
316 def __copy__(self):
317 return self.copy()
319 def __deepcopy__(self, memo):
320 new_config = type(self)()
321 for key, value in self.items():
322 if isinstance(value, (Config, LazyConfigValue)):
323 # deep copy config objects
324 value = copy.deepcopy(value, memo)
325 elif type(value) in {dict, list, set, tuple}:
326 # shallow copy plain container traits
327 value = copy.copy(value)
328 new_config[key] = value
329 return new_config
331 def __getitem__(self, key):
332 try:
333 return dict.__getitem__(self, key)
334 except KeyError:
335 if _is_section_key(key):
336 c = Config()
337 dict.__setitem__(self, key, c)
338 return c
339 elif not key.startswith("_"):
340 # undefined, create lazy value, used for container methods
341 v = LazyConfigValue()
342 dict.__setitem__(self, key, v)
343 return v
344 else:
345 raise KeyError
347 def __setitem__(self, key, value):
348 if _is_section_key(key):
349 if not isinstance(value, Config):
350 raise ValueError(
351 "values whose keys begin with an uppercase "
352 "char must be Config instances: %r, %r" % (key, value)
353 )
354 dict.__setitem__(self, key, value)
356 def __getattr__(self, key):
357 if key.startswith("__"):
358 return dict.__getattr__(self, key) # type:ignore[attr-defined]
359 try:
360 return self.__getitem__(key)
361 except KeyError as e:
362 raise AttributeError(e) from e
364 def __setattr__(self, key, value):
365 if key.startswith("__"):
366 return dict.__setattr__(self, key, value)
367 try:
368 self.__setitem__(key, value)
369 except KeyError as e:
370 raise AttributeError(e) from e
372 def __delattr__(self, key):
373 if key.startswith("__"):
374 return dict.__delattr__(self, key)
375 try:
376 dict.__delitem__(self, key)
377 except KeyError as e:
378 raise AttributeError(e) from e
381class DeferredConfig:
382 """Class for deferred-evaluation of config from CLI"""
384 pass
386 def get_value(self, trait):
387 raise NotImplementedError("Implement in subclasses")
389 def _super_repr(self):
390 # explicitly call super on direct parent
391 return super(self.__class__, self).__repr__()
394class DeferredConfigString(str, DeferredConfig):
395 """Config value for loading config from a string
397 Interpretation is deferred until it is loaded into the trait.
399 Subclass of str for backward compatibility.
401 This class is only used for values that are not listed
402 in the configurable classes.
404 When config is loaded, `trait.from_string` will be used.
406 If an error is raised in `.from_string`,
407 the original string is returned.
409 .. versionadded:: 5.0
410 """
412 def get_value(self, trait):
413 """Get the value stored in this string"""
414 s = str(self)
415 try:
416 return trait.from_string(s)
417 except Exception:
418 # exception casting from string,
419 # let the original string lie.
420 # this will raise a more informative error when config is loaded.
421 return s
423 def __repr__(self):
424 return f"{self.__class__.__name__}({self._super_repr()})"
427class DeferredConfigList(list, DeferredConfig): # type:ignore[type-arg]
428 """Config value for loading config from a list of strings
430 Interpretation is deferred until it is loaded into the trait.
432 This class is only used for values that are not listed
433 in the configurable classes.
435 When config is loaded, `trait.from_string_list` will be used.
437 If an error is raised in `.from_string_list`,
438 the original string list is returned.
440 .. versionadded:: 5.0
441 """
443 def get_value(self, trait):
444 """Get the value stored in this string"""
445 if hasattr(trait, "from_string_list"):
446 src = list(self)
447 cast = trait.from_string_list
448 else:
449 # only allow one item
450 if len(self) > 1:
451 raise ValueError(
452 f"{trait.name} only accepts one value, got {len(self)}: {list(self)}"
453 )
454 src = self[0]
455 cast = trait.from_string
457 try:
458 return cast(src)
459 except Exception:
460 # exception casting from string,
461 # let the original value lie.
462 # this will raise a more informative error when config is loaded.
463 return src
465 def __repr__(self):
466 return f"{self.__class__.__name__}({self._super_repr()})"
469# -----------------------------------------------------------------------------
470# Config loading classes
471# -----------------------------------------------------------------------------
474class ConfigLoader:
475 """A object for loading configurations from just about anywhere.
477 The resulting configuration is packaged as a :class:`Config`.
479 Notes
480 -----
481 A :class:`ConfigLoader` does one thing: load a config from a source
482 (file, command line arguments) and returns the data as a :class:`Config` object.
483 There are lots of things that :class:`ConfigLoader` does not do. It does
484 not implement complex logic for finding config files. It does not handle
485 default values or merge multiple configs. These things need to be
486 handled elsewhere.
487 """
489 def _log_default(self):
490 from traitlets.log import get_logger
492 return get_logger()
494 def __init__(self, log=None):
495 """A base class for config loaders.
497 log : instance of :class:`logging.Logger` to use.
498 By default logger of :meth:`traitlets.config.application.Application.instance()`
499 will be used
501 Examples
502 --------
503 >>> cl = ConfigLoader()
504 >>> config = cl.load_config()
505 >>> config
506 {}
507 """
508 self.clear()
509 if log is None:
510 self.log = self._log_default()
511 self.log.debug("Using default logger")
512 else:
513 self.log = log
515 def clear(self):
516 self.config = Config()
518 def load_config(self):
519 """Load a config from somewhere, return a :class:`Config` instance.
521 Usually, this will cause self.config to be set and then returned.
522 However, in most cases, :meth:`ConfigLoader.clear` should be called
523 to erase any previous state.
524 """
525 self.clear()
526 return self.config
529class FileConfigLoader(ConfigLoader):
530 """A base class for file based configurations.
532 As we add more file based config loaders, the common logic should go
533 here.
534 """
536 def __init__(self, filename, path=None, **kw):
537 """Build a config loader for a filename and path.
539 Parameters
540 ----------
541 filename : str
542 The file name of the config file.
543 path : str, list, tuple
544 The path to search for the config file on, or a sequence of
545 paths to try in order.
546 """
547 super().__init__(**kw)
548 self.filename = filename
549 self.path = path
550 self.full_filename = ""
552 def _find_file(self):
553 """Try to find the file by searching the paths."""
554 self.full_filename = filefind(self.filename, self.path)
557class JSONFileConfigLoader(FileConfigLoader):
558 """A JSON file loader for config
560 Can also act as a context manager that rewrite the configuration file to disk on exit.
562 Example::
564 with JSONFileConfigLoader('myapp.json','/home/jupyter/configurations/') as c:
565 c.MyNewConfigurable.new_value = 'Updated'
567 """
569 def load_config(self):
570 """Load the config from a file and return it as a Config object."""
571 self.clear()
572 try:
573 self._find_file()
574 except OSError as e:
575 raise ConfigFileNotFound(str(e)) from e
576 dct = self._read_file_as_dict()
577 self.config = self._convert_to_config(dct)
578 return self.config
580 def _read_file_as_dict(self):
581 with open(self.full_filename) as f:
582 return json.load(f)
584 def _convert_to_config(self, dictionary):
585 if "version" in dictionary:
586 version = dictionary.pop("version")
587 else:
588 version = 1
590 if version == 1:
591 return Config(dictionary)
592 else:
593 raise ValueError(f"Unknown version of JSON config file: {version}")
595 def __enter__(self):
596 self.load_config()
597 return self.config
599 def __exit__(self, exc_type, exc_value, traceback):
600 """
601 Exit the context manager but do not handle any errors.
603 In case of any error, we do not want to write the potentially broken
604 configuration to disk.
605 """
606 self.config.version = 1
607 json_config = json.dumps(self.config, indent=2)
608 with open(self.full_filename, "w") as f:
609 f.write(json_config)
612class PyFileConfigLoader(FileConfigLoader):
613 """A config loader for pure python files.
615 This is responsible for locating a Python config file by filename and
616 path, then executing it to construct a Config object.
617 """
619 def load_config(self):
620 """Load the config from a file and return it as a Config object."""
621 self.clear()
622 try:
623 self._find_file()
624 except OSError as e:
625 raise ConfigFileNotFound(str(e)) from e
626 self._read_file_as_dict()
627 return self.config
629 def load_subconfig(self, fname, path=None):
630 """Injected into config file namespace as load_subconfig"""
631 if path is None:
632 path = self.path
634 loader = self.__class__(fname, path)
635 try:
636 sub_config = loader.load_config()
637 except ConfigFileNotFound:
638 # Pass silently if the sub config is not there,
639 # treat it as an empty config file.
640 pass
641 else:
642 self.config.merge(sub_config)
644 def _read_file_as_dict(self):
645 """Load the config file into self.config, with recursive loading."""
647 def get_config():
648 """Unnecessary now, but a deprecation warning is more trouble than it's worth."""
649 return self.config
651 namespace = dict(
652 c=self.config,
653 load_subconfig=self.load_subconfig,
654 get_config=get_config,
655 __file__=self.full_filename,
656 )
657 conf_filename = self.full_filename
658 with open(conf_filename, "rb") as f:
659 exec(compile(f.read(), conf_filename, "exec"), namespace, namespace) # noqa
662class CommandLineConfigLoader(ConfigLoader):
663 """A config loader for command line arguments.
665 As we add more command line based loaders, the common logic should go
666 here.
667 """
669 def _exec_config_str(self, lhs, rhs, trait=None):
670 """execute self.config.<lhs> = <rhs>
672 * expands ~ with expanduser
673 * interprets value with trait if available
674 """
675 value = rhs
676 if isinstance(value, DeferredConfig):
677 if trait:
678 # trait available, reify config immediately
679 value = value.get_value(trait)
680 elif isinstance(rhs, DeferredConfigList) and len(rhs) == 1:
681 # single item, make it a deferred str
682 value = DeferredConfigString(os.path.expanduser(rhs[0]))
683 else:
684 if trait:
685 value = trait.from_string(value)
686 else:
687 value = DeferredConfigString(value)
689 *path, key = lhs.split(".")
690 section = self.config
691 for part in path:
692 section = section[part]
693 section[key] = value
694 return
696 def _load_flag(self, cfg):
697 """update self.config from a flag, which can be a dict or Config"""
698 if isinstance(cfg, (dict, Config)):
699 # don't clobber whole config sections, update
700 # each section from config:
701 for sec, c in cfg.items():
702 self.config[sec].update(c)
703 else:
704 raise TypeError("Invalid flag: %r" % cfg)
707# match --Class.trait keys for argparse
708# matches:
709# --Class.trait
710# --x
711# -x
713class_trait_opt_pattern = re.compile(r"^\-?\-[A-Za-z][\w]*(\.[\w]+)*$")
715_DOT_REPLACEMENT = "__DOT__"
716_DASH_REPLACEMENT = "__DASH__"
719class _KVAction(argparse.Action):
720 """Custom argparse action for handling --Class.trait=x
722 Always
723 """
725 def __call__(self, parser, namespace, values, option_string=None):
726 if isinstance(values, str):
727 values = [values]
728 values = ["-" if v is _DASH_REPLACEMENT else v for v in values]
729 items = getattr(namespace, self.dest, None)
730 if items is None:
731 items = DeferredConfigList()
732 else:
733 items = DeferredConfigList(items)
734 items.extend(values)
735 setattr(namespace, self.dest, items)
738class _DefaultOptionDict(dict): # type:ignore[type-arg]
739 """Like the default options dict
741 but acts as if all --Class.trait options are predefined
742 """
744 def _add_kv_action(self, key):
745 self[key] = _KVAction(
746 option_strings=[key],
747 dest=key.lstrip("-").replace(".", _DOT_REPLACEMENT),
748 # use metavar for display purposes
749 metavar=key.lstrip("-"),
750 )
752 def __contains__(self, key):
753 if "=" in key:
754 return False
755 if super().__contains__(key):
756 return True
758 if key.startswith("-") and class_trait_opt_pattern.match(key):
759 self._add_kv_action(key)
760 return True
761 return False
763 def __getitem__(self, key):
764 if key in self:
765 return super().__getitem__(key)
766 else:
767 raise KeyError(key)
769 def get(self, key, default=None):
770 try:
771 return self[key]
772 except KeyError:
773 return default
776class _KVArgParser(argparse.ArgumentParser):
777 """subclass of ArgumentParser where any --Class.trait option is implicitly defined"""
779 def parse_known_args(self, args=None, namespace=None):
780 # must be done immediately prior to parsing because if we do it in init,
781 # registration of explicit actions via parser.add_option will fail during setup
782 for container in (self, self._optionals):
783 container._option_string_actions = _DefaultOptionDict(container._option_string_actions)
784 return super().parse_known_args(args, namespace)
787# type aliases
788Flags = t.Union[str, t.Tuple[str, ...]]
789SubcommandsDict = t.Dict[str, t.Any]
792class ArgParseConfigLoader(CommandLineConfigLoader):
793 """A loader that uses the argparse module to load from the command line."""
795 parser_class = ArgumentParser
797 def __init__(
798 self,
799 argv: t.Optional[t.List[str]] = None,
800 aliases: t.Optional[t.Dict[Flags, str]] = None,
801 flags: t.Optional[t.Dict[Flags, str]] = None,
802 log: t.Any = None,
803 classes: t.Optional[t.List[t.Type[t.Any]]] = None,
804 subcommands: t.Optional[SubcommandsDict] = None,
805 *parser_args: t.Any,
806 **parser_kw: t.Any,
807 ) -> None:
808 """Create a config loader for use with argparse.
810 Parameters
811 ----------
812 classes : optional, list
813 The classes to scan for *container* config-traits and decide
814 for their "multiplicity" when adding them as *argparse* arguments.
815 argv : optional, list
816 If given, used to read command-line arguments from, otherwise
817 sys.argv[1:] is used.
818 *parser_args : tuple
819 A tuple of positional arguments that will be passed to the
820 constructor of :class:`argparse.ArgumentParser`.
821 **parser_kw : dict
822 A tuple of keyword arguments that will be passed to the
823 constructor of :class:`argparse.ArgumentParser`.
824 aliases : dict of str to str
825 Dict of aliases to full traitlets names for CLI parsing
826 flags : dict of str to str
827 Dict of flags to full traitlets names for CLI parsing
828 log
829 Passed to `ConfigLoader`
831 Returns
832 -------
833 config : Config
834 The resulting Config object.
835 """
836 classes = classes or []
837 super(CommandLineConfigLoader, self).__init__(log=log)
838 self.clear()
839 if argv is None:
840 argv = sys.argv[1:]
841 self.argv = argv
842 self.aliases = aliases or {}
843 self.flags = flags or {}
844 self.classes = classes
845 self.subcommands = subcommands # only used for argcomplete currently
847 self.parser_args = parser_args
848 self.version = parser_kw.pop("version", None)
849 kwargs = dict(argument_default=argparse.SUPPRESS)
850 kwargs.update(parser_kw)
851 self.parser_kw = kwargs
853 def load_config(self, argv=None, aliases=None, flags=_deprecated, classes=None):
854 """Parse command line arguments and return as a Config object.
856 Parameters
857 ----------
858 argv : optional, list
859 If given, a list with the structure of sys.argv[1:] to parse
860 arguments from. If not given, the instance's self.argv attribute
861 (given at construction time) is used.
862 flags
863 Deprecated in traitlets 5.0, instanciate the config loader with the flags.
865 """
867 if flags is not _deprecated:
868 warnings.warn(
869 "The `flag` argument to load_config is deprecated since Traitlets "
870 f"5.0 and will be ignored, pass flags the `{type(self)}` constructor.",
871 DeprecationWarning,
872 stacklevel=2,
873 )
875 self.clear()
876 if argv is None:
877 argv = self.argv
878 if aliases is not None:
879 self.aliases = aliases
880 if classes is not None:
881 self.classes = classes
882 self._create_parser()
883 self._argcomplete(self.classes, self.subcommands)
884 self._parse_args(argv)
885 self._convert_to_config()
886 return self.config
888 def get_extra_args(self):
889 if hasattr(self, "extra_args"):
890 return self.extra_args
891 else:
892 return []
894 def _create_parser(self):
895 self.parser = self.parser_class(
896 *self.parser_args, **self.parser_kw # type:ignore[arg-type]
897 )
898 self._add_arguments(self.aliases, self.flags, self.classes)
900 def _add_arguments(self, aliases, flags, classes):
901 raise NotImplementedError("subclasses must implement _add_arguments")
903 def _argcomplete(
904 self, classes: t.List[t.Any], subcommands: t.Optional[SubcommandsDict]
905 ) -> None:
906 """If argcomplete is enabled, allow triggering command-line autocompletion"""
907 pass
909 def _parse_args(self, args):
910 """self.parser->self.parsed_data"""
911 uargs = [cast_unicode(a) for a in args]
913 unpacked_aliases: t.Dict[str, str] = {}
914 if self.aliases:
915 unpacked_aliases = {}
916 for alias, alias_target in self.aliases.items():
917 if alias in self.flags:
918 continue
919 if not isinstance(alias, tuple):
920 alias = (alias,)
921 for al in alias:
922 if len(al) == 1:
923 unpacked_aliases["-" + al] = "--" + alias_target
924 unpacked_aliases["--" + al] = "--" + alias_target
926 def _replace(arg):
927 if arg == "-":
928 return _DASH_REPLACEMENT
929 for k, v in unpacked_aliases.items():
930 if arg == k:
931 return v
932 if arg.startswith(k + "="):
933 return v + "=" + arg[len(k) + 1 :]
934 return arg
936 if "--" in uargs:
937 idx = uargs.index("--")
938 extra_args = uargs[idx + 1 :]
939 to_parse = uargs[:idx]
940 else:
941 extra_args = []
942 to_parse = uargs
943 to_parse = [_replace(a) for a in to_parse]
945 self.parsed_data = self.parser.parse_args(to_parse)
946 self.extra_args = extra_args
948 def _convert_to_config(self):
949 """self.parsed_data->self.config"""
950 for k, v in vars(self.parsed_data).items():
951 *path, key = k.split(".")
952 section = self.config
953 for p in path:
954 section = section[p]
955 setattr(section, key, v)
958class _FlagAction(argparse.Action):
959 """ArgParse action to handle a flag"""
961 def __init__(self, *args, **kwargs):
962 self.flag = kwargs.pop("flag")
963 self.alias = kwargs.pop("alias", None)
964 kwargs["const"] = Undefined
965 if not self.alias:
966 kwargs["nargs"] = 0
967 super().__init__(*args, **kwargs)
969 def __call__(self, parser, namespace, values, option_string=None):
970 if self.nargs == 0 or values is Undefined:
971 if not hasattr(namespace, "_flags"):
972 namespace._flags = []
973 namespace._flags.append(self.flag)
974 else:
975 setattr(namespace, self.alias, values)
978class KVArgParseConfigLoader(ArgParseConfigLoader):
979 """A config loader that loads aliases and flags with argparse,
981 as well as arbitrary --Class.trait value
982 """
984 parser_class = _KVArgParser # type:ignore[assignment]
986 def _add_arguments(self, aliases, flags, classes):
987 alias_flags: t.Dict[str, t.Any] = {}
988 argparse_kwds: t.Dict[str, t.Any]
989 paa = self.parser.add_argument
990 self.parser.set_defaults(_flags=[])
991 paa("extra_args", nargs="*")
993 # An index of all container traits collected::
994 #
995 # { <traitname>: (<trait>, <argparse-kwds>) }
996 #
997 # Used to add the correct type into the `config` tree.
998 # Used also for aliases, not to re-collect them.
999 self.argparse_traits = argparse_traits = {}
1000 for cls in classes:
1001 for traitname, trait in cls.class_traits(config=True).items():
1002 argname = f"{cls.__name__}.{traitname}"
1003 argparse_kwds = {"type": str}
1004 if isinstance(trait, (Container, Dict)):
1005 multiplicity = trait.metadata.get("multiplicity", "append")
1006 if multiplicity == "append":
1007 argparse_kwds["action"] = multiplicity
1008 else:
1009 argparse_kwds["nargs"] = multiplicity
1010 argparse_traits[argname] = (trait, argparse_kwds)
1012 for keys, (value, fhelp) in flags.items():
1013 if not isinstance(keys, tuple):
1014 keys = (keys,)
1015 for key in keys:
1016 if key in aliases:
1017 alias_flags[aliases[key]] = value
1018 continue
1019 keys = ("-" + key, "--" + key) if len(key) == 1 else ("--" + key,)
1020 paa(*keys, action=_FlagAction, flag=value, help=fhelp)
1022 for keys, traitname in aliases.items():
1023 if not isinstance(keys, tuple):
1024 keys = (keys,)
1026 for key in keys:
1027 argparse_kwds = {
1028 "type": str,
1029 "dest": traitname.replace(".", _DOT_REPLACEMENT),
1030 "metavar": traitname,
1031 }
1032 argcompleter = None
1033 if traitname in argparse_traits:
1034 trait, kwds = argparse_traits[traitname]
1035 argparse_kwds.update(kwds)
1036 if "action" in argparse_kwds and traitname in alias_flags:
1037 # flag sets 'action', so can't have flag & alias with custom action
1038 # on the same name
1039 raise ArgumentError(
1040 "The alias `%s` for the 'append' sequence "
1041 "config-trait `%s` cannot be also a flag!'" % (key, traitname)
1042 )
1043 # For argcomplete, check if any either an argcompleter metadata tag or method
1044 # is available. If so, it should be a callable which takes the command-line key
1045 # string as an argument and other kwargs passed by argcomplete,
1046 # and returns the a list of string completions.
1047 argcompleter = trait.metadata.get("argcompleter") or getattr(
1048 trait, "argcompleter", None
1049 )
1050 if traitname in alias_flags:
1051 # alias and flag.
1052 # when called with 0 args: flag
1053 # when called with >= 1: alias
1054 argparse_kwds.setdefault("nargs", "?")
1055 argparse_kwds["action"] = _FlagAction
1056 argparse_kwds["flag"] = alias_flags[traitname]
1057 argparse_kwds["alias"] = traitname
1058 keys = ("-" + key, "--" + key) if len(key) == 1 else ("--" + key,)
1059 action = paa(*keys, **argparse_kwds)
1060 if argcompleter is not None:
1061 # argcomplete's completers are callables returning list of completion strings
1062 action.completer = functools.partial(argcompleter, key=key) # type: ignore
1064 def _convert_to_config(self):
1065 """self.parsed_data->self.config, parse unrecognized extra args via KVLoader."""
1066 extra_args = self.extra_args
1068 for lhs, rhs in vars(self.parsed_data).items():
1069 if lhs == "extra_args":
1070 self.extra_args = ["-" if a == _DASH_REPLACEMENT else a for a in rhs] + extra_args
1071 continue
1072 elif lhs == "_flags":
1073 # _flags will be handled later
1074 continue
1076 lhs = lhs.replace(_DOT_REPLACEMENT, ".")
1077 if "." not in lhs:
1078 self._handle_unrecognized_alias(lhs)
1079 trait = None
1081 if isinstance(rhs, list):
1082 rhs = DeferredConfigList(rhs)
1083 elif isinstance(rhs, str):
1084 rhs = DeferredConfigString(rhs)
1086 trait = self.argparse_traits.get(lhs)
1087 if trait:
1088 trait = trait[0]
1090 # eval the KV assignment
1091 try:
1092 self._exec_config_str(lhs, rhs, trait)
1093 except Exception as e:
1094 # cast deferred to nicer repr for the error
1095 # DeferredList->list, etc
1096 if isinstance(rhs, DeferredConfig):
1097 rhs = rhs._super_repr()
1098 raise ArgumentError(f"Error loading argument {lhs}={rhs}, {e}") from e
1100 for subc in self.parsed_data._flags:
1101 self._load_flag(subc)
1103 def _handle_unrecognized_alias(self, arg: str) -> None:
1104 """Handling for unrecognized alias arguments
1106 Probably a mistyped alias. By default just log a warning,
1107 but users can override this to raise an error instead, e.g.
1108 self.parser.error("Unrecognized alias: '%s'" % arg)
1109 """
1110 self.log.warning("Unrecognized alias: '%s', it will have no effect.", arg)
1112 def _argcomplete(
1113 self, classes: t.List[t.Any], subcommands: t.Optional[SubcommandsDict]
1114 ) -> None:
1115 """If argcomplete is enabled, allow triggering command-line autocompletion"""
1116 try:
1117 import argcomplete # type: ignore[import] # noqa
1118 except ImportError:
1119 return
1121 from . import argcomplete_config
1123 finder = argcomplete_config.ExtendedCompletionFinder()
1124 finder.config_classes = classes
1125 finder.subcommands = list(subcommands or [])
1126 # for ease of testing, pass through self._argcomplete_kwargs if set
1127 finder(self.parser, **getattr(self, "_argcomplete_kwargs", {}))
1130class KeyValueConfigLoader(KVArgParseConfigLoader):
1131 """Deprecated in traitlets 5.0
1133 Use KVArgParseConfigLoader
1134 """
1136 def __init__(self, *args, **kwargs):
1137 warnings.warn(
1138 "KeyValueConfigLoader is deprecated since Traitlets 5.0."
1139 " Use KVArgParseConfigLoader instead.",
1140 DeprecationWarning,
1141 stacklevel=2,
1142 )
1143 super().__init__(*args, **kwargs)
1146def load_pyconfig_files(config_files, path):
1147 """Load multiple Python config files, merging each of them in turn.
1149 Parameters
1150 ----------
1151 config_files : list of str
1152 List of config files names to load and merge into the config.
1153 path : unicode
1154 The full path to the location of the config files.
1155 """
1156 config = Config()
1157 for cf in config_files:
1158 loader = PyFileConfigLoader(cf, path=path)
1159 try:
1160 next_config = loader.load_config()
1161 except ConfigFileNotFound:
1162 pass
1163 except Exception:
1164 raise
1165 else:
1166 config.merge(next_config)
1167 return config