Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/traitlets/config/loader.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

568 statements  

1"""A simple configuration system.""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5from __future__ import annotations 

6 

7import argparse 

8import copy 

9import functools 

10import json 

11import os 

12import re 

13import sys 

14import typing as t 

15from logging import Logger 

16 

17from traitlets.traitlets import Any, Container, Dict, HasTraits, List, TraitType, Undefined 

18 

19from ..utils import cast_unicode, filefind, warnings 

20 

21# ----------------------------------------------------------------------------- 

22# Exceptions 

23# ----------------------------------------------------------------------------- 

24 

25 

26class ConfigError(Exception): 

27 pass 

28 

29 

30class ConfigLoaderError(ConfigError): 

31 pass 

32 

33 

34class ConfigFileNotFound(ConfigError): 

35 pass 

36 

37 

38class ArgumentError(ConfigLoaderError): 

39 pass 

40 

41 

42# ----------------------------------------------------------------------------- 

43# Argparse fix 

44# ----------------------------------------------------------------------------- 

45 

46# Unfortunately argparse by default prints help messages to stderr instead of 

47# stdout. This makes it annoying to capture long help screens at the command 

48# line, since one must know how to pipe stderr, which many users don't know how 

49# to do. So we override the print_help method with one that defaults to 

50# stdout and use our class instead. 

51 

52 

53class _Sentinel: 

54 def __repr__(self) -> str: 

55 return "<Sentinel deprecated>" 

56 

57 def __str__(self) -> str: 

58 return "<deprecated>" 

59 

60 

61_deprecated = _Sentinel() 

62 

63 

64class ArgumentParser(argparse.ArgumentParser): 

65 """Simple argparse subclass that prints help to stdout by default.""" 

66 

67 def print_help(self, file: t.Any = None) -> None: 

68 if file is None: 

69 file = sys.stdout 

70 return super().print_help(file) 

71 

72 print_help.__doc__ = argparse.ArgumentParser.print_help.__doc__ 

73 

74 

75# ----------------------------------------------------------------------------- 

76# Config class for holding config information 

77# ----------------------------------------------------------------------------- 

78 

79 

80def execfile(fname: str, glob: dict[str, Any]) -> None: 

81 with open(fname, "rb") as f: 

82 exec(compile(f.read(), fname, "exec"), glob, glob) # noqa: S102 

83 

84 

85class LazyConfigValue(HasTraits): 

86 """Proxy object for exposing methods on configurable containers 

87 

88 These methods allow appending/extending/updating 

89 to add to non-empty defaults instead of clobbering them. 

90 

91 Exposes: 

92 

93 - append, extend, insert on lists 

94 - update on dicts 

95 - update, add on sets 

96 """ 

97 

98 _value = None 

99 

100 # list methods 

101 _extend: List[t.Any] = List() 

102 _prepend: List[t.Any] = List() 

103 _inserts: List[t.Any] = List() 

104 

105 def append(self, obj: t.Any) -> None: 

106 """Append an item to a List""" 

107 self._extend.append(obj) 

108 

109 def extend(self, other: t.Any) -> None: 

110 """Extend a list""" 

111 self._extend.extend(other) 

112 

113 def prepend(self, other: t.Any) -> None: 

114 """like list.extend, but for the front""" 

115 self._prepend[:0] = other 

116 

117 def merge_into(self, other: t.Any) -> t.Any: 

118 """ 

119 Merge with another earlier LazyConfigValue or an earlier container. 

120 This is useful when having global system-wide configuration files. 

121 

122 Self is expected to have higher precedence. 

123 

124 Parameters 

125 ---------- 

126 other : LazyConfigValue or container 

127 

128 Returns 

129 ------- 

130 LazyConfigValue 

131 if ``other`` is also lazy, a reified container otherwise. 

132 """ 

133 if isinstance(other, LazyConfigValue): 

134 other._extend.extend(self._extend) 

135 self._extend = other._extend 

136 

137 self._prepend.extend(other._prepend) 

138 

139 other._inserts.extend(self._inserts) 

140 self._inserts = other._inserts 

141 

142 if self._update: 

143 other.update(self._update) 

144 self._update = other._update 

145 return self 

146 else: 

147 # other is a container, reify now. 

148 return self.get_value(other) 

149 

150 def insert(self, index: int, other: t.Any) -> None: 

151 if not isinstance(index, int): 

152 raise TypeError("An integer is required") 

153 self._inserts.append((index, other)) 

154 

155 # dict methods 

156 # update is used for both dict and set 

157 _update = Any() 

158 

159 def update(self, other: t.Any) -> None: 

160 """Update either a set or dict""" 

161 if self._update is None: 

162 if isinstance(other, dict): 

163 self._update = {} 

164 else: 

165 self._update = set() 

166 self._update.update(other) 

167 

168 # set methods 

169 def add(self, obj: t.Any) -> None: 

170 """Add an item to a set""" 

171 self.update({obj}) 

172 

173 def get_value(self, initial: t.Any) -> t.Any: 

174 """construct the value from the initial one 

175 

176 after applying any insert / extend / update changes 

