Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/config.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

269 statements  

1from __future__ import annotations 

2 

3import ast 

4import base64 

5import builtins # Explicitly use builtins.set as 'set' will be shadowed by a function 

6import json 

7import os 

8import pathlib 

9import site 

10import sys 

11import threading 

12import warnings 

13from collections.abc import Iterator, Mapping, Sequence 

14from typing import Any, Literal, overload 

15 

16import yaml 

17 

18from dask.typing import no_default 

19 

20 

21def _get_paths(): 

22 """Get locations to search for YAML configuration files. 

23 

24 This logic exists as a separate function for testing purposes. 

25 """ 

26 

27 paths = [ 

28 os.getenv("DASK_ROOT_CONFIG", "/etc/dask"), 

29 os.path.join(sys.prefix, "etc", "dask"), 

30 *[os.path.join(prefix, "etc", "dask") for prefix in site.PREFIXES], 

31 os.path.join(os.path.expanduser("~"), ".config", "dask"), 

32 ] 

33 if "DASK_CONFIG" in os.environ: 

34 paths.append(os.environ["DASK_CONFIG"]) 

35 

36 # Remove duplicate paths while preserving ordering 

37 paths = list(reversed(list(dict.fromkeys(reversed(paths))))) 

38 

39 return paths 

40 

41 

42paths = _get_paths() 

43 

44if "DASK_CONFIG" in os.environ: 

45 PATH = os.environ["DASK_CONFIG"] 

46else: 

47 PATH = os.path.join(os.path.expanduser("~"), ".config", "dask") 

48 

49 

50config: dict = {} 

51global_config = config # alias 

52 

53 

54config_lock = threading.Lock() 

55 

56 

57defaults: list[Mapping] = [] 

58 

59 

60def canonical_name(k: str, config: dict) -> str: 

61 """Return the canonical name for a key. 

62 

63 Handles user choice of '-' or '_' conventions by standardizing on whichever 

64 version was set first. If a key already exists in either hyphen or 

65 underscore form, the existing version is the canonical name. If neither 

66 version exists the original key is used as is. 

67 """ 

68 try: 

69 if k in config: 

70 return k 

71 except TypeError: 

72 # config is not a mapping, return the same name as provided 

73 return k 

74 

75 altk = k.replace("_", "-") if "_" in k else k.replace("-", "_") 

76 

77 if altk in config: 

78 return altk 

79 

80 return k 

81 

82 

83def update( 

84 old: dict, 

85 new: Mapping, 

86 priority: Literal["old", "new", "new-defaults"] = "new", 

87 defaults: Mapping | None = None, 

88) -> dict: 

89 """Update a nested dictionary with values from another 

90 

91 This is like dict.update except that it smoothly merges nested values 

92 

93 This operates in-place and modifies old 

94 

95 Parameters 

96 ---------- 

97 priority: string {'old', 'new', 'new-defaults'} 

98 If new (default) then the new dictionary has preference. 

99 Otherwise the old dictionary does. 

100 If 'new-defaults', a mapping should be given of the current defaults. 

101 Only if a value in ``old`` matches the current default, it will be 

102 updated with ``new``. 

103 

104 Examples 

105 -------- 

106 >>> a = {'x': 1, 'y': {'a': 2}} 

107 >>> b = {'x': 2, 'y': {'b': 3}} 

108 >>> update(a, b) # doctest: +SKIP 

109 {'x': 2, 'y': {'a': 2, 'b': 3}} 

110 

111 >>> a = {'x': 1, 'y': {'a': 2}} 

112 >>> b = {'x': 2, 'y': {'b': 3}} 

113 >>> update(a, b, priority='old') # doctest: +SKIP 

114 {'x': 1, 'y': {'a': 2, 'b': 3}} 

115 

116 >>> d = {'x': 0, 'y': {'a': 2}} 

117 >>> a = {'x': 1, 'y': {'a': 2}} 

118 >>> b = {'x': 2, 'y': {'a': 3, 'b': 3}} 

119 >>> update(a, b, priority='new-defaults', defaults=d) # doctest: +SKIP 

120 {'x': 1, 'y': {'a': 3, 'b': 3}} 

121 

122 See Also 

123 -------- 

124 dask.config.merge 

125 """ 

