Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/config.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

269 statements  

1from __future__ import annotations 

2 

3import ast 

4import base64 

5import builtins # Explicitly use builtins.set as 'set' will be shadowed by a function 

6import json 

7import os 

8import pathlib 

9import site 

10import sys 

11import threading 

12import warnings 

13from collections.abc import Iterator, Mapping, Sequence 

14from typing import Any, Literal, overload 

15 

16import yaml 

17 

18from dask.typing import no_default 

19 

20 

21def _get_paths(): 

22 """Get locations to search for YAML configuration files. 

23 

24 This logic exists as a separate function for testing purposes. 

25 """ 

26 

27 paths = [ 

28 os.getenv("DASK_ROOT_CONFIG", "/etc/dask"), 

29 os.path.join(sys.prefix, "etc", "dask"), 

30 *[os.path.join(prefix, "etc", "dask") for prefix in site.PREFIXES], 

31 os.path.join(os.path.expanduser("~"), ".config", "dask"), 

32 ] 

33 if "DASK_CONFIG" in os.environ: 

34 paths.append(os.environ["DASK_CONFIG"]) 

35 

36 # Remove duplicate paths while preserving ordering 

37 paths = list(reversed(list(dict.fromkeys(reversed(paths))))) 

38 

39 return paths 

40 

41 

42paths = _get_paths() 

43 

44if "DASK_CONFIG" in os.environ: 

45 PATH = os.environ["DASK_CONFIG"] 

46else: 

47 PATH = os.path.join(os.path.expanduser("~"), ".config", "dask") 

48 

49 

50config: dict = {} 

51global_config = config # alias 

52 

53 

54config_lock = threading.Lock() 

55 

56 

57defaults: list[Mapping] = [] 

58 

59 

60def canonical_name(k: str, config: dict) -> str: 

61 """Return the canonical name for a key. 

62 

63 Handles user choice of '-' or '_' conventions by standardizing on whichever 

64 version was set first. If a key already exists in either hyphen or 

65 underscore form, the existing version is the canonical name. If neither 

66 version exists the original key is used as is. 

67 """ 

68 try: 

69 if k in config: 

70 return k 

71 except TypeError: 

72 # config is not a mapping, return the same name as provided 

73 return k 

74 

75 altk = k.replace("_", "-") if "_" in k else k.replace("-", "_") 

76 

77 if altk in config: 

78 return altk 

79 

80 return k 

81 

82 

83def update( 

84 old: dict, 

85 new: Mapping, 

86 priority: Literal["old", "new", "new-defaults"] = "new", 

87 defaults: Mapping | None = None, 

88) -> dict: 

89 """Update a nested dictionary with values from another 

90 

91 This is like dict.update except that it smoothly merges nested values 

92 

93 This operates in-place and modifies old 

94 

95 Parameters 

96 ---------- 

97 priority: string {'old', 'new', 'new-defaults'} 

98 If new (default) then the new dictionary has preference. 

99 Otherwise the old dictionary does. 

100 If 'new-defaults', a mapping should be given of the current defaults. 

101 Only if a value in ``old`` matches the current default, it will be 

102 updated with ``new``. 

103 

104 Examples 

105 -------- 

106 >>> a = {'x': 1, 'y': {'a': 2}} 

107 >>> b = {'x': 2, 'y': {'b': 3}} 

108 >>> update(a, b) # doctest: +SKIP 

109 {'x': 2, 'y': {'a': 2, 'b': 3}} 

110 

111 >>> a = {'x': 1, 'y': {'a': 2}} 

112 >>> b = {'x': 2, 'y': {'b': 3}} 

113 >>> update(a, b, priority='old') # doctest: +SKIP 

114 {'x': 1, 'y': {'a': 2, 'b': 3}} 

115 

116 >>> d = {'x': 0, 'y': {'a': 2}} 

117 >>> a = {'x': 1, 'y': {'a': 2}} 

118 >>> b = {'x': 2, 'y': {'a': 3, 'b': 3}} 

119 >>> update(a, b, priority='new-defaults', defaults=d) # doctest: +SKIP 

120 {'x': 1, 'y': {'a': 3, 'b': 3}} 

121 

122 See Also 

123 -------- 

124 dask.config.merge 

125 """ 

