Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/traitlets/config/loader.py: 26%

570 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:05 +0000

1"""A simple configuration system.""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6import argparse 

7import copy 

8import functools 

9import json 

10import os 

11import re 

12import sys 

13import typing as t 

14 

15from traitlets.traitlets import Any, Container, Dict, HasTraits, List, Undefined 

16 

17from ..utils import cast_unicode, filefind, warnings 

18 

19# ----------------------------------------------------------------------------- 

20# Exceptions 

21# ----------------------------------------------------------------------------- 

22 

23 

24class ConfigError(Exception): 

25 pass 

26 

27 

28class ConfigLoaderError(ConfigError): 

29 pass 

30 

31 

32class ConfigFileNotFound(ConfigError): # noqa 

33 pass 

34 

35 

36class ArgumentError(ConfigLoaderError): 

37 pass 

38 

39 

40# ----------------------------------------------------------------------------- 

41# Argparse fix 

42# ----------------------------------------------------------------------------- 

43 

44# Unfortunately argparse by default prints help messages to stderr instead of 

45# stdout. This makes it annoying to capture long help screens at the command 

46# line, since one must know how to pipe stderr, which many users don't know how 

47# to do. So we override the print_help method with one that defaults to 

48# stdout and use our class instead. 

49 

50 

51class _Sentinel: 

52 def __repr__(self): 

53 return "<Sentinel deprecated>" 

54 

55 def __str__(self): 

56 return "<deprecated>" 

57 

58 

59_deprecated = _Sentinel() 

60 

61 

62class ArgumentParser(argparse.ArgumentParser): 

63 """Simple argparse subclass that prints help to stdout by default.""" 

64 

65 def print_help(self, file=None): 

66 if file is None: 

67 file = sys.stdout 

68 return super().print_help(file) 

69 

70 print_help.__doc__ = argparse.ArgumentParser.print_help.__doc__ 

71 

72 

73# ----------------------------------------------------------------------------- 

74# Config class for holding config information 

75# ----------------------------------------------------------------------------- 

76 

77 

78def execfile(fname, glob): 

79 with open(fname, "rb") as f: 

80 exec(compile(f.read(), fname, "exec"), glob, glob) # noqa 

81 

82 

83class LazyConfigValue(HasTraits): 

84 """Proxy object for exposing methods on configurable containers 

85 

86 These methods allow appending/extending/updating 

87 to add to non-empty defaults instead of clobbering them. 

88 

89 Exposes: 

90 

91 - append, extend, insert on lists 

92 - update on dicts 

93 - update, add on sets 

94 """ 

95 

96 _value = None 

97 

98 # list methods 

99 _extend: List = List() 

100 _prepend: List = List() 

101 _inserts: List = List() 

102 

103 def append(self, obj): 

104 """Append an item to a List""" 

105 self._extend.append(obj) 

106 

107 def extend(self, other): 

108 """Extend a list""" 

109 self._extend.extend(other) 

110 

111 def prepend(self, other): 

112 """like list.extend, but for the front""" 

113 self._prepend[:0] = other 

114 

115 def merge_into(self, other): 

116 """ 

117 Merge with another earlier LazyConfigValue or an earlier container. 

118 This is useful when having global system-wide configuration files. 

119 

120 Self is expected to have higher precedence. 

121 

122 Parameters 

123 ---------- 

124 other : LazyConfigValue or container 

125 

126 Returns 

127 ------- 

128 LazyConfigValue 

129 if ``other`` is also lazy, a reified container otherwise. 

130 """ 

131 if isinstance(other, LazyConfigValue): 

132 other._extend.extend(self._extend) 

133 self._extend = other._extend 

134 

135 self._prepend.extend(other._prepend) 

136 

137 other._inserts.extend(self._inserts) 

138 self._inserts = other._inserts 

139 

140 if self._update: 

141 other.update(self._update) 

142 self._update = other._update 

143 return self 

144 else: 

145 # other is a container, reify now. 

146 return self.get_value(other) 

147 

148 def insert(self, index, other): 

149 if not isinstance(index, int): 

150 raise TypeError("An integer is required") 

151 self._inserts.append((index, other)) 

152 

153 # dict methods 

154 # update is used for both dict and set 

155 _update = Any() 

156 

157 def update(self, other): 

158 """Update either a set or dict""" 

159 if self._update is None: 

160 if isinstance(other, dict): 

161 self._update = {} 

162 else: 

163 self._update = set() 

164 self._update.update(other) 

165 

166 # set methods 

167 def add(self, obj): 

168 """Add an item to a set""" 

169 self.update({obj}) 

170 

171 def get_value(self, initial): 

172 """construct the value from the initial one 

173 

174 after applying any insert / extend / update changes 