177 """ 

178 if self._value is not None: 

179 return self._value # type:ignore[unreachable] 

180 value = copy.deepcopy(initial) 

181 if isinstance(value, list): 

182 for idx, obj in self._inserts: 

183 value.insert(idx, obj) 

184 value[:0] = self._prepend 

185 value.extend(self._extend) 

186 

187 elif isinstance(value, dict): 

188 if self._update: 

189 value.update(self._update) 

190 elif isinstance(value, set): 

191 if self._update: 

192 value.update(self._update) 

193 self._value = value 

194 return value 

195 

196 def to_dict(self) -> dict[str, t.Any]: 

197 """return JSONable dict form of my data 

198 

199 Currently update as dict or set, extend, prepend as lists, and inserts as list of tuples. 

200 """ 

201 d = {} 

202 if self._update: 

203 d["update"] = self._update 

204 if self._extend: 

205 d["extend"] = self._extend 

206 if self._prepend: 

207 d["prepend"] = self._prepend 

208 elif self._inserts: 

209 d["inserts"] = self._inserts 

210 return d 

211 

212 def __repr__(self) -> str: 

213 if self._value is not None: 

214 return f"<{self.__class__.__name__} value={self._value!r}>" 

215 else: 

216 return f"<{self.__class__.__name__} {self.to_dict()!r}>" 

217 

218 

219def _is_section_key(key: str) -> bool: 

220 """Is a Config key a section name (does it start with a capital)?""" 

221 return bool(key and key[0].upper() == key[0] and not key.startswith("_")) 

222 

223 

224class Config(dict): # type:ignore[type-arg] 

225 """An attribute-based dict that can do smart merges. 

226 

227 Accessing a field on a config object for the first time populates the key 

228 with either a nested Config object for keys starting with capitals 

229 or :class:`.LazyConfigValue` for lowercase keys, 

230 allowing quick assignments such as:: 

231 

232 c = Config() 

233 c.Class.int_trait = 5 

234 c.Class.list_trait.append("x") 

235 

236 """ 

237 

238 def __init__(self, *args: t.Any, **kwds: t.Any) -> None: 

239 dict.__init__(self, *args, **kwds) 

240 self._ensure_subconfig() 

241 

242 def _ensure_subconfig(self) -> None: 

243 """ensure that sub-dicts that should be Config objects are 

244 

245 casts dicts that are under section keys to Config objects, 

246 which is necessary for constructing Config objects from dict literals. 

247 """ 

248 for key in self: 

249 obj = self[key] 

250 if _is_section_key(key) and isinstance(obj, dict) and not isinstance(obj, Config): 

251 setattr(self, key, Config(obj)) 

252 

253 def _merge(self, other: t.Any) -> None: 

254 """deprecated alias, use Config.merge()""" 

255 self.merge(other) 

256 

257 def merge(self, other: t.Any) -> None: 

258 """merge another config object into this one""" 

259 to_update = {} 

260 for k, v in other.items(): 

261 if k not in self: 

262 to_update[k] = v 

263 else: # I have this key 

264 if isinstance(v, Config) and isinstance(self[k], Config): 

265 # Recursively merge common sub Configs 

266 self[k].merge(v) 

267 elif isinstance(v, LazyConfigValue): 

268 self[k] = v.merge_into(self[k]) 

269 else: 

270 # Plain updates for non-Configs 

271 to_update[k] = v 

272 

273 self.update(to_update) 

274 

275 def collisions(self, other: Config) -> dict[str, t.Any]: 

276 """Check for collisions between two config objects. 

277 