126 for k, v in new.items(): 

127 k = canonical_name(k, old) 

128 

129 if isinstance(v, Mapping): 

130 if k not in old or old[k] is None or not isinstance(old[k], dict): 

131 old[k] = {} 

132 update( 

133 old[k], 

134 v, 

135 priority=priority, 

136 defaults=defaults.get(k) if defaults else None, 

137 ) 

138 else: 

139 if ( 

140 priority == "new" 

141 or k not in old 

142 or ( 

143 priority == "new-defaults" 

144 and defaults 

145 and k in defaults 

146 and defaults[k] == old[k] 

147 ) 

148 ): 

149 old[k] = v 

150 

151 return old 

152 

153 

154def merge(*dicts: Mapping) -> dict: 

155 """Update a sequence of nested dictionaries 

156 

157 This prefers the values in the latter dictionaries to those in the former 

158 

159 Examples 

160 -------- 

161 >>> a = {'x': 1, 'y': {'a': 2}} 

162 >>> b = {'y': {'b': 3}} 

163 >>> merge(a, b) # doctest: +SKIP 

164 {'x': 1, 'y': {'a': 2, 'b': 3}} 

165 

166 See Also 

167 -------- 

168 dask.config.update 

169 """ 

170 result: dict = {} 

171 for d in dicts: 

172 update(result, d) 

173 return result 

174 

175 

176def _load_config_file(path: str) -> dict | None: 

177 """A helper for loading a config file from a path, and erroring 

178 appropriately if the file is malformed.""" 

179 try: 

180 with open(path) as f: 

181 config = yaml.safe_load(f.read()) 

182 except OSError: 

183 # Ignore permission errors 

184 return None 

185 except Exception as exc: 

186 raise ValueError( 

187 f"A dask config file at {path!r} is malformed, original error " 

188 f"message:\n\n{exc}" 

189 ) from None 

190 if config is not None and not isinstance(config, dict): 

191 raise ValueError( 

192 f"A dask config file at {path!r} is malformed - config files must have " 

193 f"a dict as the top level object, got a {type(config).__name__} instead" 

194 ) 

195 return config 

196 

197 

198@overload 

199def collect_yaml( 

200 paths: Sequence[str], *, return_paths: Literal[False] = False 

201) -> Iterator[dict]: ... 

202 

203 

204@overload 

205def collect_yaml( 

206 paths: Sequence[str], *, return_paths: Literal[True] 

207) -> Iterator[tuple[pathlib.Path, dict]]: ... 

208 

209 

210def collect_yaml( 

211 paths: Sequence[str], *, return_paths: bool = False 

212) -> Iterator[dict | tuple[pathlib.Path, dict]]: 

213 """Collect configuration from yaml files 

214 

215 This searches through a list of paths, expands to find all yaml or json 

216 files, and then parses each file. 

217 """ 

218 # Find all paths 

219 file_paths = [] 

220 for path in paths: 

221 if os.path.exists(path): 

222 if os.path.isdir(path): 

223 try: 

224 file_paths.extend( 

225 sorted( 

226 os.path.join(path, p) 

227 for p in os.listdir(path) 

228 if os.path.splitext(p)[1].lower() 

229 in (".json", ".yaml", ".yml") 

230 ) 

231 ) 

232 except OSError: 

233 # Ignore permission errors 

234 pass 

235 else: 

236 file_paths.append(path) 

237 

238 # Parse yaml files 

239 for path in file_paths: 

240 config = _load_config_file(path) 

241 if config is not None: 

242 if return_paths: 