175 """ 

176 if self._value is not None: 

177 return self._value 

178 value = copy.deepcopy(initial) 

179 if isinstance(value, list): 

180 for idx, obj in self._inserts: 

181 value.insert(idx, obj) 

182 value[:0] = self._prepend 

183 value.extend(self._extend) 

184 

185 elif isinstance(value, dict): 

186 if self._update: 

187 value.update(self._update) 

188 elif isinstance(value, set): 

189 if self._update: 

190 value.update(self._update) 

191 self._value = value 

192 return value 

193 

194 def to_dict(self): 

195 """return JSONable dict form of my data 

196 

197 Currently update as dict or set, extend, prepend as lists, and inserts as list of tuples. 

198 """ 

199 d = {} 

200 if self._update: 

201 d["update"] = self._update 

202 if self._extend: 

203 d["extend"] = self._extend 

204 if self._prepend: 

205 d["prepend"] = self._prepend 

206 elif self._inserts: 

207 d["inserts"] = self._inserts 

208 return d 

209 

210 def __repr__(self): 

211 if self._value is not None: 

212 return f"<{self.__class__.__name__} value={self._value!r}>" 

213 else: 

214 return f"<{self.__class__.__name__} {self.to_dict()!r}>" 

215 

216 

217def _is_section_key(key): 

218 """Is a Config key a section name (does it start with a capital)?""" 

219 if key and key[0].upper() == key[0] and not key.startswith("_"): 

220 return True 

221 else: 

222 return False 

223 

224 

225class Config(dict): # type:ignore[type-arg] 

226 """An attribute-based dict that can do smart merges. 

227 

228 Accessing a field on a config object for the first time populates the key 

229 with either a nested Config object for keys starting with capitals 

230 or :class:`.LazyConfigValue` for lowercase keys, 

231 allowing quick assignments such as:: 

232 

233 c = Config() 

234 c.Class.int_trait = 5 

235 c.Class.list_trait.append("x") 

236 

237 """ 

238 

239 def __init__(self, *args, **kwds): 

240 dict.__init__(self, *args, **kwds) 

241 self._ensure_subconfig() 

242 

243 def _ensure_subconfig(self): 

244 """ensure that sub-dicts that should be Config objects are 

245 

246 casts dicts that are under section keys to Config objects, 

247 which is necessary for constructing Config objects from dict literals. 

248 """ 

249 for key in self: 

250 obj = self[key] 

251 if _is_section_key(key) and isinstance(obj, dict) and not isinstance(obj, Config): 

252 setattr(self, key, Config(obj)) 

253 

254 def _merge(self, other): 

255 """deprecated alias, use Config.merge()""" 

256 self.merge(other) 

257 

258 def merge(self, other): 

259 """merge another config object into this one""" 

260 to_update = {} 

261 for k, v in other.items(): 

262 if k not in self: 

263 to_update[k] = v 

264 else: # I have this key 

265 if isinstance(v, Config) and isinstance(self[k], Config): 

266 # Recursively merge common sub Configs 

267 self[k].merge(v) 

268 elif isinstance(v, LazyConfigValue): 

269 self[k] = v.merge_into(self[k]) 

270 else: 

271 # Plain updates for non-Configs 

272 to_update[k] = v 

273 

274 self.update(to_update) 

275 

276 def collisions(self, other: "Config") -> t.Dict[str, t.Any]: 

277 """Check for collisions between two config objects. 

278 