126 for k, v in new.items(): 

127 k = canonical_name(k, old) 

128 

129 if isinstance(v, Mapping): 

130 if k not in old or old[k] is None or not isinstance(old[k], dict): 

131 old[k] = {} 

132 update( 

133 old[k], 

134 v, 

135 priority=priority, 

136 defaults=defaults.get(k) if defaults else None, 

137 ) 

138 elif ( 

139 priority == "new" 

140 or k not in old 

141 or ( 

142 priority == "new-defaults" 

143 and defaults 

144 and k in defaults 

145 and defaults[k] == old[k] 

146 ) 

147 ): 

148 old[k] = v 

149 

150 return old 

151 

152 

153def merge(*dicts: Mapping) -> dict: 

154 """Update a sequence of nested dictionaries 

155 

156 This prefers the values in the latter dictionaries to those in the former 

157 

158 Examples 

159 -------- 

160 >>> a = {'x': 1, 'y': {'a': 2}} 

161 >>> b = {'y': {'b': 3}} 

162 >>> merge(a, b) # doctest: +SKIP 

163 {'x': 1, 'y': {'a': 2, 'b': 3}} 

164 

165 See Also 

166 -------- 

167 dask.config.update 

168 """ 

169 result: dict = {} 

170 for d in dicts: 

171 update(result, d) 

172 return result 

173 

174 

175def _load_config_file(path: str) -> dict | None: 

176 """A helper for loading a config file from a path, and erroring 

177 appropriately if the file is malformed.""" 

178 try: 

179 with open(path) as f: 

180 config = yaml.safe_load(f.read()) 

181 except OSError: 

182 # Ignore permission errors 

183 return None 

184 except Exception as exc: 

185 raise ValueError( 

186 f"A dask config file at {path!r} is malformed, original error " 

187 f"message:\n\n{exc}" 

188 ) from None 

189 if config is not None and not isinstance(config, dict): 

190 raise ValueError( 

191 f"A dask config file at {path!r} is malformed - config files must have " 

192 f"a dict as the top level object, got a {type(config).__name__} instead" 

193 ) 

194 return config 

195 

196 

197@overload 

198def collect_yaml( 

199 paths: Sequence[str], *, return_paths: Literal[False] = False 

200) -> Iterator[dict]: ... 

201 

202 

203@overload 

204def collect_yaml( 

205 paths: Sequence[str], *, return_paths: Literal[True] 

206) -> Iterator[tuple[pathlib.Path, dict]]: ... 

207 

208 

209def collect_yaml( 

210 paths: Sequence[str], *, return_paths: bool = False 

211) -> Iterator[dict | tuple[pathlib.Path, dict]]: 

212 """Collect configuration from yaml files 

213 

214 This searches through a list of paths, expands to find all yaml or json 

215 files, and then parses each file. 

216 """ 

217 # Find all paths 

218 file_paths = [] 

219 for path in paths: 

220 if os.path.exists(path): 

221 if os.path.isdir(path): 

222 try: 

223 file_paths.extend( 

224 sorted( 

225 os.path.join(path, p) 

226 for p in os.listdir(path) 

227 if os.path.splitext(p)[1].lower() 

228 in (".json", ".yaml", ".yml") 

229 ) 

230 ) 

231 except OSError: 

232 # Ignore permission errors 

233 pass 

234 else: 

235 file_paths.append(path) 

236 

237 # Parse yaml files 

238 for path in file_paths: 

239 config = _load_config_file(path) 

240 if config is not None: 

241 if return_paths: 