243 yield pathlib.Path(path), config 

244 else: 

245 yield config 

246 

247 

248def collect_env(env: Mapping[str, str] | None = None) -> dict: 

249 """Collect config from environment variables 

250 

251 This grabs environment variables of the form "DASK_FOO__BAR_BAZ=123" and 

252 turns these into config variables of the form ``{"foo": {"bar-baz": 123}}`` 

253 It transforms the key and value in the following way: 

254 

255 - Lower-cases the key text 

256 - Treats ``__`` (double-underscore) as nested access 

257 - Calls ``ast.literal_eval`` on the value 

258 

259 Any serialized config passed via ``DASK_INTERNAL_INHERIT_CONFIG`` is also set here. 

260 

261 """ 

262 

263 if env is None: 

264 env = os.environ 

265 

266 if "DASK_INTERNAL_INHERIT_CONFIG" in env: 

267 d = deserialize(env["DASK_INTERNAL_INHERIT_CONFIG"]) 

268 else: 

269 d = {} 

270 

271 for name, value in env.items(): 

272 if name.startswith("DASK_"): 

273 varname = name[5:].lower().replace("__", ".") 

274 d[varname] = interpret_value(value) 

275 

276 result: dict = {} 

277 set(d, config=result) 

278 return result 

279 

280 

281def interpret_value(value: str) -> Any: 

282 try: 

283 return ast.literal_eval(value) 

284 except (SyntaxError, ValueError): 

285 pass 

286 

287 # Avoid confusion of YAML vs. Python syntax 

288 hardcoded_map = {"none": None, "null": None, "false": False, "true": True} 

289 return hardcoded_map.get(value.lower(), value) 

290 

291 

292def paths_containing_key( 

293 key: str, 

294 paths: Sequence[str] = paths, 

295) -> Iterator[pathlib.Path]: 

296 """ 

297 Generator yielding paths which contain the given key. 

298 """ 

299 # Check existing config files for any which contains this key. 

300 for path_ in paths: 

301 for path, config in collect_yaml([path_], return_paths=True): 

302 try: 

303 get(key, config=config) 

304 except KeyError: 

305 continue 

306 else: 

307 yield pathlib.Path(path) 

308 

309 

310def ensure_file( 

311 source: str, destination: str | None = None, comment: bool = True 

312) -> None: 

313 """ 

314 Copy file to default location if it does not already exist 

315 

316 This tries to move a default configuration file to a default location if 

317 if does not already exist. It also comments out that file by default. 

318 

319 This is to be used by downstream modules (like dask.distributed) that may 

320 have default configuration files that they wish to include in the default 

321 configuration path. 

322 

323 Parameters 

324 ---------- 

325 source : string, filename 

326 Source configuration file, typically within a source directory. 

327 destination : string, directory 

328 Destination directory. Configurable by ``DASK_CONFIG`` environment 

329 variable, falling back to ~/.config/dask. 

330 comment : bool, True by default 

331 Whether or not to comment out the config file when copying. 

332 """ 

333 if destination is None: 

334 destination = PATH 

335 

336 # destination is a file and already exists, never overwrite 

337 if os.path.isfile(destination): 

338 return 

339 

340 # If destination is not an existing file, interpret as a directory, 

341 # use the source basename as the filename 

342 directory = destination 

343 destination = os.path.join(directory, os.path.basename(source)) 

344 

345 try: 

346 if not os.path.exists(destination): 

347 os.makedirs(directory, exist_ok=True) 

348 

349 # Atomically create destination. Parallel testing discovered 

350 # a race condition where a process can be busy creating the 

351 # destination while another process reads an empty config file. 

352 tmp = "%s.tmp.%d" % (destination, os.getpid()) 

353 with open(source) as f: 

354 lines = list(f) 

355 

356 if comment: 

357 lines = [ 

358 "# " + line if line.strip() and not line.startswith("#") else line 

359 for line in lines 

360 ] 

