Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/traitlets/config/loader.py: 26%
568 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1"""A simple configuration system."""
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
7import argparse
8import copy
9import functools
10import json
11import os
12import re
13import sys
14import typing as t
15from logging import Logger
17from traitlets.traitlets import Any, Container, Dict, HasTraits, List, TraitType, Undefined
19from ..utils import cast_unicode, filefind, warnings
21# -----------------------------------------------------------------------------
22# Exceptions
23# -----------------------------------------------------------------------------
26class ConfigError(Exception):
27 pass
30class ConfigLoaderError(ConfigError):
31 pass
34class ConfigFileNotFound(ConfigError):
35 pass
38class ArgumentError(ConfigLoaderError):
39 pass
42# -----------------------------------------------------------------------------
43# Argparse fix
44# -----------------------------------------------------------------------------
46# Unfortunately argparse by default prints help messages to stderr instead of
47# stdout. This makes it annoying to capture long help screens at the command
48# line, since one must know how to pipe stderr, which many users don't know how
49# to do. So we override the print_help method with one that defaults to
50# stdout and use our class instead.
53class _Sentinel:
54 def __repr__(self) -> str:
55 return "<Sentinel deprecated>"
57 def __str__(self) -> str:
58 return "<deprecated>"
61_deprecated = _Sentinel()
64class ArgumentParser(argparse.ArgumentParser):
65 """Simple argparse subclass that prints help to stdout by default."""
67 def print_help(self, file: t.Any = None) -> None:
68 if file is None:
69 file = sys.stdout
70 return super().print_help(file)
72 print_help.__doc__ = argparse.ArgumentParser.print_help.__doc__
75# -----------------------------------------------------------------------------
76# Config class for holding config information
77# -----------------------------------------------------------------------------
80def execfile(fname: str, glob: dict[str, Any]) -> None:
81 with open(fname, "rb") as f:
82 exec(compile(f.read(), fname, "exec"), glob, glob) # noqa: S102
85class LazyConfigValue(HasTraits):
86 """Proxy object for exposing methods on configurable containers
88 These methods allow appending/extending/updating
89 to add to non-empty defaults instead of clobbering them.
91 Exposes:
93 - append, extend, insert on lists
94 - update on dicts
95 - update, add on sets
96 """
98 _value = None
100 # list methods
101 _extend: List[t.Any] = List()
102 _prepend: List[t.Any] = List()
103 _inserts: List[t.Any] = List()
105 def append(self, obj: t.Any) -> None:
106 """Append an item to a List"""
107 self._extend.append(obj)
109 def extend(self, other: t.Any) -> None:
110 """Extend a list"""
111 self._extend.extend(other)
113 def prepend(self, other: t.Any) -> None:
114 """like list.extend, but for the front"""
115 self._prepend[:0] = other
117 def merge_into(self, other: t.Any) -> t.Any:
118 """
119 Merge with another earlier LazyConfigValue or an earlier container.
120 This is useful when having global system-wide configuration files.
122 Self is expected to have higher precedence.
124 Parameters
125 ----------
126 other : LazyConfigValue or container
128 Returns
129 -------
130 LazyConfigValue
131 if ``other`` is also lazy, a reified container otherwise.
132 """
133 if isinstance(other, LazyConfigValue):
134 other._extend.extend(self._extend)
135 self._extend = other._extend
137 self._prepend.extend(other._prepend)
139 other._inserts.extend(self._inserts)
140 self._inserts = other._inserts
142 if self._update:
143 other.update(self._update)
144 self._update = other._update
145 return self
146 else:
147 # other is a container, reify now.
148 return self.get_value(other)
150 def insert(self, index: int, other: t.Any) -> None:
151 if not isinstance(index, int):
152 raise TypeError("An integer is required")
153 self._inserts.append((index, other))
155 # dict methods
156 # update is used for both dict and set
157 _update = Any()
159 def update(self, other: t.Any) -> None:
160 """Update either a set or dict"""
161 if self._update is None:
162 if isinstance(other, dict):
163 self._update = {}
164 else:
165 self._update = set()
166 self._update.update(other)
168 # set methods
169 def add(self, obj: t.Any) -> None:
170 """Add an item to a set"""
171 self.update({obj})
173 def get_value(self, initial: t.Any) -> t.Any:
174 """construct the value from the initial one
176 after applying any insert / extend / update changes
177 """
178 if self._value is not None:
179 return self._value # type:ignore[unreachable]
180 value = copy.deepcopy(initial)
181 if isinstance(value, list):
182 for idx, obj in self._inserts:
183 value.insert(idx, obj)
184 value[:0] = self._prepend
185 value.extend(self._extend)
187 elif isinstance(value, dict):
188 if self._update:
189 value.update(self._update)
190 elif isinstance(value, set):
191 if self._update:
192 value.update(self._update)
193 self._value = value
194 return value
196 def to_dict(self) -> dict[str, t.Any]:
197 """return JSONable dict form of my data
199 Currently update as dict or set, extend, prepend as lists, and inserts as list of tuples.
200 """
201 d = {}
202 if self._update:
203 d["update"] = self._update
204 if self._extend:
205 d["extend"] = self._extend
206 if self._prepend:
207 d["prepend"] = self._prepend
208 elif self._inserts:
209 d["inserts"] = self._inserts
210 return d
212 def __repr__(self) -> str:
213 if self._value is not None:
214 return f"<{self.__class__.__name__} value={self._value!r}>"
215 else:
216 return f"<{self.__class__.__name__} {self.to_dict()!r}>"
219def _is_section_key(key: str) -> bool:
220 """Is a Config key a section name (does it start with a capital)?"""
221 return bool(key and key[0].upper() == key[0] and not key.startswith("_"))
224class Config(dict): # type:ignore[type-arg]
225 """An attribute-based dict that can do smart merges.
227 Accessing a field on a config object for the first time populates the key
228 with either a nested Config object for keys starting with capitals
229 or :class:`.LazyConfigValue` for lowercase keys,
230 allowing quick assignments such as::
232 c = Config()
233 c.Class.int_trait = 5
234 c.Class.list_trait.append("x")
236 """
238 def __init__(self, *args: t.Any, **kwds: t.Any) -> None:
239 dict.__init__(self, *args, **kwds)
240 self._ensure_subconfig()
242 def _ensure_subconfig(self) -> None:
243 """ensure that sub-dicts that should be Config objects are
245 casts dicts that are under section keys to Config objects,
246 which is necessary for constructing Config objects from dict literals.
247 """
248 for key in self:
249 obj = self[key]
250 if _is_section_key(key) and isinstance(obj, dict) and not isinstance(obj, Config):
251 setattr(self, key, Config(obj))
253 def _merge(self, other: t.Any) -> None:
254 """deprecated alias, use Config.merge()"""
255 self.merge(other)
257 def merge(self, other: t.Any) -> None:
258 """merge another config object into this one"""
259 to_update = {}
260 for k, v in other.items():
261 if k not in self:
262 to_update[k] = v
263 else: # I have this key
264 if isinstance(v, Config) and isinstance(self[k], Config):
265 # Recursively merge common sub Configs
266 self[k].merge(v)
267 elif isinstance(v, LazyConfigValue):
268 self[k] = v.merge_into(self[k])
269 else:
270 # Plain updates for non-Configs
271 to_update[k] = v
273 self.update(to_update)
275 def collisions(self, other: Config) -> dict[str, t.Any]:
276 """Check for collisions between two config objects.
278 Returns a dict of the form {"Class": {"trait": "collision message"}}`,
279 indicating which values have been ignored.
281 An empty dict indicates no collisions.
282 """
283 collisions: dict[str, t.Any] = {}
284 for section in self:
285 if section not in other:
286 continue
287 mine = self[section]
288 theirs = other[section]
289 for key in mine:
290 if key in theirs and mine[key] != theirs[key]:
291 collisions.setdefault(section, {})
292 collisions[section][key] = f"{mine[key]!r} ignored, using {theirs[key]!r}"
293 return collisions
295 def __contains__(self, key: t.Any) -> bool:
296 # allow nested contains of the form `"Section.key" in config`
297 if "." in key:
298 first, remainder = key.split(".", 1)
299 if first not in self:
300 return False
301 return remainder in self[first]
303 return super().__contains__(key)
305 # .has_key is deprecated for dictionaries.
306 has_key = __contains__
308 def _has_section(self, key: str) -> bool:
309 return _is_section_key(key) and key in self
311 def copy(self) -> dict[str, t.Any]:
312 return type(self)(dict.copy(self))
314 def __copy__(self) -> dict[str, t.Any]:
315 return self.copy()
317 def __deepcopy__(self, memo: t.Any) -> Config:
318 new_config = type(self)()
319 for key, value in self.items():
320 if isinstance(value, (Config, LazyConfigValue)):
321 # deep copy config objects
322 value = copy.deepcopy(value, memo)
323 elif type(value) in {dict, list, set, tuple}:
324 # shallow copy plain container traits
325 value = copy.copy(value)
326 new_config[key] = value
327 return new_config
329 def __getitem__(self, key: str) -> t.Any:
330 try:
331 return dict.__getitem__(self, key)
332 except KeyError:
333 if _is_section_key(key):
334 c = Config()
335 dict.__setitem__(self, key, c)
336 return c
337 elif not key.startswith("_"):
338 # undefined, create lazy value, used for container methods
339 v = LazyConfigValue()
340 dict.__setitem__(self, key, v)
341 return v
342 else:
343 raise
345 def __setitem__(self, key: str, value: t.Any) -> None:
346 if _is_section_key(key):
347 if not isinstance(value, Config):
348 raise ValueError(
349 "values whose keys begin with an uppercase "
350 f"char must be Config instances: {key!r}, {value!r}"
351 )
352 dict.__setitem__(self, key, value)
354 def __getattr__(self, key: str) -> t.Any:
355 if key.startswith("__"):
356 return dict.__getattr__(self, key) # type:ignore[attr-defined]
357 try:
358 return self.__getitem__(key)
359 except KeyError as e:
360 raise AttributeError(e) from e
362 def __setattr__(self, key: str, value: t.Any) -> None:
363 if key.startswith("__"):
364 return dict.__setattr__(self, key, value)
365 try:
366 self.__setitem__(key, value)
367 except KeyError as e:
368 raise AttributeError(e) from e
370 def __delattr__(self, key: str) -> None:
371 if key.startswith("__"):
372 return dict.__delattr__(self, key)
373 try:
374 dict.__delitem__(self, key)
375 except KeyError as e:
376 raise AttributeError(e) from e
379class DeferredConfig:
380 """Class for deferred-evaluation of config from CLI"""
382 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any:
383 raise NotImplementedError("Implement in subclasses")
385 def _super_repr(self) -> str:
386 # explicitly call super on direct parent
387 return super(self.__class__, self).__repr__()
390class DeferredConfigString(str, DeferredConfig):
391 """Config value for loading config from a string
393 Interpretation is deferred until it is loaded into the trait.
395 Subclass of str for backward compatibility.
397 This class is only used for values that are not listed
398 in the configurable classes.
400 When config is loaded, `trait.from_string` will be used.
402 If an error is raised in `.from_string`,
403 the original string is returned.
405 .. versionadded:: 5.0
406 """
408 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any:
409 """Get the value stored in this string"""
410 s = str(self)
411 try:
412 return trait.from_string(s)
413 except Exception:
414 # exception casting from string,
415 # let the original string lie.
416 # this will raise a more informative error when config is loaded.
417 return s
419 def __repr__(self) -> str:
420 return f"{self.__class__.__name__}({self._super_repr()})"
423class DeferredConfigList(t.List[t.Any], DeferredConfig):
424 """Config value for loading config from a list of strings
426 Interpretation is deferred until it is loaded into the trait.
428 This class is only used for values that are not listed
429 in the configurable classes.
431 When config is loaded, `trait.from_string_list` will be used.
433 If an error is raised in `.from_string_list`,
434 the original string list is returned.
436 .. versionadded:: 5.0
437 """
439 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any:
440 """Get the value stored in this string"""
441 if hasattr(trait, "from_string_list"):
442 src = list(self)
443 cast = trait.from_string_list
444 else:
445 # only allow one item
446 if len(self) > 1:
447 raise ValueError(
448 f"{trait.name} only accepts one value, got {len(self)}: {list(self)}"
449 )
450 src = self[0]
451 cast = trait.from_string
453 try:
454 return cast(src)
455 except Exception:
456 # exception casting from string,
457 # let the original value lie.
458 # this will raise a more informative error when config is loaded.
459 return src
461 def __repr__(self) -> str:
462 return f"{self.__class__.__name__}({self._super_repr()})"
465# -----------------------------------------------------------------------------
466# Config loading classes
467# -----------------------------------------------------------------------------
470class ConfigLoader:
471 """A object for loading configurations from just about anywhere.
473 The resulting configuration is packaged as a :class:`Config`.
475 Notes
476 -----
477 A :class:`ConfigLoader` does one thing: load a config from a source
478 (file, command line arguments) and returns the data as a :class:`Config` object.
479 There are lots of things that :class:`ConfigLoader` does not do. It does
480 not implement complex logic for finding config files. It does not handle
481 default values or merge multiple configs. These things need to be
482 handled elsewhere.
483 """
485 def _log_default(self) -> Logger:
486 from traitlets.log import get_logger
488 return t.cast(Logger, get_logger())
490 def __init__(self, log: Logger | None = None) -> None:
491 """A base class for config loaders.
493 log : instance of :class:`logging.Logger` to use.
494 By default logger of :meth:`traitlets.config.application.Application.instance()`
495 will be used
497 Examples
498 --------
499 >>> cl = ConfigLoader()
500 >>> config = cl.load_config()
501 >>> config
502 {}
503 """
504 self.clear()
505 if log is None:
506 self.log = self._log_default()
507 self.log.debug("Using default logger")
508 else:
509 self.log = log
511 def clear(self) -> None:
512 self.config = Config()
514 def load_config(self) -> Config:
515 """Load a config from somewhere, return a :class:`Config` instance.
517 Usually, this will cause self.config to be set and then returned.
518 However, in most cases, :meth:`ConfigLoader.clear` should be called
519 to erase any previous state.
520 """
521 self.clear()
522 return self.config
525class FileConfigLoader(ConfigLoader):
526 """A base class for file based configurations.
528 As we add more file based config loaders, the common logic should go
529 here.
530 """
532 def __init__(self, filename: str, path: str | None = None, **kw: t.Any) -> None:
533 """Build a config loader for a filename and path.
535 Parameters
536 ----------
537 filename : str
538 The file name of the config file.
539 path : str, list, tuple
540 The path to search for the config file on, or a sequence of
541 paths to try in order.
542 """
543 super().__init__(**kw)
544 self.filename = filename
545 self.path = path
546 self.full_filename = ""
548 def _find_file(self) -> None:
549 """Try to find the file by searching the paths."""
550 self.full_filename = filefind(self.filename, self.path)
553class JSONFileConfigLoader(FileConfigLoader):
554 """A JSON file loader for config
556 Can also act as a context manager that rewrite the configuration file to disk on exit.
558 Example::
560 with JSONFileConfigLoader('myapp.json','/home/jupyter/configurations/') as c:
561 c.MyNewConfigurable.new_value = 'Updated'
563 """
565 def load_config(self) -> Config:
566 """Load the config from a file and return it as a Config object."""
567 self.clear()
568 try:
569 self._find_file()
570 except OSError as e:
571 raise ConfigFileNotFound(str(e)) from e
572 dct = self._read_file_as_dict()
573 self.config = self._convert_to_config(dct)
574 return self.config
576 def _read_file_as_dict(self) -> dict[str, t.Any]:
577 with open(self.full_filename) as f:
578 return t.cast("dict[str, t.Any]", json.load(f))
580 def _convert_to_config(self, dictionary: dict[str, t.Any]) -> Config:
581 if "version" in dictionary:
582 version = dictionary.pop("version")
583 else:
584 version = 1
586 if version == 1:
587 return Config(dictionary)
588 else:
589 raise ValueError(f"Unknown version of JSON config file: {version}")
591 def __enter__(self) -> Config:
592 self.load_config()
593 return self.config
595 def __exit__(self, exc_type: object, exc_value: object, traceback: object) -> None:
596 """
597 Exit the context manager but do not handle any errors.
599 In case of any error, we do not want to write the potentially broken
600 configuration to disk.
601 """
602 self.config.version = 1
603 json_config = json.dumps(self.config, indent=2)
604 with open(self.full_filename, "w") as f:
605 f.write(json_config)
608class PyFileConfigLoader(FileConfigLoader):
609 """A config loader for pure python files.
611 This is responsible for locating a Python config file by filename and
612 path, then executing it to construct a Config object.
613 """
615 def load_config(self) -> Config:
616 """Load the config from a file and return it as a Config object."""
617 self.clear()
618 try:
619 self._find_file()
620 except OSError as e:
621 raise ConfigFileNotFound(str(e)) from e
622 self._read_file_as_dict()
623 return self.config
625 def load_subconfig(self, fname: str, path: str | None = None) -> None:
626 """Injected into config file namespace as load_subconfig"""
627 if path is None:
628 path = self.path
630 loader = self.__class__(fname, path)
631 try:
632 sub_config = loader.load_config()
633 except ConfigFileNotFound:
634 # Pass silently if the sub config is not there,
635 # treat it as an empty config file.
636 pass
637 else:
638 self.config.merge(sub_config)
640 def _read_file_as_dict(self) -> None:
641 """Load the config file into self.config, with recursive loading."""
643 def get_config() -> Config:
644 """Unnecessary now, but a deprecation warning is more trouble than it's worth."""
645 return self.config
647 namespace = dict( # noqa: C408
648 c=self.config,
649 load_subconfig=self.load_subconfig,
650 get_config=get_config,
651 __file__=self.full_filename,
652 )
653 conf_filename = self.full_filename
654 with open(conf_filename, "rb") as f:
655 exec(compile(f.read(), conf_filename, "exec"), namespace, namespace) # noqa: S102
658class CommandLineConfigLoader(ConfigLoader):
659 """A config loader for command line arguments.
661 As we add more command line based loaders, the common logic should go
662 here.
663 """
665 def _exec_config_str(
666 self, lhs: t.Any, rhs: t.Any, trait: TraitType[t.Any, t.Any] | None = None
667 ) -> None:
668 """execute self.config.<lhs> = <rhs>
670 * expands ~ with expanduser
671 * interprets value with trait if available
672 """
673 value = rhs
674 if isinstance(value, DeferredConfig):
675 if trait:
676 # trait available, reify config immediately
677 value = value.get_value(trait)
678 elif isinstance(rhs, DeferredConfigList) and len(rhs) == 1:
679 # single item, make it a deferred str
680 value = DeferredConfigString(os.path.expanduser(rhs[0]))
681 else:
682 if trait:
683 value = trait.from_string(value)
684 else:
685 value = DeferredConfigString(value)
687 *path, key = lhs.split(".")
688 section = self.config
689 for part in path:
690 section = section[part]
691 section[key] = value
692 return
694 def _load_flag(self, cfg: t.Any) -> None:
695 """update self.config from a flag, which can be a dict or Config"""
696 if isinstance(cfg, (dict, Config)):
697 # don't clobber whole config sections, update
698 # each section from config:
699 for sec, c in cfg.items():
700 self.config[sec].update(c)
701 else:
702 raise TypeError("Invalid flag: %r" % cfg)
705# match --Class.trait keys for argparse
706# matches:
707# --Class.trait
708# --x
709# -x
711class_trait_opt_pattern = re.compile(r"^\-?\-[A-Za-z][\w]*(\.[\w]+)*$")
713_DOT_REPLACEMENT = "__DOT__"
714_DASH_REPLACEMENT = "__DASH__"
717class _KVAction(argparse.Action):
718 """Custom argparse action for handling --Class.trait=x
720 Always
721 """
723 def __call__( # type:ignore[override]
724 self,
725 parser: argparse.ArgumentParser,
726 namespace: dict[str, t.Any],
727 values: t.Sequence[t.Any],
728 option_string: str | None = None,
729 ) -> None:
730 if isinstance(values, str):
731 values = [values]
732 values = ["-" if v is _DASH_REPLACEMENT else v for v in values]
733 items = getattr(namespace, self.dest, None)
734 if items is None:
735 items = DeferredConfigList()
736 else:
737 items = DeferredConfigList(items)
738 items.extend(values)
739 setattr(namespace, self.dest, items)
742class _DefaultOptionDict(dict): # type:ignore[type-arg]
743 """Like the default options dict
745 but acts as if all --Class.trait options are predefined
746 """
748 def _add_kv_action(self, key: str) -> None:
749 self[key] = _KVAction(
750 option_strings=[key],
751 dest=key.lstrip("-").replace(".", _DOT_REPLACEMENT),
752 # use metavar for display purposes
753 metavar=key.lstrip("-"),
754 )
756 def __contains__(self, key: t.Any) -> bool:
757 if "=" in key:
758 return False
759 if super().__contains__(key):
760 return True
762 if key.startswith("-") and class_trait_opt_pattern.match(key):
763 self._add_kv_action(key)
764 return True
765 return False
767 def __getitem__(self, key: str) -> t.Any:
768 if key in self:
769 return super().__getitem__(key)
770 else:
771 raise KeyError(key)
773 def get(self, key: str, default: t.Any = None) -> t.Any:
774 try:
775 return self[key]
776 except KeyError:
777 return default
780class _KVArgParser(argparse.ArgumentParser):
781 """subclass of ArgumentParser where any --Class.trait option is implicitly defined"""
783 def parse_known_args( # type:ignore[override]
784 self, args: t.Sequence[str] | None = None, namespace: argparse.Namespace | None = None
785 ) -> tuple[argparse.Namespace | None, list[str]]:
786 # must be done immediately prior to parsing because if we do it in init,
787 # registration of explicit actions via parser.add_option will fail during setup
788 for container in (self, self._optionals):
789 container._option_string_actions = _DefaultOptionDict(container._option_string_actions)
790 return super().parse_known_args(args, namespace)
793# type aliases
794SubcommandsDict = t.Dict[str, t.Any]
797class ArgParseConfigLoader(CommandLineConfigLoader):
798 """A loader that uses the argparse module to load from the command line."""
800 parser_class = ArgumentParser
802 def __init__(
803 self,
804 argv: list[str] | None = None,
805 aliases: dict[str, str] | None = None,
806 flags: dict[str, str] | None = None,
807 log: t.Any = None,
808 classes: list[type[t.Any]] | None = None,
809 subcommands: SubcommandsDict | None = None,
810 *parser_args: t.Any,
811 **parser_kw: t.Any,
812 ) -> None:
813 """Create a config loader for use with argparse.
815 Parameters
816 ----------
817 classes : optional, list
818 The classes to scan for *container* config-traits and decide
819 for their "multiplicity" when adding them as *argparse* arguments.
820 argv : optional, list
821 If given, used to read command-line arguments from, otherwise
822 sys.argv[1:] is used.
823 *parser_args : tuple
824 A tuple of positional arguments that will be passed to the
825 constructor of :class:`argparse.ArgumentParser`.
826 **parser_kw : dict
827 A tuple of keyword arguments that will be passed to the
828 constructor of :class:`argparse.ArgumentParser`.
829 aliases : dict of str to str
830 Dict of aliases to full traitlets names for CLI parsing
831 flags : dict of str to str
832 Dict of flags to full traitlets names for CLI parsing
833 log
834 Passed to `ConfigLoader`
836 Returns
837 -------
838 config : Config
839 The resulting Config object.
840 """
841 classes = classes or []
842 super(CommandLineConfigLoader, self).__init__(log=log)
843 self.clear()
844 if argv is None:
845 argv = sys.argv[1:]
846 self.argv = argv
847 self.aliases = aliases or {}
848 self.flags = flags or {}
849 self.classes = classes
850 self.subcommands = subcommands # only used for argcomplete currently
852 self.parser_args = parser_args
853 self.version = parser_kw.pop("version", None)
854 kwargs = dict(argument_default=argparse.SUPPRESS) # noqa: C408
855 kwargs.update(parser_kw)
856 self.parser_kw = kwargs
858 def load_config(
859 self,
860 argv: list[str] | None = None,
861 aliases: t.Any = None,
862 flags: t.Any = _deprecated,
863 classes: t.Any = None,
864 ) -> Config:
865 """Parse command line arguments and return as a Config object.
867 Parameters
868 ----------
869 argv : optional, list
870 If given, a list with the structure of sys.argv[1:] to parse
871 arguments from. If not given, the instance's self.argv attribute
872 (given at construction time) is used.
873 flags
874 Deprecated in traitlets 5.0, instantiate the config loader with the flags.
876 """
878 if flags is not _deprecated:
879 warnings.warn(
880 "The `flag` argument to load_config is deprecated since Traitlets "
881 f"5.0 and will be ignored, pass flags the `{type(self)}` constructor.",
882 DeprecationWarning,
883 stacklevel=2,
884 )
886 self.clear()
887 if argv is None:
888 argv = self.argv
889 if aliases is not None:
890 self.aliases = aliases
891 if classes is not None:
892 self.classes = classes
893 self._create_parser()
894 self._argcomplete(self.classes, self.subcommands)
895 self._parse_args(argv)
896 self._convert_to_config()
897 return self.config
899 def get_extra_args(self) -> list[str]:
900 if hasattr(self, "extra_args"):
901 return self.extra_args
902 else:
903 return []
905 def _create_parser(self) -> None:
906 self.parser = self.parser_class(
907 *self.parser_args,
908 **self.parser_kw, # type:ignore[arg-type]
909 )
910 self._add_arguments(self.aliases, self.flags, self.classes)
912 def _add_arguments(self, aliases: t.Any, flags: t.Any, classes: t.Any) -> None:
913 raise NotImplementedError("subclasses must implement _add_arguments")
915 def _argcomplete(self, classes: list[t.Any], subcommands: SubcommandsDict | None) -> None:
916 """If argcomplete is enabled, allow triggering command-line autocompletion"""
918 def _parse_args(self, args: t.Any) -> t.Any:
919 """self.parser->self.parsed_data"""
920 uargs = [cast_unicode(a) for a in args]
922 unpacked_aliases: dict[str, str] = {}
923 if self.aliases:
924 unpacked_aliases = {}
925 for alias, alias_target in self.aliases.items():
926 if alias in self.flags:
927 continue
928 if not isinstance(alias, tuple): # type:ignore[unreachable]
929 alias = (alias,) # type:ignore[assignment]
930 for al in alias:
931 if len(al) == 1:
932 unpacked_aliases["-" + al] = "--" + alias_target
933 unpacked_aliases["--" + al] = "--" + alias_target
935 def _replace(arg: str) -> str:
936 if arg == "-":
937 return _DASH_REPLACEMENT
938 for k, v in unpacked_aliases.items():
939 if arg == k:
940 return v
941 if arg.startswith(k + "="):
942 return v + "=" + arg[len(k) + 1 :]
943 return arg
945 if "--" in uargs:
946 idx = uargs.index("--")
947 extra_args = uargs[idx + 1 :]
948 to_parse = uargs[:idx]
949 else:
950 extra_args = []
951 to_parse = uargs
952 to_parse = [_replace(a) for a in to_parse]
954 self.parsed_data = self.parser.parse_args(to_parse)
955 self.extra_args = extra_args
957 def _convert_to_config(self) -> None:
958 """self.parsed_data->self.config"""
959 for k, v in vars(self.parsed_data).items():
960 *path, key = k.split(".")
961 section = self.config
962 for p in path:
963 section = section[p]
964 setattr(section, key, v)
967class _FlagAction(argparse.Action):
968 """ArgParse action to handle a flag"""
970 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
971 self.flag = kwargs.pop("flag")
972 self.alias = kwargs.pop("alias", None)
973 kwargs["const"] = Undefined
974 if not self.alias:
975 kwargs["nargs"] = 0
976 super().__init__(*args, **kwargs)
978 def __call__(
979 self, parser: t.Any, namespace: t.Any, values: t.Any, option_string: str | None = None
980 ) -> None:
981 if self.nargs == 0 or values is Undefined:
982 if not hasattr(namespace, "_flags"):
983 namespace._flags = []
984 namespace._flags.append(self.flag)
985 else:
986 setattr(namespace, self.alias, values)
989class KVArgParseConfigLoader(ArgParseConfigLoader):
990 """A config loader that loads aliases and flags with argparse,
992 as well as arbitrary --Class.trait value
993 """
995 parser_class = _KVArgParser # type:ignore[assignment]
997 def _add_arguments(self, aliases: t.Any, flags: t.Any, classes: t.Any) -> None:
998 alias_flags: dict[str, t.Any] = {}
999 argparse_kwds: dict[str, t.Any]
1000 argparse_traits: dict[str, t.Any]
1001 paa = self.parser.add_argument
1002 self.parser.set_defaults(_flags=[])
1003 paa("extra_args", nargs="*")
1005 # An index of all container traits collected::
1006 #
1007 # { <traitname>: (<trait>, <argparse-kwds>) }
1008 #
1009 # Used to add the correct type into the `config` tree.
1010 # Used also for aliases, not to re-collect them.
1011 self.argparse_traits = argparse_traits = {}
1012 for cls in classes:
1013 for traitname, trait in cls.class_traits(config=True).items():
1014 argname = f"{cls.__name__}.{traitname}"
1015 argparse_kwds = {"type": str}
1016 if isinstance(trait, (Container, Dict)):
1017 multiplicity = trait.metadata.get("multiplicity", "append")
1018 if multiplicity == "append":
1019 argparse_kwds["action"] = multiplicity
1020 else:
1021 argparse_kwds["nargs"] = multiplicity
1022 argparse_traits[argname] = (trait, argparse_kwds)
1024 for keys, (value, fhelp) in flags.items():
1025 if not isinstance(keys, tuple):
1026 keys = (keys,)
1027 for key in keys:
1028 if key in aliases:
1029 alias_flags[aliases[key]] = value
1030 continue
1031 keys = ("-" + key, "--" + key) if len(key) == 1 else ("--" + key,)
1032 paa(*keys, action=_FlagAction, flag=value, help=fhelp)
1034 for keys, traitname in aliases.items():
1035 if not isinstance(keys, tuple):
1036 keys = (keys,)
1038 for key in keys:
1039 argparse_kwds = {
1040 "type": str,
1041 "dest": traitname.replace(".", _DOT_REPLACEMENT),
1042 "metavar": traitname,
1043 }
1044 argcompleter = None
1045 if traitname in argparse_traits:
1046 trait, kwds = argparse_traits[traitname]
1047 argparse_kwds.update(kwds)
1048 if "action" in argparse_kwds and traitname in alias_flags:
1049 # flag sets 'action', so can't have flag & alias with custom action
1050 # on the same name
1051 raise ArgumentError(
1052 f"The alias `{key}` for the 'append' sequence "
1053 f"config-trait `{traitname}` cannot be also a flag!'"
1054 )
1055 # For argcomplete, check if any either an argcompleter metadata tag or method
1056 # is available. If so, it should be a callable which takes the command-line key
1057 # string as an argument and other kwargs passed by argcomplete,
1058 # and returns the a list of string completions.
1059 argcompleter = trait.metadata.get("argcompleter") or getattr(
1060 trait, "argcompleter", None
1061 )
1062 if traitname in alias_flags:
1063 # alias and flag.
1064 # when called with 0 args: flag
1065 # when called with >= 1: alias
1066 argparse_kwds.setdefault("nargs", "?")
1067 argparse_kwds["action"] = _FlagAction
1068 argparse_kwds["flag"] = alias_flags[traitname]
1069 argparse_kwds["alias"] = traitname
1070 keys = ("-" + key, "--" + key) if len(key) == 1 else ("--" + key,)
1071 action = paa(*keys, **argparse_kwds)
1072 if argcompleter is not None:
1073 # argcomplete's completers are callables returning list of completion strings
1074 action.completer = functools.partial( # type:ignore[attr-defined]
1075 argcompleter, key=key
1076 )
1078 def _convert_to_config(self) -> None:
1079 """self.parsed_data->self.config, parse unrecognized extra args via KVLoader."""
1080 extra_args = self.extra_args
1082 for lhs, rhs in vars(self.parsed_data).items():
1083 if lhs == "extra_args":
1084 self.extra_args = ["-" if a == _DASH_REPLACEMENT else a for a in rhs] + extra_args
1085 continue
1086 if lhs == "_flags":
1087 # _flags will be handled later
1088 continue
1090 lhs = lhs.replace(_DOT_REPLACEMENT, ".")
1091 if "." not in lhs:
1092 self._handle_unrecognized_alias(lhs)
1093 trait = None
1095 if isinstance(rhs, list):
1096 rhs = DeferredConfigList(rhs)
1097 elif isinstance(rhs, str):
1098 rhs = DeferredConfigString(rhs)
1100 trait = self.argparse_traits.get(lhs)
1101 if trait:
1102 trait = trait[0]
1104 # eval the KV assignment
1105 try:
1106 self._exec_config_str(lhs, rhs, trait)
1107 except Exception as e:
1108 # cast deferred to nicer repr for the error
1109 # DeferredList->list, etc
1110 if isinstance(rhs, DeferredConfig):
1111 rhs = rhs._super_repr()
1112 raise ArgumentError(f"Error loading argument {lhs}={rhs}, {e}") from e
1114 for subc in self.parsed_data._flags:
1115 self._load_flag(subc)
1117 def _handle_unrecognized_alias(self, arg: str) -> None:
1118 """Handling for unrecognized alias arguments
1120 Probably a mistyped alias. By default just log a warning,
1121 but users can override this to raise an error instead, e.g.
1122 self.parser.error("Unrecognized alias: '%s'" % arg)
1123 """
1124 self.log.warning("Unrecognized alias: '%s', it will have no effect.", arg)
1126 def _argcomplete(self, classes: list[t.Any], subcommands: SubcommandsDict | None) -> None:
1127 """If argcomplete is enabled, allow triggering command-line autocompletion"""
1128 try:
1129 import argcomplete # noqa: F401
1130 except ImportError:
1131 return
1133 from . import argcomplete_config
1135 finder = argcomplete_config.ExtendedCompletionFinder() # type:ignore[no-untyped-call]
1136 finder.config_classes = classes
1137 finder.subcommands = list(subcommands or [])
1138 # for ease of testing, pass through self._argcomplete_kwargs if set
1139 finder(self.parser, **getattr(self, "_argcomplete_kwargs", {}))
1142class KeyValueConfigLoader(KVArgParseConfigLoader):
1143 """Deprecated in traitlets 5.0
1145 Use KVArgParseConfigLoader
1146 """
1148 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
1149 warnings.warn(
1150 "KeyValueConfigLoader is deprecated since Traitlets 5.0."
1151 " Use KVArgParseConfigLoader instead.",
1152 DeprecationWarning,
1153 stacklevel=2,
1154 )
1155 super().__init__(*args, **kwargs)
1158def load_pyconfig_files(config_files: list[str], path: str) -> Config:
1159 """Load multiple Python config files, merging each of them in turn.
1161 Parameters
1162 ----------
1163 config_files : list of str
1164 List of config files names to load and merge into the config.
1165 path : unicode
1166 The full path to the location of the config files.
1167 """
1168 config = Config()
1169 for cf in config_files:
1170 loader = PyFileConfigLoader(cf, path=path)
1171 try:
1172 next_config = loader.load_config()
1173 except ConfigFileNotFound:
1174 pass
1175 except Exception:
1176 raise
1177 else:
1178 config.merge(next_config)
1179 return config