242 yield pathlib.Path(path), config 

243 else: 

244 yield config 

245 

246 

247def collect_env(env: Mapping[str, str] | None = None) -> dict: 

248 """Collect config from environment variables 

249 

250 This grabs environment variables of the form "DASK_FOO__BAR_BAZ=123" and 

251 turns these into config variables of the form ``{"foo": {"bar-baz": 123}}`` 

252 It transforms the key and value in the following way: 

253 

254 - Lower-cases the key text 

255 - Treats ``__`` (double-underscore) as nested access 

256 - Calls ``ast.literal_eval`` on the value 

257 

258 Any serialized config passed via ``DASK_INTERNAL_INHERIT_CONFIG`` is also set here. 

259 

260 """ 

261 

262 if env is None: 

263 env = os.environ 

264 

265 if "DASK_INTERNAL_INHERIT_CONFIG" in env: 

266 d = deserialize(env["DASK_INTERNAL_INHERIT_CONFIG"]) 

267 else: 

268 d = {} 

269 

270 for name, value in env.items(): 

271 if name.startswith("DASK_"): 

272 varname = name[5:].lower().replace("__", ".") 

273 d[varname] = interpret_value(value) 

274 

275 result: dict = {} 

276 set(d, config=result) 

277 return result 

278 

279 

280def interpret_value(value: str) -> Any: 

281 try: 

282 return ast.literal_eval(value) 

283 except (SyntaxError, ValueError): 

284 pass 

285 

286 # Avoid confusion of YAML vs. Python syntax 

287 hardcoded_map = {"none": None, "null": None, "false": False, "true": True} 

288 return hardcoded_map.get(value.lower(), value) 

289 

290 

291def paths_containing_key( 

292 key: str, 

293 paths: Sequence[str] = paths, 

294) -> Iterator[pathlib.Path]: 

295 """ 

296 Generator yielding paths which contain the given key. 

297 """ 

298 # Check existing config files for any which contains this key. 

299 for path_ in paths: 

300 for path, config in collect_yaml([path_], return_paths=True): 

301 try: 

302 get(key, config=config) 

303 except KeyError: 

304 continue 

305 else: 

306 yield pathlib.Path(path) 

307 

308 

309def ensure_file( 

310 source: str, destination: str | None = None, comment: bool = True 

311) -> None: 

312 """ 

313 Copy file to default location if it does not already exist 

314 

315 This tries to move a default configuration file to a default location if 

316 if does not already exist. It also comments out that file by default. 

317 

318 This is to be used by downstream modules (like dask.distributed) that may 

319 have default configuration files that they wish to include in the default 

320 configuration path. 

321 

322 Parameters 

323 ---------- 

324 source : string, filename 

325 Source configuration file, typically within a source directory. 

326 destination : string, directory 

327 Destination directory. Configurable by ``DASK_CONFIG`` environment 

328 variable, falling back to ~/.config/dask. 

329 comment : bool, True by default 

330 Whether or not to comment out the config file when copying. 

331 """ 

332 if destination is None: 

333 destination = PATH 

334 

335 # destination is a file and already exists, never overwrite 

336 if os.path.isfile(destination): 

337 return 

338 

339 # If destination is not an existing file, interpret as a directory, 

340 # use the source basename as the filename 

341 directory = destination 

342 destination = os.path.join(directory, os.path.basename(source)) 

343 

344 try: 

345 if not os.path.exists(destination): 

346 os.makedirs(directory, exist_ok=True) 

347 

348 # Atomically create destination. Parallel testing discovered 

349 # a race condition where a process can be busy creating the 

350 # destination while another process reads an empty config file. 

351 tmp = "%s.tmp.%d" % (destination, os.getpid()) 

352 with open(source) as f: 

353 lines = list(f) 

354 

355 if comment: 

356 lines = [ 

357 "# " + line if line.strip() and not line.startswith("#") else line 

358 for line in lines 

359 ] 