361 

362 with open(tmp, "w") as f: 

363 f.write("".join(lines)) 

364 

365 try: 

366 os.rename(tmp, destination) 

367 except OSError: 

368 os.remove(tmp) 

369 except OSError: 

370 pass 

371 

372 

373class set: 

374 """Temporarily set configuration values within a context manager 

375 

376 Parameters 

377 ---------- 

378 arg : mapping or None, optional 

379 A mapping of configuration key-value pairs to set. 

380 **kwargs : 

381 Additional key-value pairs to set. If ``arg`` is provided, values set 

382 in ``arg`` will be applied before those in ``kwargs``. 

383 Double-underscores (``__``) in keyword arguments will be replaced with 

384 ``.``, allowing nested values to be easily set. 

385 

386 Examples 

387 -------- 

388 >>> import dask 

389 

390 Set ``'foo.bar'`` in a context, by providing a mapping. 

391 

392 >>> with dask.config.set({'foo.bar': 123}): 

393 ... pass 

394 

395 Set ``'foo.bar'`` in a context, by providing a keyword argument. 

396 

397 >>> with dask.config.set(foo__bar=123): 

398 ... pass 

399 

400 Set ``'foo.bar'`` globally. 

401 

402 >>> dask.config.set(foo__bar=123) # doctest: +SKIP 

403 

404 See Also 

405 -------- 

406 dask.config.get 

407 """ 

408 

409 config: dict 

410 # [(op, path, value), ...] 

411 _record: list[tuple[Literal["insert", "replace"], tuple[str, ...], Any]] 

412 

413 def __init__( 

414 self, 

415 arg: Mapping | None = None, 

416 config: dict | None = None, 

417 lock: threading.Lock = config_lock, 

418 **kwargs, 

419 ): 

420 if config is None: # Keep Sphinx autofunction documentation clean 

421 config = global_config 

422 

423 with lock: 

424 self.config = config 

425 self._record = [] 

426 

427 if arg is not None: 

428 for key, value in arg.items(): 

429 key = check_deprecations(key) 

430 self._assign(key.split("."), value, config) 

431 if kwargs: 

432 for key, value in kwargs.items(): 

433 key = key.replace("__", ".") 

434 key = check_deprecations(key) 

435 self._assign(key.split("."), value, config) 

436 

437 def __enter__(self): 

438 return self.config 

439 

440 def __exit__(self, type, value, traceback): 

441 for op, path, value in reversed(self._record): 

442 d = self.config 

443 if op == "replace": 

444 for key in path[:-1]: 

445 d = d.setdefault(key, {}) 

446 d[path[-1]] = value 

447 else: # insert 

448 for key in path[:-1]: 

449 try: 

450 d = d[key] 

451 except KeyError: 

452 break 

453 else: 

454 d.pop(path[-1], None) 

455 

456 def _assign( 

457 self, 

458 keys: Sequence[str], 

459 value: Any, 

460 d: dict, 

461 path: tuple[str, ...] = (), 

462 record: bool = True, 

463 ) -> None: 

464 """Assign value into a nested configuration dictionary 

465 

466 Parameters 

467 ---------- 

468 keys : Sequence[str] 

469 The nested path of keys to assign the value. 

470 value : object 

471 d : dict 

472 The part of the nested dictionary into which we want to assign the 

473 value 

474 path : tuple[str], optional 

475 The path history up to this point. 

476 record : bool, optional 

477 Whether this operation needs to be recorded to allow for rollback. 

478 """ 

479 key = canonical_name(keys[0], d) 

480 

481 path = path + (key,) 

482 

483 if len(keys) == 1: 

484 if record: 

485 if key in d: 

486 self._record.append(("replace", path, d[key])) 

487 else: 

488 self._record.append(("insert", path, None)) 

489 d[key] = value 

490 else: 

491 if key not in d: 

