Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/traitlets/config/loader.py: 24%

571 statements  

« prev     ^ index     » next       coverage.py v7.3.3, created at 2023-12-15 06:13 +0000

1"""A simple configuration system.""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5from __future__ import annotations 

6 

7import argparse 

8import copy 

9import functools 

10import json 

11import os 

12import re 

13import sys 

14import typing as t 

15from logging import Logger 

16 

17from traitlets.traitlets import Any, Container, Dict, HasTraits, List, TraitType, Undefined 

18 

19from ..utils import cast_unicode, filefind, warnings 

20 

21# ----------------------------------------------------------------------------- 

22# Exceptions 

23# ----------------------------------------------------------------------------- 

24 

25 

26class ConfigError(Exception): 

27 pass 

28 

29 

30class ConfigLoaderError(ConfigError): 

31 pass 

32 

33 

34class ConfigFileNotFound(ConfigError): # noqa 

35 pass 

36 

37 

38class ArgumentError(ConfigLoaderError): 

39 pass 

40 

41 

42# ----------------------------------------------------------------------------- 

43# Argparse fix 

44# ----------------------------------------------------------------------------- 

45 

46# Unfortunately argparse by default prints help messages to stderr instead of 

47# stdout. This makes it annoying to capture long help screens at the command 

48# line, since one must know how to pipe stderr, which many users don't know how 

49# to do. So we override the print_help method with one that defaults to 

50# stdout and use our class instead. 

51 

52 

53class _Sentinel: 

54 def __repr__(self) -> str: 

55 return "<Sentinel deprecated>" 

56 

57 def __str__(self) -> str: 

58 return "<deprecated>" 

59 

60 

61_deprecated = _Sentinel() 

62 

63 

64class ArgumentParser(argparse.ArgumentParser): 

65 """Simple argparse subclass that prints help to stdout by default.""" 

66 

67 def print_help(self, file: t.Any = None) -> None: 

68 if file is None: 

69 file = sys.stdout 

70 return super().print_help(file) 

71 

72 print_help.__doc__ = argparse.ArgumentParser.print_help.__doc__ 

73 

74 

75# ----------------------------------------------------------------------------- 

76# Config class for holding config information 

77# ----------------------------------------------------------------------------- 

78 

79 

80def execfile(fname: str, glob: dict[str, Any]) -> None: 

81 with open(fname, "rb") as f: 

82 exec(compile(f.read(), fname, "exec"), glob, glob) # noqa 

83 

84 

85class LazyConfigValue(HasTraits): 

86 """Proxy object for exposing methods on configurable containers 

87 

88 These methods allow appending/extending/updating 

89 to add to non-empty defaults instead of clobbering them. 

90 

91 Exposes: 

92 

93 - append, extend, insert on lists 

94 - update on dicts 

95 - update, add on sets 

96 """ 

97 

98 _value = None 

99 

100 # list methods 

101 _extend: List[t.Any] = List() 

102 _prepend: List[t.Any] = List() 

103 _inserts: List[t.Any] = List() 

104 

105 def append(self, obj: t.Any) -> None: 

106 """Append an item to a List""" 

107 self._extend.append(obj) 

108 

109 def extend(self, other: t.Any) -> None: 

110 """Extend a list""" 

111 self._extend.extend(other) 

112 

113 def prepend(self, other: t.Any) -> None: 

114 """like list.extend, but for the front""" 

115 self._prepend[:0] = other 

116 

117 def merge_into(self, other: t.Any) -> t.Any: 

118 """ 

119 Merge with another earlier LazyConfigValue or an earlier container. 

120 This is useful when having global system-wide configuration files. 

121 

122 Self is expected to have higher precedence. 

123 

124 Parameters 

125 ---------- 

126 other : LazyConfigValue or container 

127 

128 Returns 

129 ------- 

130 LazyConfigValue 

131 if ``other`` is also lazy, a reified container otherwise. 

132 """ 

133 if isinstance(other, LazyConfigValue): 

134 other._extend.extend(self._extend) 

135 self._extend = other._extend 

136 

137 self._prepend.extend(other._prepend) 

138 

139 other._inserts.extend(self._inserts) 

140 self._inserts = other._inserts 

141 

142 if self._update: 

143 other.update(self._update) 

144 self._update = other._update 

145 return self 

146 else: 

147 # other is a container, reify now. 

148 return self.get_value(other) 

149 

150 def insert(self, index: int, other: t.Any) -> None: 

151 if not isinstance(index, int): 

152 raise TypeError("An integer is required") 

153 self._inserts.append((index, other)) 

154 

155 # dict methods 

156 # update is used for both dict and set 

157 _update = Any() 

158 

159 def update(self, other: t.Any) -> None: 

160 """Update either a set or dict""" 

161 if self._update is None: 

162 if isinstance(other, dict): 

163 self._update = {} 

164 else: 

165 self._update = set() 

166 self._update.update(other) 

167 

168 # set methods 

169 def add(self, obj: t.Any) -> None: 

170 """Add an item to a set""" 

171 self.update({obj}) 

172 

173 def get_value(self, initial: t.Any) -> t.Any: 

174 """construct the value from the initial one 