360 

361 with open(tmp, "w") as f: 

362 f.write("".join(lines)) 

363 

364 try: 

365 os.rename(tmp, destination) 

366 except OSError: 

367 os.remove(tmp) 

368 except OSError: 

369 pass 

370 

371 

372class set: 

373 """Temporarily set configuration values within a context manager 

374 

375 Parameters 

376 ---------- 

377 arg : mapping or None, optional 

378 A mapping of configuration key-value pairs to set. 

379 **kwargs : 

380 Additional key-value pairs to set. If ``arg`` is provided, values set 

381 in ``arg`` will be applied before those in ``kwargs``. 

382 Double-underscores (``__``) in keyword arguments will be replaced with 

383 ``.``, allowing nested values to be easily set. 

384 

385 Examples 

386 -------- 

387 >>> import dask 

388 

389 Set ``'foo.bar'`` in a context, by providing a mapping. 

390 

391 >>> with dask.config.set({'foo.bar': 123}): 

392 ... pass 

393 

394 Set ``'foo.bar'`` in a context, by providing a keyword argument. 

395 

396 >>> with dask.config.set(foo__bar=123): 

397 ... pass 

398 

399 Set ``'foo.bar'`` globally. 

400 

401 >>> dask.config.set(foo__bar=123) # doctest: +SKIP 

402 

403 See Also 

404 -------- 

405 dask.config.get 

406 """ 

407 

408 config: dict 

409 # [(op, path, value), ...] 

410 _record: list[tuple[Literal["insert", "replace"], tuple[str, ...], Any]] 

411 

412 def __init__( 

413 self, 

414 arg: Mapping | None = None, 

415 config: dict | None = None, 

416 lock: threading.Lock = config_lock, 

417 **kwargs, 

418 ): 

419 if config is None: # Keep Sphinx autofunction documentation clean 

420 config = global_config 

421 

422 with lock: 

423 self.config = config 

424 self._record = [] 

425 

426 if arg is not None: 

427 for key, value in arg.items(): 

428 key = check_deprecations(key) 

429 self._assign(key.split("."), value, config) 

430 if kwargs: 

431 for key, value in kwargs.items(): 

432 key = key.replace("__", ".") 

433 key = check_deprecations(key) 

434 self._assign(key.split("."), value, config) 

435 

436 def __enter__(self): 

437 return self.config 

438 

439 def __exit__(self, type, value, traceback): 

440 for op, path, value in reversed(self._record): 

441 d = self.config 

442 if op == "replace": 

443 for key in path[:-1]: 

444 d = d.setdefault(key, {}) 

445 d[path[-1]] = value 

446 else: # insert 

447 for key in path[:-1]: 

448 try: 

449 d = d[key] 

450 except KeyError: 

451 break 

452 else: 

453 d.pop(path[-1], None) 

454 

455 def _assign( 

456 self, 

457 keys: Sequence[str], 

458 value: Any, 

459 d: dict, 

460 path: tuple[str, ...] = (), 

461 record: bool = True, 

462 ) -> None: 

463 """Assign value into a nested configuration dictionary 

464 

465 Parameters 

466 ---------- 

467 keys : Sequence[str] 

468 The nested path of keys to assign the value. 

469 value : object 

470 d : dict 

471 The part of the nested dictionary into which we want to assign the 

472 value 

473 path : tuple[str], optional 

474 The path history up to this point. 

475 record : bool, optional 

476 Whether this operation needs to be recorded to allow for rollback. 

477 """ 

478 key = canonical_name(keys[0], d) 

479 

480 path = path + (key,) 

481 

482 if len(keys) == 1: 

483 if record: 

484 if key in d: 

485 self._record.append(("replace", path, d[key])) 

486 else: 

487 self._record.append(("insert", path, None)) 

488 d[key] = value 

489 else: 

490 if key not in d: 

