1import os
2import warnings
3from functools import reduce
4from glob import iglob
5from pathlib import Path
6from typing import TYPE_CHECKING, Any, Literal, Optional
7
8from ...exceptions import SettingsError
9from ...utils import path_type_label
10from ..base import PydanticBaseSettingsSource
11from ..utils import parse_env_vars
12from .env import EnvSettingsSource
13from .secrets import SecretsSettingsSource
14
15if TYPE_CHECKING:
16 from ...main import BaseSettings
17 from ...sources import PathType
18
19
20SECRETS_DIR_MAX_SIZE = 16 * 2**20 # 16 MiB seems to be a reasonable default
21
22
23class NestedSecretsSettingsSource(EnvSettingsSource):
24 def __init__(
25 self,
26 file_secret_settings: PydanticBaseSettingsSource | SecretsSettingsSource,
27 secrets_dir: Optional['PathType'] = None,
28 secrets_dir_missing: Literal['ok', 'warn', 'error'] | None = None,
29 secrets_dir_max_size: int | None = None,
30 secrets_case_sensitive: bool | None = None,
31 secrets_prefix: str | None = None,
32 secrets_nested_delimiter: str | None = None,
33 secrets_nested_subdir: bool | None = None,
34 # args for compatibility with SecretsSettingsSource, don't use directly
35 case_sensitive: bool | None = None,
36 env_prefix: str | None = None,
37 ) -> None:
38 # We allow the first argument to be settings_cls like original
39 # SecretsSettingsSource. However, it is recommended to pass
40 # SecretsSettingsSource instance instead (as it is shown in usage examples),
41 # otherwise `_secrets_dir` arg passed to Settings() constructor will be ignored.
42 settings_cls: type[BaseSettings] = getattr(
43 file_secret_settings,
44 'settings_cls',
45 file_secret_settings, # type: ignore[arg-type]
46 )
47 # config options
48 conf = settings_cls.model_config
49 self.secrets_dir: PathType | None = first_not_none(
50 getattr(file_secret_settings, 'secrets_dir', None),
51 secrets_dir,
52 conf.get('secrets_dir'),
53 )
54 self.secrets_dir_missing: Literal['ok', 'warn', 'error'] = first_not_none(
55 secrets_dir_missing,
56 conf.get('secrets_dir_missing'),
57 'warn',
58 )
59 if self.secrets_dir_missing not in ('ok', 'warn', 'error'):
60 raise SettingsError(f'invalid secrets_dir_missing value: {self.secrets_dir_missing}')
61 self.secrets_dir_max_size: int = first_not_none(
62 secrets_dir_max_size,
63 conf.get('secrets_dir_max_size'),
64 SECRETS_DIR_MAX_SIZE,
65 )
66 self.case_sensitive: bool = first_not_none(
67 secrets_case_sensitive,
68 conf.get('secrets_case_sensitive'),
69 case_sensitive,
70 conf.get('case_sensitive'),
71 False,
72 )
73 self.secrets_prefix: str = first_not_none(
74 secrets_prefix,
75 conf.get('secrets_prefix'),
76 env_prefix,
77 conf.get('env_prefix'),
78 '',
79 )
80
81 # nested options
82 self.secrets_nested_delimiter: str | None = first_not_none(
83 secrets_nested_delimiter,
84 conf.get('secrets_nested_delimiter'),
85 conf.get('env_nested_delimiter'),
86 )
87 self.secrets_nested_subdir: bool = first_not_none(
88 secrets_nested_subdir,
89 conf.get('secrets_nested_subdir'),
90 False,
91 )
92 if self.secrets_nested_subdir:
93 if secrets_nested_delimiter or conf.get('secrets_nested_delimiter'):
94 raise SettingsError('Options secrets_nested_delimiter and secrets_nested_subdir are mutually exclusive')
95 else:
96 self.secrets_nested_delimiter = os.sep
97
98 # ensure valid secrets_path
99 if self.secrets_dir is None:
100 paths = []
101 elif isinstance(self.secrets_dir, (Path, str)):
102 paths = [self.secrets_dir]
103 else:
104 paths = list(self.secrets_dir)
105 self.secrets_paths: list[Path] = [Path(p).expanduser().resolve() for p in paths]
106 for path in self.secrets_paths:
107 self.validate_secrets_path(path)
108
109 # construct parent
110 super().__init__(
111 settings_cls,
112 case_sensitive=self.case_sensitive,
113 env_prefix=self.secrets_prefix,
114 env_nested_delimiter=self.secrets_nested_delimiter,
115 env_ignore_empty=False, # match SecretsSettingsSource behaviour
116 env_parse_enums=True, # we can pass everything here, it will still behave as "True"
117 env_parse_none_str=None, # match SecretsSettingsSource behaviour
118 )
119 self.env_parse_none_str = None # update manually because of None
120
121 # update parent members
122 if not len(self.secrets_paths):
123 self.env_vars = {}
124 else:
125 secrets = reduce(
126 lambda d1, d2: dict((*d1.items(), *d2.items())),
127 (self.load_secrets(p) for p in self.secrets_paths),
128 )
129 self.env_vars = parse_env_vars(
130 secrets,
131 self.case_sensitive,
132 self.env_ignore_empty,
133 self.env_parse_none_str,
134 )
135
136 def validate_secrets_path(self, path: Path) -> None:
137 if not path.exists():
138 if self.secrets_dir_missing == 'ok':
139 pass
140 elif self.secrets_dir_missing == 'warn':
141 warnings.warn(f'directory "{path}" does not exist', stacklevel=2)
142 elif self.secrets_dir_missing == 'error':
143 raise SettingsError(f'directory "{path}" does not exist')
144 else:
145 raise ValueError # unreachable, checked before
146 else:
147 if not path.is_dir():
148 raise SettingsError(f'secrets_dir must reference a directory, not a {path_type_label(path)}')
149 secrets_dir_size = sum(f.stat().st_size for f in path.glob('**/*') if f.is_file())
150 if secrets_dir_size > self.secrets_dir_max_size:
151 raise SettingsError(f'secrets_dir size is above {self.secrets_dir_max_size} bytes')
152
153 @staticmethod
154 def load_secrets(path: Path) -> dict[str, str]:
155 return {
156 str(p.relative_to(path)): p.read_text().strip()
157 for p in map(Path, iglob(f'{path}/**/*', recursive=True))
158 if p.is_file()
159 }
160
161 def __repr__(self) -> str:
162 return f'NestedSecretsSettingsSource(secrets_dir={self.secrets_dir!r})'
163
164
165def first_not_none(*objs: Any) -> Any:
166 return next(filter(lambda o: o is not None, objs), None)