import json
import os
from dataclasses import asdict, dataclass, field
from pathlib import Path
from textwrap import dedent
from typing import Any, Dict, List, Literal, Optional, Union
import yaml
from .util import _obfuscate_token
def _get_env_chain(chain, default=None):
if not chain:
raise ValueError("`chain` must be a non-empty iterable of strings, or a string")
for env in (chain,) if isinstance(chain, str) else chain:
if value := os.getenv(env):
break
return value or default
[docs]class ConfigurationFileError(Exception):
pass
[docs]class ConfigurationError(Exception):
pass
[docs]class Configuration:
"""
RedShred Configuration, here is the order that various configured sources are looked up
REDSHRED_TOKEN + REDSHRED_HOST
REDSHRED_CONTEXT + REDSHRED_CONFIG
REDSHRED_CONTEXT (uses default config path)
REDSHRED_CONFIG (defaults to currentContext)
None default config path with currentContext
"""
context: str
user: Optional[str]
host: str
token: str
verify: bool
options: Dict[str, Any]
source: str
def __init__(self, token=None, host=None, host_verify=None, config_path=None, context_override=None):
try:
token = token or _get_env_chain(("REDSHRED_TOKEN", "REDSHRED_AUTH_TOKEN"))
host = host or _get_env_chain("REDSHRED_HOST")
host_verify = (
host_verify
if host_verify is not None
else _get_env_chain("REDSHRED_HOST_VERIFY", default="true") == "true"
)
config_path = config_path or _get_env_chain("REDSHRED_CONFIG")
context_override = context_override or _get_env_chain("REDSHRED_CONTEXT")
if token:
self.token = token
self.host = host or "https://api.redshred.com"
self.user = ""
self.verify = host_verify
self.context = ""
self.options = {}
self.source = ("REDSHRED_TOKEN and REDSHRED_HOST" if host else "REDSHRED_TOKEN") + " env variable"
else:
possible_config_files = [Path(config_path).expanduser()] if config_path else []
possible_config_files.extend(
[Path("~/.config/redshred/config").expanduser(), Path("~/.rsconfig").expanduser()]
)
rs_config_file = next(
(config_file_path for config_file_path in possible_config_files if config_file_path.exists()),
None,
)
if not rs_config_file:
raise ConfigurationError(
"Unable to configure RedShred credentials, please set REDSHRED_TOKEN environmental variable or "
"use a RedShred config file"
)
try:
rs_config = self._load_config_file(rs_config_file)
except (LookupError, AttributeError, yaml.YAMLError) as e:
raise ConfigurationFileError(
"Improperly formatted RedShred configuration file, refer to documentation for standard format"
) from e
context = rs_config.get_context(context_override, errors="strict") or rs_config.get_context(
rs_config.currentContext
)
server = rs_config.get_server(context.server, errors="strict")
self.token = context.token
self.host = server.url
self.user = context.username
self.verify = server.verify
self.context = context.name
self.options = rs_config.options
sources = []
if context_override:
sources.append("REDSHRED_CONTEXT")
if config_path:
sources.append("REDSHRED_CONFIG")
self.source = " and ".join(sources) + " env variable" if sources else "~/.rsconfig file"
except (ConfigurationError, ConfigurationFileError):
raise
except Exception as e:
raise ConfigurationError("There was a error with your RedShred configuration file/arguments") from e
def __repr__(self):
attr_reprs = []
for attr in ("context", "user", "host", "token"):
value = _obfuscate_token(getattr(self, attr)) if attr == "token" else getattr(self, attr)
attr_reprs.append(f"{attr!r}={value!r}")
return f"{self.__class__.__name__}({', '.join(attr_reprs)})"
[docs] def info(self):
"""multiline string representation of the configuration details"""
return dedent(
f"""
Context: {self.context}
Host: {self.host}
Token: {_obfuscate_token(self.token)}
User: {self.user}
Source: {self.source}
"""
).strip()
@staticmethod
def _load_config_file(rs_config_file: Union[Path, str]):
if isinstance(rs_config_file, Path):
config_file = rs_config_file.absolute()
elif isinstance(rs_config_file, str):
config_file = os.path.abspath(os.path.expanduser(rs_config_file))
else:
raise TypeError(f"`str` or `Path` object expected, {type(rs_config_file)!r} given")
return _RedShredConfigFile(**yaml.safe_load(open(config_file).read()))
@dataclass
class _RedShredConfigFile:
@dataclass
class Server:
name: str
url: str
verify: bool = True
@dataclass
class Context:
name: str
server: str
token: str
username: str = ""
@dataclass
class Options:
verbosity: int = 0
apiVersion: str = field(default="redshred/v2")
kind: Literal["Config"] = field(default="Config")
currentContext: Optional[str] = field(default="default")
options: Dict[str, Any] = field(default_factory=dict)
servers: List[Server] = field(default_factory=list)
contexts: List[Context] = field(default_factory=list)
def __post_init__(self):
self.servers = [self.Server(**server) for server in self.servers]
self.contexts = [self.Context(**ctx) for ctx in self.contexts]
self.Options = self.Options(**self.options)
def dict(self):
return asdict(self)
def json(self, indent=2):
return json.dumps(self.dict(), indent=indent)
def yaml(self, indent=2):
return yaml.dump(self.dict(), Dumper=yaml.Dumper, indent=indent)
def get_context(self, context, errors="ignore"):
for ctx in self.contexts:
if ctx.name == context:
return ctx
if errors == "strict" and context:
raise ValueError(f"Context {context!r} not found/")
def get_server(self, server, errors="ignore"):
for svr in self.servers:
if svr.name == server:
return svr
if errors == "strict" and server:
raise ValueError(f"Server {server!r} not found/")
def set_context(self, context):
if self.get_context(context):
self.currentContext = context
else:
valid_contexts = {ctx.name for ctx in self.contexts}
raise ValueError(f"{context!r} context does not exist, options are {valid_contexts}")