Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/traitlets/config/loader.py: 24%
571 statements
« prev ^ index » next coverage.py v7.3.3, created at 2023-12-15 06:13 +0000
« prev ^ index » next coverage.py v7.3.3, created at 2023-12-15 06:13 +0000
1"""A simple configuration system."""
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
7import argparse
8import copy
9import functools
10import json
11import os
12import re
13import sys
14import typing as t
15from logging import Logger
17from traitlets.traitlets import Any, Container, Dict, HasTraits, List, TraitType, Undefined
19from ..utils import cast_unicode, filefind, warnings
21# -----------------------------------------------------------------------------
22# Exceptions
23# -----------------------------------------------------------------------------
26class ConfigError(Exception):
27 pass
30class ConfigLoaderError(ConfigError):
31 pass
34class ConfigFileNotFound(ConfigError): # noqa
35 pass
38class ArgumentError(ConfigLoaderError):
39 pass
42# -----------------------------------------------------------------------------
43# Argparse fix
44# -----------------------------------------------------------------------------
46# Unfortunately argparse by default prints help messages to stderr instead of
47# stdout. This makes it annoying to capture long help screens at the command
48# line, since one must know how to pipe stderr, which many users don't know how
49# to do. So we override the print_help method with one that defaults to
50# stdout and use our class instead.
53class _Sentinel:
54 def __repr__(self) -> str:
55 return "<Sentinel deprecated>"
57 def __str__(self) -> str:
58 return "<deprecated>"
61_deprecated = _Sentinel()
64class ArgumentParser(argparse.ArgumentParser):
65 """Simple argparse subclass that prints help to stdout by default."""
67 def print_help(self, file: t.Any = None) -> None:
68 if file is None:
69 file = sys.stdout
70 return super().print_help(file)
72 print_help.__doc__ = argparse.ArgumentParser.print_help.__doc__
75# -----------------------------------------------------------------------------
76# Config class for holding config information
77# -----------------------------------------------------------------------------
80def execfile(fname: str, glob: dict[str, Any]) -> None:
81 with open(fname, "rb") as f:
82 exec(compile(f.read(), fname, "exec"), glob, glob) # noqa
85class LazyConfigValue(HasTraits):
86 """Proxy object for exposing methods on configurable containers
88 These methods allow appending/extending/updating
89 to add to non-empty defaults instead of clobbering them.
91 Exposes:
93 - append, extend, insert on lists
94 - update on dicts
95 - update, add on sets
96 """
98 _value = None
100 # list methods
101 _extend: List[t.Any] = List()
102 _prepend: List[t.Any] = List()
103 _inserts: List[t.Any] = List()
105 def append(self, obj: t.Any) -> None:
106 """Append an item to a List"""
107 self._extend.append(obj)
109 def extend(self, other: t.Any) -> None:
110 """Extend a list"""
111 self._extend.extend(other)
113 def prepend(self, other: t.Any) -> None:
114 """like list.extend, but for the front"""
115 self._prepend[:0] = other
117 def merge_into(self, other: t.Any) -> t.Any:
118 """
119 Merge with another earlier LazyConfigValue or an earlier container.
120 This is useful when having global system-wide configuration files.
122 Self is expected to have higher precedence.
124 Parameters
125 ----------
126 other : LazyConfigValue or container
128 Returns
129 -------
130 LazyConfigValue
131 if ``other`` is also lazy, a reified container otherwise.
132 """
133 if isinstance(other, LazyConfigValue):
134 other._extend.extend(self._extend)
135 self._extend = other._extend
137 self._prepend.extend(other._prepend)
139 other._inserts.extend(self._inserts)
140 self._inserts = other._inserts
142 if self._update:
143 other.update(self._update)
144 self._update = other._update
145 return self
146 else:
147 # other is a container, reify now.
148 return self.get_value(other)
150 def insert(self, index: int, other: t.Any) -> None:
151 if not isinstance(index, int):
152 raise TypeError("An integer is required")
153 self._inserts.append((index, other))
155 # dict methods
156 # update is used for both dict and set
157 _update = Any()
159 def update(self, other: t.Any) -> None:
160 """Update either a set or dict"""
161 if self._update is None:
162 if isinstance(other, dict):
163 self._update = {}
164 else:
165 self._update = set()
166 self._update.update(other)
168 # set methods
169 def add(self, obj: t.Any) -> None:
170 """Add an item to a set"""
171 self.update({obj})
173 def get_value(self, initial: t.Any) -> t.Any:
174 """construct the value from the initial one
176 after applying any insert / extend / update changes
177 """
178 if self._value is not None:
179 return self._value # type:ignore[unreachable]
180 value = copy.deepcopy(initial)
181 if isinstance(value, list):
182 for idx, obj in self._inserts:
183 value.insert(idx, obj)
184 value[:0] = self._prepend
185 value.extend(self._extend)
187 elif isinstance(value, dict):
188 if self._update:
189 value.update(self._update)
190 elif isinstance(value, set):
191 if self._update:
192 value.update(self._update)
193 self._value = value
194 return value
196 def to_dict(self) -> dict[str, t.Any]:
197 """return JSONable dict form of my data
199 Currently update as dict or set, extend, prepend as lists, and inserts as list of tuples.
200 """
201 d = {}
202 if self._update:
203 d["update"] = self._update
204 if self._extend:
205 d["extend"] = self._extend
206 if self._prepend:
207 d["prepend"] = self._prepend
208 elif self._inserts:
209 d["inserts"] = self._inserts
210 return d
212 def __repr__(self) -> str:
213 if self._value is not None:
214 return f"<{self.__class__.__name__} value={self._value!r}>"
215 else:
216 return f"<{self.__class__.__name__} {self.to_dict()!r}>"
219def _is_section_key(key: str) -> bool:
220 """Is a Config key a section name (does it start with a capital)?"""
221 if key and key[0].upper() == key[0] and not key.startswith("_"):
222 return True
223 else:
224 return False
227class Config(dict): # type:ignore[type-arg]
228 """An attribute-based dict that can do smart merges.
230 Accessing a field on a config object for the first time populates the key
231 with either a nested Config object for keys starting with capitals
232 or :class:`.LazyConfigValue` for lowercase keys,
233 allowing quick assignments such as::
235 c = Config()
236 c.Class.int_trait = 5
237 c.Class.list_trait.append("x")
239 """
241 def __init__(self, *args: t.Any, **kwds: t.Any) -> None:
242 dict.__init__(self, *args, **kwds)
243 self._ensure_subconfig()
245 def _ensure_subconfig(self) -> None:
246 """ensure that sub-dicts that should be Config objects are
248 casts dicts that are under section keys to Config objects,
249 which is necessary for constructing Config objects from dict literals.
250 """
251 for key in self:
252 obj = self[key]
253 if _is_section_key(key) and isinstance(obj, dict) and not isinstance(obj, Config):
254 setattr(self, key, Config(obj))
256 def _merge(self, other: t.Any) -> None:
257 """deprecated alias, use Config.merge()"""
258 self.merge(other)
260 def merge(self, other: t.Any) -> None:
261 """merge another config object into this one"""
262 to_update = {}
263 for k, v in other.items():
264 if k not in self:
265 to_update[k] = v
266 else: # I have this key
267 if isinstance(v, Config) and isinstance(self[k], Config):
268 # Recursively merge common sub Configs
269 self[k].merge(v)
270 elif isinstance(v, LazyConfigValue):
271 self[k] = v.merge_into(self[k])
272 else:
273 # Plain updates for non-Configs
274 to_update[k] = v
276 self.update(to_update)
278 def collisions(self, other: Config) -> dict[str, t.Any]:
279 """Check for collisions between two config objects.
281 Returns a dict of the form {"Class": {"trait": "collision message"}}`,
282 indicating which values have been ignored.
284 An empty dict indicates no collisions.
285 """
286 collisions: dict[str, t.Any] = {}
287 for section in self:
288 if section not in other:
289 continue
290 mine = self[section]
291 theirs = other[section]
292 for key in mine:
293 if key in theirs and mine[key] != theirs[key]:
294 collisions.setdefault(section, {})
295 collisions[section][key] = f"{mine[key]!r} ignored, using {theirs[key]!r}"
296 return collisions
298 def __contains__(self, key: t.Any) -> bool:
299 # allow nested contains of the form `"Section.key" in config`
300 if "." in key:
301 first, remainder = key.split(".", 1)
302 if first not in self:
303 return False
304 return remainder in self[first]
306 return super().__contains__(key)
308 # .has_key is deprecated for dictionaries.
309 has_key = __contains__
311 def _has_section(self, key: str) -> bool:
312 return _is_section_key(key) and key in self
314 def copy(self) -> dict[str, t.Any]:
315 return type(self)(dict.copy(self))
317 def __copy__(self) -> dict[str, t.Any]:
318 return self.copy()
320 def __deepcopy__(self, memo: t.Any) -> Config:
321 new_config = type(self)()
322 for key, value in self.items():
323 if isinstance(value, (Config, LazyConfigValue)):
324 # deep copy config objects
325 value = copy.deepcopy(value, memo)
326 elif type(value) in {dict, list, set, tuple}:
327 # shallow copy plain container traits
328 value = copy.copy(value)
329 new_config[key] = value
330 return new_config
332 def __getitem__(self, key: str) -> t.Any:
333 try:
334 return dict.__getitem__(self, key)
335 except KeyError:
336 if _is_section_key(key):
337 c = Config()
338 dict.__setitem__(self, key, c)
339 return c
340 elif not key.startswith("_"):
341 # undefined, create lazy value, used for container methods
342 v = LazyConfigValue()
343 dict.__setitem__(self, key, v)
344 return v
345 else:
346 raise
348 def __setitem__(self, key: str, value: t.Any) -> None:
349 if _is_section_key(key):
350 if not isinstance(value, Config):
351 raise ValueError(
352 "values whose keys begin with an uppercase "
353 f"char must be Config instances: {key!r}, {value!r}"
354 )
355 dict.__setitem__(self, key, value)
357 def __getattr__(self, key: str) -> t.Any:
358 if key.startswith("__"):
359 return dict.__getattr__(self, key) # type:ignore[attr-defined]
360 try:
361 return self.__getitem__(key)
362 except KeyError as e:
363 raise AttributeError(e) from e
365 def __setattr__(self, key: str, value: t.Any) -> None:
366 if key.startswith("__"):
367 return dict.__setattr__(self, key, value)
368 try:
369 self.__setitem__(key, value)
370 except KeyError as e:
371 raise AttributeError(e) from e
373 def __delattr__(self, key: str) -> None:
374 if key.startswith("__"):
375 return dict.__delattr__(self, key)
376 try:
377 dict.__delitem__(self, key)
378 except KeyError as e:
379 raise AttributeError(e) from e
382class DeferredConfig:
383 """Class for deferred-evaluation of config from CLI"""
385 pass
387 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any:
388 raise NotImplementedError("Implement in subclasses")
390 def _super_repr(self) -> str:
391 # explicitly call super on direct parent
392 return super(self.__class__, self).__repr__()
395class DeferredConfigString(str, DeferredConfig):
396 """Config value for loading config from a string
398 Interpretation is deferred until it is loaded into the trait.
400 Subclass of str for backward compatibility.
402 This class is only used for values that are not listed
403 in the configurable classes.
405 When config is loaded, `trait.from_string` will be used.
407 If an error is raised in `.from_string`,
408 the original string is returned.
410 .. versionadded:: 5.0
411 """
413 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any:
414 """Get the value stored in this string"""
415 s = str(self)
416 try:
417 return trait.from_string(s)
418 except Exception:
419 # exception casting from string,
420 # let the original string lie.
421 # this will raise a more informative error when config is loaded.
422 return s
424 def __repr__(self) -> str:
425 return f"{self.__class__.__name__}({self._super_repr()})"
428class DeferredConfigList(t.List[t.Any], DeferredConfig):
429 """Config value for loading config from a list of strings
431 Interpretation is deferred until it is loaded into the trait.
433 This class is only used for values that are not listed
434 in the configurable classes.
436 When config is loaded, `trait.from_string_list` will be used.
438 If an error is raised in `.from_string_list`,
439 the original string list is returned.
441 .. versionadded:: 5.0
442 """
444 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any:
445 """Get the value stored in this string"""
446 if hasattr(trait, "from_string_list"):
447 src = list(self)
448 cast = trait.from_string_list
449 else:
450 # only allow one item
451 if len(self) > 1:
452 raise ValueError(
453 f"{trait.name} only accepts one value, got {len(self)}: {list(self)}"
454 )
455 src = self[0]
456 cast = trait.from_string
458 try:
459 return cast(src)
460 except Exception:
461 # exception casting from string,
462 # let the original value lie.
463 # this will raise a more informative error when config is loaded.
464 return src
466 def __repr__(self) -> str:
467 return f"{self.__class__.__name__}({self._super_repr()})"
470# -----------------------------------------------------------------------------
471# Config loading classes
472# -----------------------------------------------------------------------------
475class ConfigLoader:
476 """A object for loading configurations from just about anywhere.
478 The resulting configuration is packaged as a :class:`Config`.
480 Notes
481 -----
482 A :class:`ConfigLoader` does one thing: load a config from a source
483 (file, command line arguments) and returns the data as a :class:`Config` object.
484 There are lots of things that :class:`ConfigLoader` does not do. It does
485 not implement complex logic for finding config files. It does not handle
486 default values or merge multiple configs. These things need to be
487 handled elsewhere.
488 """
490 def _log_default(self) -> Logger:
491 from traitlets.log import get_logger
493 return t.cast(Logger, get_logger())
495 def __init__(self, log: Logger | None = None) -> None:
496 """A base class for config loaders.
498 log : instance of :class:`logging.Logger` to use.
499 By default logger of :meth:`traitlets.config.application.Application.instance()`
500 will be used
502 Examples
503 --------
504 >>> cl = ConfigLoader()
505 >>> config = cl.load_config()
506 >>> config
507 {}
508 """
509 self.clear()
510 if log is None:
511 self.log = self._log_default()
512 self.log.debug("Using default logger")
513 else:
514 self.log = log
516 def clear(self) -> None:
517 self.config = Config()
519 def load_config(self) -> Config:
520 """Load a config from somewhere, return a :class:`Config` instance.
522 Usually, this will cause self.config to be set and then returned.
523 However, in most cases, :meth:`ConfigLoader.clear` should be called
524 to erase any previous state.
525 """
526 self.clear()
527 return self.config
530class FileConfigLoader(ConfigLoader):
531 """A base class for file based configurations.
533 As we add more file based config loaders, the common logic should go
534 here.
535 """
537 def __init__(self, filename: str, path: str | None = None, **kw: t.Any) -> None:
538 """Build a config loader for a filename and path.
540 Parameters
541 ----------
542 filename : str
543 The file name of the config file.
544 path : str, list, tuple
545 The path to search for the config file on, or a sequence of
546 paths to try in order.
547 """
548 super().__init__(**kw)
549 self.filename = filename
550 self.path = path
551 self.full_filename = ""
553 def _find_file(self) -> None:
554 """Try to find the file by searching the paths."""
555 self.full_filename = filefind(self.filename, self.path)
558class JSONFileConfigLoader(FileConfigLoader):
559 """A JSON file loader for config
561 Can also act as a context manager that rewrite the configuration file to disk on exit.
563 Example::
565 with JSONFileConfigLoader('myapp.json','/home/jupyter/configurations/') as c:
566 c.MyNewConfigurable.new_value = 'Updated'
568 """
570 def load_config(self) -> Config:
571 """Load the config from a file and return it as a Config object."""
572 self.clear()
573 try:
574 self._find_file()
575 except OSError as e:
576 raise ConfigFileNotFound(str(e)) from e
577 dct = self._read_file_as_dict()
578 self.config = self._convert_to_config(dct)
579 return self.config
581 def _read_file_as_dict(self) -> dict[str, t.Any]:
582 with open(self.full_filename) as f:
583 return t.cast("dict[str, t.Any]", json.load(f))
585 def _convert_to_config(self, dictionary: dict[str, t.Any]) -> Config:
586 if "version" in dictionary:
587 version = dictionary.pop("version")
588 else:
589 version = 1
591 if version == 1:
592 return Config(dictionary)
593 else:
594 raise ValueError(f"Unknown version of JSON config file: {version}")
596 def __enter__(self) -> Config:
597 self.load_config()
598 return self.config
600 def __exit__(self, exc_type: t.Any, exc_value: t.Any, traceback: t.Any) -> None:
601 """
602 Exit the context manager but do not handle any errors.
604 In case of any error, we do not want to write the potentially broken
605 configuration to disk.
606 """
607 self.config.version = 1
608 json_config = json.dumps(self.config, indent=2)
609 with open(self.full_filename, "w") as f:
610 f.write(json_config)
613class PyFileConfigLoader(FileConfigLoader):
614 """A config loader for pure python files.
616 This is responsible for locating a Python config file by filename and
617 path, then executing it to construct a Config object.
618 """
620 def load_config(self) -> Config:
621 """Load the config from a file and return it as a Config object."""
622 self.clear()
623 try:
624 self._find_file()
625 except OSError as e:
626 raise ConfigFileNotFound(str(e)) from e
627 self._read_file_as_dict()
628 return self.config
630 def load_subconfig(self, fname: str, path: str | None = None) -> None:
631 """Injected into config file namespace as load_subconfig"""
632 if path is None:
633 path = self.path
635 loader = self.__class__(fname, path)
636 try:
637 sub_config = loader.load_config()
638 except ConfigFileNotFound:
639 # Pass silently if the sub config is not there,
640 # treat it as an empty config file.
641 pass
642 else:
643 self.config.merge(sub_config)
645 def _read_file_as_dict(self) -> None:
646 """Load the config file into self.config, with recursive loading."""
648 def get_config() -> Config:
649 """Unnecessary now, but a deprecation warning is more trouble than it's worth."""
650 return self.config
652 namespace = dict(
653 c=self.config,
654 load_subconfig=self.load_subconfig,
655 get_config=get_config,
656 __file__=self.full_filename,
657 )
658 conf_filename = self.full_filename
659 with open(conf_filename, "rb") as f:
660 exec(compile(f.read(), conf_filename, "exec"), namespace, namespace) # noqa
663class CommandLineConfigLoader(ConfigLoader):
664 """A config loader for command line arguments.
666 As we add more command line based loaders, the common logic should go
667 here.
668 """
670 def _exec_config_str(
671 self, lhs: t.Any, rhs: t.Any, trait: TraitType[t.Any, t.Any] | None = None
672 ) -> None:
673 """execute self.config.<lhs> = <rhs>
675 * expands ~ with expanduser
676 * interprets value with trait if available
677 """
678 value = rhs
679 if isinstance(value, DeferredConfig):
680 if trait:
681 # trait available, reify config immediately
682 value = value.get_value(trait)
683 elif isinstance(rhs, DeferredConfigList) and len(rhs) == 1:
684 # single item, make it a deferred str
685 value = DeferredConfigString(os.path.expanduser(rhs[0]))
686 else:
687 if trait:
688 value = trait.from_string(value)
689 else:
690 value = DeferredConfigString(value)
692 *path, key = lhs.split(".")
693 section = self.config
694 for part in path:
695 section = section[part]
696 section[key] = value
697 return
699 def _load_flag(self, cfg: t.Any) -> None:
700 """update self.config from a flag, which can be a dict or Config"""
701 if isinstance(cfg, (dict, Config)):
702 # don't clobber whole config sections, update
703 # each section from config:
704 for sec, c in cfg.items():
705 self.config[sec].update(c)
706 else:
707 raise TypeError("Invalid flag: %r" % cfg)
710# match --Class.trait keys for argparse
711# matches:
712# --Class.trait
713# --x
714# -x
716class_trait_opt_pattern = re.compile(r"^\-?\-[A-Za-z][\w]*(\.[\w]+)*$")
718_DOT_REPLACEMENT = "__DOT__"
719_DASH_REPLACEMENT = "__DASH__"
722class _KVAction(argparse.Action):
723 """Custom argparse action for handling --Class.trait=x
725 Always
726 """
728 def __call__( # type:ignore[override]
729 self,
730 parser: argparse.ArgumentParser,
731 namespace: dict[str, t.Any],
732 values: t.Sequence[t.Any],
733 option_string: str | None = None,
734 ) -> None:
735 if isinstance(values, str):
736 values = [values]
737 values = ["-" if v is _DASH_REPLACEMENT else v for v in values]
738 items = getattr(namespace, self.dest, None)
739 if items is None:
740 items = DeferredConfigList()
741 else:
742 items = DeferredConfigList(items)
743 items.extend(values)
744 setattr(namespace, self.dest, items)
747class _DefaultOptionDict(dict): # type:ignore[type-arg]
748 """Like the default options dict
750 but acts as if all --Class.trait options are predefined
751 """
753 def _add_kv_action(self, key: str) -> None:
754 self[key] = _KVAction(
755 option_strings=[key],
756 dest=key.lstrip("-").replace(".", _DOT_REPLACEMENT),
757 # use metavar for display purposes
758 metavar=key.lstrip("-"),
759 )
761 def __contains__(self, key: t.Any) -> bool:
762 if "=" in key:
763 return False
764 if super().__contains__(key):
765 return True
767 if key.startswith("-") and class_trait_opt_pattern.match(key):
768 self._add_kv_action(key)
769 return True
770 return False
772 def __getitem__(self, key: str) -> t.Any:
773 if key in self:
774 return super().__getitem__(key)
775 else:
776 raise KeyError(key)
778 def get(self, key: str, default: t.Any = None) -> t.Any:
779 try:
780 return self[key]
781 except KeyError:
782 return default
785class _KVArgParser(argparse.ArgumentParser):
786 """subclass of ArgumentParser where any --Class.trait option is implicitly defined"""
788 def parse_known_args( # type:ignore[override]
789 self, args: t.Sequence[str] | None = None, namespace: argparse.Namespace | None = None
790 ) -> tuple[argparse.Namespace | None, list[str]]:
791 # must be done immediately prior to parsing because if we do it in init,
792 # registration of explicit actions via parser.add_option will fail during setup
793 for container in (self, self._optionals):
794 container._option_string_actions = _DefaultOptionDict(container._option_string_actions)
795 return super().parse_known_args(args, namespace)
798# type aliases
799SubcommandsDict = t.Dict[str, t.Any]
802class ArgParseConfigLoader(CommandLineConfigLoader):
803 """A loader that uses the argparse module to load from the command line."""
805 parser_class = ArgumentParser
807 def __init__(
808 self,
809 argv: list[str] | None = None,
810 aliases: dict[str, str] | None = None,
811 flags: dict[str, str] | None = None,
812 log: t.Any = None,
813 classes: list[type[t.Any]] | None = None,
814 subcommands: SubcommandsDict | None = None,
815 *parser_args: t.Any,
816 **parser_kw: t.Any,
817 ) -> None:
818 """Create a config loader for use with argparse.
820 Parameters
821 ----------
822 classes : optional, list
823 The classes to scan for *container* config-traits and decide
824 for their "multiplicity" when adding them as *argparse* arguments.
825 argv : optional, list
826 If given, used to read command-line arguments from, otherwise
827 sys.argv[1:] is used.
828 *parser_args : tuple
829 A tuple of positional arguments that will be passed to the
830 constructor of :class:`argparse.ArgumentParser`.
831 **parser_kw : dict
832 A tuple of keyword arguments that will be passed to the
833 constructor of :class:`argparse.ArgumentParser`.
834 aliases : dict of str to str
835 Dict of aliases to full traitlets names for CLI parsing
836 flags : dict of str to str
837 Dict of flags to full traitlets names for CLI parsing
838 log
839 Passed to `ConfigLoader`
841 Returns
842 -------
843 config : Config
844 The resulting Config object.
845 """
846 classes = classes or []
847 super(CommandLineConfigLoader, self).__init__(log=log)
848 self.clear()
849 if argv is None:
850 argv = sys.argv[1:]
851 self.argv = argv
852 self.aliases = aliases or {}
853 self.flags = flags or {}
854 self.classes = classes
855 self.subcommands = subcommands # only used for argcomplete currently
857 self.parser_args = parser_args
858 self.version = parser_kw.pop("version", None)
859 kwargs = dict(argument_default=argparse.SUPPRESS)
860 kwargs.update(parser_kw)
861 self.parser_kw = kwargs
863 def load_config(
864 self,
865 argv: list[str] | None = None,
866 aliases: t.Any = None,
867 flags: t.Any = _deprecated,
868 classes: t.Any = None,
869 ) -> Config:
870 """Parse command line arguments and return as a Config object.
872 Parameters
873 ----------
874 argv : optional, list
875 If given, a list with the structure of sys.argv[1:] to parse
876 arguments from. If not given, the instance's self.argv attribute
877 (given at construction time) is used.
878 flags
879 Deprecated in traitlets 5.0, instantiate the config loader with the flags.
881 """
883 if flags is not _deprecated:
884 warnings.warn(
885 "The `flag` argument to load_config is deprecated since Traitlets "
886 f"5.0 and will be ignored, pass flags the `{type(self)}` constructor.",
887 DeprecationWarning,
888 stacklevel=2,
889 )
891 self.clear()
892 if argv is None:
893 argv = self.argv
894 if aliases is not None:
895 self.aliases = aliases
896 if classes is not None:
897 self.classes = classes
898 self._create_parser()
899 self._argcomplete(self.classes, self.subcommands)
900 self._parse_args(argv)
901 self._convert_to_config()
902 return self.config
904 def get_extra_args(self) -> list[str]:
905 if hasattr(self, "extra_args"):
906 return self.extra_args
907 else:
908 return []
910 def _create_parser(self) -> None:
911 self.parser = self.parser_class(
912 *self.parser_args,
913 **self.parser_kw, # type:ignore[arg-type]
914 )
915 self._add_arguments(self.aliases, self.flags, self.classes)
917 def _add_arguments(self, aliases: t.Any, flags: t.Any, classes: t.Any) -> None:
918 raise NotImplementedError("subclasses must implement _add_arguments")
920 def _argcomplete(self, classes: list[t.Any], subcommands: SubcommandsDict | None) -> None:
921 """If argcomplete is enabled, allow triggering command-line autocompletion"""
922 pass
924 def _parse_args(self, args: t.Any) -> t.Any:
925 """self.parser->self.parsed_data"""
926 uargs = [cast_unicode(a) for a in args]
928 unpacked_aliases: dict[str, str] = {}
929 if self.aliases:
930 unpacked_aliases = {}
931 for alias, alias_target in self.aliases.items():
932 if alias in self.flags:
933 continue
934 if not isinstance(alias, tuple): # type:ignore[unreachable]
935 alias = (alias,) # type:ignore[assignment]
936 for al in alias:
937 if len(al) == 1:
938 unpacked_aliases["-" + al] = "--" + alias_target
939 unpacked_aliases["--" + al] = "--" + alias_target
941 def _replace(arg: str) -> str:
942 if arg == "-":
943 return _DASH_REPLACEMENT
944 for k, v in unpacked_aliases.items():
945 if arg == k:
946 return v
947 if arg.startswith(k + "="):
948 return v + "=" + arg[len(k) + 1 :]
949 return arg
951 if "--" in uargs:
952 idx = uargs.index("--")
953 extra_args = uargs[idx + 1 :]
954 to_parse = uargs[:idx]
955 else:
956 extra_args = []
957 to_parse = uargs
958 to_parse = [_replace(a) for a in to_parse]
960 self.parsed_data = self.parser.parse_args(to_parse)
961 self.extra_args = extra_args
963 def _convert_to_config(self) -> None:
964 """self.parsed_data->self.config"""
965 for k, v in vars(self.parsed_data).items():
966 *path, key = k.split(".")
967 section = self.config
968 for p in path:
969 section = section[p]
970 setattr(section, key, v)
973class _FlagAction(argparse.Action):
974 """ArgParse action to handle a flag"""
976 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
977 self.flag = kwargs.pop("flag")
978 self.alias = kwargs.pop("alias", None)
979 kwargs["const"] = Undefined
980 if not self.alias:
981 kwargs["nargs"] = 0
982 super().__init__(*args, **kwargs)
984 def __call__(
985 self, parser: t.Any, namespace: t.Any, values: t.Any, option_string: str | None = None
986 ) -> None:
987 if self.nargs == 0 or values is Undefined:
988 if not hasattr(namespace, "_flags"):
989 namespace._flags = []
990 namespace._flags.append(self.flag)
991 else:
992 setattr(namespace, self.alias, values)
995class KVArgParseConfigLoader(ArgParseConfigLoader):
996 """A config loader that loads aliases and flags with argparse,
998 as well as arbitrary --Class.trait value
999 """
1001 parser_class = _KVArgParser # type:ignore[assignment]
1003 def _add_arguments(self, aliases: t.Any, flags: t.Any, classes: t.Any) -> None:
1004 alias_flags: dict[str, t.Any] = {}
1005 argparse_kwds: dict[str, t.Any]
1006 argparse_traits: dict[str, t.Any]
1007 paa = self.parser.add_argument
1008 self.parser.set_defaults(_flags=[])
1009 paa("extra_args", nargs="*")
1011 # An index of all container traits collected::
1012 #
1013 # { <traitname>: (<trait>, <argparse-kwds>) }
1014 #
1015 # Used to add the correct type into the `config` tree.
1016 # Used also for aliases, not to re-collect them.
1017 self.argparse_traits = argparse_traits = {}
1018 for cls in classes:
1019 for traitname, trait in cls.class_traits(config=True).items():
1020 argname = f"{cls.__name__}.{traitname}"
1021 argparse_kwds = {"type": str}
1022 if isinstance(trait, (Container, Dict)):
1023 multiplicity = trait.metadata.get("multiplicity", "append")
1024 if multiplicity == "append":
1025 argparse_kwds["action"] = multiplicity
1026 else:
1027 argparse_kwds["nargs"] = multiplicity
1028 argparse_traits[argname] = (trait, argparse_kwds)
1030 for keys, (value, fhelp) in flags.items():
1031 if not isinstance(keys, tuple):
1032 keys = (keys,)
1033 for key in keys:
1034 if key in aliases:
1035 alias_flags[aliases[key]] = value
1036 continue
1037 keys = ("-" + key, "--" + key) if len(key) == 1 else ("--" + key,)
1038 paa(*keys, action=_FlagAction, flag=value, help=fhelp)
1040 for keys, traitname in aliases.items():
1041 if not isinstance(keys, tuple):
1042 keys = (keys,)
1044 for key in keys:
1045 argparse_kwds = {
1046 "type": str,
1047 "dest": traitname.replace(".", _DOT_REPLACEMENT),
1048 "metavar": traitname,
1049 }
1050 argcompleter = None
1051 if traitname in argparse_traits:
1052 trait, kwds = argparse_traits[traitname]
1053 argparse_kwds.update(kwds)
1054 if "action" in argparse_kwds and traitname in alias_flags:
1055 # flag sets 'action', so can't have flag & alias with custom action
1056 # on the same name
1057 raise ArgumentError(
1058 f"The alias `{key}` for the 'append' sequence "
1059 f"config-trait `{traitname}` cannot be also a flag!'"
1060 )
1061 # For argcomplete, check if any either an argcompleter metadata tag or method
1062 # is available. If so, it should be a callable which takes the command-line key
1063 # string as an argument and other kwargs passed by argcomplete,
1064 # and returns the a list of string completions.
1065 argcompleter = trait.metadata.get("argcompleter") or getattr(
1066 trait, "argcompleter", None
1067 )
1068 if traitname in alias_flags:
1069 # alias and flag.
1070 # when called with 0 args: flag
1071 # when called with >= 1: alias
1072 argparse_kwds.setdefault("nargs", "?")
1073 argparse_kwds["action"] = _FlagAction
1074 argparse_kwds["flag"] = alias_flags[traitname]
1075 argparse_kwds["alias"] = traitname
1076 keys = ("-" + key, "--" + key) if len(key) == 1 else ("--" + key,)
1077 action = paa(*keys, **argparse_kwds)
1078 if argcompleter is not None:
1079 # argcomplete's completers are callables returning list of completion strings
1080 action.completer = functools.partial( # type:ignore[attr-defined]
1081 argcompleter, key=key
1082 )
1084 def _convert_to_config(self) -> None:
1085 """self.parsed_data->self.config, parse unrecognized extra args via KVLoader."""
1086 extra_args = self.extra_args
1088 for lhs, rhs in vars(self.parsed_data).items():
1089 if lhs == "extra_args":
1090 self.extra_args = ["-" if a == _DASH_REPLACEMENT else a for a in rhs] + extra_args
1091 continue
1092 elif lhs == "_flags":
1093 # _flags will be handled later
1094 continue
1096 lhs = lhs.replace(_DOT_REPLACEMENT, ".")
1097 if "." not in lhs:
1098 self._handle_unrecognized_alias(lhs)
1099 trait = None
1101 if isinstance(rhs, list):
1102 rhs = DeferredConfigList(rhs)
1103 elif isinstance(rhs, str):
1104 rhs = DeferredConfigString(rhs)
1106 trait = self.argparse_traits.get(lhs)
1107 if trait:
1108 trait = trait[0]
1110 # eval the KV assignment
1111 try:
1112 self._exec_config_str(lhs, rhs, trait)
1113 except Exception as e:
1114 # cast deferred to nicer repr for the error
1115 # DeferredList->list, etc
1116 if isinstance(rhs, DeferredConfig):
1117 rhs = rhs._super_repr()
1118 raise ArgumentError(f"Error loading argument {lhs}={rhs}, {e}") from e
1120 for subc in self.parsed_data._flags:
1121 self._load_flag(subc)
1123 def _handle_unrecognized_alias(self, arg: str) -> None:
1124 """Handling for unrecognized alias arguments
1126 Probably a mistyped alias. By default just log a warning,
1127 but users can override this to raise an error instead, e.g.
1128 self.parser.error("Unrecognized alias: '%s'" % arg)
1129 """
1130 self.log.warning("Unrecognized alias: '%s', it will have no effect.", arg)
1132 def _argcomplete(self, classes: list[t.Any], subcommands: SubcommandsDict | None) -> None:
1133 """If argcomplete is enabled, allow triggering command-line autocompletion"""
1134 try:
1135 import argcomplete # noqa
1136 except ImportError:
1137 return
1139 from . import argcomplete_config
1141 finder = argcomplete_config.ExtendedCompletionFinder() # type:ignore[no-untyped-call]
1142 finder.config_classes = classes
1143 finder.subcommands = list(subcommands or [])
1144 # for ease of testing, pass through self._argcomplete_kwargs if set
1145 finder(self.parser, **getattr(self, "_argcomplete_kwargs", {}))
1148class KeyValueConfigLoader(KVArgParseConfigLoader):
1149 """Deprecated in traitlets 5.0
1151 Use KVArgParseConfigLoader
1152 """
1154 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
1155 warnings.warn(
1156 "KeyValueConfigLoader is deprecated since Traitlets 5.0."
1157 " Use KVArgParseConfigLoader instead.",
1158 DeprecationWarning,
1159 stacklevel=2,
1160 )
1161 super().__init__(*args, **kwargs)
1164def load_pyconfig_files(config_files: list[str], path: str) -> Config:
1165 """Load multiple Python config files, merging each of them in turn.
1167 Parameters
1168 ----------
1169 config_files : list of str
1170 List of config files names to load and merge into the config.
1171 path : unicode
1172 The full path to the location of the config files.
1173 """
1174 config = Config()
1175 for cf in config_files:
1176 loader = PyFileConfigLoader(cf, path=path)
1177 try:
1178 next_config = loader.load_config()
1179 except ConfigFileNotFound:
1180 pass
1181 except Exception:
1182 raise
1183 else:
1184 config.merge(next_config)
1185 return config