491 if record: 

492 self._record.append(("insert", path, None)) 

493 d[key] = {} 

494 # No need to record subsequent operations after an insert 

495 record = False 

496 self._assign(keys[1:], value, d[key], path, record=record) 

497 

498 

499def collect(paths: list[str] = paths, env: Mapping[str, str] | None = None) -> dict: 

500 """ 

501 Collect configuration from paths and environment variables 

502 

503 Parameters 

504 ---------- 

505 paths : list[str] 

506 A list of paths to search for yaml config files 

507 

508 env : Mapping[str, str] 

509 The system environment variables 

510 

511 Returns 

512 ------- 

513 config: dict 

514 

515 See Also 

516 -------- 

517 dask.config.refresh: collect configuration and update into primary config 

518 """ 

519 if env is None: 

520 env = os.environ 

521 

522 configs = [*collect_yaml(paths=paths), collect_env(env=env)] 

523 return merge(*configs) 

524 

525 

526def refresh( 

527 config: dict | None = None, defaults: list[Mapping] = defaults, **kwargs 

528) -> None: 

529 """ 

530 Update configuration by re-reading yaml files and env variables 

531 

532 This mutates the global dask.config.config, or the config parameter if 

533 passed in. 

534 

535 This goes through the following stages: 

536 

537 1. Clearing out all old configuration 

538 2. Updating from the stored defaults from downstream libraries 

539 (see update_defaults) 

540 3. Updating from yaml files and environment variables 

541 4. Automatically renaming deprecated keys (with a warning) 

542 

543 Note that some functionality only checks configuration once at startup and 

544 may not change behavior, even if configuration changes. It is recommended 

545 to restart your python process if convenient to ensure that new 

546 configuration changes take place. 

547 

548 See Also 

549 -------- 

550 dask.config.collect: for parameters 

551 dask.config.update_defaults 

552 """ 

553 if config is None: # Keep Sphinx autofunction documentation clean 

554 config = global_config 

555 

556 config.clear() 

557 

558 for d in defaults: 

559 update(config, d, priority="old") 

560 

561 update(config, collect(**kwargs)) 

562 rename(deprecations, config) 

563 

564 

565def get( 

566 key: str, 

567 default: Any = no_default, 

568 config: dict | None = None, 

569 override_with: Any = None, 

570) -> Any: 

571 """ 

572 Get elements from global config 

573 

574 If ``override_with`` is not None this value will be passed straight back. 

575 Useful for getting kwarg defaults from Dask config. 

576 

577 Use '.' for nested access 

578 

579 Examples 

580 -------- 

581 >>> from dask import config 

582 >>> config.get('foo') # doctest: +SKIP 

583 {'x': 1, 'y': 2} 

584 

585 >>> config.get('foo.x') # doctest: +SKIP 

586 1 

587 

588 >>> config.get('foo.x.y', default=123) # doctest: +SKIP 

589 123 

590 

591 >>> config.get('foo.y', override_with=None) # doctest: +SKIP 

592 2 

593 

594 >>> config.get('foo.y', override_with=3) # doctest: +SKIP 

595 3 

596 

597 See Also 

598 -------- 

599 dask.config.set 

600 """ 

601 if override_with is not None: 

602 return override_with 

603 

604 if config is None: # Keep Sphinx autofunction documentation clean 

605 config = global_config 

606 

607 keys = key.split(".") 

608 result = config 

609 for k in keys: 

610 k = canonical_name(k, result) 

611 try: 

612 result = result[k] 

613 except (TypeError, IndexError, KeyError): 

614 if default is no_default: 

615 raise 

616 return default 

617 

618 return result 

619 

620 

621def pop(key: str, default: Any = no_default, config: dict = config) -> Any: 

622 """Like ``get``, but remove the element if found 

623 

624 See Also 

625 -------- 

626 dask.config.get 

627 dask.config.set 

628 """ 