175 

176 after applying any insert / extend / update changes 

177 """ 

178 if self._value is not None: 

179 return self._value # type:ignore[unreachable] 

180 value = copy.deepcopy(initial) 

181 if isinstance(value, list): 

182 for idx, obj in self._inserts: 

183 value.insert(idx, obj) 

184 value[:0] = self._prepend 

185 value.extend(self._extend) 

186 

187 elif isinstance(value, dict): 

188 if self._update: 

189 value.update(self._update) 

190 elif isinstance(value, set): 

191 if self._update: 

192 value.update(self._update) 

193 self._value = value 

194 return value 

195 

196 def to_dict(self) -> dict[str, t.Any]: 

197 """return JSONable dict form of my data 

198 

199 Currently update as dict or set, extend, prepend as lists, and inserts as list of tuples. 

200 """ 

201 d = {} 

202 if self._update: 

203 d["update"] = self._update 

204 if self._extend: 

205 d["extend"] = self._extend 

206 if self._prepend: 

207 d["prepend"] = self._prepend 

208 elif self._inserts: 

209 d["inserts"] = self._inserts 

210 return d 

211 

212 def __repr__(self) -> str: 

213 if self._value is not None: 

214 return f"<{self.__class__.__name__} value={self._value!r}>" 

215 else: 

216 return f"<{self.__class__.__name__} {self.to_dict()!r}>" 

217 

218 

219def _is_section_key(key: str) -> bool: 

220 """Is a Config key a section name (does it start with a capital)?""" 

221 if key and key[0].upper() == key[0] and not key.startswith("_"): 

222 return True 

223 else: 

224 return False 

225 

226 

227class Config(dict): # type:ignore[type-arg] 

228 """An attribute-based dict that can do smart merges. 

229 

230 Accessing a field on a config object for the first time populates the key 

231 with either a nested Config object for keys starting with capitals 

232 or :class:`.LazyConfigValue` for lowercase keys, 

233 allowing quick assignments such as:: 

234 

235 c = Config() 

236 c.Class.int_trait = 5 

237 c.Class.list_trait.append("x") 

238 

239 """ 

240 

241 def __init__(self, *args: t.Any, **kwds: t.Any) -> None: 

242 dict.__init__(self, *args, **kwds) 

243 self._ensure_subconfig() 

244 

245 def _ensure_subconfig(self) -> None: 

246 """ensure that sub-dicts that should be Config objects are 

247 

248 casts dicts that are under section keys to Config objects, 

249 which is necessary for constructing Config objects from dict literals. 

250 """ 

251 for key in self: 

252 obj = self[key] 

253 if _is_section_key(key) and isinstance(obj, dict) and not isinstance(obj, Config): 

254 setattr(self, key, Config(obj)) 

255 

256 def _merge(self, other: t.Any) -> None: 

257 """deprecated alias, use Config.merge()""" 

258 self.merge(other) 

259 

260 def merge(self, other: t.Any) -> None: 

261 """merge another config object into this one""" 

262 to_update = {} 

263 for k, v in other.items(): 

264 if k not in self: 

265 to_update[k] = v 

266 else: # I have this key 

267 if isinstance(v, Config) and isinstance(self[k], Config): 

268 # Recursively merge common sub Configs 

269 self[k].merge(v) 

270 elif isinstance(v, LazyConfigValue): 

271 self[k] = v.merge_into(self[k]) 

272 else: 

273 # Plain updates for non-Configs 

274 to_update[k] = v 

275 

276 self.update(to_update) 

277 

278 def collisions(self, other: Config) -> dict[str, t.Any]: 

279 """Check for collisions between two config objects. 

280 

