1"""A base class for objects that are configurable.""" 
    2 
    3# Copyright (c) IPython Development Team. 
    4# Distributed under the terms of the Modified BSD License. 
    5from __future__ import annotations 
    6 
    7import logging 
    8import typing as t 
    9from copy import deepcopy 
    10from textwrap import dedent 
    11 
    12from traitlets.traitlets import ( 
    13    Any, 
    14    Container, 
    15    Dict, 
    16    HasTraits, 
    17    Instance, 
    18    TraitType, 
    19    default, 
    20    observe, 
    21    observe_compat, 
    22    validate, 
    23) 
    24from traitlets.utils import warnings 
    25from traitlets.utils.bunch import Bunch 
    26from traitlets.utils.text import indent, wrap_paragraphs 
    27 
    28from .loader import Config, DeferredConfig, LazyConfigValue, _is_section_key 
    29 
    30# ----------------------------------------------------------------------------- 
    31# Helper classes for Configurables 
    32# ----------------------------------------------------------------------------- 
    33 
    34if t.TYPE_CHECKING: 
    35    LoggerType = t.Union[logging.Logger, logging.LoggerAdapter[t.Any]] 
    36else: 
    37    LoggerType = t.Any 
    38 
    39 
    40class ConfigurableError(Exception): 
    41    pass 
    42 
    43 
    44class MultipleInstanceError(ConfigurableError): 
    45    pass 
    46 
    47 
    48# ----------------------------------------------------------------------------- 
    49# Configurable implementation 
    50# ----------------------------------------------------------------------------- 
    51 
    52 
    53class Configurable(HasTraits): 
    54    config = Instance(Config, (), {}) 
    55    parent = Instance("traitlets.config.configurable.Configurable", allow_none=True) 
    56 
    57    def __init__(self, **kwargs: t.Any) -> None: 
    58        """Create a configurable given a config config. 
    59 
    60        Parameters 
    61        ---------- 
    62        config : Config 
    63            If this is empty, default values are used. If config is a 
    64            :class:`Config` instance, it will be used to configure the 
    65            instance. 
    66        parent : Configurable instance, optional 
    67            The parent Configurable instance of this object. 
    68 
    69        Notes 
    70        ----- 
    71        Subclasses of Configurable must call the :meth:`__init__` method of 
    72        :class:`Configurable` *before* doing anything else and using 
    73        :func:`super`:: 
    74 
    75            class MyConfigurable(Configurable): 
    76                def __init__(self, config=None): 
    77                    super(MyConfigurable, self).__init__(config=config) 
    78                    # Then any other code you need to finish initialization. 
    79 
    80        This ensures that instances will be configured properly. 
    81        """ 
    82        parent = kwargs.pop("parent", None) 
    83        if parent is not None: 
    84            # config is implied from parent 
    85            if kwargs.get("config", None) is None: 
    86                kwargs["config"] = parent.config 
    87            self.parent = parent 
    88 
    89        config = kwargs.pop("config", None) 
    90 
    91        # load kwarg traits, other than config 
    92        super().__init__(**kwargs) 
    93 
    94        # record traits set by config 
    95        config_override_names = set() 
    96 
    97        def notice_config_override(change: Bunch) -> None: 
    98            """Record traits set by both config and kwargs. 
    99 
    100            They will need to be overridden again after loading config. 
    101            """ 
    102            if change.name in kwargs: 
    103                config_override_names.add(change.name) 
    104 
    105        self.observe(notice_config_override) 
    106 
    107        # load config 
    108        if config is not None: 
    109            # We used to deepcopy, but for now we are trying to just save 
    110            # by reference.  This *could* have side effects as all components 
    111            # will share config. In fact, I did find such a side effect in 
    112            # _config_changed below. If a config attribute value was a mutable type 
    113            # all instances of a component were getting the same copy, effectively 
    114            # making that a class attribute. 
    115            # self.config = deepcopy(config) 
    116            self.config = config 
    117        else: 
    118            # allow _config_default to return something 
    119            self._load_config(self.config) 
    120        self.unobserve(notice_config_override) 
    121 
    122        for name in config_override_names: 
    123            setattr(self, name, kwargs[name]) 
    124 
    125    # ------------------------------------------------------------------------- 
    126    # Static trait notifications 
    127    # ------------------------------------------------------------------------- 
    128 
    129    @classmethod 
    130    def section_names(cls) -> list[str]: 
    131        """return section names as a list""" 
    132        return [ 
    133            c.__name__ 
    134            for c in reversed(cls.__mro__) 
    135            if issubclass(c, Configurable) and issubclass(cls, c) 
    136        ] 
    137 
    138    def _find_my_config(self, cfg: Config) -> t.Any: 
    139        """extract my config from a global Config object 
    140 
    141        will construct a Config object of only the config values that apply to me 
    142        based on my mro(), as well as those of my parent(s) if they exist. 
    143 
    144        If I am Bar and my parent is Foo, and their parent is Tim, 
    145        this will return merge following config sections, in this order:: 
    146 
    147            [Bar, Foo.Bar, Tim.Foo.Bar] 
    148 
    149        With the last item being the highest priority. 
    150        """ 
    151        cfgs = [cfg] 
    152        if self.parent: 
    153            cfgs.append(self.parent._find_my_config(cfg)) 
    154        my_config = Config() 
    155        for c in cfgs: 
    156            for sname in self.section_names(): 
    157                # Don't do a blind getattr as that would cause the config to 
    158                # dynamically create the section with name Class.__name__. 
    159                if c._has_section(sname): 
    160                    my_config.merge(c[sname]) 
    161        return my_config 
    162 
    163    def _load_config( 
    164        self, 
    165        cfg: Config, 
    166        section_names: list[str] | None = None, 
    167        traits: dict[str, TraitType[t.Any, t.Any]] | None = None, 
    168    ) -> None: 
    169        """load traits from a Config object""" 
    170 
    171        if traits is None: 
    172            traits = self.traits(config=True) 
    173        if section_names is None: 
    174            section_names = self.section_names() 
    175 
    176        my_config = self._find_my_config(cfg) 
    177 
    178        # hold trait notifications until after all config has been loaded 
    179        with self.hold_trait_notifications(): 
    180            for name, config_value in my_config.items(): 
    181                if name in traits: 
    182                    if isinstance(config_value, LazyConfigValue): 
    183                        # ConfigValue is a wrapper for using append / update on containers 
    184                        # without having to copy the initial value 
    185                        initial = getattr(self, name) 
    186                        config_value = config_value.get_value(initial) 
    187                    elif isinstance(config_value, DeferredConfig): 
    188                        # DeferredConfig tends to come from CLI/environment variables 
    189                        config_value = config_value.get_value(traits[name]) 
    190                    # We have to do a deepcopy here if we don't deepcopy the entire 
    191                    # config object. If we don't, a mutable config_value will be 
    192                    # shared by all instances, effectively making it a class attribute. 
    193                    setattr(self, name, deepcopy(config_value)) 
    194                elif not _is_section_key(name) and not isinstance(config_value, Config): 
    195                    from difflib import get_close_matches 
    196 
    197                    if isinstance(self, LoggingConfigurable): 
    198                        assert self.log is not None 
    199                        warn = self.log.warning 
    200                    else: 
    201 
    202                        def warn(msg: t.Any) -> None: 
    203                            return warnings.warn(msg, UserWarning, stacklevel=9) 
    204 
    205                    matches = get_close_matches(name, traits) 
    206                    msg = f"Config option `{name}` not recognized by `{self.__class__.__name__}`." 
    207 
    208                    if len(matches) == 1: 
    209                        msg += f"  Did you mean `{matches[0]}`?" 
    210                    elif len(matches) >= 1: 
    211                        msg += "  Did you mean one of: `{matches}`?".format( 
    212                            matches=", ".join(sorted(matches)) 
    213                        ) 
    214                    warn(msg) 
    215 
    216    @observe("config") 
    217    @observe_compat 
    218    def _config_changed(self, change: Bunch) -> None: 
    219        """Update all the class traits having ``config=True`` in metadata. 
    220 
    221        For any class trait with a ``config`` metadata attribute that is 
    222        ``True``, we update the trait with the value of the corresponding 
    223        config entry. 
    224        """ 
    225        # Get all traits with a config metadata entry that is True 
    226        traits = self.traits(config=True) 
    227 
    228        # We auto-load config section for this class as well as any parent 
    229        # classes that are Configurable subclasses.  This starts with Configurable 
    230        # and works down the mro loading the config for each section. 
    231        section_names = self.section_names() 
    232        self._load_config(change.new, traits=traits, section_names=section_names) 
    233 
    234    def update_config(self, config: Config) -> None: 
    235        """Update config and load the new values""" 
    236        # traitlets prior to 4.2 created a copy of self.config in order to trigger change events. 
    237        # Some projects (IPython < 5) relied upon one side effect of this, 
    238        # that self.config prior to update_config was not modified in-place. 
    239        # For backward-compatibility, we must ensure that self.config 
    240        # is a new object and not modified in-place, 
    241        # but config consumers should not rely on this behavior. 
    242        self.config = deepcopy(self.config) 
    243        # load config 
    244        self._load_config(config) 
    245        # merge it into self.config 
    246        self.config.merge(config) 
    247        # TODO: trigger change event if/when dict-update change events take place 
    248        # DO NOT trigger full trait-change 
    249 
    250    @classmethod 
    251    def class_get_help(cls, inst: HasTraits | None = None) -> str: 
    252        """Get the help string for this class in ReST format. 
    253 
    254        If `inst` is given, its current trait values will be used in place of 
    255        class defaults. 
    256        """ 
    257        assert inst is None or isinstance(inst, cls) 
    258        final_help = [] 
    259        base_classes = ", ".join(p.__name__ for p in cls.__bases__) 
    260        final_help.append(f"{cls.__name__}({base_classes}) options") 
    261        final_help.append(len(final_help[0]) * "-") 
    262        for _, v in sorted(cls.class_traits(config=True).items()): 
    263            help = cls.class_get_trait_help(v, inst) 
    264            final_help.append(help) 
    265        return "\n".join(final_help) 
    266 
    267    @classmethod 
    268    def class_get_trait_help( 
    269        cls, 
    270        trait: TraitType[t.Any, t.Any], 
    271        inst: HasTraits | None = None, 
    272        helptext: str | None = None, 
    273    ) -> str: 
    274        """Get the helptext string for a single trait. 
    275 
    276        :param inst: 
    277            If given, its current trait values will be used in place of 
    278            the class default. 
    279        :param helptext: 
    280            If not given, uses the `help` attribute of the current trait. 
    281        """ 
    282        assert inst is None or isinstance(inst, cls) 
    283        lines = [] 
    284        header = f"--{cls.__name__}.{trait.name}" 
    285        if isinstance(trait, (Container, Dict)): 
    286            multiplicity = trait.metadata.get("multiplicity", "append") 
    287            if isinstance(trait, Dict): 
    288                sample_value = "<key-1>=<value-1>" 
    289            else: 
    290                sample_value = "<%s-item-1>" % trait.__class__.__name__.lower() 
    291            if multiplicity == "append": 
    292                header = f"{header}={sample_value}..." 
    293            else: 
    294                header = f"{header} {sample_value}..." 
    295        else: 
    296            header = f"{header}=<{trait.__class__.__name__}>" 
    297        # header = "--%s.%s=<%s>" % (cls.__name__, trait.name, trait.__class__.__name__) 
    298        lines.append(header) 
    299 
    300        if helptext is None: 
    301            helptext = trait.help 
    302        if helptext != "": 
    303            helptext = "\n".join(wrap_paragraphs(helptext, 76)) 
    304            lines.append(indent(helptext)) 
    305 
    306        if "Enum" in trait.__class__.__name__: 
    307            # include Enum choices 
    308            lines.append(indent("Choices: %s" % trait.info())) 
    309 
    310        if inst is not None: 
    311            lines.append(indent(f"Current: {getattr(inst, trait.name or '')!r}")) 
    312        else: 
    313            try: 
    314                dvr = trait.default_value_repr() 
    315            except Exception: 
    316                dvr = None  # ignore defaults we can't construct 
    317            if dvr is not None: 
    318                if len(dvr) > 64: 
    319                    dvr = dvr[:61] + "..." 
    320                lines.append(indent("Default: %s" % dvr)) 
    321 
    322        return "\n".join(lines) 
    323 
    324    @classmethod 
    325    def class_print_help(cls, inst: HasTraits | None = None) -> None: 
    326        """Get the help string for a single trait and print it.""" 
    327        print(cls.class_get_help(inst))  # noqa: T201 
    328 
    329    @classmethod 
    330    def _defining_class( 
    331        cls, trait: TraitType[t.Any, t.Any], classes: t.Sequence[type[HasTraits]] 
    332    ) -> type[Configurable]: 
    333        """Get the class that defines a trait 
    334 
    335        For reducing redundant help output in config files. 
    336        Returns the current class if: 
    337        - the trait is defined on this class, or 
    338        - the class where it is defined would not be in the config file 
    339 
    340        Parameters 
    341        ---------- 
    342        trait : Trait 
    343            The trait to look for 
    344        classes : list 
    345            The list of other classes to consider for redundancy. 
    346            Will return `cls` even if it is not defined on `cls` 
    347            if the defining class is not in `classes`. 
    348        """ 
    349        defining_cls = cls 
    350        assert trait.name is not None 
    351        for parent in cls.mro(): 
    352            if ( 
    353                issubclass(parent, Configurable) 
    354                and parent in classes 
    355                and parent.class_own_traits(config=True).get(trait.name, None) is trait 
    356            ): 
    357                defining_cls = parent 
    358        return defining_cls 
    359 
    360    @classmethod 
    361    def class_config_section(cls, classes: t.Sequence[type[HasTraits]] | None = None) -> str: 
    362        """Get the config section for this class. 
    363 
    364        Parameters 
    365        ---------- 
    366        classes : list, optional 
    367            The list of other classes in the config file. 
    368            Used to reduce redundant information. 
    369        """ 
    370 
    371        def c(s: str) -> str: 
    372            """return a commented, wrapped block.""" 
    373            s = "\n\n".join(wrap_paragraphs(s, 78)) 
    374 
    375            return "## " + s.replace("\n", "\n#  ") 
    376 
    377        # section header 
    378        breaker = "#" + "-" * 78 
    379        parent_classes = ", ".join(p.__name__ for p in cls.__bases__ if issubclass(p, Configurable)) 
    380 
    381        s = f"# {cls.__name__}({parent_classes}) configuration" 
    382        lines = [breaker, s, breaker] 
    383        # get the description trait 
    384        desc = cls.class_traits().get("description") 
    385        if desc: 
    386            desc = desc.default_value 
    387        if not desc: 
    388            # no description from trait, use __doc__ 
    389            desc = getattr(cls, "__doc__", "")  # type:ignore[arg-type] 
    390        if desc: 
    391            lines.append(c(desc))  # type:ignore[arg-type] 
    392            lines.append("") 
    393 
    394        for name, trait in sorted(cls.class_traits(config=True).items()): 
    395            default_repr = trait.default_value_repr() 
    396 
    397            if classes: 
    398                defining_class = cls._defining_class(trait, classes) 
    399            else: 
    400                defining_class = cls 
    401            if defining_class is cls: 
    402                # cls owns the trait, show full help 
    403                if trait.help: 
    404                    lines.append(c(trait.help)) 
    405                if "Enum" in type(trait).__name__: 
    406                    # include Enum choices 
    407                    lines.append("#  Choices: %s" % trait.info()) 
    408                lines.append("#  Default: %s" % default_repr) 
    409            else: 
    410                # Trait appears multiple times and isn't defined here. 
    411                # Truncate help to first line + "See also Original.trait" 
    412                if trait.help: 
    413                    lines.append(c(trait.help.split("\n", 1)[0])) 
    414                lines.append(f"#  See also: {defining_class.__name__}.{name}") 
    415 
    416            lines.append(f"# c.{cls.__name__}.{name} = {default_repr}") 
    417            lines.append("") 
    418        return "\n".join(lines) 
    419 
    420    @classmethod 
    421    def class_config_rst_doc(cls) -> str: 
    422        """Generate rST documentation for this class' config options. 
    423 
    424        Excludes traits defined on parent classes. 
    425        """ 
    426        lines = [] 
    427        classname = cls.__name__ 
    428        for _, trait in sorted(cls.class_traits(config=True).items()): 
    429            ttype = trait.__class__.__name__ 
    430 
    431            if not trait.name: 
    432                continue 
    433            termline = classname + "." + trait.name 
    434 
    435            # Choices or type 
    436            if "Enum" in ttype: 
    437                # include Enum choices 
    438                termline += " : " + trait.info_rst()  # type:ignore[attr-defined] 
    439            else: 
    440                termline += " : " + ttype 
    441            lines.append(termline) 
    442 
    443            # Default value 
    444            try: 
    445                dvr = trait.default_value_repr() 
    446            except Exception: 
    447                dvr = None  # ignore defaults we can't construct 
    448            if dvr is not None: 
    449                if len(dvr) > 64: 
    450                    dvr = dvr[:61] + "..." 
    451                # Double up backslashes, so they get to the rendered docs 
    452                dvr = dvr.replace("\\n", "\\\\n") 
    453                lines.append(indent("Default: ``%s``" % dvr)) 
    454                lines.append("") 
    455 
    456            help = trait.help or "No description" 
    457            lines.append(indent(dedent(help))) 
    458 
    459            # Blank line 
    460            lines.append("") 
    461 
    462        return "\n".join(lines) 
    463 
    464 
    465class LoggingConfigurable(Configurable): 
    466    """A parent class for Configurables that log. 
    467 
    468    Subclasses have a log trait, and the default behavior 
    469    is to get the logger from the currently running Application. 
    470    """ 
    471 
    472    log = Any(help="Logger or LoggerAdapter instance", allow_none=False) 
    473 
    474    @validate("log") 
    475    def _validate_log(self, proposal: Bunch) -> LoggerType: 
    476        if not isinstance(proposal.value, (logging.Logger, logging.LoggerAdapter)): 
    477            # warn about unsupported type, but be lenient to allow for duck typing 
    478            warnings.warn( 
    479                f"{self.__class__.__name__}.log should be a Logger or LoggerAdapter," 
    480                f" got {proposal.value}.", 
    481                UserWarning, 
    482                stacklevel=2, 
    483            ) 
    484        return t.cast(LoggerType, proposal.value) 
    485 
    486    @default("log") 
    487    def _log_default(self) -> LoggerType: 
    488        if isinstance(self.parent, LoggingConfigurable): 
    489            assert self.parent is not None 
    490            return t.cast(logging.Logger, self.parent.log) 
    491        from traitlets import log 
    492 
    493        return log.get_logger() 
    494 
    495    def _get_log_handler(self) -> logging.Handler | None: 
    496        """Return the default Handler 
    497 
    498        Returns None if none can be found 
    499 
    500        Deprecated, this now returns the first log handler which may or may 
    501        not be the default one. 
    502        """ 
    503        if not self.log: 
    504            return None 
    505        logger: logging.Logger = ( 
    506            self.log if isinstance(self.log, logging.Logger) else self.log.logger 
    507        ) 
    508        if not getattr(logger, "handlers", None): 
    509            # no handlers attribute or empty handlers list 
    510            return None 
    511        return logger.handlers[0] 
    512 
    513 
    514CT = t.TypeVar("CT", bound="SingletonConfigurable") 
    515 
    516 
    517class SingletonConfigurable(LoggingConfigurable): 
    518    """A configurable that only allows one instance. 
    519 
    520    This class is for classes that should only have one instance of itself 
    521    or *any* subclass. To create and retrieve such a class use the 
    522    :meth:`SingletonConfigurable.instance` method. 
    523    """ 
    524 
    525    _instance = None 
    526 
    527    @classmethod 
    528    def _walk_mro(cls) -> t.Generator[type[SingletonConfigurable], None, None]: 
    529        """Walk the cls.mro() for parent classes that are also singletons 
    530 
    531        For use in instance() 
    532        """ 
    533 
    534        for subclass in cls.mro(): 
    535            if ( 
    536                issubclass(cls, subclass) 
    537                and issubclass(subclass, SingletonConfigurable) 
    538                and subclass != SingletonConfigurable 
    539            ): 
    540                yield subclass 
    541 
    542    @classmethod 
    543    def clear_instance(cls) -> None: 
    544        """unset _instance for this class and singleton parents.""" 
    545        if not cls.initialized(): 
    546            return 
    547        for subclass in cls._walk_mro(): 
    548            if isinstance(subclass._instance, cls): 
    549                # only clear instances that are instances 
    550                # of the calling class 
    551                subclass._instance = None  # type:ignore[unreachable] 
    552 
    553    @classmethod 
    554    def instance(cls: type[CT], *args: t.Any, **kwargs: t.Any) -> CT: 
    555        """Returns a global instance of this class. 
    556 
    557        This method create a new instance if none have previously been created 
    558        and returns a previously created instance is one already exists. 
    559 
    560        The arguments and keyword arguments passed to this method are passed 
    561        on to the :meth:`__init__` method of the class upon instantiation. 
    562 
    563        Examples 
    564        -------- 
    565        Create a singleton class using instance, and retrieve it:: 
    566 
    567            >>> from traitlets.config.configurable import SingletonConfigurable 
    568            >>> class Foo(SingletonConfigurable): pass 
    569            >>> foo = Foo.instance() 
    570            >>> foo == Foo.instance() 
    571            True 
    572 
    573        Create a subclass that is retrieved using the base class instance:: 
    574 
    575            >>> class Bar(SingletonConfigurable): pass 
    576            >>> class Bam(Bar): pass 
    577            >>> bam = Bam.instance() 
    578            >>> bam == Bar.instance() 
    579            True 
    580        """ 
    581        # Create and save the instance 
    582        if cls._instance is None: 
    583            inst = cls(*args, **kwargs) 
    584            # Now make sure that the instance will also be returned by 
    585            # parent classes' _instance attribute. 
    586            for subclass in cls._walk_mro(): 
    587                subclass._instance = inst 
    588 
    589        if isinstance(cls._instance, cls): 
    590            return cls._instance 
    591        else: 
    592            raise MultipleInstanceError( 
    593                f"An incompatible sibling of '{cls.__name__}' is already instantiated" 
    594                f" as singleton: {type(cls._instance).__name__}" 
    595            ) 
    596 
    597    @classmethod 
    598    def initialized(cls) -> bool: 
    599        """Has an instance been created?""" 
    600        return hasattr(cls, "_instance") and cls._instance is not None