629 keys = key.split(".") 

630 result = config 

631 for i, k in enumerate(keys): 

632 k = canonical_name(k, result) 

633 try: 

634 if i == len(keys) - 1: 

635 return result.pop(k) 

636 else: 

637 result = result[k] 

638 except (TypeError, IndexError, KeyError): 

639 if default is no_default: 

640 raise 

641 return default 

642 

643 

644def update_defaults( 

645 new: Mapping, config: dict = config, defaults: list[Mapping] = defaults 

646) -> None: 

647 """Add a new set of defaults to the configuration 

648 

649 It does two things: 

650 

651 1. Add the defaults to a global collection to be used by refresh later 

652 2. Updates the global config with the new configuration. 

653 Old values are prioritized over new ones, unless the current value 

654 is the old default, in which case it's updated to the new default. 

655 """ 

656 current_defaults = merge(*defaults) 

657 defaults.append(new) 

658 update(config, new, priority="new-defaults", defaults=current_defaults) 

659 

660 

661def expand_environment_variables(config: Any) -> Any: 

662 """Expand environment variables in a nested config dictionary 

663 

664 This function will recursively search through any nested dictionaries 

665 and/or lists. 

666 

667 Parameters 

668 ---------- 

669 config : dict, iterable, or str 

670 Input object to search for environment variables 

671 

672 Returns 

673 ------- 

674 config : same type as input 

675 

676 Examples 

677 -------- 

678 >>> expand_environment_variables({'x': [1, 2, '$USER']}) # doctest: +SKIP 

679 {'x': [1, 2, 'my-username']} 

680 """ 

681 if isinstance(config, Mapping): 

682 return {k: expand_environment_variables(v) for k, v in config.items()} 

683 elif isinstance(config, str): 

684 return os.path.expandvars(config) 

685 elif isinstance(config, (list, tuple, builtins.set)): 

686 return type(config)(expand_environment_variables(v) for v in config) 

687 else: 

688 return config 

689 

690 

691#: Mapping of {deprecated key: new key} for renamed keys, or {deprecated key: None} for 

692#: removed keys. All deprecated keys must use '-' instead of '_'. 

693#: This is used in three places: 

694#: 1. In refresh(), which calls rename() to rename and warn upon loading 

695#: from ~/.config/dask.yaml, DASK_ env variables, etc. 

696#: 2. in distributed/config.py and equivalent modules, where we perform additional 

697#: distributed-specific renames for the yaml/env config and enrich this dict 

698#: 3. from individual calls to dask.config.set(), which internally invoke 

699# check_deprecations() 

700deprecations: dict[str, str | None] = { 

701 "fuse-ave-width": "optimization.fuse.ave-width", 

702 "fuse-max-height": "optimization.fuse.max-height", 

703 "fuse-max-width": "optimization.fuse.max-width", 

704 "fuse-rename-keys": "optimization.fuse.rename-keys", 

705 "fuse-max-depth-new-edges": "optimization.fuse.max-depth-new-edges", 

706 # See https://github.com/dask/distributed/pull/4916 

707 "ucx.cuda-copy": "distributed.ucx.cuda_copy", 

708 "ucx.tcp": "distributed.ucx.tcp", 

709 "ucx.nvlink": "distributed.ucx.nvlink", 

710 "ucx.infiniband": "distributed.ucx.infiniband", 

711 "ucx.rdmacm": "distributed.ucx.rdmacm", 

712 "ucx.net-devices": "distributed.ucx.net-devices", 

713 "ucx.reuse-endpoints": "distributed.ucx.reuse-endpoints", 

714 "rmm.pool-size": "distributed.rmm.pool-size", 

715 "shuffle": "dataframe.shuffle.algorithm", 

716 "array.rechunk-threshold": "array.rechunk.threshold", 

717 "dataframe.shuffle.algorithm": "dataframe.shuffle.method", 

718 "dataframe.shuffle-compression": "dataframe.shuffle.compression", 

719 "admin.traceback.shorten.what": "admin.traceback.shorten", # changed in 2023.9.0 

720 "array.shuffle.chunksize-tolerance": "array.chunk-size-tolerance", 

721} 