492 if record: 

493 self._record.append(("insert", path, None)) 

494 d[key] = {} 

495 # No need to record subsequent operations after an insert 

496 record = False 

497 self._assign(keys[1:], value, d[key], path, record=record) 

498 

499 

500def collect(paths: list[str] = paths, env: Mapping[str, str] | None = None) -> dict: 

501 """ 

502 Collect configuration from paths and environment variables 

503 

504 Parameters 

505 ---------- 

506 paths : list[str] 

507 A list of paths to search for yaml config files 

508 

509 env : Mapping[str, str] 

510 The system environment variables 

511 

512 Returns 

513 ------- 

514 config: dict 

515 

516 See Also 

517 -------- 

518 dask.config.refresh: collect configuration and update into primary config 

519 """ 

520 if env is None: 

521 env = os.environ 

522 

523 configs = [*collect_yaml(paths=paths), collect_env(env=env)] 

524 return merge(*configs) 

525 

526 

527def refresh( 

528 config: dict | None = None, defaults: list[Mapping] = defaults, **kwargs 

529) -> None: 

530 """ 

531 Update configuration by re-reading yaml files and env variables 

532 

533 This mutates the global dask.config.config, or the config parameter if 

534 passed in. 

535 

536 This goes through the following stages: 

537 

538 1. Clearing out all old configuration 

539 2. Updating from the stored defaults from downstream libraries 

540 (see update_defaults) 

541 3. Updating from yaml files and environment variables 

542 4. Automatically renaming deprecated keys (with a warning) 

543 

544 Note that some functionality only checks configuration once at startup and 

545 may not change behavior, even if configuration changes. It is recommended 

546 to restart your python process if convenient to ensure that new 

547 configuration changes take place. 

548 

549 See Also 

550 -------- 

551 dask.config.collect: for parameters 

552 dask.config.update_defaults 

553 """ 

554 if config is None: # Keep Sphinx autofunction documentation clean 

555 config = global_config 

556 

557 config.clear() 

558 

559 for d in defaults: 

560 update(config, d, priority="old") 

561 

562 update(config, collect(**kwargs)) 

563 rename(deprecations, config) 

564 

565 

566def get( 

567 key: str, 

568 default: Any = no_default, 

569 config: dict | None = None, 

570 override_with: Any = None, 

571) -> Any: 

572 """ 

573 Get elements from global config 

574 

575 If ``override_with`` is not None this value will be passed straight back. 

576 Useful for getting kwarg defaults from Dask config. 

577 

578 Use '.' for nested access 

579 

580 Examples 

581 -------- 

582 >>> from dask import config 

583 >>> config.get('foo') # doctest: +SKIP 

584 {'x': 1, 'y': 2} 

585 

586 >>> config.get('foo.x') # doctest: +SKIP 

587 1 

588 

589 >>> config.get('foo.x.y', default=123) # doctest: +SKIP 

590 123 

591 

592 >>> config.get('foo.y', override_with=None) # doctest: +SKIP 

593 2 

594 

595 >>> config.get('foo.y', override_with=3) # doctest: +SKIP 

596 3 

597 

598 See Also 

599 -------- 

600 dask.config.set 

601 """ 

602 if override_with is not None: 

603 return override_with 

604 

605 if config is None: # Keep Sphinx autofunction documentation clean 

606 config = global_config 

607 

608 keys = key.split(".") 

609 result = config 

610 for k in keys: 

611 k = canonical_name(k, result) 

612 try: 

613 result = result[k] 

614 except (TypeError, IndexError, KeyError): 

615 if default is no_default: 

616 raise 

617 return default 

618 

619 return result 

620 

621 

622def pop(key: str, default: Any = no_default, config: dict = config) -> Any: 

623 """Like ``get``, but remove the element if found 

624 

625 See Also 

626 -------- 

627 dask.config.get 

628 dask.config.set 

629 """ 