281 Returns a dict of the form {"Class": {"trait": "collision message"}}`, 

282 indicating which values have been ignored. 

283 

284 An empty dict indicates no collisions. 

285 """ 

286 collisions: dict[str, t.Any] = {} 

287 for section in self: 

288 if section not in other: 

289 continue 

290 mine = self[section] 

291 theirs = other[section] 

292 for key in mine: 

293 if key in theirs and mine[key] != theirs[key]: 

294 collisions.setdefault(section, {}) 

295 collisions[section][key] = f"{mine[key]!r} ignored, using {theirs[key]!r}" 

296 return collisions 

297 

298 def __contains__(self, key: t.Any) -> bool: 

299 # allow nested contains of the form `"Section.key" in config` 

300 if "." in key: 

301 first, remainder = key.split(".", 1) 

302 if first not in self: 

303 return False 

304 return remainder in self[first] 

305 

306 return super().__contains__(key) 

307 

308 # .has_key is deprecated for dictionaries. 

309 has_key = __contains__ 

310 

311 def _has_section(self, key: str) -> bool: 

312 return _is_section_key(key) and key in self 

313 

314 def copy(self) -> dict[str, t.Any]: 

315 return type(self)(dict.copy(self)) 

316 

317 def __copy__(self) -> dict[str, t.Any]: 

318 return self.copy() 

319 

320 def __deepcopy__(self, memo: t.Any) -> Config: 

321 new_config = type(self)() 

322 for key, value in self.items(): 

323 if isinstance(value, (Config, LazyConfigValue)): 

324 # deep copy config objects 

325 value = copy.deepcopy(value, memo) 

326 elif type(value) in {dict, list, set, tuple}: 

327 # shallow copy plain container traits 

328 value = copy.copy(value) 

329 new_config[key] = value 

330 return new_config 

331 

332 def __getitem__(self, key: str) -> t.Any: 

333 try: 

334 return dict.__getitem__(self, key) 

335 except KeyError: 

336 if _is_section_key(key): 

337 c = Config() 

338 dict.__setitem__(self, key, c) 

339 return c 

340 elif not key.startswith("_"): 

341 # undefined, create lazy value, used for container methods 

342 v = LazyConfigValue() 

343 dict.__setitem__(self, key, v) 

344 return v 

345 else: 

346 raise 

347 

348 def __setitem__(self, key: str, value: t.Any) -> None: 

349 if _is_section_key(key): 

350 if not isinstance(value, Config): 

351 raise ValueError( 

352 "values whose keys begin with an uppercase " 

353 f"char must be Config instances: {key!r}, {value!r}" 

354 ) 

355 dict.__setitem__(self, key, value) 

356 

357 def __getattr__(self, key: str) -> t.Any: 

358 if key.startswith("__"): 

359 return dict.__getattr__(self, key) # type:ignore[attr-defined] 

360 try: 

361 return self.__getitem__(key) 

362 except KeyError as e: 

363 raise AttributeError(e) from e 

364 

365 def __setattr__(self, key: str, value: t.Any) -> None: 

366 if key.startswith("__"): 

367 return dict.__setattr__(self, key, value) 

368 try: 

369 self.__setitem__(key, value) 

370 except KeyError as e: 

371 raise AttributeError(e) from e 

372 

373 def __delattr__(self, key: str) -> None: 

374 if key.startswith("__"): 

375 return dict.__delattr__(self, key) 

376 try: 

377 dict.__delitem__(self, key) 

378 except KeyError as e: 

379 raise AttributeError(e) from e 

380 

381 

382class DeferredConfig: 

383 """Class for deferred-evaluation of config from CLI""" 

384 

385 pass 

386 

387 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any: 

388 raise NotImplementedError("Implement in subclasses") 

389 

390 def _super_repr(self) -> str: 

391 # explicitly call super on direct parent 

392 return super(self.__class__, self).__repr__() 

393 

394 

395class DeferredConfigString(str, DeferredConfig): 

396 """Config value for loading config from a string 

397 

398 Interpretation is deferred until it is loaded into the trait. 

399 

400 Subclass of str for backward compatibility. 

401 

402 This class is only used for values that are not listed 

403 in the configurable classes. 

404 

405 When config is loaded, `trait.from_string` will be used. 

406 

407 If an error is raised in `.from_string`, 

408 the original string is returned. 

409 

410 .. versionadded:: 5.0 

411 """ 

412 

413 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any: 

414 """Get the value stored in this string""" 

415 s = str(self) 

416 try: 

417 return trait.from_string(s) 

418 except Exception: 

419 # exception casting from string, 

420 # let the original string lie. 

421 # this will raise a more informative error when config is loaded. 

422 return s 

423 

424 def __repr__(self) -> str: 

425 return f"{self.__class__.__name__}({self._super_repr()})" 

426 

427 

428class DeferredConfigList(t.List[t.Any], DeferredConfig): 

429 """Config value for loading config from a list of strings 

430 

431 Interpretation is deferred until it is loaded into the trait. 

432 

433 This class is only used for values that are not listed 

434 in the configurable classes. 

435 

436 When config is loaded, `trait.from_string_list` will be used. 

437 

438 If an error is raised in `.from_string_list`, 

439 the original string list is returned. 

440 

441 .. versionadded:: 5.0 

442 """ 

443 

444 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any: 

445 """Get the value stored in this string""" 

446 if hasattr(trait, "from_string_list"): 

447 src = list(self) 

448 cast = trait.from_string_list 

449 else: 

450 # only allow one item 

451 if len(self) > 1: 

452 raise ValueError( 

453 f"{trait.name} only accepts one value, got {len(self)}: {list(self)}" 

454 ) 

455 src = self[0] 

456 cast = trait.from_string 

457 

458 try: 

459 return cast(src) 

460 except Exception: 

461 # exception casting from string, 

462 # let the original value lie. 

463 # this will raise a more informative error when config is loaded. 

464 return src 

465 

466 def __repr__(self) -> str: 

467 return f"{self.__class__.__name__}({self._super_repr()})" 

468 

469 

470# ----------------------------------------------------------------------------- 

471# Config loading classes 

472# ----------------------------------------------------------------------------- 

473 

474 

475class ConfigLoader: 

476 """A object for loading configurations from just about anywhere. 

477 

478 The resulting configuration is packaged as a :class:`Config`. 

479 

480 Notes 

481 ----- 

482 A :class:`ConfigLoader` does one thing: load a config from a source 

483 (file, command line arguments) and returns the data as a :class:`Config` object. 

484 There are lots of things that :class:`ConfigLoader` does not do. It does 

485 not implement complex logic for finding config files. It does not handle 

486 default values or merge multiple configs. These things need to be 

487 handled elsewhere. 

488 """ 

489 

490 def _log_default(self) -> Logger: 

491 from traitlets.log import get_logger 

492 

493 return t.cast(Logger, get_logger()) 

494 

495 def __init__(self, log: Logger | None = None) -> None: 

496 """A base class for config loaders. 

497 

498 log : instance of :class:`logging.Logger` to use. 

499 By default logger of :meth:`traitlets.config.application.Application.instance()` 

500 will be used 

501 

502 Examples 

503 -------- 

504 >>> cl = ConfigLoader() 

505 >>> config = cl.load_config() 

506 >>> config 

507 {} 

508 """ 

509 self.clear() 

510 if log is None: 

511 self.log = self._log_default() 

512 self.log.debug("Using default logger") 

513 else: 

514 self.log = log 

515 

516 def clear(self) -> None: 

517 self.config = Config() 

518 

519 def load_config(self) -> Config: 

520 """Load a config from somewhere, return a :class:`Config` instance. 

521 

522 Usually, this will cause self.config to be set and then returned. 

523 However, in most cases, :meth:`ConfigLoader.clear` should be called 

524 to erase any previous state. 

525 """ 

526 self.clear() 

527 return self.config 

528 

529 

530class FileConfigLoader(ConfigLoader): 

531 """A base class for file based configurations. 

532 

533 As we add more file based config loaders, the common logic should go 

534 here. 

535 """ 

536 

537 def __init__(self, filename: str, path: str | None = None, **kw: t.Any) -> None: 

538 """Build a config loader for a filename and path. 

539 

540 Parameters 

541 ---------- 

542 filename : str 

543 The file name of the config file. 

544 path : str, list, tuple 

545 The path to search for the config file on, or a sequence of 

546 paths to try in order. 

547 """ 

548 super().__init__(**kw) 

549 self.filename = filename 

550 self.path = path 

551 self.full_filename = "" 

552 

553 def _find_file(self) -> None: 

554 """Try to find the file by searching the paths.""" 

555 self.full_filename = filefind(self.filename, self.path) 

556 

557 

558class JSONFileConfigLoader(FileConfigLoader): 

559 """A JSON file loader for config 

560 

561 Can also act as a context manager that rewrite the configuration file to disk on exit. 

562 

563 Example:: 

564 

565 with JSONFileConfigLoader('myapp.json','/home/jupyter/configurations/') as c: 

566 c.MyNewConfigurable.new_value = 'Updated' 

567 

568 """ 

569 

570 def load_config(self) -> Config: 

571 """Load the config from a file and return it as a Config object.""" 

572 self.clear() 

573 try: 

574 self._find_file() 

575 except OSError as e: 

576 raise ConfigFileNotFound(str(e)) from e 

577 dct = self._read_file_as_dict() 

578 self.config = self._convert_to_config(dct) 

579 return self.config 

580 

581 def _read_file_as_dict(self) -> dict[str, t.Any]: 

582 with open(self.full_filename) as f: 

583 return t.cast("dict[str, t.Any]", json.load(f)) 

584 

585 def _convert_to_config(self, dictionary: dict[str, t.Any]) -> Config: 

586 if "version" in dictionary: 

587 version = dictionary.pop("version") 

588 else: 

589 version = 1 

590 

591 if version == 1: 

592 return Config(dictionary) 

593 else: 

594 raise ValueError(f"Unknown version of JSON config file: {version}") 

595 

596 def __enter__(self) -> Config: 

597 self.load_config() 

598 return self.config 

599 

600 def __exit__(self, exc_type: t.Any, exc_value: t.Any, traceback: t.Any) -> None: 

601 """ 

602 Exit the context manager but do not handle any errors. 

603 

604 In case of any error, we do not want to write the potentially broken 

605 configuration to disk. 

606 """ 

607 self.config.version = 1 

608 json_config = json.dumps(self.config, indent=2) 

609 with open(self.full_filename, "w") as f: 

610 f.write(json_config) 

611 

612 

613class PyFileConfigLoader(FileConfigLoader): 

614 """A config loader for pure python files. 

615 

616 This is responsible for locating a Python config file by filename and 

617 path, then executing it to construct a Config object. 

618 """ 

619 

620 def load_config(self) -> Config: 

621 """Load the config from a file and return it as a Config object.""" 

622 self.clear() 

623 try: 

624 self._find_file() 

625 except OSError as e: 

626 raise ConfigFileNotFound(str(e)) from e 

627 self._read_file_as_dict() 

628 return self.config 

629 

630 def load_subconfig(self, fname: str, path: str | None = None) -> None: 

631 """Injected into config file namespace as load_subconfig""" 

632 if path is None: 

633 path = self.path 

634 

635 loader = self.__class__(fname, path) 

636 try: 

637 sub_config = loader.load_config() 

638 except ConfigFileNotFound: 

639 # Pass silently if the sub config is not there, 

640 # treat it as an empty config file. 

641 pass 

642 else: 

643 self.config.merge(sub_config) 

644 

645 def _read_file_as_dict(self) -> None: 

646 """Load the config file into self.config, with recursive loading.""" 

647 

648 def get_config() -> Config: 

649 """Unnecessary now, but a deprecation warning is more trouble than it's worth.""" 

650 return self.config 

651 

652 namespace = dict( 

653 c=self.config, 

654 load_subconfig=self.load_subconfig, 

655 get_config=get_config, 

656 __file__=self.full_filename, 

657 ) 

658 conf_filename = self.full_filename 

659 with open(conf_filename, "rb") as f: 

660 exec(compile(f.read(), conf_filename, "exec"), namespace, namespace) # noqa 

661 

662 

663class CommandLineConfigLoader(ConfigLoader): 

664 """A config loader for command line arguments. 

665 

666 As we add more command line based loaders, the common logic should go 

667 here. 

668 """ 

669 

670 def _exec_config_str( 

671 self, lhs: t.Any, rhs: t.Any, trait: TraitType[t.Any, t.Any] | None = None 

672 ) -> None: 

673 """execute self.config.<lhs> = <rhs> 

674 

675 * expands ~ with expanduser 

676 * interprets value with trait if available 

677 """ 

678 value = rhs 

679 if isinstance(value, DeferredConfig): 

680 if trait: 

681 # trait available, reify config immediately 

682 value = value.get_value(trait) 

683 elif isinstance(rhs, DeferredConfigList) and len(rhs) == 1: 

684 # single item, make it a deferred str 

685 value = DeferredConfigString(os.path.expanduser(rhs[0])) 

686 else: 

687 if trait: 

688 value = trait.from_string(value) 

689 else: 

690 value = DeferredConfigString(value) 

691 

692 *path, key = lhs.split(".") 

693 section = self.config 

694 for part in path: 

695 section = section[part] 

696 section[key] = value 

697 return 

698 

699 def _load_flag(self, cfg: t.Any) -> None: 

700 """update self.config from a flag, which can be a dict or Config""" 

701 if isinstance(cfg, (dict, Config)): 

702 # don't clobber whole config sections, update 

703 # each section from config: 

704 for sec, c in cfg.items(): 

705 self.config[sec].update(c) 

706 else: 

707 raise TypeError("Invalid flag: %r" % cfg) 

708 

709 

710# match --Class.trait keys for argparse 

711# matches: 

712# --Class.trait 

713# --x 

714# -x 

715 

716class_trait_opt_pattern = re.compile(r"^\-?\-[A-Za-z][\w]*(\.[\w]+)*$") 

717 

718_DOT_REPLACEMENT = "__DOT__" 

719_DASH_REPLACEMENT = "__DASH__" 

720 

721 

722class _KVAction(argparse.Action): 

723 """Custom argparse action for handling --Class.trait=x 

724 

725 Always 

726 """ 

727 

728 def __call__( # type:ignore[override] 

729 self, 

730 parser: argparse.ArgumentParser, 

731 namespace: dict[str, t.Any], 

732 values: t.Sequence[t.Any], 

733 option_string: str | None = None, 

734 ) -> None: 

735 if isinstance(values, str): 

736 values = [values] 

737 values = ["-" if v is _DASH_REPLACEMENT else v for v in values] 

738 items = getattr(namespace, self.dest, None) 

739 if items is None: 

740 items = DeferredConfigList() 

741 else: 

742 items = DeferredConfigList(items) 

743 items.extend(values) 

744 setattr(namespace, self.dest, items) 

745 

746 

747class _DefaultOptionDict(dict): # type:ignore[type-arg] 

748 """Like the default options dict 

749 

750 but acts as if all --Class.trait options are predefined 

751 """ 

752 

753 def _add_kv_action(self, key: str) -> None: 

754 self[key] = _KVAction( 

755 option_strings=[key], 

756 dest=key.lstrip("-").replace(".", _DOT_REPLACEMENT), 

757 # use metavar for display purposes 

758 metavar=key.lstrip("-"), 

759 ) 

760 

761 def __contains__(self, key: t.Any) -> bool: 

762 if "=" in key: 

763 return False 

764 if super().__contains__(key): 

765 return True 

766 

767 if key.startswith("-") and class_trait_opt_pattern.match(key): 

768 self._add_kv_action(key) 

769 return True 

770 return False 

771 

772 def __getitem__(self, key: str) -> t.Any: 

773 if key in self: 

774 return super().__getitem__(key) 

775 else: 

776 raise KeyError(key) 

777 

778 def get(self, key: str, default: t.Any = None) -> t.Any: 

779 try: 

780 return self[key] 

781 except KeyError: 

782 return default 

783 

784 

785class _KVArgParser(argparse.ArgumentParser): 

786 """subclass of ArgumentParser where any --Class.trait option is implicitly defined""" 

787 

788 def parse_known_args( # type:ignore[override] 

789 self, args: t.Sequence[str] | None = None, namespace: argparse.Namespace | None = None 

790 ) -> tuple[argparse.Namespace | None, list[str]]: 

791 # must be done immediately prior to parsing because if we do it in init, 

792 # registration of explicit actions via parser.add_option will fail during setup 

793 for container in (self, self._optionals): 

794 container._option_string_actions = _DefaultOptionDict(container._option_string_actions) 

795 return super().parse_known_args(args, namespace) 

796 

797 

798# type aliases 

799SubcommandsDict = t.Dict[str, t.Any] 

800 

801 

802class ArgParseConfigLoader(CommandLineConfigLoader): 

803 """A loader that uses the argparse module to load from the command line.""" 

804 

805 parser_class = ArgumentParser 

806 

807 def __init__( 

808 self, 

809 argv: list[str] | None = None, 

810 aliases: dict[str, str] | None = None, 

811 flags: dict[str, str] | None = None, 

812 log: t.Any = None, 

813 classes: list[type[t.Any]] | None = None, 

814 subcommands: SubcommandsDict | None = None, 

815 *parser_args: t.Any, 

816 **parser_kw: t.Any, 

817 ) -> None: 

818 """Create a config loader for use with argparse. 

819 

820 Parameters 

821 ---------- 

822 classes : optional, list 

823 The classes to scan for *container* config-traits and decide 

824 for their "multiplicity" when adding them as *argparse* arguments. 

825 argv : optional, list 

826 If given, used to read command-line arguments from, otherwise 

827 sys.argv[1:] is used. 

828 *parser_args : tuple 

829 A tuple of positional arguments that will be passed to the 

830 constructor of :class:`argparse.ArgumentParser`. 

831 **parser_kw : dict 

832 A tuple of keyword arguments that will be passed to the 

833 constructor of :class:`argparse.ArgumentParser`. 

834 aliases : dict of str to str 

835 Dict of aliases to full traitlets names for CLI parsing 

836 flags : dict of str to str 

837 Dict of flags to full traitlets names for CLI parsing 

838 log 

839 Passed to `ConfigLoader` 

840 

841 Returns 

842 ------- 

843 config : Config 

844 The resulting Config object. 

845 """ 

846 classes = classes or [] 

847 super(CommandLineConfigLoader, self).__init__(log=log) 

848 self.clear() 

849 if argv is None: 

850 argv = sys.argv[1:] 

851 self.argv = argv 

852 self.aliases = aliases or {} 

853 self.flags = flags or {} 

854 self.classes = classes 

855 self.subcommands = subcommands # only used for argcomplete currently 

856 

857 self.parser_args = parser_args 

858 self.version = parser_kw.pop("version", None) 

859 kwargs = dict(argument_default=argparse.SUPPRESS) 

860 kwargs.update(parser_kw) 

861 self.parser_kw = kwargs 

862 

863 def load_config( 

864 self, 

865 argv: list[str] | None = None, 

866 aliases: t.Any = None, 

867 flags: t.Any = _deprecated, 

868 classes: t.Any = None, 

869 ) -> Config: 

870 """Parse command line arguments and return as a Config object. 

871 

872 Parameters 

873 ---------- 

874 argv : optional, list 

875 If given, a list with the structure of sys.argv[1:] to parse 

876 arguments from. If not given, the instance's self.argv attribute 

877 (given at construction time) is used. 

878 flags 

879 Deprecated in traitlets 5.0, instantiate the config loader with the flags. 

880 

881 """ 

882 

883 if flags is not _deprecated: 

884 warnings.warn( 

885 "The `flag` argument to load_config is deprecated since Traitlets " 

886 f"5.0 and will be ignored, pass flags the `{type(self)}` constructor.", 

887 DeprecationWarning, 

888 stacklevel=2, 

889 ) 

890 

891 self.clear() 

892 if argv is None: 

893 argv = self.argv 

894 if aliases is not None: 

895 self.aliases = aliases 

896 if classes is not None: 

897 self.classes = classes 

898 self._create_parser() 

899 self._argcomplete(self.classes, self.subcommands) 

900 self._parse_args(argv) 

901 self._convert_to_config() 

902 return self.config 

903 

904 def get_extra_args(self) -> list[str]: 

905 if hasattr(self, "extra_args"): 

906 return self.extra_args 

907 else: 

908 return [] 

909 

910 def _create_parser(self) -> None: 

911 self.parser = self.parser_class( 

912 *self.parser_args, 

913 **self.parser_kw, # type:ignore[arg-type] 

914 ) 

915 self._add_arguments(self.aliases, self.flags, self.classes) 

916 

917 def _add_arguments(self, aliases: t.Any, flags: t.Any, classes: t.Any) -> None: 

918 raise NotImplementedError("subclasses must implement _add_arguments") 

919 

920 def _argcomplete(self, classes: list[t.Any], subcommands: SubcommandsDict | None) -> None: 

921 """If argcomplete is enabled, allow triggering command-line autocompletion""" 

922 pass 

923 

924 def _parse_args(self, args: t.Any) -> t.Any: 

925 """self.parser->self.parsed_data""" 

926 uargs = [cast_unicode(a) for a in args] 

927 

928 unpacked_aliases: dict[str, str] = {} 

929 if self.aliases: 

930 unpacked_aliases = {} 

931 for alias, alias_target in self.aliases.items(): 

932 if alias in self.flags: 

933 continue 

934 if not isinstance(alias, tuple): # type:ignore[unreachable] 

935 alias = (alias,) # type:ignore[assignment] 

936 for al in alias: 

937 if len(al) == 1: 

938 unpacked_aliases["-" + al] = "--" + alias_target 

939 unpacked_aliases["--" + al] = "--" + alias_target 

940 

941 def _replace(arg: str) -> str: 

942 if arg == "-": 

943 return _DASH_REPLACEMENT 

944 for k, v in unpacked_aliases.items(): 

945 if arg == k: 

946 return v 

947 if arg.startswith(k + "="): 

948 return v + "=" + arg[len(k) + 1 :] 

949 return arg 

950 

951 if "--" in uargs: 

952 idx = uargs.index("--") 

953 extra_args = uargs[idx + 1 :] 

954 to_parse = uargs[:idx] 

955 else: 

956 extra_args = [] 

957 to_parse = uargs 

958 to_parse = [_replace(a) for a in to_parse] 

959 

960 self.parsed_data = self.parser.parse_args(to_parse) 

961 self.extra_args = extra_args 

962 

963 def _convert_to_config(self) -> None: 

964 """self.parsed_data->self.config""" 

965 for k, v in vars(self.parsed_data).items(): 

966 *path, key = k.split(".") 

967 section = self.config 

968 for p in path: 

969 section = section[p] 

970 setattr(section, key, v) 

971 

972 

973class _FlagAction(argparse.Action): 

974 """ArgParse action to handle a flag""" 

975 

976 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None: 

977 self.flag = kwargs.pop("flag") 

978 self.alias = kwargs.pop("alias", None) 

979 kwargs["const"] = Undefined 

980 if not self.alias: 

981 kwargs["nargs"] = 0 

982 super().__init__(*args, **kwargs) 

983 

984 def __call__( 

985 self, parser: t.Any, namespace: t.Any, values: t.Any, option_string: str | None = None 

986 ) -> None: 

987 if self.nargs == 0 or values is Undefined: 

988 if not hasattr(namespace, "_flags"): 

989 namespace._flags = [] 

990 namespace._flags.append(self.flag) 

991 else: 

992 setattr(namespace, self.alias, values) 

993 

994 

995class KVArgParseConfigLoader(ArgParseConfigLoader): 

996 """A config loader that loads aliases and flags with argparse, 

997 

998 as well as arbitrary --Class.trait value 

999 """ 

1000 

1001 parser_class = _KVArgParser # type:ignore[assignment] 

1002 

1003 def _add_arguments(self, aliases: t.Any, flags: t.Any, classes: t.Any) -> None: 

1004 alias_flags: dict[str, t.Any] = {} 

1005 argparse_kwds: dict[str, t.Any] 

1006 argparse_traits: dict[str, t.Any] 

1007 paa = self.parser.add_argument 

1008 self.parser.set_defaults(_flags=[]) 

1009 paa("extra_args", nargs="*") 

1010 

1011 # An index of all container traits collected:: 

1012 # 

1013 # { <traitname>: (<trait>, <argparse-kwds>) } 

1014 # 

1015 # Used to add the correct type into the `config` tree. 

1016 # Used also for aliases, not to re-collect them. 

1017 self.argparse_traits = argparse_traits = {} 

1018 for cls in classes: 

1019 for traitname, trait in cls.class_traits(config=True).items(): 

1020 argname = f"{cls.__name__}.{traitname}" 

1021 argparse_kwds = {"type": str} 

1022 if isinstance(trait, (Container, Dict)): 

1023 multiplicity = trait.metadata.get("multiplicity", "append") 

1024 if multiplicity == "append": 

1025 argparse_kwds["action"] = multiplicity 

1026 else: 

1027 argparse_kwds["nargs"] = multiplicity 

1028 argparse_traits[argname] = (trait, argparse_kwds) 

1029 

1030 for keys, (value, fhelp) in flags.items(): 

1031 if not isinstance(keys, tuple): 

1032 keys = (keys,) 

1033 for key in keys: 

1034 if key in aliases: 

1035 alias_flags[aliases[key]] = value 

1036 continue 

1037 keys = ("-" + key, "--" + key) if len(key) == 1 else ("--" + key,) 

1038 paa(*keys, action=_FlagAction, flag=value, help=fhelp) 

1039 

1040 for keys, traitname in aliases.items(): 

1041 if not isinstance(keys, tuple): 

1042 keys = (keys,) 

1043 

1044 for key in keys: 

1045 argparse_kwds = { 

1046 "type": str, 

1047 "dest": traitname.replace(".", _DOT_REPLACEMENT), 

1048 "metavar": traitname, 

1049 } 

1050 argcompleter = None 

1051 if traitname in argparse_traits: 

1052 trait, kwds = argparse_traits[traitname] 

1053 argparse_kwds.update(kwds) 

1054 if "action" in argparse_kwds and traitname in alias_flags: 

1055 # flag sets 'action', so can't have flag & alias with custom action 

1056 # on the same name 

1057 raise ArgumentError( 

1058 f"The alias `{key}` for the 'append' sequence " 

1059 f"config-trait `{traitname}` cannot be also a flag!'" 

1060 ) 

1061 # For argcomplete, check if any either an argcompleter metadata tag or method 

1062 # is available. If so, it should be a callable which takes the command-line key 

1063 # string as an argument and other kwargs passed by argcomplete, 

1064 # and returns the a list of string completions. 

1065 argcompleter = trait.metadata.get("argcompleter") or getattr( 

1066 trait, "argcompleter", None 

1067 ) 

1068 if traitname in alias_flags: 

1069 # alias and flag. 

1070 # when called with 0 args: flag 

1071 # when called with >= 1: alias 

1072 argparse_kwds.setdefault("nargs", "?") 

1073 argparse_kwds["action"] = _FlagAction 

1074 argparse_kwds["flag"] = alias_flags[traitname] 

1075 argparse_kwds["alias"] = traitname 

1076 keys = ("-" + key, "--" + key) if len(key) == 1 else ("--" + key,) 

1077 action = paa(*keys, **argparse_kwds) 

1078 if argcompleter is not None: 

1079 # argcomplete's completers are callables returning list of completion strings 

1080 action.completer = functools.partial( # type:ignore[attr-defined] 

1081 argcompleter, key=key 

1082 ) 

1083 

1084 def _convert_to_config(self) -> None: 

1085 """self.parsed_data->self.config, parse unrecognized extra args via KVLoader.""" 

1086 extra_args = self.extra_args 

1087 

1088 for lhs, rhs in vars(self.parsed_data).items(): 

1089 if lhs == "extra_args": 

1090 self.extra_args = ["-" if a == _DASH_REPLACEMENT else a for a in rhs] + extra_args 

1091 continue 

1092 elif lhs == "_flags": 

1093 # _flags will be handled later 

1094 continue 

1095 

1096 lhs = lhs.replace(_DOT_REPLACEMENT, ".") 

1097 if "." not in lhs: 

1098 self._handle_unrecognized_alias(lhs) 

1099 trait = None 

1100 

1101 if isinstance(rhs, list): 

1102 rhs = DeferredConfigList(rhs) 

1103 elif isinstance(rhs, str): 

1104 rhs = DeferredConfigString(rhs) 

1105 

1106 trait = self.argparse_traits.get(lhs) 

1107 if trait: 

1108 trait = trait[0] 

1109 

1110 # eval the KV assignment 

1111 try: 

1112 self._exec_config_str(lhs, rhs, trait) 

1113 except Exception as e: 

1114 # cast deferred to nicer repr for the error 

1115 # DeferredList->list, etc 

1116 if isinstance(rhs, DeferredConfig): 

1117 rhs = rhs._super_repr() 

1118 raise ArgumentError(f"Error loading argument {lhs}={rhs}, {e}") from e 

1119 

1120 for subc in self.parsed_data._flags: 

1121 self._load_flag(subc) 

1122 

1123 def _handle_unrecognized_alias(self, arg: str) -> None: 

1124 """Handling for unrecognized alias arguments 

1125 

1126 Probably a mistyped alias. By default just log a warning, 

1127 but users can override this to raise an error instead, e.g. 

1128 self.parser.error("Unrecognized alias: '%s'" % arg) 

1129 """ 

1130 self.log.warning("Unrecognized alias: '%s', it will have no effect.", arg) 

1131 

1132 def _argcomplete(self, classes: list[t.Any], subcommands: SubcommandsDict | None) -> None: 

1133 """If argcomplete is enabled, allow triggering command-line autocompletion""" 

1134 try: 

1135 import argcomplete # noqa 

1136 except ImportError: 

1137 return 

1138 

1139 from . import argcomplete_config 

1140 

1141 finder = argcomplete_config.ExtendedCompletionFinder() # type:ignore[no-untyped-call] 

1142 finder.config_classes = classes 

1143 finder.subcommands = list(subcommands or []) 

1144 # for ease of testing, pass through self._argcomplete_kwargs if set 

1145 finder(self.parser, **getattr(self, "_argcomplete_kwargs", {})) 

1146 

1147 

1148class KeyValueConfigLoader(KVArgParseConfigLoader): 

1149 """Deprecated in traitlets 5.0 

1150 

1151 Use KVArgParseConfigLoader 

1152 """ 

1153 

1154 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None: 

1155 warnings.warn( 

1156 "KeyValueConfigLoader is deprecated since Traitlets 5.0." 

1157 " Use KVArgParseConfigLoader instead.", 

1158 DeprecationWarning, 

1159 stacklevel=2, 

1160 ) 

1161 super().__init__(*args, **kwargs) 

1162 

1163 

1164def load_pyconfig_files(config_files: list[str], path: str) -> Config: 

1165 """Load multiple Python config files, merging each of them in turn. 

1166 

1167 Parameters 

1168 ---------- 

1169 config_files : list of str 

1170 List of config files names to load and merge into the config. 

1171 path : unicode 

1172 The full path to the location of the config files. 

1173 """ 

1174 config = Config() 

1175 for cf in config_files: 

1176 loader = PyFileConfigLoader(cf, path=path) 

1177 try: 

1178 next_config = loader.load_config() 

1179 except ConfigFileNotFound: 

1180 pass 

1181 except Exception: 

1182 raise 

1183 else: 

1184 config.merge(next_config) 

1185 return config