Source code for redshred.configuration

import json
import os
from dataclasses import asdict, dataclass, field
from pathlib import Path
from textwrap import dedent
from typing import Any, Dict, List, Literal, Optional, Union

import yaml

from .util import _obfuscate_token


def _get_env_chain(chain, default=None):
    if not chain:
        raise ValueError("`chain` must be a non-empty iterable of strings, or a string")
    for env in (chain,) if isinstance(chain, str) else chain:
        if value := os.getenv(env):
            break
    return value or default


[docs]class ConfigurationFileError(Exception): pass
[docs]class ConfigurationError(Exception): pass
[docs]class Configuration: """ RedShred Configuration, here is the order that various configured sources are looked up REDSHRED_TOKEN + REDSHRED_HOST REDSHRED_CONTEXT + REDSHRED_CONFIG REDSHRED_CONTEXT (uses default config path) REDSHRED_CONFIG (defaults to currentContext) None default config path with currentContext """ context: str user: Optional[str] host: str token: str verify: bool options: Dict[str, Any] source: str def __init__(self, token=None, host=None, host_verify=None, config_path=None, context_override=None): try: token = token or _get_env_chain(("REDSHRED_TOKEN", "REDSHRED_AUTH_TOKEN")) host = host or _get_env_chain("REDSHRED_HOST") host_verify = ( host_verify if host_verify is not None else _get_env_chain("REDSHRED_HOST_VERIFY", default="true") == "true" ) config_path = config_path or _get_env_chain("REDSHRED_CONFIG") context_override = context_override or _get_env_chain("REDSHRED_CONTEXT") if token: self.token = token self.host = host or "https://api.redshred.com" self.user = "" self.verify = host_verify self.context = "" self.options = {} self.source = ("REDSHRED_TOKEN and REDSHRED_HOST" if host else "REDSHRED_TOKEN") + " env variable" else: possible_config_files = [Path(config_path).expanduser()] if config_path else [] possible_config_files.extend( [Path("~/.config/redshred/config").expanduser(), Path("~/.rsconfig").expanduser()] ) rs_config_file = next( (config_file_path for config_file_path in possible_config_files if config_file_path.exists()), None, ) if not rs_config_file: raise ConfigurationError( "Unable to configure RedShred credentials, please set REDSHRED_TOKEN environmental variable or " "use a RedShred config file" ) try: rs_config = self._load_config_file(rs_config_file) except (LookupError, AttributeError, yaml.YAMLError) as e: raise ConfigurationFileError( "Improperly formatted RedShred configuration file, refer to documentation for standard format" ) from e context = rs_config.get_context(context_override, errors="strict") or rs_config.get_context( rs_config.currentContext ) server = rs_config.get_server(context.server, errors="strict") self.token = context.token self.host = server.url self.user = context.username self.verify = server.verify self.context = context.name self.options = rs_config.options sources = [] if context_override: sources.append("REDSHRED_CONTEXT") if config_path: sources.append("REDSHRED_CONFIG") self.source = " and ".join(sources) + " env variable" if sources else "~/.rsconfig file" except (ConfigurationError, ConfigurationFileError): raise except Exception as e: raise ConfigurationError("There was a error with your RedShred configuration file/arguments") from e def __repr__(self): attr_reprs = [] for attr in ("context", "user", "host", "token"): value = _obfuscate_token(getattr(self, attr)) if attr == "token" else getattr(self, attr) attr_reprs.append(f"{attr!r}={value!r}") return f"{self.__class__.__name__}({', '.join(attr_reprs)})"
[docs] def info(self): """multiline string representation of the configuration details""" return dedent( f""" Context: {self.context} Host: {self.host} Token: {_obfuscate_token(self.token)} User: {self.user} Source: {self.source} """ ).strip()
@staticmethod def _load_config_file(rs_config_file: Union[Path, str]): if isinstance(rs_config_file, Path): config_file = rs_config_file.absolute() elif isinstance(rs_config_file, str): config_file = os.path.abspath(os.path.expanduser(rs_config_file)) else: raise TypeError(f"`str` or `Path` object expected, {type(rs_config_file)!r} given") return _RedShredConfigFile(**yaml.safe_load(open(config_file).read()))
@dataclass class _RedShredConfigFile: @dataclass class Server: name: str url: str verify: bool = True @dataclass class Context: name: str server: str token: str username: str = "" @dataclass class Options: verbosity: int = 0 apiVersion: str = field(default="redshred/v2") kind: Literal["Config"] = field(default="Config") currentContext: Optional[str] = field(default="default") options: Dict[str, Any] = field(default_factory=dict) servers: List[Server] = field(default_factory=list) contexts: List[Context] = field(default_factory=list) def __post_init__(self): self.servers = [self.Server(**server) for server in self.servers] self.contexts = [self.Context(**ctx) for ctx in self.contexts] self.Options = self.Options(**self.options) def dict(self): return asdict(self) def json(self, indent=2): return json.dumps(self.dict(), indent=indent) def yaml(self, indent=2): return yaml.dump(self.dict(), Dumper=yaml.Dumper, indent=indent) def get_context(self, context, errors="ignore"): for ctx in self.contexts: if ctx.name == context: return ctx if errors == "strict" and context: raise ValueError(f"Context {context!r} not found/") def get_server(self, server, errors="ignore"): for svr in self.servers: if svr.name == server: return svr if errors == "strict" and server: raise ValueError(f"Server {server!r} not found/") def set_context(self, context): if self.get_context(context): self.currentContext = context else: valid_contexts = {ctx.name for ctx in self.contexts} raise ValueError(f"{context!r} context does not exist, options are {valid_contexts}")