630 keys = key.split(".") 

631 result = config 

632 for i, k in enumerate(keys): 

633 k = canonical_name(k, result) 

634 try: 

635 if i == len(keys) - 1: 

636 return result.pop(k) 

637 else: 

638 result = result[k] 

639 except (TypeError, IndexError, KeyError): 

640 if default is no_default: 

641 raise 

642 return default 

643 

644 

645def update_defaults( 

646 new: Mapping, config: dict = config, defaults: list[Mapping] = defaults 

647) -> None: 

648 """Add a new set of defaults to the configuration 

649 

650 It does two things: 

651 

652 1. Add the defaults to a global collection to be used by refresh later 

653 2. Updates the global config with the new configuration. 

654 Old values are prioritized over new ones, unless the current value 

655 is the old default, in which case it's updated to the new default. 

656 """ 

657 current_defaults = merge(*defaults) 

658 defaults.append(new) 

659 update(config, new, priority="new-defaults", defaults=current_defaults) 

660 

661 

662def expand_environment_variables(config: Any) -> Any: 

663 """Expand environment variables in a nested config dictionary 

664 

665 This function will recursively search through any nested dictionaries 

666 and/or lists. 

667 

668 Parameters 

669 ---------- 

670 config : dict, iterable, or str 

671 Input object to search for environment variables 

672 

673 Returns 

674 ------- 

675 config : same type as input 

676 

677 Examples 

678 -------- 

679 >>> expand_environment_variables({'x': [1, 2, '$USER']}) # doctest: +SKIP 

680 {'x': [1, 2, 'my-username']} 

681 """ 

682 if isinstance(config, Mapping): 

683 return {k: expand_environment_variables(v) for k, v in config.items()} 

684 elif isinstance(config, str): 

685 return os.path.expandvars(config) 

686 elif isinstance(config, (list, tuple, builtins.set)): 

687 return type(config)(expand_environment_variables(v) for v in config) 

688 else: 

689 return config 

690 

691 

692#: Mapping of {deprecated key: new key} for renamed keys, or {deprecated key: None} for 

693#: removed keys. All deprecated keys must use '-' instead of '_'. 

694#: This is used in three places: 

695#: 1. In refresh(), which calls rename() to rename and warn upon loading 

696#: from ~/.config/dask.yaml, DASK_ env variables, etc. 

697#: 2. in distributed/config.py and equivalent modules, where we perform additional 

698#: distributed-specific renames for the yaml/env config and enrich this dict 

699#: 3. from individual calls to dask.config.set(), which internally invoke 

700# check_deprecations() 

701deprecations: dict[str, str | None] = { 

702 "fuse-ave-width": "optimization.fuse.ave-width", 

703 "fuse-max-height": "optimization.fuse.max-height", 

704 "fuse-max-width": "optimization.fuse.max-width", 

705 "fuse-rename-keys": "optimization.fuse.rename-keys", 

706 "fuse-max-depth-new-edges": "optimization.fuse.max-depth-new-edges", 

707 # See https://github.com/dask/distributed/pull/4916 

708 "ucx.cuda-copy": "distributed.ucx.cuda_copy", 

709 "ucx.tcp": "distributed.ucx.tcp", 

710 "ucx.nvlink": "distributed.ucx.nvlink", 

711 "ucx.infiniband": "distributed.ucx.infiniband", 

712 "ucx.rdmacm": "distributed.ucx.rdmacm", 

713 "ucx.net-devices": "distributed.ucx.net-devices", 

714 "ucx.reuse-endpoints": "distributed.ucx.reuse-endpoints", 

715 "rmm.pool-size": "distributed.rmm.pool-size", 

716 "shuffle": "dataframe.shuffle.algorithm", 

717 "array.rechunk-threshold": "array.rechunk.threshold", 

718 "dataframe.shuffle.algorithm": "dataframe.shuffle.method", 

719 "dataframe.shuffle-compression": "dataframe.shuffle.compression", 

720 "admin.traceback.shorten.what": "admin.traceback.shorten", # changed in 2023.9.0 

721 "array.shuffle.chunksize-tolerance": "array.chunk-size-tolerance", 

722} 