279 Returns a dict of the form {"Class": {"trait": "collision message"}}`, 

280 indicating which values have been ignored. 

281 

282 An empty dict indicates no collisions. 

283 """ 

284 collisions: t.Dict[str, t.Any] = {} 

285 for section in self: 

286 if section not in other: 

287 continue 

288 mine = self[section] 

289 theirs = other[section] 

290 for key in mine: 

291 if key in theirs and mine[key] != theirs[key]: 

292 collisions.setdefault(section, {}) 

293 collisions[section][key] = f"{mine[key]!r} ignored, using {theirs[key]!r}" 

294 return collisions 

295 

296 def __contains__(self, key): 

297 # allow nested contains of the form `"Section.key" in config` 

298 if "." in key: 

299 first, remainder = key.split(".", 1) 

300 if first not in self: 

301 return False 

302 return remainder in self[first] 

303 

304 return super().__contains__(key) 

305 

306 # .has_key is deprecated for dictionaries. 

307 has_key = __contains__ 

308 

309 def _has_section(self, key): 

310 return _is_section_key(key) and key in self 

311 

312 def copy(self): 

313 return type(self)(dict.copy(self)) 

314 

315 def __copy__(self): 

316 return self.copy() 

317 

318 def __deepcopy__(self, memo): 

319 new_config = type(self)() 

320 for key, value in self.items(): 

321 if isinstance(value, (Config, LazyConfigValue)): 

322 # deep copy config objects 

323 value = copy.deepcopy(value, memo) 

324 elif type(value) in {dict, list, set, tuple}: 

325 # shallow copy plain container traits 

326 value = copy.copy(value) 

327 new_config[key] = value 

328 return new_config 

329 

330 def __getitem__(self, key): 

331 try: 

332 return dict.__getitem__(self, key) 

333 except KeyError: 

334 if _is_section_key(key): 

335 c = Config() 

336 dict.__setitem__(self, key, c) 

337 return c 

338 elif not key.startswith("_"): 

339 # undefined, create lazy value, used for container methods 

340 v = LazyConfigValue() 

341 dict.__setitem__(self, key, v) 

342 return v 

343 else: 

344 raise 

345 

346 def __setitem__(self, key, value): 

347 if _is_section_key(key): 

348 if not isinstance(value, Config): 

349 raise ValueError( 

350 "values whose keys begin with an uppercase " 

351 f"char must be Config instances: {key!r}, {value!r}" 

352 ) 

353 dict.__setitem__(self, key, value) 

354 

355 def __getattr__(self, key): 

356 if key.startswith("__"): 

357 return dict.__getattr__(self, key) # type:ignore[attr-defined] 

358 try: 

359 return self.__getitem__(key) 

360 except KeyError as e: 

361 raise AttributeError(e) from e 

362 

363 def __setattr__(self, key, value): 

364 if key.startswith("__"): 

365 return dict.__setattr__(self, key, value) 

366 try: 

367 self.__setitem__(key, value) 

368 except KeyError as e: 

369 raise AttributeError(e) from e 

370 

371 def __delattr__(self, key): 

372 if key.startswith("__"): 

373 return dict.__delattr__(self, key) 

374 try: 

375 dict.__delitem__(self, key) 

376 except KeyError as e: 

377 raise AttributeError(e) from e 

378 

379 

380class DeferredConfig: 

381 """Class for deferred-evaluation of config from CLI""" 

382 

383 pass 

384 

385 def get_value(self, trait): 

386 raise NotImplementedError("Implement in subclasses") 

387 

388 def _super_repr(self): 

389 # explicitly call super on direct parent 

390 return super(self.__class__, self).__repr__() 

391 

392 

393class DeferredConfigString(str, DeferredConfig): 

394 """Config value for loading config from a string 

395 

396 Interpretation is deferred until it is loaded into the trait. 

397 

398 Subclass of str for backward compatibility. 

399 

400 This class is only used for values that are not listed 

401 in the configurable classes. 

402 

403 When config is loaded, `trait.from_string` will be used. 

404 

405 If an error is raised in `.from_string`, 

406 the original string is returned. 

407 

408 .. versionadded:: 5.0 

409 """ 

410 

411 def get_value(self, trait): 

412 """Get the value stored in this string""" 

413 s = str(self) 

414 try: 

415 return trait.from_string(s) 

416 except Exception: 

417 # exception casting from string, 

418 # let the original string lie. 

419 # this will raise a more informative error when config is loaded. 

420 return s 

421 

422 def __repr__(self): 

423 return f"{self.__class__.__name__}({self._super_repr()})" 

424 

425 

426class DeferredConfigList(list, DeferredConfig): # type:ignore[type-arg] 

427 """Config value for loading config from a list of strings 

428 

429 Interpretation is deferred until it is loaded into the trait. 

430 

431 This class is only used for values that are not listed 

432 in the configurable classes. 

433 

434 When config is loaded, `trait.from_string_list` will be used. 

435 

436 If an error is raised in `.from_string_list`, 

437 the original string list is returned. 

438 

439 .. versionadded:: 5.0 

440 """ 

441 

442 def get_value(self, trait): 

443 """Get the value stored in this string""" 

444 if hasattr(trait, "from_string_list"): 

445 src = list(self) 

446 cast = trait.from_string_list 

447 else: 

448 # only allow one item 

449 if len(self) > 1: 

450 raise ValueError( 

451 f"{trait.name} only accepts one value, got {len(self)}: {list(self)}" 

452 ) 

453 src = self[0] 

454 cast = trait.from_string 

455 

456 try: 

457 return cast(src) 

458 except Exception: 

459 # exception casting from string, 

460 # let the original value lie. 

461 # this will raise a more informative error when config is loaded. 

462 return src 

463 

464 def __repr__(self): 

465 return f"{self.__class__.__name__}({self._super_repr()})" 

466 

467 

468# ----------------------------------------------------------------------------- 

469# Config loading classes 

470# ----------------------------------------------------------------------------- 

471 

472 

473class ConfigLoader: 

474 """A object for loading configurations from just about anywhere. 

475 

476 The resulting configuration is packaged as a :class:`Config`. 

477 

478 Notes 

479 ----- 

480 A :class:`ConfigLoader` does one thing: load a config from a source 

481 (file, command line arguments) and returns the data as a :class:`Config` object. 

482 There are lots of things that :class:`ConfigLoader` does not do. It does 

483 not implement complex logic for finding config files. It does not handle 

484 default values or merge multiple configs. These things need to be 

485 handled elsewhere. 

486 """ 

487 

488 def _log_default(self): 

489 from traitlets.log import get_logger 

490 

491 return get_logger() 

492 

493 def __init__(self, log=None): 

494 """A base class for config loaders. 

495 

496 log : instance of :class:`logging.Logger` to use. 

497 By default logger of :meth:`traitlets.config.application.Application.instance()` 

498 will be used 

499 

500 Examples 

501 -------- 

502 >>> cl = ConfigLoader() 

503 >>> config = cl.load_config() 

504 >>> config 

505 {} 

506 """ 

507 self.clear() 

508 if log is None: 

509 self.log = self._log_default() 

510 self.log.debug("Using default logger") 

511 else: 

512 self.log = log 

513 

514 def clear(self): 

515 self.config = Config() 

516 

517 def load_config(self): 

518 """Load a config from somewhere, return a :class:`Config` instance. 

519 

520 Usually, this will cause self.config to be set and then returned. 

521 However, in most cases, :meth:`ConfigLoader.clear` should be called 

522 to erase any previous state. 

523 """ 

524 self.clear() 

525 return self.config 

526 

527 

528class FileConfigLoader(ConfigLoader): 

529 """A base class for file based configurations. 

530 

531 As we add more file based config loaders, the common logic should go 

532 here. 

533 """ 

534 

535 def __init__(self, filename, path=None, **kw): 

536 """Build a config loader for a filename and path. 

537 

538 Parameters 

539 ---------- 

540 filename : str 

541 The file name of the config file. 

542 path : str, list, tuple 

543 The path to search for the config file on, or a sequence of 

544 paths to try in order. 

545 """ 

546 super().__init__(**kw) 

547 self.filename = filename 

548 self.path = path 

549 self.full_filename = "" 

550 

551 def _find_file(self): 

552 """Try to find the file by searching the paths.""" 

553 self.full_filename = filefind(self.filename, self.path) 

554 

555 

556class JSONFileConfigLoader(FileConfigLoader): 

557 """A JSON file loader for config 

558 

559 Can also act as a context manager that rewrite the configuration file to disk on exit. 

560 

561 Example:: 

562 

563 with JSONFileConfigLoader('myapp.json','/home/jupyter/configurations/') as c: 

564 c.MyNewConfigurable.new_value = 'Updated' 

565 

566 """ 

567 

568 def load_config(self): 

569 """Load the config from a file and return it as a Config object.""" 

570 self.clear() 

571 try: 

572 self._find_file() 

573 except OSError as e: 

574 raise ConfigFileNotFound(str(e)) from e 

575 dct = self._read_file_as_dict() 

576 self.config = self._convert_to_config(dct) 

577 return self.config 

578 

579 def _read_file_as_dict(self): 

580 with open(self.full_filename) as f: 

581 return json.load(f) 

582 

583 def _convert_to_config(self, dictionary): 

584 if "version" in dictionary: 

585 version = dictionary.pop("version") 

586 else: 

587 version = 1 

588 

589 if version == 1: 

590 return Config(dictionary) 

591 else: 

592 raise ValueError(f"Unknown version of JSON config file: {version}") 

593 

594 def __enter__(self): 

595 self.load_config() 

596 return self.config 

597 

598 def __exit__(self, exc_type, exc_value, traceback): 

599 """ 

600 Exit the context manager but do not handle any errors. 

601 

602 In case of any error, we do not want to write the potentially broken 

603 configuration to disk. 

604 """ 

605 self.config.version = 1 

606 json_config = json.dumps(self.config, indent=2) 

607 with open(self.full_filename, "w") as f: 

608 f.write(json_config) 

609 

610 

611class PyFileConfigLoader(FileConfigLoader): 

612 """A config loader for pure python files. 

613 

614 This is responsible for locating a Python config file by filename and 

615 path, then executing it to construct a Config object. 

616 """ 

617 

618 def load_config(self): 

619 """Load the config from a file and return it as a Config object.""" 

620 self.clear() 

621 try: 

622 self._find_file() 

623 except OSError as e: 

624 raise ConfigFileNotFound(str(e)) from e 

625 self._read_file_as_dict() 

626 return self.config 

627 

628 def load_subconfig(self, fname, path=None): 

629 """Injected into config file namespace as load_subconfig""" 

630 if path is None: 

631 path = self.path 

632 

633 loader = self.__class__(fname, path) 

634 try: 

635 sub_config = loader.load_config() 

636 except ConfigFileNotFound: 

637 # Pass silently if the sub config is not there, 

638 # treat it as an empty config file. 

639 pass 

640 else: 

641 self.config.merge(sub_config) 

642 

643 def _read_file_as_dict(self): 

644 """Load the config file into self.config, with recursive loading.""" 

645 

646 def get_config(): 

647 """Unnecessary now, but a deprecation warning is more trouble than it's worth.""" 

648 return self.config 

649 

650 namespace = dict( 

651 c=self.config, 

652 load_subconfig=self.load_subconfig, 

653 get_config=get_config, 

654 __file__=self.full_filename, 

655 ) 

656 conf_filename = self.full_filename 

657 with open(conf_filename, "rb") as f: 

658 exec(compile(f.read(), conf_filename, "exec"), namespace, namespace) # noqa 

659 

660 

661class CommandLineConfigLoader(ConfigLoader): 

662 """A config loader for command line arguments. 

663 

664 As we add more command line based loaders, the common logic should go 

665 here. 

666 """ 

667 

668 def _exec_config_str(self, lhs, rhs, trait=None): 

669 """execute self.config.<lhs> = <rhs> 

670 

671 * expands ~ with expanduser 

672 * interprets value with trait if available 

673 """ 

674 value = rhs 

675 if isinstance(value, DeferredConfig): 

676 if trait: 

677 # trait available, reify config immediately 

678 value = value.get_value(trait) 

679 elif isinstance(rhs, DeferredConfigList) and len(rhs) == 1: 

680 # single item, make it a deferred str 

681 value = DeferredConfigString(os.path.expanduser(rhs[0])) 

682 else: 

683 if trait: 

684 value = trait.from_string(value) 

685 else: 

686 value = DeferredConfigString(value) 

687 

688 *path, key = lhs.split(".") 

689 section = self.config 

690 for part in path: 

691 section = section[part] 

692 section[key] = value 

693 return 

694 

695 def _load_flag(self, cfg): 

696 """update self.config from a flag, which can be a dict or Config""" 

697 if isinstance(cfg, (dict, Config)): 

698 # don't clobber whole config sections, update 

699 # each section from config: 

700 for sec, c in cfg.items(): 

701 self.config[sec].update(c) 

702 else: 

703 raise TypeError("Invalid flag: %r" % cfg) 

704 

705 

706# match --Class.trait keys for argparse 

707# matches: 

708# --Class.trait 

709# --x 

710# -x 

711 

712class_trait_opt_pattern = re.compile(r"^\-?\-[A-Za-z][\w]*(\.[\w]+)*$") 

713 

714_DOT_REPLACEMENT = "__DOT__" 

715_DASH_REPLACEMENT = "__DASH__" 

716 

717 

718class _KVAction(argparse.Action): 

719 """Custom argparse action for handling --Class.trait=x 

720 

721 Always 

722 """ 

723 

724 def __call__(self, parser, namespace, values, option_string=None): 

725 if isinstance(values, str): 

726 values = [values] 

727 values = ["-" if v is _DASH_REPLACEMENT else v for v in values] 

728 items = getattr(namespace, self.dest, None) 

729 if items is None: 

730 items = DeferredConfigList() 

731 else: 

732 items = DeferredConfigList(items) 

733 items.extend(values) 

734 setattr(namespace, self.dest, items) 

735 

736 

737class _DefaultOptionDict(dict): # type:ignore[type-arg] 

738 """Like the default options dict 

739 

740 but acts as if all --Class.trait options are predefined 

741 """ 

742 

743 def _add_kv_action(self, key): 

744 self[key] = _KVAction( 

745 option_strings=[key], 

746 dest=key.lstrip("-").replace(".", _DOT_REPLACEMENT), 

747 # use metavar for display purposes 

748 metavar=key.lstrip("-"), 

749 ) 

750 

751 def __contains__(self, key): 

752 if "=" in key: 

753 return False 

754 if super().__contains__(key): 

755 return True 

756 

757 if key.startswith("-") and class_trait_opt_pattern.match(key): 

758 self._add_kv_action(key) 

759 return True 

760 return False 

761 

762 def __getitem__(self, key): 

763 if key in self: 

764 return super().__getitem__(key) 

765 else: 

766 raise KeyError(key) 

767 

768 def get(self, key, default=None): 

769 try: 

770 return self[key] 

771 except KeyError: 

772 return default 

773 

774 

775class _KVArgParser(argparse.ArgumentParser): 

776 """subclass of ArgumentParser where any --Class.trait option is implicitly defined""" 

777 

778 def parse_known_args(self, args=None, namespace=None): 

779 # must be done immediately prior to parsing because if we do it in init, 

780 # registration of explicit actions via parser.add_option will fail during setup 

781 for container in (self, self._optionals): 

782 container._option_string_actions = _DefaultOptionDict(container._option_string_actions) 

783 return super().parse_known_args(args, namespace) 

784 

785 

786# type aliases 

787Flags = t.Union[str, t.Tuple[str, ...]] 

788SubcommandsDict = t.Dict[str, t.Any] 

789 

790 

791class ArgParseConfigLoader(CommandLineConfigLoader): 

792 """A loader that uses the argparse module to load from the command line.""" 

793 

794 parser_class = ArgumentParser 

795 

796 def __init__( 

797 self, 

798 argv: t.Optional[t.List[str]] = None, 

799 aliases: t.Optional[t.Dict[Flags, str]] = None, 

800 flags: t.Optional[t.Dict[Flags, str]] = None, 

801 log: t.Any = None, 

802 classes: t.Optional[t.List[t.Type[t.Any]]] = None, 

803 subcommands: t.Optional[SubcommandsDict] = None, 

804 *parser_args: t.Any, 

805 **parser_kw: t.Any, 

806 ) -> None: 

807 """Create a config loader for use with argparse. 

808 

809 Parameters 

810 ---------- 

811 classes : optional, list 

812 The classes to scan for *container* config-traits and decide 

813 for their "multiplicity" when adding them as *argparse* arguments. 

814 argv : optional, list 

815 If given, used to read command-line arguments from, otherwise 

816 sys.argv[1:] is used. 

817 *parser_args : tuple 

818 A tuple of positional arguments that will be passed to the 

819 constructor of :class:`argparse.ArgumentParser`. 

820 **parser_kw : dict 

821 A tuple of keyword arguments that will be passed to the 

822 constructor of :class:`argparse.ArgumentParser`. 

823 aliases : dict of str to str 

824 Dict of aliases to full traitlets names for CLI parsing 

825 flags : dict of str to str 

826 Dict of flags to full traitlets names for CLI parsing 

827 log 

828 Passed to `ConfigLoader` 

829 

830 Returns 

831 ------- 

832 config : Config 

833 The resulting Config object. 

834 """ 

835 classes = classes or [] 

836 super(CommandLineConfigLoader, self).__init__(log=log) 

837 self.clear() 

838 if argv is None: 

839 argv = sys.argv[1:] 

840 self.argv = argv 

841 self.aliases = aliases or {} 

842 self.flags = flags or {} 

843 self.classes = classes 

844 self.subcommands = subcommands # only used for argcomplete currently 

845 

846 self.parser_args = parser_args 

847 self.version = parser_kw.pop("version", None) 

848 kwargs = dict(argument_default=argparse.SUPPRESS) 

849 kwargs.update(parser_kw) 

850 self.parser_kw = kwargs 

851 

852 def load_config(self, argv=None, aliases=None, flags=_deprecated, classes=None): 

853 """Parse command line arguments and return as a Config object. 

854 

855 Parameters 

856 ---------- 

857 argv : optional, list 

858 If given, a list with the structure of sys.argv[1:] to parse 

859 arguments from. If not given, the instance's self.argv attribute 

860 (given at construction time) is used. 

861 flags 

862 Deprecated in traitlets 5.0, instanciate the config loader with the flags. 

863 

864 """ 

865 

866 if flags is not _deprecated: 

867 warnings.warn( 

868 "The `flag` argument to load_config is deprecated since Traitlets " 

869 f"5.0 and will be ignored, pass flags the `{type(self)}` constructor.", 

870 DeprecationWarning, 

871 stacklevel=2, 

872 ) 

873 

874 self.clear() 

875 if argv is None: 

876 argv = self.argv 

877 if aliases is not None: 

878 self.aliases = aliases 

879 if classes is not None: 

880 self.classes = classes 

881 self._create_parser() 

882 self._argcomplete(self.classes, self.subcommands) 

883 self._parse_args(argv) 

884 self._convert_to_config() 

885 return self.config 

886 

887 def get_extra_args(self): 

888 if hasattr(self, "extra_args"): 

889 return self.extra_args 

890 else: 

891 return [] 

892 

893 def _create_parser(self): 

894 self.parser = self.parser_class( 

895 *self.parser_args, **self.parser_kw # type:ignore[arg-type] 

896 ) 

897 self._add_arguments(self.aliases, self.flags, self.classes) 

898 

899 def _add_arguments(self, aliases, flags, classes): 

900 raise NotImplementedError("subclasses must implement _add_arguments") 

901 

902 def _argcomplete( 

903 self, classes: t.List[t.Any], subcommands: t.Optional[SubcommandsDict] 

904 ) -> None: 

905 """If argcomplete is enabled, allow triggering command-line autocompletion""" 

906 pass 

907 

908 def _parse_args(self, args): 

909 """self.parser->self.parsed_data""" 

910 uargs = [cast_unicode(a) for a in args] 

911 

912 unpacked_aliases: t.Dict[str, str] = {} 

913 if self.aliases: 

914 unpacked_aliases = {} 

915 for alias, alias_target in self.aliases.items(): 

916 if alias in self.flags: 

917 continue 

918 if not isinstance(alias, tuple): 

919 alias = (alias,) 

920 for al in alias: 

921 if len(al) == 1: 

922 unpacked_aliases["-" + al] = "--" + alias_target 

923 unpacked_aliases["--" + al] = "--" + alias_target 

924 

925 def _replace(arg): 

926 if arg == "-": 

927 return _DASH_REPLACEMENT 

928 for k, v in unpacked_aliases.items(): 

929 if arg == k: 

930 return v 

931 if arg.startswith(k + "="): 

932 return v + "=" + arg[len(k) + 1 :] 

933 return arg 

934 

935 if "--" in uargs: 

936 idx = uargs.index("--") 

937 extra_args = uargs[idx + 1 :] 

938 to_parse = uargs[:idx] 

939 else: 

940 extra_args = [] 

941 to_parse = uargs 

942 to_parse = [_replace(a) for a in to_parse] 

943 

944 self.parsed_data = self.parser.parse_args(to_parse) 

945 self.extra_args = extra_args 

946 

947 def _convert_to_config(self): 

948 """self.parsed_data->self.config""" 

949 for k, v in vars(self.parsed_data).items(): 

950 *path, key = k.split(".") 

951 section = self.config 

952 for p in path: 

953 section = section[p] 

954 setattr(section, key, v) 

955 

956 

957class _FlagAction(argparse.Action): 

958 """ArgParse action to handle a flag""" 

959 

960 def __init__(self, *args, **kwargs): 

961 self.flag = kwargs.pop("flag") 

962 self.alias = kwargs.pop("alias", None) 

963 kwargs["const"] = Undefined 

964 if not self.alias: 

965 kwargs["nargs"] = 0 

966 super().__init__(*args, **kwargs) 

967 

968 def __call__(self, parser, namespace, values, option_string=None): 

969 if self.nargs == 0 or values is Undefined: 

970 if not hasattr(namespace, "_flags"): 

971 namespace._flags = [] 

972 namespace._flags.append(self.flag) 

973 else: 

974 setattr(namespace, self.alias, values) 

975 

976 

977class KVArgParseConfigLoader(ArgParseConfigLoader): 

978 """A config loader that loads aliases and flags with argparse, 

979 

980 as well as arbitrary --Class.trait value 

981 """ 

982 

983 parser_class = _KVArgParser # type:ignore[assignment] 

984 

985 def _add_arguments(self, aliases, flags, classes): 

986 alias_flags: t.Dict[str, t.Any] = {} 

987 argparse_kwds: t.Dict[str, t.Any] 

988 paa = self.parser.add_argument 

989 self.parser.set_defaults(_flags=[]) 

990 paa("extra_args", nargs="*") 

991 

992 # An index of all container traits collected:: 

993 # 

994 # { <traitname>: (<trait>, <argparse-kwds>) } 

995 # 

996 # Used to add the correct type into the `config` tree. 

997 # Used also for aliases, not to re-collect them. 

998 self.argparse_traits = argparse_traits = {} 

999 for cls in classes: 

1000 for traitname, trait in cls.class_traits(config=True).items(): 

1001 argname = f"{cls.__name__}.{traitname}" 

1002 argparse_kwds = {"type": str} 

1003 if isinstance(trait, (Container, Dict)): 

1004 multiplicity = trait.metadata.get("multiplicity", "append") 

1005 if multiplicity == "append": 

1006 argparse_kwds["action"] = multiplicity 

1007 else: 

1008 argparse_kwds["nargs"] = multiplicity 

1009 argparse_traits[argname] = (trait, argparse_kwds) 

1010 

1011 for keys, (value, fhelp) in flags.items(): 

1012 if not isinstance(keys, tuple): 

1013 keys = (keys,) 

1014 for key in keys: 

1015 if key in aliases: 

1016 alias_flags[aliases[key]] = value 

1017 continue 

1018 keys = ("-" + key, "--" + key) if len(key) == 1 else ("--" + key,) 

1019 paa(*keys, action=_FlagAction, flag=value, help=fhelp) 

1020 

1021 for keys, traitname in aliases.items(): 

1022 if not isinstance(keys, tuple): 

1023 keys = (keys,) 

1024 

1025 for key in keys: 

1026 argparse_kwds = { 

1027 "type": str, 

1028 "dest": traitname.replace(".", _DOT_REPLACEMENT), 

1029 "metavar": traitname, 

1030 } 

1031 argcompleter = None 

1032 if traitname in argparse_traits: 

1033 trait, kwds = argparse_traits[traitname] 

1034 argparse_kwds.update(kwds) 

1035 if "action" in argparse_kwds and traitname in alias_flags: 

1036 # flag sets 'action', so can't have flag & alias with custom action 

1037 # on the same name 

1038 raise ArgumentError( 

1039 f"The alias `{key}` for the 'append' sequence " 

1040 f"config-trait `{traitname}` cannot be also a flag!'" 

1041 ) 

1042 # For argcomplete, check if any either an argcompleter metadata tag or method 

1043 # is available. If so, it should be a callable which takes the command-line key 

1044 # string as an argument and other kwargs passed by argcomplete, 

1045 # and returns the a list of string completions. 

1046 argcompleter = trait.metadata.get("argcompleter") or getattr( 

1047 trait, "argcompleter", None 

1048 ) 

1049 if traitname in alias_flags: 

1050 # alias and flag. 

1051 # when called with 0 args: flag 

1052 # when called with >= 1: alias 

1053 argparse_kwds.setdefault("nargs", "?") 

1054 argparse_kwds["action"] = _FlagAction 

1055 argparse_kwds["flag"] = alias_flags[traitname] 

1056 argparse_kwds["alias"] = traitname 

1057 keys = ("-" + key, "--" + key) if len(key) == 1 else ("--" + key,) 

1058 action = paa(*keys, **argparse_kwds) 

1059 if argcompleter is not None: 

1060 # argcomplete's completers are callables returning list of completion strings 

1061 action.completer = functools.partial(argcompleter, key=key) # type: ignore 

1062 

1063 def _convert_to_config(self): 

1064 """self.parsed_data->self.config, parse unrecognized extra args via KVLoader.""" 

1065 extra_args = self.extra_args 

1066 

1067 for lhs, rhs in vars(self.parsed_data).items(): 

1068 if lhs == "extra_args": 

1069 self.extra_args = ["-" if a == _DASH_REPLACEMENT else a for a in rhs] + extra_args 

1070 continue 

1071 elif lhs == "_flags": 

1072 # _flags will be handled later 

1073 continue 

1074 

1075 lhs = lhs.replace(_DOT_REPLACEMENT, ".") 

1076 if "." not in lhs: 

1077 self._handle_unrecognized_alias(lhs) 

1078 trait = None 

1079 

1080 if isinstance(rhs, list): 

1081 rhs = DeferredConfigList(rhs) 

1082 elif isinstance(rhs, str): 

1083 rhs = DeferredConfigString(rhs) 

1084 

1085 trait = self.argparse_traits.get(lhs) 

1086 if trait: 

1087 trait = trait[0] 

1088 

1089 # eval the KV assignment 

1090 try: 

1091 self._exec_config_str(lhs, rhs, trait) 

1092 except Exception as e: 

1093 # cast deferred to nicer repr for the error 

1094 # DeferredList->list, etc 

1095 if isinstance(rhs, DeferredConfig): 

1096 rhs = rhs._super_repr() 

1097 raise ArgumentError(f"Error loading argument {lhs}={rhs}, {e}") from e 

1098 

1099 for subc in self.parsed_data._flags: 

1100 self._load_flag(subc) 

1101 

1102 def _handle_unrecognized_alias(self, arg: str) -> None: 

1103 """Handling for unrecognized alias arguments 

1104 

1105 Probably a mistyped alias. By default just log a warning, 

1106 but users can override this to raise an error instead, e.g. 

1107 self.parser.error("Unrecognized alias: '%s'" % arg) 

1108 """ 

1109 self.log.warning("Unrecognized alias: '%s', it will have no effect.", arg) 

1110 

1111 def _argcomplete( 

1112 self, classes: t.List[t.Any], subcommands: t.Optional[SubcommandsDict] 

1113 ) -> None: 

1114 """If argcomplete is enabled, allow triggering command-line autocompletion""" 

1115 try: 

1116 import argcomplete # noqa 

1117 except ImportError: 

1118 return 

1119 

1120 from . import argcomplete_config 

1121 

1122 finder = argcomplete_config.ExtendedCompletionFinder() 

1123 finder.config_classes = classes 

1124 finder.subcommands = list(subcommands or []) 

1125 # for ease of testing, pass through self._argcomplete_kwargs if set 

1126 finder(self.parser, **getattr(self, "_argcomplete_kwargs", {})) 

1127 

1128 

1129class KeyValueConfigLoader(KVArgParseConfigLoader): 

1130 """Deprecated in traitlets 5.0 

1131 

1132 Use KVArgParseConfigLoader 

1133 """ 

1134 

1135 def __init__(self, *args, **kwargs): 

1136 warnings.warn( 

1137 "KeyValueConfigLoader is deprecated since Traitlets 5.0." 

1138 " Use KVArgParseConfigLoader instead.", 

1139 DeprecationWarning, 

1140 stacklevel=2, 

1141 ) 

1142 super().__init__(*args, **kwargs) 

1143 

1144 

1145def load_pyconfig_files(config_files, path): 

1146 """Load multiple Python config files, merging each of them in turn. 

1147 

1148 Parameters 

1149 ---------- 

1150 config_files : list of str 

1151 List of config files names to load and merge into the config. 

1152 path : unicode 

1153 The full path to the location of the config files. 

1154 """ 

1155 config = Config() 

1156 for cf in config_files: 

1157 loader = PyFileConfigLoader(cf, path=path) 

1158 try: 

1159 next_config = loader.load_config() 

1160 except ConfigFileNotFound: 

1161 pass 

1162 except Exception: 

1163 raise 

1164 else: 

1165 config.merge(next_config) 

1166 return config