278 Returns a dict of the form {"Class": {"trait": "collision message"}}`, 

279 indicating which values have been ignored. 

280 

281 An empty dict indicates no collisions. 

282 """ 

283 collisions: dict[str, t.Any] = {} 

284 for section in self: 

285 if section not in other: 

286 continue 

287 mine = self[section] 

288 theirs = other[section] 

289 for key in mine: 

290 if key in theirs and mine[key] != theirs[key]: 

291 collisions.setdefault(section, {}) 

292 collisions[section][key] = f"{mine[key]!r} ignored, using {theirs[key]!r}" 

293 return collisions 

294 

295 def __contains__(self, key: t.Any) -> bool: 

296 # allow nested contains of the form `"Section.key" in config` 

297 if "." in key: 

298 first, remainder = key.split(".", 1) 

299 if first not in self: 

300 return False 

301 return remainder in self[first] 

302 

303 return super().__contains__(key) 

304 

305 # .has_key is deprecated for dictionaries. 

306 has_key = __contains__ 

307 

308 def _has_section(self, key: str) -> bool: 

309 return _is_section_key(key) and key in self 

310 

311 def copy(self) -> dict[str, t.Any]: 

312 return type(self)(dict.copy(self)) 

313 

314 def __copy__(self) -> dict[str, t.Any]: 

315 return self.copy() 

316 

317 def __deepcopy__(self, memo: t.Any) -> Config: 

318 new_config = type(self)() 

319 for key, value in self.items(): 

320 if isinstance(value, (Config, LazyConfigValue)): 

321 # deep copy config objects 

322 value = copy.deepcopy(value, memo) 

323 elif type(value) in {dict, list, set, tuple}: 

324 # shallow copy plain container traits 

325 value = copy.copy(value) 

326 new_config[key] = value 

327 return new_config 

328 

329 def __getitem__(self, key: str) -> t.Any: 

330 try: 

331 return dict.__getitem__(self, key) 

332 except KeyError: 

333 if _is_section_key(key): 

334 c = Config() 

335 dict.__setitem__(self, key, c) 

336 return c 

337 elif not key.startswith("_"): 

338 # undefined, create lazy value, used for container methods 

339 v = LazyConfigValue() 

340 dict.__setitem__(self, key, v) 

341 return v 

342 else: 

343 raise 

344 

345 def __setitem__(self, key: str, value: t.Any) -> None: 

346 if _is_section_key(key): 

347 if not isinstance(value, Config): 

348 raise ValueError( 

349 "values whose keys begin with an uppercase " 

350 f"char must be Config instances: {key!r}, {value!r}" 

351 ) 

352 dict.__setitem__(self, key, value) 

353 

354 def __getattr__(self, key: str) -> t.Any: 

355 if key.startswith("__"): 

356 return dict.__getattr__(self, key) # type:ignore[attr-defined] 

357 try: 

358 return self.__getitem__(key) 

359 except KeyError as e: 

360 raise AttributeError(e) from e 

361 

362 def __setattr__(self, key: str, value: t.Any) -> None: 

363 if key.startswith("__"): 

364 return dict.__setattr__(self, key, value) 

365 try: 

366 self.__setitem__(key, value) 

367 except KeyError as e: 

368 raise AttributeError(e) from e 

369 

370 def __delattr__(self, key: str) -> None: 

371 if key.startswith("__"): 

372 return dict.__delattr__(self, key) 

373 try: 

374 dict.__delitem__(self, key) 

375 except KeyError as e: 

376 raise AttributeError(e) from e 

377 

378 

379class DeferredConfig: 

380 """Class for deferred-evaluation of config from CLI""" 

381 

382 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any: 

383 raise NotImplementedError("Implement in subclasses") 

384 

385 def _super_repr(self) -> str: 

386 # explicitly call super on direct parent 

387 return super(self.__class__, self).__repr__() 

388 

389 

390class DeferredConfigString(str, DeferredConfig): 

391 """Config value for loading config from a string 

392 

393 Interpretation is deferred until it is loaded into the trait. 

394 

395 Subclass of str for backward compatibility. 

396 

397 This class is only used for values that are not listed 

398 in the configurable classes. 

399 

400 When config is loaded, `trait.from_string` will be used. 

401 

402 If an error is raised in `.from_string`, 

403 the original string is returned. 

404 

405 .. versionadded:: 5.0 

406 """ 

407 

408 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any: 

409 """Get the value stored in this string""" 

410 s = str(self) 

411 try: 

412 return trait.from_string(s) 

413 except Exception: 

414 # exception casting from string, 

415 # let the original string lie. 

416 # this will raise a more informative error when config is loaded. 

417 return s 

418 

419 def __repr__(self) -> str: 

420 return f"{self.__class__.__name__}({self._super_repr()})" 

421 

422 

423class DeferredConfigList(t.List[t.Any], DeferredConfig): 

424 """Config value for loading config from a list of strings 

425 

426 Interpretation is deferred until it is loaded into the trait. 

427 

428 This class is only used for values that are not listed 

429 in the configurable classes. 

430 

431 When config is loaded, `trait.from_string_list` will be used. 

432 

433 If an error is raised in `.from_string_list`, 

434 the original string list is returned. 

435 

436 .. versionadded:: 5.0 

437 """ 

438 

439 def get_value(self, trait: TraitType[t.Any, t.Any]) -> t.Any: 

440 """Get the value stored in this string""" 

441 if hasattr(trait, "from_string_list"): 

442 src = list(self) 

443 cast = trait.from_string_list 

444 else: 

445 # only allow one item 

446 if len(self) > 1: 

447 raise ValueError( 

448 f"{trait.name} only accepts one value, got {len(self)}: {list(self)}" 

449 ) 

450 src = self[0] 

451 cast = trait.from_string 

452 

453 try: 

454 return cast(src) 

455 except Exception: 

456 # exception casting from string, 

457 # let the original value lie. 

458 # this will raise a more informative error when config is loaded. 

459 return src 

460 

461 def __repr__(self) -> str: 

462 return f"{self.__class__.__name__}({self._super_repr()})" 

463 

464 

465# ----------------------------------------------------------------------------- 

466# Config loading classes 

467# ----------------------------------------------------------------------------- 

468 

469 

470class ConfigLoader: 

471 """A object for loading configurations from just about anywhere. 

472 

473 The resulting configuration is packaged as a :class:`Config`. 

474 

475 Notes 

476 ----- 

477 A :class:`ConfigLoader` does one thing: load a config from a source 

478 (file, command line arguments) and returns the data as a :class:`Config` object. 

479 There are lots of things that :class:`ConfigLoader` does not do. It does 

480 not implement complex logic for finding config files. It does not handle 

481 default values or merge multiple configs. These things need to be 

482 handled elsewhere. 

483 """ 

484 

485 def _log_default(self) -> Logger: 

486 from traitlets.log import get_logger 

487 

488 return t.cast(Logger, get_logger()) 

489 

490 def __init__(self, log: Logger | None = None) -> None: 

491 """A base class for config loaders. 

492 

493 log : instance of :class:`logging.Logger` to use. 

494 By default logger of :meth:`traitlets.config.application.Application.instance()` 

495 will be used 

496 

497 Examples 

498 -------- 

499 >>> cl = ConfigLoader() 

500 >>> config = cl.load_config() 

501 >>> config 

502 {} 

503 """ 

504 self.clear() 

505 if log is None: 

506 self.log = self._log_default() 

507 self.log.debug("Using default logger") 

508 else: 

509 self.log = log 

510 

511 def clear(self) -> None: 

512 self.config = Config() 

513 

514 def load_config(self) -> Config: 

515 """Load a config from somewhere, return a :class:`Config` instance. 

516 

517 Usually, this will cause self.config to be set and then returned. 

518 However, in most cases, :meth:`ConfigLoader.clear` should be called 

519 to erase any previous state. 

520 """ 

521 self.clear() 

522 return self.config 

523 

524 

525class FileConfigLoader(ConfigLoader): 

526 """A base class for file based configurations. 

527 

528 As we add more file based config loaders, the common logic should go 

529 here. 

530 """ 

531 

532 def __init__(self, filename: str, path: str | None = None, **kw: t.Any) -> None: 

533 """Build a config loader for a filename and path. 

534 

535 Parameters 

536 ---------- 

537 filename : str 

538 The file name of the config file. 

539 path : str, list, tuple 

540 The path to search for the config file on, or a sequence of 

541 paths to try in order. 

542 """ 

543 super().__init__(**kw) 

544 self.filename = filename 

545 self.path = path 

546 self.full_filename = "" 

547 

548 def _find_file(self) -> None: 

549 """Try to find the file by searching the paths.""" 

550 self.full_filename = filefind(self.filename, self.path) 

551 

552 

553class JSONFileConfigLoader(FileConfigLoader): 

554 """A JSON file loader for config 

555 

556 Can also act as a context manager that rewrite the configuration file to disk on exit. 

557 

558 Example:: 

559 

560 with JSONFileConfigLoader('myapp.json','/home/jupyter/configurations/') as c: 

561 c.MyNewConfigurable.new_value = 'Updated' 

562 

563 """ 

564 

565 def load_config(self) -> Config: 

566 """Load the config from a file and return it as a Config object.""" 

567 self.clear() 

568 try: 

569 self._find_file() 

570 except OSError as e: 

571 raise ConfigFileNotFound(str(e)) from e 

572 dct = self._read_file_as_dict() 

573 self.config = self._convert_to_config(dct) 

574 return self.config 

575 

576 def _read_file_as_dict(self) -> dict[str, t.Any]: 

577 with open(self.full_filename) as f: 

578 return t.cast("dict[str, t.Any]", json.load(f)) 

579 

580 def _convert_to_config(self, dictionary: dict[str, t.Any]) -> Config: 

581 if "version" in dictionary: 

582 version = dictionary.pop("version") 

583 else: 

584 version = 1 

585 

586 if version == 1: 

587 return Config(dictionary) 

588 else: 

589 raise ValueError(f"Unknown version of JSON config file: {version}") 

590 

591 def __enter__(self) -> Config: 

592 self.load_config() 

593 return self.config 

594 

595 def __exit__(self, exc_type: object, exc_value: object, traceback: object) -> None: 

596 """ 

597 Exit the context manager but do not handle any errors. 

598 

599 In case of any error, we do not want to write the potentially broken 

600 configuration to disk. 

601 """ 

602 self.config.version = 1 

603 json_config = json.dumps(self.config, indent=2) 

604 with open(self.full_filename, "w") as f: 

605 f.write(json_config) 

606 

607 

608class PyFileConfigLoader(FileConfigLoader): 

609 """A config loader for pure python files. 

610 

611 This is responsible for locating a Python config file by filename and 

612 path, then executing it to construct a Config object. 

613 """ 

614 

615 def load_config(self) -> Config: 

616 """Load the config from a file and return it as a Config object.""" 

617 self.clear() 

618 try: 

619 self._find_file() 

620 except OSError as e: 

621 raise ConfigFileNotFound(str(e)) from e 

622 self._read_file_as_dict() 

623 return self.config 

624 

625 def load_subconfig(self, fname: str, path: str | None = None) -> None: 

626 """Injected into config file namespace as load_subconfig""" 

627 if path is None: 

628 path = self.path 

629 

630 loader = self.__class__(fname, path) 

631 try: 

632 sub_config = loader.load_config() 

633 except ConfigFileNotFound: 

634 # Pass silently if the sub config is not there, 

635 # treat it as an empty config file. 

636 pass 

637 else: 

638 self.config.merge(sub_config) 

639 

640 def _read_file_as_dict(self) -> None: 

641 """Load the config file into self.config, with recursive loading.""" 

642 

643 def get_config() -> Config: 

644 """Unnecessary now, but a deprecation warning is more trouble than it's worth.""" 

645 return self.config 

646 

647 namespace = dict( # noqa: C408 

648 c=self.config, 

649 load_subconfig=self.load_subconfig, 

650 get_config=get_config, 

651 __file__=self.full_filename, 

652 ) 

653 conf_filename = self.full_filename 

654 with open(conf_filename, "rb") as f: 

655 exec(compile(f.read(), conf_filename, "exec"), namespace, namespace) # noqa: S102 

656 

657 

658class CommandLineConfigLoader(ConfigLoader): 

659 """A config loader for command line arguments. 

660 

661 As we add more command line based loaders, the common logic should go 

662 here. 

663 """ 

664 

665 def _exec_config_str( 

666 self, lhs: t.Any, rhs: t.Any, trait: TraitType[t.Any, t.Any] | None = None 

667 ) -> None: 

668 """execute self.config.<lhs> = <rhs> 

669 

670 * expands ~ with expanduser 

671 * interprets value with trait if available 

672 """ 

673 value = rhs 

674 if isinstance(value, DeferredConfig): 

675 if trait: 

676 # trait available, reify config immediately 

677 value = value.get_value(trait) 

678 elif isinstance(rhs, DeferredConfigList) and len(rhs) == 1: 

679 # single item, make it a deferred str 

680 value = DeferredConfigString(os.path.expanduser(rhs[0])) 

681 else: 

682 if trait: 

683 value = trait.from_string(value) 

684 else: 

685 value = DeferredConfigString(value) 

686 

687 *path, key = lhs.split(".") 

688 section = self.config 

689 for part in path: 

690 section = section[part] 

691 section[key] = value 

692 return 

693 

694 def _load_flag(self, cfg: t.Any) -> None: 

695 """update self.config from a flag, which can be a dict or Config""" 

696 if isinstance(cfg, (dict, Config)): 

697 # don't clobber whole config sections, update 

698 # each section from config: 

699 for sec, c in cfg.items(): 

700 self.config[sec].update(c) 

701 else: 

702 raise TypeError("Invalid flag: %r" % cfg) 

703 

704 

705# match --Class.trait keys for argparse 

706# matches: 

707# --Class.trait 

708# --x 

709# -x 

710 

711class_trait_opt_pattern = re.compile(r"^\-?\-[A-Za-z][\w]*(\.[\w]+)*$") 

712 

713_DOT_REPLACEMENT = "__DOT__" 

714_DASH_REPLACEMENT = "__DASH__" 

715 

716 

717class _KVAction(argparse.Action): 

718 """Custom argparse action for handling --Class.trait=x 

719 

720 Always 

721 """ 

722 

723 def __call__( # type:ignore[override] 

724 self, 

725 parser: argparse.ArgumentParser, 

726 namespace: dict[str, t.Any], 

727 values: t.Sequence[t.Any], 

728 option_string: str | None = None, 

729 ) -> None: 

730 if isinstance(values, str): 

731 values = [values] 

732 values = ["-" if v is _DASH_REPLACEMENT else v for v in values] 

733 items = getattr(namespace, self.dest, None) 

734 if items is None: 

735 items = DeferredConfigList() 

736 else: 

737 items = DeferredConfigList(items) 

738 items.extend(values) 

739 setattr(namespace, self.dest, items) 

740 

741 

742class _DefaultOptionDict(dict): # type:ignore[type-arg] 

743 """Like the default options dict 

744 

745 but acts as if all --Class.trait options are predefined 

746 """ 

747 

748 def _add_kv_action(self, key: str) -> None: 

749 self[key] = _KVAction( 

750 option_strings=[key], 

751 dest=key.lstrip("-").replace(".", _DOT_REPLACEMENT), 

752 # use metavar for display purposes 

753 metavar=key.lstrip("-"), 

754 ) 

755 

756 def __contains__(self, key: t.Any) -> bool: 

757 if "=" in key: 

758 return False 

759 if super().__contains__(key): 

760 return True 

761 

762 if key.startswith("-") and class_trait_opt_pattern.match(key): 

763 self._add_kv_action(key) 

764 return True 

765 return False 

766 

767 def __getitem__(self, key: str) -> t.Any: 

768 if key in self: 

769 return super().__getitem__(key) 

770 else: 

771 raise KeyError(key) 

772 

773 def get(self, key: str, default: t.Any = None) -> t.Any: 

774 try: 

775 return self[key] 

776 except KeyError: 

777 return default 

778 

779 

780class _KVArgParser(argparse.ArgumentParser): 

781 """subclass of ArgumentParser where any --Class.trait option is implicitly defined""" 

782 

783 def parse_known_args( # type:ignore[override] 

784 self, args: t.Sequence[str] | None = None, namespace: argparse.Namespace | None = None 

785 ) -> tuple[argparse.Namespace | None, list[str]]: 

786 # must be done immediately prior to parsing because if we do it in init, 

787 # registration of explicit actions via parser.add_option will fail during setup 

788 for container in (self, self._optionals): 

789 container._option_string_actions = _DefaultOptionDict(container._option_string_actions) 

790 return super().parse_known_args(args, namespace) 

791 

792 

793# type aliases 

794SubcommandsDict = t.Dict[str, t.Any] 

795 

796 

797class ArgParseConfigLoader(CommandLineConfigLoader): 

798 """A loader that uses the argparse module to load from the command line.""" 

799 

800 parser_class = ArgumentParser 

801 

802 def __init__( 

803 self, 

804 argv: list[str] | None = None, 

805 aliases: dict[str, str] | None = None, 

806 flags: dict[str, str] | None = None, 

807 log: t.Any = None, 

808 classes: list[type[t.Any]] | None = None, 

809 subcommands: SubcommandsDict | None = None, 

810 *parser_args: t.Any, 

811 **parser_kw: t.Any, 

812 ) -> None: 

813 """Create a config loader for use with argparse. 

814 

815 Parameters 

816 ---------- 

817 classes : optional, list 

818 The classes to scan for *container* config-traits and decide 

819 for their "multiplicity" when adding them as *argparse* arguments. 

820 argv : optional, list 

821 If given, used to read command-line arguments from, otherwise 

822 sys.argv[1:] is used. 

823 *parser_args : tuple 

824 A tuple of positional arguments that will be passed to the 

825 constructor of :class:`argparse.ArgumentParser`. 

826 **parser_kw : dict 

827 A tuple of keyword arguments that will be passed to the 

828 constructor of :class:`argparse.ArgumentParser`. 

829 aliases : dict of str to str 

830 Dict of aliases to full traitlets names for CLI parsing 

831 flags : dict of str to str 

832 Dict of flags to full traitlets names for CLI parsing 

833 log 

834 Passed to `ConfigLoader` 

835 

836 Returns 

837 ------- 

838 config : Config 

839 The resulting Config object. 

840 """ 

841 classes = classes or [] 

842 super(CommandLineConfigLoader, self).__init__(log=log) 

843 self.clear() 

844 if argv is None: 

845 argv = sys.argv[1:] 

846 self.argv = argv 

847 self.aliases = aliases or {} 

848 self.flags = flags or {} 

849 self.classes = classes 

850 self.subcommands = subcommands # only used for argcomplete currently 

851 

852 self.parser_args = parser_args 

853 self.version = parser_kw.pop("version", None) 

854 kwargs = dict(argument_default=argparse.SUPPRESS) # noqa: C408 

855 kwargs.update(parser_kw) 

856 self.parser_kw = kwargs 

857 

858 def load_config( 

859 self, 

860 argv: list[str] | None = None, 

861 aliases: t.Any = None, 

862 flags: t.Any = _deprecated, 

863 classes: t.Any = None, 

864 ) -> Config: 

865 """Parse command line arguments and return as a Config object. 

866 

867 Parameters 

868 ---------- 

869 argv : optional, list 

870 If given, a list with the structure of sys.argv[1:] to parse 

871 arguments from. If not given, the instance's self.argv attribute 

872 (given at construction time) is used. 

873 flags 

874 Deprecated in traitlets 5.0, instantiate the config loader with the flags. 

875 

876 """ 

877 

878 if flags is not _deprecated: 

879 warnings.warn( 

880 "The `flag` argument to load_config is deprecated since Traitlets " 

881 f"5.0 and will be ignored, pass flags the `{type(self)}` constructor.", 

882 DeprecationWarning, 

883 stacklevel=2, 

884 ) 

885 

886 self.clear() 

887 if argv is None: 

888 argv = self.argv 

889 if aliases is not None: 

890 self.aliases = aliases 

891 if classes is not None: 

892 self.classes = classes 

893 self._create_parser() 

894 self._argcomplete(self.classes, self.subcommands) 

895 self._parse_args(argv) 

896 self._convert_to_config() 

897 return self.config 

898 

899 def get_extra_args(self) -> list[str]: 

900 if hasattr(self, "extra_args"): 

901 return self.extra_args 

902 else: 

903 return [] 

904 

905 def _create_parser(self) -> None: 

906 self.parser = self.parser_class( 

907 *self.parser_args, 

908 **self.parser_kw, # type:ignore[arg-type] 

909 ) 

910 self._add_arguments(self.aliases, self.flags, self.classes) 

911 

912 def _add_arguments(self, aliases: t.Any, flags: t.Any, classes: t.Any) -> None: 

913 raise NotImplementedError("subclasses must implement _add_arguments") 

914 

915 def _argcomplete(self, classes: list[t.Any], subcommands: SubcommandsDict | None) -> None: 

916 """If argcomplete is enabled, allow triggering command-line autocompletion""" 

917 

918 def _parse_args(self, args: t.Any) -> t.Any: 

919 """self.parser->self.parsed_data""" 

920 uargs = [cast_unicode(a) for a in args] 

921 

922 unpacked_aliases: dict[str, str] = {} 

923 if self.aliases: 

924 unpacked_aliases = {} 

925 for alias, alias_target in self.aliases.items(): 

926 if alias in self.flags: 

927 continue 

928 if not isinstance(alias, tuple): # type:ignore[unreachable] 

929 alias = (alias,) # type:ignore[assignment] 

930 for al in alias: 

931 if len(al) == 1: 

932 unpacked_aliases["-" + al] = "--" + alias_target 

933 unpacked_aliases["--" + al] = "--" + alias_target 

934 

935 def _replace(arg: str) -> str: 

936 if arg == "-": 

937 return _DASH_REPLACEMENT 

938 for k, v in unpacked_aliases.items(): 

939 if arg == k: 

940 return v 

941 if arg.startswith(k + "="): 

942 return v + "=" + arg[len(k) + 1 :] 

943 return arg 

944 

945 if "--" in uargs: 

946 idx = uargs.index("--") 

947 extra_args = uargs[idx + 1 :] 

948 to_parse = uargs[:idx] 

949 else: 

950 extra_args = [] 

951 to_parse = uargs 

952 to_parse = [_replace(a) for a in to_parse] 

953 

954 self.parsed_data = self.parser.parse_args(to_parse) 

955 self.extra_args = extra_args 

956 

957 def _convert_to_config(self) -> None: 

958 """self.parsed_data->self.config""" 

959 for k, v in vars(self.parsed_data).items(): 

960 *path, key = k.split(".") 

961 section = self.config 

962 for p in path: 

963 section = section[p] 

964 setattr(section, key, v) 

965 

966 

967class _FlagAction(argparse.Action): 

968 """ArgParse action to handle a flag""" 

969 

970 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None: 

971 self.flag = kwargs.pop("flag") 

972 self.alias = kwargs.pop("alias", None) 

973 kwargs["const"] = Undefined 

974 if not self.alias: 

975 kwargs["nargs"] = 0 

976 super().__init__(*args, **kwargs) 

977 

978 def __call__( 

979 self, parser: t.Any, namespace: t.Any, values: t.Any, option_string: str | None = None 

980 ) -> None: 

981 if self.nargs == 0 or values is Undefined: 

982 if not hasattr(namespace, "_flags"): 

983 namespace._flags = [] 

984 namespace._flags.append(self.flag) 

985 else: 

986 setattr(namespace, self.alias, values) 

987 

988 

989class KVArgParseConfigLoader(ArgParseConfigLoader): 

990 """A config loader that loads aliases and flags with argparse, 

991 

992 as well as arbitrary --Class.trait value 

993 """ 

994 

995 parser_class = _KVArgParser # type:ignore[assignment] 

996 

997 def _add_arguments(self, aliases: t.Any, flags: t.Any, classes: t.Any) -> None: 

998 alias_flags: dict[str, t.Any] = {} 

999 argparse_kwds: dict[str, t.Any] 

1000 argparse_traits: dict[str, t.Any] 

1001 paa = self.parser.add_argument 

1002 self.parser.set_defaults(_flags=[]) 

1003 paa("extra_args", nargs="*") 

1004 

1005 # An index of all container traits collected:: 

1006 # 

1007 # { <traitname>: (<trait>, <argparse-kwds>) } 

1008 # 

1009 # Used to add the correct type into the `config` tree. 

1010 # Used also for aliases, not to re-collect them. 

1011 self.argparse_traits = argparse_traits = {} 

1012 for cls in classes: 

1013 for traitname, trait in cls.class_traits(config=True).items(): 

1014 argname = f"{cls.__name__}.{traitname}" 

1015 argparse_kwds = {"type": str} 

1016 if isinstance(trait, (Container, Dict)): 

1017 multiplicity = trait.metadata.get("multiplicity", "append") 

1018 if multiplicity == "append": 

1019 argparse_kwds["action"] = multiplicity 

1020 else: 

1021 argparse_kwds["nargs"] = multiplicity 

1022 argparse_traits[argname] = (trait, argparse_kwds) 

1023 

1024 for keys, (value, fhelp) in flags.items(): 

1025 if not isinstance(keys, tuple): 

1026 keys = (keys,) 

1027 for key in keys: 

1028 if key in aliases: 

1029 alias_flags[aliases[key]] = value 

1030 continue 

1031 keys = ("-" + key, "--" + key) if len(key) == 1 else ("--" + key,) 

1032 paa(*keys, action=_FlagAction, flag=value, help=fhelp) 

1033 

1034 for keys, traitname in aliases.items(): 

1035 if not isinstance(keys, tuple): 

1036 keys = (keys,) 

1037 

1038 for key in keys: 

1039 argparse_kwds = { 

1040 "type": str, 

1041 "dest": traitname.replace(".", _DOT_REPLACEMENT), 

1042 "metavar": traitname, 

1043 } 

1044 argcompleter = None 

1045 if traitname in argparse_traits: 

1046 trait, kwds = argparse_traits[traitname] 

1047 argparse_kwds.update(kwds) 

1048 if "action" in argparse_kwds and traitname in alias_flags: 

1049 # flag sets 'action', so can't have flag & alias with custom action 

1050 # on the same name 

1051 raise ArgumentError( 

1052 f"The alias `{key}` for the 'append' sequence " 

1053 f"config-trait `{traitname}` cannot be also a flag!'" 

1054 ) 

1055 # For argcomplete, check if any either an argcompleter metadata tag or method 

1056 # is available. If so, it should be a callable which takes the command-line key 

1057 # string as an argument and other kwargs passed by argcomplete, 

1058 # and returns the a list of string completions. 

1059 argcompleter = trait.metadata.get("argcompleter") or getattr( 

1060 trait, "argcompleter", None 

1061 ) 

1062 if traitname in alias_flags: 

1063 # alias and flag. 

1064 # when called with 0 args: flag 

1065 # when called with >= 1: alias 

1066 argparse_kwds.setdefault("nargs", "?") 

1067 argparse_kwds["action"] = _FlagAction 

1068 argparse_kwds["flag"] = alias_flags[traitname] 

1069 argparse_kwds["alias"] = traitname 

1070 keys = ("-" + key, "--" + key) if len(key) == 1 else ("--" + key,) 

1071 action = paa(*keys, **argparse_kwds) 

1072 if argcompleter is not None: 

1073 # argcomplete's completers are callables returning list of completion strings 

1074 action.completer = functools.partial( # type:ignore[attr-defined] 

1075 argcompleter, key=key 

1076 ) 

1077 

1078 def _convert_to_config(self) -> None: 

1079 """self.parsed_data->self.config, parse unrecognized extra args via KVLoader.""" 

1080 extra_args = self.extra_args 

1081 

1082 for lhs, rhs in vars(self.parsed_data).items(): 

1083 if lhs == "extra_args": 

1084 self.extra_args = ["-" if a == _DASH_REPLACEMENT else a for a in rhs] + extra_args 

1085 continue 

1086 if lhs == "_flags": 

1087 # _flags will be handled later 

1088 continue 

1089 

1090 lhs = lhs.replace(_DOT_REPLACEMENT, ".") 

1091 if "." not in lhs: 

1092 self._handle_unrecognized_alias(lhs) 

1093 trait = None 

1094 

1095 if isinstance(rhs, list): 

1096 rhs = DeferredConfigList(rhs) 

1097 elif isinstance(rhs, str): 

1098 rhs = DeferredConfigString(rhs) 

1099 

1100 trait = self.argparse_traits.get(lhs) 

1101 if trait: 

1102 trait = trait[0] 

1103 

1104 # eval the KV assignment 

1105 try: 

1106 self._exec_config_str(lhs, rhs, trait) 

1107 except Exception as e: 

1108 # cast deferred to nicer repr for the error 

1109 # DeferredList->list, etc 

1110 if isinstance(rhs, DeferredConfig): 

1111 rhs = rhs._super_repr() 

1112 raise ArgumentError(f"Error loading argument {lhs}={rhs}, {e}") from e 

1113 

1114 for subc in self.parsed_data._flags: 

1115 self._load_flag(subc) 

1116 

1117 def _handle_unrecognized_alias(self, arg: str) -> None: 

1118 """Handling for unrecognized alias arguments 

1119 

1120 Probably a mistyped alias. By default just log a warning, 

1121 but users can override this to raise an error instead, e.g. 

1122 self.parser.error("Unrecognized alias: '%s'" % arg) 

1123 """ 

1124 self.log.warning("Unrecognized alias: '%s', it will have no effect.", arg) 

1125 

1126 def _argcomplete(self, classes: list[t.Any], subcommands: SubcommandsDict | None) -> None: 

1127 """If argcomplete is enabled, allow triggering command-line autocompletion""" 

1128 try: 

1129 import argcomplete # noqa: F401 

1130 except ImportError: 

1131 return 

1132 

1133 from . import argcomplete_config 

1134 

1135 finder = argcomplete_config.ExtendedCompletionFinder() # type:ignore[no-untyped-call] 

1136 finder.config_classes = classes 

1137 finder.subcommands = list(subcommands or []) 

1138 # for ease of testing, pass through self._argcomplete_kwargs if set 

1139 finder(self.parser, **getattr(self, "_argcomplete_kwargs", {})) 

1140 

1141 

1142class KeyValueConfigLoader(KVArgParseConfigLoader): 

1143 """Deprecated in traitlets 5.0 

1144 

1145 Use KVArgParseConfigLoader 

1146 """ 

1147 

1148 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None: 

1149 warnings.warn( 

1150 "KeyValueConfigLoader is deprecated since Traitlets 5.0." 

1151 " Use KVArgParseConfigLoader instead.", 

1152 DeprecationWarning, 

1153 stacklevel=2, 

1154 ) 

1155 super().__init__(*args, **kwargs) 

1156 

1157 

1158def load_pyconfig_files(config_files: list[str], path: str) -> Config: 

1159 """Load multiple Python config files, merging each of them in turn. 

1160 

1161 Parameters 

1162 ---------- 

1163 config_files : list of str 

1164 List of config files names to load and merge into the config. 

1165 path : unicode 

1166 The full path to the location of the config files. 

1167 """ 

1168 config = Config() 

1169 for cf in config_files: 

1170 loader = PyFileConfigLoader(cf, path=path) 

1171 try: 

1172 next_config = loader.load_config() 

1173 except ConfigFileNotFound: 

1174 pass 

1175 except Exception: 

1176 raise 

1177 else: 

1178 config.merge(next_config) 

1179 return config