723 

724 

725def rename( 

726 deprecations: Mapping[str, str | None] = deprecations, config: dict = config 

727) -> None: 

728 """Rename old keys to new keys 

729 

730 This helps migrate older configuration versions over time 

731 

732 See Also 

733 -------- 

734 check_deprecations 

735 """ 

736 for key in deprecations: 

737 try: 

738 value = pop(key, config=config) 

739 except (TypeError, IndexError, KeyError): 

740 continue 

741 key = canonical_name(key, config=config) 

742 new = check_deprecations(key, deprecations) 

743 if new: 

744 set({new: value}, config=config) 

745 

746 

747def check_deprecations( 

748 key: str, deprecations: Mapping[str, str | None] = deprecations 

749) -> str: 

750 """Check if the provided value has been renamed or removed 

751 

752 Parameters 

753 ---------- 

754 key : str 

755 The configuration key to check 

756 deprecations : Dict[str, str] 

757 The mapping of aliases 

758 

759 Examples 

760 -------- 

761 >>> deprecations = {"old_key": "new_key", "invalid": None} 

762 >>> check_deprecations("old_key", deprecations=deprecations) # doctest: +SKIP 

763 FutureWarning: Dask configuration key 'old_key' has been deprecated; please use "new_key" instead 

764 

765 >>> check_deprecations("invalid", deprecations=deprecations) 

766 Traceback (most recent call last): 

767 ... 

768 ValueError: Dask configuration key 'invalid' has been removed 

769 

770 >>> check_deprecations("another_key", deprecations=deprecations) 

771 'another_key' 

772 

773 Returns 

774 ------- 

775 new: str 

776 The proper key, whether the original (if no deprecation) or the aliased 

777 value 

778 

779 See Also 

780 -------- 

781 rename 

782 """ 

783 old = key.replace("_", "-") 

784 if old in deprecations: 

785 new = deprecations[old] 

786 if new: 

787 warnings.warn( 

788 f"Dask configuration key {key!r} has been deprecated; " 

789 f"please use {new!r} instead", 

790 FutureWarning, 

791 ) 

792 return new 

793 else: 

794 raise ValueError(f"Dask configuration key {key!r} has been removed") 

795 else: 

796 return key 

797 

798 

799def serialize(data: Any) -> str: 

800 """Serialize config data into a string. 

801 

802 Typically used to pass config via the ``DASK_INTERNAL_INHERIT_CONFIG`` environment variable. 

803 

804 Parameters 

805 ---------- 

806 data: json-serializable object 

807 The data to serialize 

808 

809 Returns 

810 ------- 

811 serialized_data: str 

812 The serialized data as a string 

813 

814 """ 

815 return base64.urlsafe_b64encode(json.dumps(data).encode()).decode() 

816 

817 

818def deserialize(data: str) -> Any: 

819 """De-serialize config data into the original object. 

820 

821 Typically when receiving config via the ``DASK_INTERNAL_INHERIT_CONFIG`` environment variable. 

822 

823 Parameters 

824 ---------- 

825 data: str 

826 String serialized by :func:`dask.config.serialize` 

827 

828 Returns 

829 ------- 

830 deserialized_data: obj 

831 The de-serialized data 

832 

833 """ 

834 return json.loads(base64.urlsafe_b64decode(data.encode()).decode()) 

835 

836 

837def _initialize() -> None: 

838 fn = os.path.join(os.path.dirname(__file__), "dask.yaml") 

839 

840 with open(fn) as f: 

841 _defaults = yaml.safe_load(f) 

842 

843 update_defaults(_defaults) 

844 

845 

846refresh() 

847_initialize()