Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/config.py: 53%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import ast
4import base64
5import builtins # Explicitly use builtins.set as 'set' will be shadowed by a function
6import json
7import os
8import pathlib
9import site
10import sys
11import threading
12import warnings
13from collections.abc import Iterator, Mapping, Sequence
14from typing import Any, Literal, overload
16import yaml
18from dask.typing import no_default
21def _get_paths():
22 """Get locations to search for YAML configuration files.
24 This logic exists as a separate function for testing purposes.
25 """
27 paths = [
28 os.getenv("DASK_ROOT_CONFIG", "/etc/dask"),
29 os.path.join(sys.prefix, "etc", "dask"),
30 *[os.path.join(prefix, "etc", "dask") for prefix in site.PREFIXES],
31 os.path.join(os.path.expanduser("~"), ".config", "dask"),
32 ]
33 if "DASK_CONFIG" in os.environ:
34 paths.append(os.environ["DASK_CONFIG"])
36 # Remove duplicate paths while preserving ordering
37 paths = list(reversed(list(dict.fromkeys(reversed(paths)))))
39 return paths
42paths = _get_paths()
44if "DASK_CONFIG" in os.environ:
45 PATH = os.environ["DASK_CONFIG"]
46else:
47 PATH = os.path.join(os.path.expanduser("~"), ".config", "dask")
50config: dict = {}
51global_config = config # alias
54config_lock = threading.Lock()
57defaults: list[Mapping] = []
60def canonical_name(k: str, config: dict) -> str:
61 """Return the canonical name for a key.
63 Handles user choice of '-' or '_' conventions by standardizing on whichever
64 version was set first. If a key already exists in either hyphen or
65 underscore form, the existing version is the canonical name. If neither
66 version exists the original key is used as is.
67 """
68 try:
69 if k in config:
70 return k
71 except TypeError:
72 # config is not a mapping, return the same name as provided
73 return k
75 altk = k.replace("_", "-") if "_" in k else k.replace("-", "_")
77 if altk in config:
78 return altk
80 return k
83def update(
84 old: dict,
85 new: Mapping,
86 priority: Literal["old", "new", "new-defaults"] = "new",
87 defaults: Mapping | None = None,
88) -> dict:
89 """Update a nested dictionary with values from another
91 This is like dict.update except that it smoothly merges nested values
93 This operates in-place and modifies old
95 Parameters
96 ----------
97 priority: string {'old', 'new', 'new-defaults'}
98 If new (default) then the new dictionary has preference.
99 Otherwise the old dictionary does.
100 If 'new-defaults', a mapping should be given of the current defaults.
101 Only if a value in ``old`` matches the current default, it will be
102 updated with ``new``.
104 Examples
105 --------
106 >>> a = {'x': 1, 'y': {'a': 2}}
107 >>> b = {'x': 2, 'y': {'b': 3}}
108 >>> update(a, b) # doctest: +SKIP
109 {'x': 2, 'y': {'a': 2, 'b': 3}}
111 >>> a = {'x': 1, 'y': {'a': 2}}
112 >>> b = {'x': 2, 'y': {'b': 3}}
113 >>> update(a, b, priority='old') # doctest: +SKIP
114 {'x': 1, 'y': {'a': 2, 'b': 3}}
116 >>> d = {'x': 0, 'y': {'a': 2}}
117 >>> a = {'x': 1, 'y': {'a': 2}}
118 >>> b = {'x': 2, 'y': {'a': 3, 'b': 3}}
119 >>> update(a, b, priority='new-defaults', defaults=d) # doctest: +SKIP
120 {'x': 1, 'y': {'a': 3, 'b': 3}}
122 See Also
123 --------
124 dask.config.merge
125 """
126 for k, v in new.items():
127 k = canonical_name(k, old)
129 if isinstance(v, Mapping):
130 if k not in old or old[k] is None or not isinstance(old[k], dict):
131 old[k] = {}
132 update(
133 old[k],
134 v,
135 priority=priority,
136 defaults=defaults.get(k) if defaults else None,
137 )
138 else:
139 if (
140 priority == "new"
141 or k not in old
142 or (
143 priority == "new-defaults"
144 and defaults
145 and k in defaults
146 and defaults[k] == old[k]
147 )
148 ):
149 old[k] = v
151 return old
154def merge(*dicts: Mapping) -> dict:
155 """Update a sequence of nested dictionaries
157 This prefers the values in the latter dictionaries to those in the former
159 Examples
160 --------
161 >>> a = {'x': 1, 'y': {'a': 2}}
162 >>> b = {'y': {'b': 3}}
163 >>> merge(a, b) # doctest: +SKIP
164 {'x': 1, 'y': {'a': 2, 'b': 3}}
166 See Also
167 --------
168 dask.config.update
169 """
170 result: dict = {}
171 for d in dicts:
172 update(result, d)
173 return result
176def _load_config_file(path: str) -> dict | None:
177 """A helper for loading a config file from a path, and erroring
178 appropriately if the file is malformed."""
179 try:
180 with open(path) as f:
181 config = yaml.safe_load(f.read())
182 except OSError:
183 # Ignore permission errors
184 return None
185 except Exception as exc:
186 raise ValueError(
187 f"A dask config file at {path!r} is malformed, original error "
188 f"message:\n\n{exc}"
189 ) from None
190 if config is not None and not isinstance(config, dict):
191 raise ValueError(
192 f"A dask config file at {path!r} is malformed - config files must have "
193 f"a dict as the top level object, got a {type(config).__name__} instead"
194 )
195 return config
198@overload
199def collect_yaml(
200 paths: Sequence[str], *, return_paths: Literal[False] = False
201) -> Iterator[dict]: ...
204@overload
205def collect_yaml(
206 paths: Sequence[str], *, return_paths: Literal[True]
207) -> Iterator[tuple[pathlib.Path, dict]]: ...
210def collect_yaml(
211 paths: Sequence[str], *, return_paths: bool = False
212) -> Iterator[dict | tuple[pathlib.Path, dict]]:
213 """Collect configuration from yaml files
215 This searches through a list of paths, expands to find all yaml or json
216 files, and then parses each file.
217 """
218 # Find all paths
219 file_paths = []
220 for path in paths:
221 if os.path.exists(path):
222 if os.path.isdir(path):
223 try:
224 file_paths.extend(
225 sorted(
226 os.path.join(path, p)
227 for p in os.listdir(path)
228 if os.path.splitext(p)[1].lower()
229 in (".json", ".yaml", ".yml")
230 )
231 )
232 except OSError:
233 # Ignore permission errors
234 pass
235 else:
236 file_paths.append(path)
238 # Parse yaml files
239 for path in file_paths:
240 config = _load_config_file(path)
241 if config is not None:
242 if return_paths:
243 yield pathlib.Path(path), config
244 else:
245 yield config
248def collect_env(env: Mapping[str, str] | None = None) -> dict:
249 """Collect config from environment variables
251 This grabs environment variables of the form "DASK_FOO__BAR_BAZ=123" and
252 turns these into config variables of the form ``{"foo": {"bar-baz": 123}}``
253 It transforms the key and value in the following way:
255 - Lower-cases the key text
256 - Treats ``__`` (double-underscore) as nested access
257 - Calls ``ast.literal_eval`` on the value
259 Any serialized config passed via ``DASK_INTERNAL_INHERIT_CONFIG`` is also set here.
261 """
263 if env is None:
264 env = os.environ
266 if "DASK_INTERNAL_INHERIT_CONFIG" in env:
267 d = deserialize(env["DASK_INTERNAL_INHERIT_CONFIG"])
268 else:
269 d = {}
271 for name, value in env.items():
272 if name.startswith("DASK_"):
273 varname = name[5:].lower().replace("__", ".")
274 d[varname] = interpret_value(value)
276 result: dict = {}
277 set(d, config=result)
278 return result
281def interpret_value(value: str) -> Any:
282 try:
283 return ast.literal_eval(value)
284 except (SyntaxError, ValueError):
285 pass
287 # Avoid confusion of YAML vs. Python syntax
288 hardcoded_map = {"none": None, "null": None, "false": False, "true": True}
289 return hardcoded_map.get(value.lower(), value)
292def paths_containing_key(
293 key: str,
294 paths: Sequence[str] = paths,
295) -> Iterator[pathlib.Path]:
296 """
297 Generator yielding paths which contain the given key.
298 """
299 # Check existing config files for any which contains this key.
300 for path_ in paths:
301 for path, config in collect_yaml([path_], return_paths=True):
302 try:
303 get(key, config=config)
304 except KeyError:
305 continue
306 else:
307 yield pathlib.Path(path)
310def ensure_file(
311 source: str, destination: str | None = None, comment: bool = True
312) -> None:
313 """
314 Copy file to default location if it does not already exist
316 This tries to move a default configuration file to a default location if
317 if does not already exist. It also comments out that file by default.
319 This is to be used by downstream modules (like dask.distributed) that may
320 have default configuration files that they wish to include in the default
321 configuration path.
323 Parameters
324 ----------
325 source : string, filename
326 Source configuration file, typically within a source directory.
327 destination : string, directory
328 Destination directory. Configurable by ``DASK_CONFIG`` environment
329 variable, falling back to ~/.config/dask.
330 comment : bool, True by default
331 Whether or not to comment out the config file when copying.
332 """
333 if destination is None:
334 destination = PATH
336 # destination is a file and already exists, never overwrite
337 if os.path.isfile(destination):
338 return
340 # If destination is not an existing file, interpret as a directory,
341 # use the source basename as the filename
342 directory = destination
343 destination = os.path.join(directory, os.path.basename(source))
345 try:
346 if not os.path.exists(destination):
347 os.makedirs(directory, exist_ok=True)
349 # Atomically create destination. Parallel testing discovered
350 # a race condition where a process can be busy creating the
351 # destination while another process reads an empty config file.
352 tmp = "%s.tmp.%d" % (destination, os.getpid())
353 with open(source) as f:
354 lines = list(f)
356 if comment:
357 lines = [
358 "# " + line if line.strip() and not line.startswith("#") else line
359 for line in lines
360 ]
362 with open(tmp, "w") as f:
363 f.write("".join(lines))
365 try:
366 os.rename(tmp, destination)
367 except OSError:
368 os.remove(tmp)
369 except OSError:
370 pass
373class set:
374 """Temporarily set configuration values within a context manager
376 Parameters
377 ----------
378 arg : mapping or None, optional
379 A mapping of configuration key-value pairs to set.
380 **kwargs :
381 Additional key-value pairs to set. If ``arg`` is provided, values set
382 in ``arg`` will be applied before those in ``kwargs``.
383 Double-underscores (``__``) in keyword arguments will be replaced with
384 ``.``, allowing nested values to be easily set.
386 Examples
387 --------
388 >>> import dask
390 Set ``'foo.bar'`` in a context, by providing a mapping.
392 >>> with dask.config.set({'foo.bar': 123}):
393 ... pass
395 Set ``'foo.bar'`` in a context, by providing a keyword argument.
397 >>> with dask.config.set(foo__bar=123):
398 ... pass
400 Set ``'foo.bar'`` globally.
402 >>> dask.config.set(foo__bar=123) # doctest: +SKIP
404 See Also
405 --------
406 dask.config.get
407 """
409 config: dict
410 # [(op, path, value), ...]
411 _record: list[tuple[Literal["insert", "replace"], tuple[str, ...], Any]]
413 def __init__(
414 self,
415 arg: Mapping | None = None,
416 config: dict | None = None,
417 lock: threading.Lock = config_lock,
418 **kwargs,
419 ):
420 if config is None: # Keep Sphinx autofunction documentation clean
421 config = global_config
423 with lock:
424 self.config = config
425 self._record = []
427 if arg is not None:
428 for key, value in arg.items():
429 key = check_deprecations(key)
430 self._assign(key.split("."), value, config)
431 if kwargs:
432 for key, value in kwargs.items():
433 key = key.replace("__", ".")
434 key = check_deprecations(key)
435 self._assign(key.split("."), value, config)
437 def __enter__(self):
438 return self.config
440 def __exit__(self, type, value, traceback):
441 for op, path, value in reversed(self._record):
442 d = self.config
443 if op == "replace":
444 for key in path[:-1]:
445 d = d.setdefault(key, {})
446 d[path[-1]] = value
447 else: # insert
448 for key in path[:-1]:
449 try:
450 d = d[key]
451 except KeyError:
452 break
453 else:
454 d.pop(path[-1], None)
456 def _assign(
457 self,
458 keys: Sequence[str],
459 value: Any,
460 d: dict,
461 path: tuple[str, ...] = (),
462 record: bool = True,
463 ) -> None:
464 """Assign value into a nested configuration dictionary
466 Parameters
467 ----------
468 keys : Sequence[str]
469 The nested path of keys to assign the value.
470 value : object
471 d : dict
472 The part of the nested dictionary into which we want to assign the
473 value
474 path : tuple[str], optional
475 The path history up to this point.
476 record : bool, optional
477 Whether this operation needs to be recorded to allow for rollback.
478 """
479 key = canonical_name(keys[0], d)
481 path = path + (key,)
483 if len(keys) == 1:
484 if record:
485 if key in d:
486 self._record.append(("replace", path, d[key]))
487 else:
488 self._record.append(("insert", path, None))
489 d[key] = value
490 else:
491 if key not in d:
492 if record:
493 self._record.append(("insert", path, None))
494 d[key] = {}
495 # No need to record subsequent operations after an insert
496 record = False
497 self._assign(keys[1:], value, d[key], path, record=record)
500def collect(paths: list[str] = paths, env: Mapping[str, str] | None = None) -> dict:
501 """
502 Collect configuration from paths and environment variables
504 Parameters
505 ----------
506 paths : list[str]
507 A list of paths to search for yaml config files
509 env : Mapping[str, str]
510 The system environment variables
512 Returns
513 -------
514 config: dict
516 See Also
517 --------
518 dask.config.refresh: collect configuration and update into primary config
519 """
520 if env is None:
521 env = os.environ
523 configs = [*collect_yaml(paths=paths), collect_env(env=env)]
524 return merge(*configs)
527def refresh(
528 config: dict | None = None, defaults: list[Mapping] = defaults, **kwargs
529) -> None:
530 """
531 Update configuration by re-reading yaml files and env variables
533 This mutates the global dask.config.config, or the config parameter if
534 passed in.
536 This goes through the following stages:
538 1. Clearing out all old configuration
539 2. Updating from the stored defaults from downstream libraries
540 (see update_defaults)
541 3. Updating from yaml files and environment variables
542 4. Automatically renaming deprecated keys (with a warning)
544 Note that some functionality only checks configuration once at startup and
545 may not change behavior, even if configuration changes. It is recommended
546 to restart your python process if convenient to ensure that new
547 configuration changes take place.
549 See Also
550 --------
551 dask.config.collect: for parameters
552 dask.config.update_defaults
553 """
554 if config is None: # Keep Sphinx autofunction documentation clean
555 config = global_config
557 config.clear()
559 for d in defaults:
560 update(config, d, priority="old")
562 update(config, collect(**kwargs))
563 rename(deprecations, config)
566def get(
567 key: str,
568 default: Any = no_default,
569 config: dict | None = None,
570 override_with: Any = None,
571) -> Any:
572 """
573 Get elements from global config
575 If ``override_with`` is not None this value will be passed straight back.
576 Useful for getting kwarg defaults from Dask config.
578 Use '.' for nested access
580 Examples
581 --------
582 >>> from dask import config
583 >>> config.get('foo') # doctest: +SKIP
584 {'x': 1, 'y': 2}
586 >>> config.get('foo.x') # doctest: +SKIP
587 1
589 >>> config.get('foo.x.y', default=123) # doctest: +SKIP
590 123
592 >>> config.get('foo.y', override_with=None) # doctest: +SKIP
593 2
595 >>> config.get('foo.y', override_with=3) # doctest: +SKIP
596 3
598 See Also
599 --------
600 dask.config.set
601 """
602 if override_with is not None:
603 return override_with
605 if config is None: # Keep Sphinx autofunction documentation clean
606 config = global_config
608 keys = key.split(".")
609 result = config
610 for k in keys:
611 k = canonical_name(k, result)
612 try:
613 result = result[k]
614 except (TypeError, IndexError, KeyError):
615 if default is no_default:
616 raise
617 return default
619 return result
622def pop(key: str, default: Any = no_default, config: dict = config) -> Any:
623 """Like ``get``, but remove the element if found
625 See Also
626 --------
627 dask.config.get
628 dask.config.set
629 """
630 keys = key.split(".")
631 result = config
632 for i, k in enumerate(keys):
633 k = canonical_name(k, result)
634 try:
635 if i == len(keys) - 1:
636 return result.pop(k)
637 else:
638 result = result[k]
639 except (TypeError, IndexError, KeyError):
640 if default is no_default:
641 raise
642 return default
645def update_defaults(
646 new: Mapping, config: dict = config, defaults: list[Mapping] = defaults
647) -> None:
648 """Add a new set of defaults to the configuration
650 It does two things:
652 1. Add the defaults to a global collection to be used by refresh later
653 2. Updates the global config with the new configuration.
654 Old values are prioritized over new ones, unless the current value
655 is the old default, in which case it's updated to the new default.
656 """
657 current_defaults = merge(*defaults)
658 defaults.append(new)
659 update(config, new, priority="new-defaults", defaults=current_defaults)
662def expand_environment_variables(config: Any) -> Any:
663 """Expand environment variables in a nested config dictionary
665 This function will recursively search through any nested dictionaries
666 and/or lists.
668 Parameters
669 ----------
670 config : dict, iterable, or str
671 Input object to search for environment variables
673 Returns
674 -------
675 config : same type as input
677 Examples
678 --------
679 >>> expand_environment_variables({'x': [1, 2, '$USER']}) # doctest: +SKIP
680 {'x': [1, 2, 'my-username']}
681 """
682 if isinstance(config, Mapping):
683 return {k: expand_environment_variables(v) for k, v in config.items()}
684 elif isinstance(config, str):
685 return os.path.expandvars(config)
686 elif isinstance(config, (list, tuple, builtins.set)):
687 return type(config)(expand_environment_variables(v) for v in config)
688 else:
689 return config
692#: Mapping of {deprecated key: new key} for renamed keys, or {deprecated key: None} for
693#: removed keys. All deprecated keys must use '-' instead of '_'.
694#: This is used in three places:
695#: 1. In refresh(), which calls rename() to rename and warn upon loading
696#: from ~/.config/dask.yaml, DASK_ env variables, etc.
697#: 2. in distributed/config.py and equivalent modules, where we perform additional
698#: distributed-specific renames for the yaml/env config and enrich this dict
699#: 3. from individual calls to dask.config.set(), which internally invoke
700# check_deprecations()
701deprecations: dict[str, str | None] = {
702 "fuse-ave-width": "optimization.fuse.ave-width",
703 "fuse-max-height": "optimization.fuse.max-height",
704 "fuse-max-width": "optimization.fuse.max-width",
705 "fuse-rename-keys": "optimization.fuse.rename-keys",
706 "fuse-max-depth-new-edges": "optimization.fuse.max-depth-new-edges",
707 # See https://github.com/dask/distributed/pull/4916
708 "ucx.cuda-copy": "distributed.ucx.cuda_copy",
709 "ucx.tcp": "distributed.ucx.tcp",
710 "ucx.nvlink": "distributed.ucx.nvlink",
711 "ucx.infiniband": "distributed.ucx.infiniband",
712 "ucx.rdmacm": "distributed.ucx.rdmacm",
713 "ucx.net-devices": "distributed.ucx.net-devices",
714 "ucx.reuse-endpoints": "distributed.ucx.reuse-endpoints",
715 "rmm.pool-size": "distributed.rmm.pool-size",
716 "shuffle": "dataframe.shuffle.algorithm",
717 "array.rechunk-threshold": "array.rechunk.threshold",
718 "dataframe.shuffle.algorithm": "dataframe.shuffle.method",
719 "dataframe.shuffle-compression": "dataframe.shuffle.compression",
720 "admin.traceback.shorten.what": "admin.traceback.shorten", # changed in 2023.9.0
721 "array.shuffle.chunksize-tolerance": "array.chunk-size-tolerance",
722}
725def rename(
726 deprecations: Mapping[str, str | None] = deprecations, config: dict = config
727) -> None:
728 """Rename old keys to new keys
730 This helps migrate older configuration versions over time
732 See Also
733 --------
734 check_deprecations
735 """
736 for key in deprecations:
737 try:
738 value = pop(key, config=config)
739 except (TypeError, IndexError, KeyError):
740 continue
741 key = canonical_name(key, config=config)
742 new = check_deprecations(key, deprecations)
743 if new:
744 set({new: value}, config=config)
747def check_deprecations(
748 key: str, deprecations: Mapping[str, str | None] = deprecations
749) -> str:
750 """Check if the provided value has been renamed or removed
752 Parameters
753 ----------
754 key : str
755 The configuration key to check
756 deprecations : Dict[str, str]
757 The mapping of aliases
759 Examples
760 --------
761 >>> deprecations = {"old_key": "new_key", "invalid": None}
762 >>> check_deprecations("old_key", deprecations=deprecations) # doctest: +SKIP
763 FutureWarning: Dask configuration key 'old_key' has been deprecated; please use "new_key" instead
765 >>> check_deprecations("invalid", deprecations=deprecations)
766 Traceback (most recent call last):
767 ...
768 ValueError: Dask configuration key 'invalid' has been removed
770 >>> check_deprecations("another_key", deprecations=deprecations)
771 'another_key'
773 Returns
774 -------
775 new: str
776 The proper key, whether the original (if no deprecation) or the aliased
777 value
779 See Also
780 --------
781 rename
782 """
783 old = key.replace("_", "-")
784 if old in deprecations:
785 new = deprecations[old]
786 if new:
787 warnings.warn(
788 f"Dask configuration key {key!r} has been deprecated; "
789 f"please use {new!r} instead",
790 FutureWarning,
791 )
792 return new
793 else:
794 raise ValueError(f"Dask configuration key {key!r} has been removed")
795 else:
796 return key
799def serialize(data: Any) -> str:
800 """Serialize config data into a string.
802 Typically used to pass config via the ``DASK_INTERNAL_INHERIT_CONFIG`` environment variable.
804 Parameters
805 ----------
806 data: json-serializable object
807 The data to serialize
809 Returns
810 -------
811 serialized_data: str
812 The serialized data as a string
814 """
815 return base64.urlsafe_b64encode(json.dumps(data).encode()).decode()
818def deserialize(data: str) -> Any:
819 """De-serialize config data into the original object.
821 Typically when receiving config via the ``DASK_INTERNAL_INHERIT_CONFIG`` environment variable.
823 Parameters
824 ----------
825 data: str
826 String serialized by :func:`dask.config.serialize`
828 Returns
829 -------
830 deserialized_data: obj
831 The de-serialized data
833 """
834 return json.loads(base64.urlsafe_b64decode(data.encode()).decode())
837def _initialize() -> None:
838 fn = os.path.join(os.path.dirname(__file__), "dask.yaml")
840 with open(fn) as f:
841 _defaults = yaml.safe_load(f)
843 update_defaults(_defaults)
846refresh()
847_initialize()