722 

723 

724def rename( 

725 deprecations: Mapping[str, str | None] = deprecations, config: dict = config 

726) -> None: 

727 """Rename old keys to new keys 

728 

729 This helps migrate older configuration versions over time 

730 

731 See Also 

732 -------- 

733 check_deprecations 

734 """ 

735 for key in deprecations: 

736 try: 

737 value = pop(key, config=config) 

738 except (TypeError, IndexError, KeyError): 

739 continue 

740 key = canonical_name(key, config=config) 

741 new = check_deprecations(key, deprecations) 

742 if new: 

743 set({new: value}, config=config) 

744 

745 

746def check_deprecations( 

747 key: str, deprecations: Mapping[str, str | None] = deprecations 

748) -> str: 

749 """Check if the provided value has been renamed or removed 

750 

751 Parameters 

752 ---------- 

753 key : str 

754 The configuration key to check 

755 deprecations : Dict[str, str] 

756 The mapping of aliases 

757 

758 Examples 

759 -------- 

760 >>> deprecations = {"old_key": "new_key", "invalid": None} 

761 >>> check_deprecations("old_key", deprecations=deprecations) # doctest: +SKIP 

762 FutureWarning: Dask configuration key 'old_key' has been deprecated; please use "new_key" instead 

763 

764 >>> check_deprecations("invalid", deprecations=deprecations) 

765 Traceback (most recent call last): 

766 ... 

767 ValueError: Dask configuration key 'invalid' has been removed 

768 

769 >>> check_deprecations("another_key", deprecations=deprecations) 

770 'another_key' 

771 

772 Returns 

773 ------- 

774 new: str 

775 The proper key, whether the original (if no deprecation) or the aliased 

776 value 

777 

778 See Also 

779 -------- 

780 rename 

781 """ 

782 old = key.replace("_", "-") 

783 if old in deprecations: 

784 new = deprecations[old] 

785 if new: 

786 warnings.warn( 

787 f"Dask configuration key {key!r} has been deprecated; " 

788 f"please use {new!r} instead", 

789 FutureWarning, 

790 ) 

791 return new 

792 else: 

793 raise ValueError(f"Dask configuration key {key!r} has been removed") 

794 else: 

795 return key 

796 

797 

798def serialize(data: Any) -> str: 

799 """Serialize config data into a string. 

800 

801 Typically used to pass config via the ``DASK_INTERNAL_INHERIT_CONFIG`` environment variable. 

802 

803 Parameters 

804 ---------- 

805 data: json-serializable object 

806 The data to serialize 

807 

808 Returns 

809 ------- 

810 serialized_data: str 

811 The serialized data as a string 

812 

813 """ 

814 return base64.urlsafe_b64encode(json.dumps(data).encode()).decode() 

815 

816 

817def deserialize(data: str) -> Any: 

818 """De-serialize config data into the original object. 

819 

820 Typically when receiving config via the ``DASK_INTERNAL_INHERIT_CONFIG`` environment variable. 

821 

822 Parameters 

823 ---------- 

824 data: str 

825 String serialized by :func:`dask.config.serialize` 

826 

827 Returns 

828 ------- 

829 deserialized_data: obj 

830 The de-serialized data 

831 

832 """ 

833 return json.loads(base64.urlsafe_b64decode(data.encode()).decode()) 

834 

835 

836def _initialize() -> None: 

837 fn = os.path.join(os.path.dirname(__file__), "dask.yaml") 

838 

839 with open(fn) as f: 

840 _defaults = yaml.safe_load(f) 

841 

842 update_defaults(_defaults) 

843 

844 

845refresh() 

846_initialize()