1"""A simple configuration system."""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
6
7import argparse
8import copy
9import functools
10import json
11import os
12import re
13import sys
14import typing as t
15from logging import Logger
16
17from traitlets.traitlets import Any, Container, Dict, HasTraits, List, TraitType, Undefined
18
19from ..utils import cast_unicode, filefind, warnings
20
21# -----------------------------------------------------------------------------
22# Exceptions
23# -----------------------------------------------------------------------------
24
25
26class ConfigError(Exception):
27 pass
28
29
30class ConfigLoaderError(ConfigError):
31 pass
32
33
34class ConfigFileNotFound(ConfigError):
35 pass
36
37
38class ArgumentError(ConfigLoaderError):
39 pass
40
41
42# -----------------------------------------------------------------------------
43# Argparse fix
44# -----------------------------------------------------------------------------
45
46# Unfortunately argparse by default prints help messages to stderr instead of
47# stdout. This makes it annoying to capture long help screens at the command
48# line, since one must know how to pipe stderr, which many users don't know how
49# to do. So we override the print_help method with one that defaults to
50# stdout and use our class instead.
51
52
53class _Sentinel:
54 def __repr__(self) -> str:
55 return "<Sentinel deprecated>"
56
57 def __str__(self) -> str:
58 return "<deprecated>"
59
60
61_deprecated = _Sentinel()
62
63
64class ArgumentParser(argparse.ArgumentParser):
65 """Simple argparse subclass that prints help to stdout by default."""
66
67 def print_help(self, file: t.Any = None) -> None:
68 if file is None:
69 file = sys.stdout
70 return super().print_help(file)
71
72 print_help.__doc__ = argparse.ArgumentParser.print_help.__doc__
73
74
75# -----------------------------------------------------------------------------
76# Config class for holding config information
77# -----------------------------------------------------------------------------
78
79
80def execfile(fname: str, glob: dict[str, Any]) -> None:
81 with open(fname, "rb") as f:
82 exec(compile(f.read(), fname, "exec"), glob, glob) # noqa: S102
83
84
85class LazyConfigValue(HasTraits):
86 """Proxy object for exposing methods on configurable containers
87
88 These methods allow appending/extending/updating
89 to add to non-empty defaults instead of clobbering them.
90
91 Exposes:
92
93 - append, extend, insert on lists
94 - update on dicts
95 - update, add on sets
96 """
97
98 _value = None
99
100 # list methods
101 _extend: List[t.Any] = List()
102 _prepend: List[t.Any] = List()
103 _inserts: List[t.Any] = List()
104
105 def append(self, obj: t.Any) -> None:
106 """Append an item to a List"""
107 self._extend.append(obj)
108
109 def extend(self, other: t.Any) -> None:
110 """Extend a list"""
111 self._extend.extend(other)
112
113 def prepend(self, other: t.Any) -> None:
114 """like list.extend, but for the front"""
115 self._prepend[:0] = other
116
117 def merge_into(self, other: t.Any) -> t.Any:
118 """
119 Merge with another earlier LazyConfigValue or an earlier container.
120 This is useful when having global system-wide configuration files.
121
122 Self is expected to have higher precedence.
123
124 Parameters
125 ----------
126 other : LazyConfigValue or container
127
128 Returns
129 -------
130 LazyConfigValue
131 if ``other`` is also lazy, a reified container otherwise.
132 """
133 if isinstance(other, LazyConfigValue):
134 other._extend.extend(self._extend)
135 self._extend = other._extend
136
137 self._prepend.extend(other._prepend)
138
139 other._inserts.extend(self._inserts)
140 self._inserts = other._inserts
141
142 if self._update:
143 other.update(self._update)
144 self._update = other._update
145 return self
146 else:
147 # other is a container, reify now.
148 return self.get_value(other)
149
150 def insert(self, index: int, other: t.Any) -> None:
151 if not isinstance(index, int):
152 raise TypeError("An integer is required")
153 self._inserts.append((index, other))
154
155 # dict methods
156 # update is used for both dict and set
157 _update = Any()
158
159 def update(self, other: t.Any) -> None:
160 """Update either a set or dict"""
161 if self._update is None:
162 if isinstance(other, dict):
163 self._update = {}
164 else:
165 self._update = set()
166 self._update.update(other)
167
168 # set methods
169 def add(self, obj: t.Any) -> None:
170 """Add an item to a set"""
171 self.update({obj})
172
173 def get_value(self, initial: t.Any) -> t.Any:
174 """construct the value from the initial one
175
176 after applying any insert / extend / update changes
177 """
178 if self._value is not None:
179 return self._value # type:ignore[unreachable]
180 value = copy.deepcopy(initial)
181 if isinstance(value, list):
182 for idx, obj in self._inserts:
183 value.insert(idx, obj)
184 value[:0] = self._prepend
185 value.extend(self._extend)
186
187 elif isinstance(value, dict):
188 if self._update:
189 value.update(self._update)
190 elif isinstance(value, set):
191 if self._update:
192 value.update(self._update)
193 self._value = value
194 return value
195
196 def to_dict(self) -> dict[str, t.Any]:
197 """return JSONable dict form of my data
198
199 Currently update as dict or set, extend, prepend as lists, and inserts as list of tuples.
200 """
201 d = {}
202 if self._update:
203 d["update"] = self._update
204 if self._extend:
205 d["extend"] = self._extend
206 if self._prepend:
207 d["prepend"] = self._prepend
208 elif self._inserts:
209 d["inserts"] = self._inserts
210 return d
211
212 def __repr__(self) -> str:
213 if self._value is not None:
214 return f"<{self.__class__.__name__} value={self._value!r}>"
215 else:
216 return f"<{self.__class__.__name__} {self.to_dict()!r}>"
217
218
219def _is_section_key(key: str) -> bool:
220 """Is a Config key a section name (does it start with a capital)?"""
221 return bool(key and key[0].upper() == key[0] and not key.startswith("_"))
222
223
224class Config(dict): # type:ignore[type-arg]
225 """An attribute-based dict that can do smart merges.
226
227 Accessing a field on a config object for the first time populates the key
228 with either a nested Config object for keys starting with capitals
229 or :class:`.LazyConfigValue` for lowercase keys,
230 allowing quick assignments such as::
231
232 c = Config()
233 c.Class.int_trait = 5
234 c.Class.list_trait.append("x")
235
236 """
237
238 def __init__(self, *args: t.Any, **kwds: t.Any) -> None:
239 dict.__init__(self, *args, **kwds)
240 self._ensure_subconfig()
241
242 def _ensure_subconfig(self) -> None:
243 """ensure that sub-dicts that should be Config objects are
244
245 casts dicts that are under section keys to Config objects,
246 which is necessary for constructing Config objects from dict literals.
247 """
248 for key in self:
249 obj = self[key]
250 if _is_section_key(key) and isinstance(obj, dict) and not isinstance(obj, Config):
251 setattr(self, key, Config(obj))
252
253 def _merge(self, other: t.Any) -> None:
254 """deprecated alias, use Config.merge()"""
255 self.merge(other)
256
257 def merge(self, other: t.Any) -> None:
258 """merge another config object into this one"""
259 to_update = {}
260 for k, v in other.items():
261 if k not in self:
262 to_update[k] = v
263 else: # I have this key
264 if isinstance(v, Config) and isinstance(self[k], Config):
265 # Recursively merge common sub Configs
266 self[k].merge(v)
267 elif isinstance(v, LazyConfigValue):
268 self[k] = v.merge_into(self[k])
269 else:
270 # Plain updates for non-Configs
271 to_update[k] = v
272
273 self.update(to_update)
274
275 def collisions(self, other: Config) -> dict[str, t.Any]:
276 """Check for collisions between two config objects.
277
278 Returns a dict of the form {"Class": {"trait": "collision message"}}`,
279 indicating which values have been ignored.
280
281 An empty dict indicates no collisions.
282 """
283 collisions: dict[str, t.Any] = {}
284 for section in self:
285 if section not in other:
286 continue
287 mine = self[section]
288 theirs = other[section]
289 for key in mine:
290 if key in theirs and mine[key] != theirs[key]:
291 collisions.setdefault(section, {})
292 collisions[section][key] = f"{mine[key]!r} ignored, using {theirs[key]!r}"
293 return collisions
294
295 def __contains__(self, key: t.Any) -> bool:
296 # allow nested contains of the form `"Section.key" in config`
297 if "." in key:
298 first, remainder = key.split(".", 1)
299 if first not in self:
300 return False
301 return remainder in self[first]
302
303 return super().__contains__(key)
304
305 # .has_key is deprecated for dictionaries.
306 has_key = __contains__
307
308 def _has_section(self, key: str) -> bool:
309 return _is_section_key(key) and key in self
310
311 def copy(self) -> dict[str, t.Any]:
312 return type(self)(dict.copy(self))
313
314 def __copy__(self) -> dict[str, t.Any]:
315 return self.copy()
316
317 def __deepcopy__(self, memo: t.Any) -> Config:
318 new_config = type(self)()
319 for key, value in self.items():
320 if isinstance(value, (Config, LazyConfigValue)):
321 # deep copy config objects
322 value = copy.deepcopy(value, memo)
323 elif type(value) in {dict, list, set, tuple}:
324 # shallow copy plain container traits
325 value = copy.copy(value)
326 new_config[key] = value
327 return new_config
328
329 def __getitem__(self, key: str) -> t.Any:
330 try:
331 return dict.__getitem__(self, key)
332 except KeyError:
333 if _is_section_key(key):
334 c = Config()
335 dict.__setitem__(self, key, c)
336 return c
337 elif not key.startswith("_"):
338 # undefined, create lazy value, used for container methods
339 v = LazyConfigValue()
340 dict.__setitem__(self, key, v)
341 return v
342 else:
343 raise
344
345 def __setitem__(self, key: str, value: t.Any) -> None:
346 if _is_section_key(key):
347 if not isinstance(value, Config):
348 raise ValueError(
349 "values whose keys begin with an uppercase "
350 f"char must be Config instances: {key!r}, {value!r}"
351 )
352 dict.__setitem__(self, key, value)
353
354 def __getattr__(self, key: str) -> t.Any:
355 if key.startswith("__"):
356 return dict.__getattr__(self, key) # type:ignore[attr-defined]
357 try:
358 return self.__getitem__(key)
359 except KeyError as e:
360 raise AttributeError(e) from e
361
362 def __setattr__(self, key: str, value: t.Any) -> None:
363 if key.startswith("__"):
364 return dict.__setattr__(self, key, value)
365 try:
366 self.__setitem__(key, value)
367 except KeyError as e:
368 raise AttributeError(e) from e
369
370 def __delattr__(self, key: str) -> None:
371 if key.startswith("__"):
372 return dict.__delattr__(self, key)
373 try:
374 dict.__delitem__(self, key)
375 except KeyError as e:
376 raise AttributeError(e) from e
377
378
379class DeferredConfig:
380 """Class for deferred-evaluation of config from CLI"""
381
382 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any:
383 raise NotImplementedError("Implement in subclasses")
384
385 def _super_repr(self) -> str:
386 # explicitly call super on direct parent
387 return super(self.__class__, self).__repr__()
388
389
390class DeferredConfigString(str, DeferredConfig):
391 """Config value for loading config from a string
392
393 Interpretation is deferred until it is loaded into the trait.
394
395 Subclass of str for backward compatibility.
396
397 This class is only used for values that are not listed
398 in the configurable classes.
399
400 When config is loaded, `trait.from_string` will be used.
401
402 If an error is raised in `.from_string`,
403 the original string is returned.
404
405 .. versionadded:: 5.0
406 """
407
408 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any:
409 """Get the value stored in this string"""
410 s = str(self)
411 try:
412 return trait.from_string(s)
413 except Exception:
414 # exception casting from string,
415 # let the original string lie.
416 # this will raise a more informative error when config is loaded.
417 return s
418
419 def __repr__(self) -> str:
420 return f"{self.__class__.__name__}({self._super_repr()})"
421
422
423class DeferredConfigList(t.List[t.Any], DeferredConfig):
424 """Config value for loading config from a list of strings
425
426 Interpretation is deferred until it is loaded into the trait.
427
428 This class is only used for values that are not listed
429 in the configurable classes.
430
431 When config is loaded, `trait.from_string_list` will be used.
432
433 If an error is raised in `.from_string_list`,
434 the original string list is returned.
435
436 .. versionadded:: 5.0
437 """
438
439 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any:
440 """Get the value stored in this string"""
441 if hasattr(trait, "from_string_list"):
442 src = list(self)
443 cast = trait.from_string_list
444 else:
445 # only allow one item
446 if len(self) > 1:
447 raise ValueError(
448 f"{trait.name} only accepts one value, got {len(self)}: {list(self)}"
449 )
450 src = self[0]
451 cast = trait.from_string
452
453 try:
454 return cast(src)
455 except Exception:
456 # exception casting from string,
457 # let the original value lie.
458 # this will raise a more informative error when config is loaded.
459 return src
460
461 def __repr__(self) -> str:
462 return f"{self.__class__.__name__}({self._super_repr()})"
463
464
465# -----------------------------------------------------------------------------
466# Config loading classes
467# -----------------------------------------------------------------------------
468
469
470class ConfigLoader:
471 """A object for loading configurations from just about anywhere.
472
473 The resulting configuration is packaged as a :class:`Config`.
474
475 Notes
476 -----
477 A :class:`ConfigLoader` does one thing: load a config from a source
478 (file, command line arguments) and returns the data as a :class:`Config` object.
479 There are lots of things that :class:`ConfigLoader` does not do. It does
480 not implement complex logic for finding config files. It does not handle
481 default values or merge multiple configs. These things need to be
482 handled elsewhere.
483 """
484
485 def _log_default(self) -> Logger:
486 from traitlets.log import get_logger
487
488 return t.cast(Logger, get_logger())
489
490 def __init__(self, log: Logger | None = None) -> None:
491 """A base class for config loaders.
492
493 log : instance of :class:`logging.Logger` to use.
494 By default logger of :meth:`traitlets.config.application.Application.instance()`
495 will be used
496
497 Examples
498 --------
499 >>> cl = ConfigLoader()
500 >>> config = cl.load_config()
501 >>> config
502 {}
503 """
504 self.clear()
505 if log is None:
506 self.log = self._log_default()
507 self.log.debug("Using default logger")
508 else:
509 self.log = log
510
511 def clear(self) -> None:
512 self.config = Config()
513
514 def load_config(self) -> Config:
515 """Load a config from somewhere, return a :class:`Config` instance.
516
517 Usually, this will cause self.config to be set and then returned.
518 However, in most cases, :meth:`ConfigLoader.clear` should be called
519 to erase any previous state.
520 """
521 self.clear()
522 return self.config
523
524
525class FileConfigLoader(ConfigLoader):
526 """A base class for file based configurations.
527
528 As we add more file based config loaders, the common logic should go
529 here.
530 """
531
532 def __init__(self, filename: str, path: str | None = None, **kw: t.Any) -> None:
533 """Build a config loader for a filename and path.
534
535 Parameters
536 ----------
537 filename : str
538 The file name of the config file.
539 path : str, list, tuple
540 The path to search for the config file on, or a sequence of
541 paths to try in order.
542 """
543 super().__init__(**kw)
544 self.filename = filename
545 self.path = path
546 self.full_filename = ""
547
548 def _find_file(self) -> None:
549 """Try to find the file by searching the paths."""
550 self.full_filename = filefind(self.filename, self.path)
551
552
553class JSONFileConfigLoader(FileConfigLoader):
554 """A JSON file loader for config
555
556 Can also act as a context manager that rewrite the configuration file to disk on exit.
557
558 Example::
559
560 with JSONFileConfigLoader('myapp.json','/home/jupyter/configurations/') as c:
561 c.MyNewConfigurable.new_value = 'Updated'
562
563 """
564
565 def load_config(self) -> Config:
566 """Load the config from a file and return it as a Config object."""
567 self.clear()
568 try:
569 self._find_file()
570 except OSError as e:
571 raise ConfigFileNotFound(str(e)) from e
572 dct = self._read_file_as_dict()
573 self.config = self._convert_to_config(dct)
574 return self.config
575
576 def _read_file_as_dict(self) -> dict[str, t.Any]:
577 with open(self.full_filename) as f:
578 return t.cast("dict[str, t.Any]", json.load(f))
579
580 def _convert_to_config(self, dictionary: dict[str, t.Any]) -> Config:
581 if "version" in dictionary:
582 version = dictionary.pop("version")
583 else:
584 version = 1
585
586 if version == 1:
587 return Config(dictionary)
588 else:
589 raise ValueError(f"Unknown version of JSON config file: {version}")
590
591 def __enter__(self) -> Config:
592 self.load_config()
593 return self.config
594
595 def __exit__(self, exc_type: object, exc_value: object, traceback: object) -> None:
596 """
597 Exit the context manager but do not handle any errors.
598
599 In case of any error, we do not want to write the potentially broken
600 configuration to disk.
601 """
602 self.config.version = 1
603 json_config = json.dumps(self.config, indent=2)
604 with open(self.full_filename, "w") as f:
605 f.write(json_config)
606
607
608class PyFileConfigLoader(FileConfigLoader):
609 """A config loader for pure python files.
610
611 This is responsible for locating a Python config file by filename and
612 path, then executing it to construct a Config object.
613 """
614
615 def load_config(self) -> Config:
616 """Load the config from a file and return it as a Config object."""
617 self.clear()
618 try:
619 self._find_file()
620 except OSError as e:
621 raise ConfigFileNotFound(str(e)) from e
622 self._read_file_as_dict()
623 return self.config
624
625 def load_subconfig(self, fname: str, path: str | None = None) -> None:
626 """Injected into config file namespace as load_subconfig"""
627 if path is None:
628 path = self.path
629
630 loader = self.__class__(fname, path)
631 try:
632 sub_config = loader.load_config()
633 except ConfigFileNotFound:
634 # Pass silently if the sub config is not there,
635 # treat it as an empty config file.
636 pass
637 else:
638 self.config.merge(sub_config)
639
640 def _read_file_as_dict(self) -> None:
641 """Load the config file into self.config, with recursive loading."""
642
643 def get_config() -> Config:
644 """Unnecessary now, but a deprecation warning is more trouble than it's worth."""
645 return self.config
646
647 namespace = dict( # noqa: C408
648 c=self.config,
649 load_subconfig=self.load_subconfig,
650 get_config=get_config,
651 __file__=self.full_filename,
652 )
653 conf_filename = self.full_filename
654 with open(conf_filename, "rb") as f:
655 exec(compile(f.read(), conf_filename, "exec"), namespace, namespace) # noqa: S102
656
657
658class CommandLineConfigLoader(ConfigLoader):
659 """A config loader for command line arguments.
660
661 As we add more command line based loaders, the common logic should go
662 here.
663 """
664
665 def _exec_config_str(
666 self, lhs: t.Any, rhs: t.Any, trait: TraitType[t.Any, t.Any] | None = None
667 ) -> None:
668 """execute self.config.<lhs> = <rhs>
669
670 * expands ~ with expanduser
671 * interprets value with trait if available
672 """
673 value = rhs
674 if isinstance(value, DeferredConfig):
675 if trait:
676 # trait available, reify config immediately
677 value = value.get_value(trait)
678 elif isinstance(rhs, DeferredConfigList) and len(rhs) == 1:
679 # single item, make it a deferred str
680 value = DeferredConfigString(os.path.expanduser(rhs[0]))
681 else:
682 if trait:
683 value = trait.from_string(value)
684 else:
685 value = DeferredConfigString(value)
686
687 *path, key = lhs.split(".")
688 section = self.config
689 for part in path:
690 section = section[part]
691 section[key] = value
692 return
693
694 def _load_flag(self, cfg: t.Any) -> None:
695 """update self.config from a flag, which can be a dict or Config"""
696 if isinstance(cfg, (dict, Config)):
697 # don't clobber whole config sections, update
698 # each section from config:
699 for sec, c in cfg.items():
700 self.config[sec].update(c)
701 else:
702 raise TypeError("Invalid flag: %r" % cfg)
703
704
705# match --Class.trait keys for argparse
706# matches:
707# --Class.trait
708# --x
709# -x
710
711class_trait_opt_pattern = re.compile(r"^\-?\-[A-Za-z][\w]*(\.[\w]+)*$")
712
713_DOT_REPLACEMENT = "__DOT__"
714_DASH_REPLACEMENT = "__DASH__"
715
716
717class _KVAction(argparse.Action):
718 """Custom argparse action for handling --Class.trait=x
719
720 Always
721 """
722
723 def __call__( # type:ignore[override]
724 self,
725 parser: argparse.ArgumentParser,
726 namespace: dict[str, t.Any],
727 values: t.Sequence[t.Any],
728 option_string: str | None = None,
729 ) -> None:
730 if isinstance(values, str):
731 values = [values]
732 values = ["-" if v is _DASH_REPLACEMENT else v for v in values]
733 items = getattr(namespace, self.dest, None)
734 if items is None:
735 items = DeferredConfigList()
736 else:
737 items = DeferredConfigList(items)
738 items.extend(values)
739 setattr(namespace, self.dest, items)
740
741
742class _DefaultOptionDict(dict): # type:ignore[type-arg]
743 """Like the default options dict
744
745 but acts as if all --Class.trait options are predefined
746 """
747
748 def _add_kv_action(self, key: str) -> None:
749 self[key] = _KVAction(
750 option_strings=[key],
751 dest=key.lstrip("-").replace(".", _DOT_REPLACEMENT),
752 # use metavar for display purposes
753 metavar=key.lstrip("-"),
754 )
755
756 def __contains__(self, key: t.Any) -> bool:
757 if "=" in key:
758 return False
759 if super().__contains__(key):
760 return True
761
762 if key.startswith("-") and class_trait_opt_pattern.match(key):
763 self._add_kv_action(key)
764 return True
765 return False
766
767 def __getitem__(self, key: str) -> t.Any:
768 if key in self:
769 return super().__getitem__(key)
770 else:
771 raise KeyError(key)
772
773 def get(self, key: str, default: t.Any = None) -> t.Any:
774 try:
775 return self[key]
776 except KeyError:
777 return default
778
779
780class _KVArgParser(argparse.ArgumentParser):
781 """subclass of ArgumentParser where any --Class.trait option is implicitly defined"""
782
783 def parse_known_args( # type:ignore[override]
784 self, args: t.Sequence[str] | None = None, namespace: argparse.Namespace | None = None
785 ) -> tuple[argparse.Namespace | None, list[str]]:
786 # must be done immediately prior to parsing because if we do it in init,
787 # registration of explicit actions via parser.add_option will fail during setup
788 for container in (self, self._optionals):
789 container._option_string_actions = _DefaultOptionDict(container._option_string_actions)
790 return super().parse_known_args(args, namespace)
791
792
793# type aliases
794SubcommandsDict = t.Dict[str, t.Any]
795
796
797class ArgParseConfigLoader(CommandLineConfigLoader):
798 """A loader that uses the argparse module to load from the command line."""
799
800 parser_class = ArgumentParser
801
802 def __init__(
803 self,
804 argv: list[str] | None = None,
805 aliases: dict[str, str] | None = None,
806 flags: dict[str, str] | None = None,
807 log: t.Any = None,
808 classes: list[type[t.Any]] | None = None,
809 subcommands: SubcommandsDict | None = None,
810 *parser_args: t.Any,
811 **parser_kw: t.Any,
812 ) -> None:
813 """Create a config loader for use with argparse.
814
815 Parameters
816 ----------
817 classes : optional, list
818 The classes to scan for *container* config-traits and decide
819 for their "multiplicity" when adding them as *argparse* arguments.
820 argv : optional, list
821 If given, used to read command-line arguments from, otherwise
822 sys.argv[1:] is used.
823 *parser_args : tuple
824 A tuple of positional arguments that will be passed to the
825 constructor of :class:`argparse.ArgumentParser`.
826 **parser_kw : dict
827 A tuple of keyword arguments that will be passed to the
828 constructor of :class:`argparse.ArgumentParser`.
829 aliases : dict of str to str
830 Dict of aliases to full traitlets names for CLI parsing
831 flags : dict of str to str
832 Dict of flags to full traitlets names for CLI parsing
833 log
834 Passed to `ConfigLoader`
835
836 Returns
837 -------
838 config : Config
839 The resulting Config object.
840 """
841 classes = classes or []
842 super(CommandLineConfigLoader, self).__init__(log=log)
843 self.clear()
844 if argv is None:
845 argv = sys.argv[1:]
846 self.argv = argv
847 self.aliases = aliases or {}
848 self.flags = flags or {}
849 self.classes = classes
850 self.subcommands = subcommands # only used for argcomplete currently
851
852 self.parser_args = parser_args
853 self.version = parser_kw.pop("version", None)
854 kwargs = dict(argument_default=argparse.SUPPRESS) # noqa: C408
855 kwargs.update(parser_kw)
856 self.parser_kw = kwargs
857
858 def load_config(
859 self,
860 argv: list[str] | None = None,
861 aliases: t.Any = None,
862 flags: t.Any = _deprecated,
863 classes: t.Any = None,
864 ) -> Config:
865 """Parse command line arguments and return as a Config object.
866
867 Parameters
868 ----------
869 argv : optional, list
870 If given, a list with the structure of sys.argv[1:] to parse
871 arguments from. If not given, the instance's self.argv attribute
872 (given at construction time) is used.
873 flags
874 Deprecated in traitlets 5.0, instantiate the config loader with the flags.
875
876 """
877
878 if flags is not _deprecated:
879 warnings.warn(
880 "The `flag` argument to load_config is deprecated since Traitlets "
881 f"5.0 and will be ignored, pass flags the `{type(self)}` constructor.",
882 DeprecationWarning,
883 stacklevel=2,
884 )
885
886 self.clear()
887 if argv is None:
888 argv = self.argv
889 if aliases is not None:
890 self.aliases = aliases
891 if classes is not None:
892 self.classes = classes
893 self._create_parser()
894 self._argcomplete(self.classes, self.subcommands)
895 self._parse_args(argv)
896 self._convert_to_config()
897 return self.config
898
899 def get_extra_args(self) -> list[str]:
900 if hasattr(self, "extra_args"):
901 return self.extra_args
902 else:
903 return []
904
905 def _create_parser(self) -> None:
906 self.parser = self.parser_class(
907 *self.parser_args,
908 **self.parser_kw, # type:ignore[arg-type]
909 )
910 self._add_arguments(self.aliases, self.flags, self.classes)
911
912 def _add_arguments(self, aliases: t.Any, flags: t.Any, classes: t.Any) -> None:
913 raise NotImplementedError("subclasses must implement _add_arguments")
914
915 def _argcomplete(self, classes: list[t.Any], subcommands: SubcommandsDict | None) -> None:
916 """If argcomplete is enabled, allow triggering command-line autocompletion"""
917
918 def _parse_args(self, args: t.Any) -> t.Any:
919 """self.parser->self.parsed_data"""
920 uargs = [cast_unicode(a) for a in args]
921
922 unpacked_aliases: dict[str, str] = {}
923 if self.aliases:
924 unpacked_aliases = {}
925 for alias, alias_target in self.aliases.items():
926 if alias in self.flags:
927 continue
928 if not isinstance(alias, tuple): # type:ignore[unreachable]
929 alias = (alias,) # type:ignore[assignment]
930 for al in alias:
931 if len(al) == 1:
932 unpacked_aliases["-" + al] = "--" + alias_target
933 unpacked_aliases["--" + al] = "--" + alias_target
934
935 def _replace(arg: str) -> str:
936 if arg == "-":
937 return _DASH_REPLACEMENT
938 for k, v in unpacked_aliases.items():
939 if arg == k:
940 return v
941 if arg.startswith(k + "="):
942 return v + "=" + arg[len(k) + 1 :]
943 return arg
944
945 if "--" in uargs:
946 idx = uargs.index("--")
947 extra_args = uargs[idx + 1 :]
948 to_parse = uargs[:idx]
949 else:
950 extra_args = []
951 to_parse = uargs
952 to_parse = [_replace(a) for a in to_parse]
953
954 self.parsed_data = self.parser.parse_args(to_parse)
955 self.extra_args = extra_args
956
957 def _convert_to_config(self) -> None:
958 """self.parsed_data->self.config"""
959 for k, v in vars(self.parsed_data).items():
960 *path, key = k.split(".")
961 section = self.config
962 for p in path:
963 section = section[p]
964 setattr(section, key, v)
965
966
967class _FlagAction(argparse.Action):
968 """ArgParse action to handle a flag"""
969
970 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
971 self.flag = kwargs.pop("flag")
972 self.alias = kwargs.pop("alias", None)
973 kwargs["const"] = Undefined
974 if not self.alias:
975 kwargs["nargs"] = 0
976 super().__init__(*args, **kwargs)
977
978 def __call__(
979 self, parser: t.Any, namespace: t.Any, values: t.Any, option_string: str | None = None
980 ) -> None:
981 if self.nargs == 0 or values is Undefined:
982 if not hasattr(namespace, "_flags"):
983 namespace._flags = []
984 namespace._flags.append(self.flag)
985 else:
986 setattr(namespace, self.alias, values)
987
988
989class KVArgParseConfigLoader(ArgParseConfigLoader):
990 """A config loader that loads aliases and flags with argparse,
991
992 as well as arbitrary --Class.trait value
993 """
994
995 parser_class = _KVArgParser # type:ignore[assignment]
996
997 def _add_arguments(self, aliases: t.Any, flags: t.Any, classes: t.Any) -> None:
998 alias_flags: dict[str, t.Any] = {}
999 argparse_kwds: dict[str, t.Any]
1000 argparse_traits: dict[str, t.Any]
1001 paa = self.parser.add_argument
1002 self.parser.set_defaults(_flags=[])
1003 paa("extra_args", nargs="*")
1004
1005 # An index of all container traits collected::
1006 #
1007 # { <traitname>: (<trait>, <argparse-kwds>) }
1008 #
1009 # Used to add the correct type into the `config` tree.
1010 # Used also for aliases, not to re-collect them.
1011 self.argparse_traits = argparse_traits = {}
1012 for cls in classes:
1013 for traitname, trait in cls.class_traits(config=True).items():
1014 argname = f"{cls.__name__}.{traitname}"
1015 argparse_kwds = {"type": str}
1016 if isinstance(trait, (Container, Dict)):
1017 multiplicity = trait.metadata.get("multiplicity", "append")
1018 if multiplicity == "append":
1019 argparse_kwds["action"] = multiplicity
1020 else:
1021 argparse_kwds["nargs"] = multiplicity
1022 argparse_traits[argname] = (trait, argparse_kwds)
1023
1024 for keys, (value, fhelp) in flags.items():
1025 if not isinstance(keys, tuple):
1026 keys = (keys,)
1027 for key in keys:
1028 if key in aliases:
1029 alias_flags[aliases[key]] = value
1030 continue
1031 keys = ("-" + key, "--" + key) if len(key) == 1 else ("--" + key,)
1032 paa(*keys, action=_FlagAction, flag=value, help=fhelp)
1033
1034 for keys, traitname in aliases.items():
1035 if not isinstance(keys, tuple):
1036 keys = (keys,)
1037
1038 for key in keys:
1039 argparse_kwds = {
1040 "type": str,
1041 "dest": traitname.replace(".", _DOT_REPLACEMENT),
1042 "metavar": traitname,
1043 }
1044 argcompleter = None
1045 if traitname in argparse_traits:
1046 trait, kwds = argparse_traits[traitname]
1047 argparse_kwds.update(kwds)
1048 if "action" in argparse_kwds and traitname in alias_flags:
1049 # flag sets 'action', so can't have flag & alias with custom action
1050 # on the same name
1051 raise ArgumentError(
1052 f"The alias `{key}` for the 'append' sequence "
1053 f"config-trait `{traitname}` cannot be also a flag!'"
1054 )
1055 # For argcomplete, check if any either an argcompleter metadata tag or method
1056 # is available. If so, it should be a callable which takes the command-line key
1057 # string as an argument and other kwargs passed by argcomplete,
1058 # and returns the a list of string completions.
1059 argcompleter = trait.metadata.get("argcompleter") or getattr(
1060 trait, "argcompleter", None
1061 )
1062 if traitname in alias_flags:
1063 # alias and flag.
1064 # when called with 0 args: flag
1065 # when called with >= 1: alias
1066 argparse_kwds.setdefault("nargs", "?")
1067 argparse_kwds["action"] = _FlagAction
1068 argparse_kwds["flag"] = alias_flags[traitname]
1069 argparse_kwds["alias"] = traitname
1070 keys = ("-" + key, "--" + key) if len(key) == 1 else ("--" + key,)
1071 action = paa(*keys, **argparse_kwds)
1072 if argcompleter is not None:
1073 # argcomplete's completers are callables returning list of completion strings
1074 action.completer = functools.partial( # type:ignore[attr-defined]
1075 argcompleter, key=key
1076 )
1077
1078 def _convert_to_config(self) -> None:
1079 """self.parsed_data->self.config, parse unrecognized extra args via KVLoader."""
1080 extra_args = self.extra_args
1081
1082 for lhs, rhs in vars(self.parsed_data).items():
1083 if lhs == "extra_args":
1084 self.extra_args = ["-" if a == _DASH_REPLACEMENT else a for a in rhs] + extra_args
1085 continue
1086 if lhs == "_flags":
1087 # _flags will be handled later
1088 continue
1089
1090 lhs = lhs.replace(_DOT_REPLACEMENT, ".")
1091 if "." not in lhs:
1092 self._handle_unrecognized_alias(lhs)
1093 trait = None
1094
1095 if isinstance(rhs, list):
1096 rhs = DeferredConfigList(rhs)
1097 elif isinstance(rhs, str):
1098 rhs = DeferredConfigString(rhs)
1099
1100 trait = self.argparse_traits.get(lhs)
1101 if trait:
1102 trait = trait[0]
1103
1104 # eval the KV assignment
1105 try:
1106 self._exec_config_str(lhs, rhs, trait)
1107 except Exception as e:
1108 # cast deferred to nicer repr for the error
1109 # DeferredList->list, etc
1110 if isinstance(rhs, DeferredConfig):
1111 rhs = rhs._super_repr()
1112 raise ArgumentError(f"Error loading argument {lhs}={rhs}, {e}") from e
1113
1114 for subc in self.parsed_data._flags:
1115 self._load_flag(subc)
1116
1117 def _handle_unrecognized_alias(self, arg: str) -> None:
1118 """Handling for unrecognized alias arguments
1119
1120 Probably a mistyped alias. By default just log a warning,
1121 but users can override this to raise an error instead, e.g.
1122 self.parser.error("Unrecognized alias: '%s'" % arg)
1123 """
1124 self.log.warning("Unrecognized alias: '%s', it will have no effect.", arg)
1125
1126 def _argcomplete(self, classes: list[t.Any], subcommands: SubcommandsDict | None) -> None:
1127 """If argcomplete is enabled, allow triggering command-line autocompletion"""
1128 try:
1129 import argcomplete # noqa: F401
1130 except ImportError:
1131 return
1132
1133 from . import argcomplete_config
1134
1135 finder = argcomplete_config.ExtendedCompletionFinder() # type:ignore[no-untyped-call]
1136 finder.config_classes = classes
1137 finder.subcommands = list(subcommands or [])
1138 # for ease of testing, pass through self._argcomplete_kwargs if set
1139 finder(self.parser, **getattr(self, "_argcomplete_kwargs", {}))
1140
1141
1142class KeyValueConfigLoader(KVArgParseConfigLoader):
1143 """Deprecated in traitlets 5.0
1144
1145 Use KVArgParseConfigLoader
1146 """
1147
1148 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
1149 warnings.warn(
1150 "KeyValueConfigLoader is deprecated since Traitlets 5.0."
1151 " Use KVArgParseConfigLoader instead.",
1152 DeprecationWarning,
1153 stacklevel=2,
1154 )
1155 super().__init__(*args, **kwargs)
1156
1157
1158def load_pyconfig_files(config_files: list[str], path: str) -> Config:
1159 """Load multiple Python config files, merging each of them in turn.
1160
1161 Parameters
1162 ----------
1163 config_files : list of str
1164 List of config files names to load and merge into the config.
1165 path : unicode
1166 The full path to the location of the config files.
1167 """
1168 config = Config()
1169 for cf in config_files:
1170 loader = PyFileConfigLoader(cf, path=path)
1171 try:
1172 next_config = loader.load_config()
1173 except ConfigFileNotFound:
1174 pass
1175 except Exception:
1176 raise
1177 else:
1178 config.merge(next_config)
1179 return config