Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/config.py: 53%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import ast
4import base64
5import builtins # Explicitly use builtins.set as 'set' will be shadowed by a function
6import json
7import os
8import pathlib
9import site
10import sys
11import threading
12import warnings
13from collections.abc import Iterator, Mapping, Sequence
14from typing import Any, Literal, overload
16import yaml
18from dask.typing import no_default
21def _get_paths():
22 """Get locations to search for YAML configuration files.
24 This logic exists as a separate function for testing purposes.
25 """
27 paths = [
28 os.getenv("DASK_ROOT_CONFIG", "/etc/dask"),
29 os.path.join(sys.prefix, "etc", "dask"),
30 *[os.path.join(prefix, "etc", "dask") for prefix in site.PREFIXES],
31 os.path.join(os.path.expanduser("~"), ".config", "dask"),
32 ]
33 if "DASK_CONFIG" in os.environ:
34 paths.append(os.environ["DASK_CONFIG"])
36 # Remove duplicate paths while preserving ordering
37 paths = list(reversed(list(dict.fromkeys(reversed(paths)))))
39 return paths
42paths = _get_paths()
44if "DASK_CONFIG" in os.environ:
45 PATH = os.environ["DASK_CONFIG"]
46else:
47 PATH = os.path.join(os.path.expanduser("~"), ".config", "dask")
50config: dict = {}
51global_config = config # alias
54config_lock = threading.Lock()
57defaults: list[Mapping] = []
60def canonical_name(k: str, config: dict) -> str:
61 """Return the canonical name for a key.
63 Handles user choice of '-' or '_' conventions by standardizing on whichever
64 version was set first. If a key already exists in either hyphen or
65 underscore form, the existing version is the canonical name. If neither
66 version exists the original key is used as is.
67 """
68 try:
69 if k in config:
70 return k
71 except TypeError:
72 # config is not a mapping, return the same name as provided
73 return k
75 altk = k.replace("_", "-") if "_" in k else k.replace("-", "_")
77 if altk in config:
78 return altk
80 return k
83def update(
84 old: dict,
85 new: Mapping,
86 priority: Literal["old", "new", "new-defaults"] = "new",
87 defaults: Mapping | None = None,
88) -> dict:
89 """Update a nested dictionary with values from another
91 This is like dict.update except that it smoothly merges nested values
93 This operates in-place and modifies old
95 Parameters
96 ----------
97 priority: string {'old', 'new', 'new-defaults'}
98 If new (default) then the new dictionary has preference.
99 Otherwise the old dictionary does.
100 If 'new-defaults', a mapping should be given of the current defaults.
101 Only if a value in ``old`` matches the current default, it will be
102 updated with ``new``.
104 Examples
105 --------
106 >>> a = {'x': 1, 'y': {'a': 2}}
107 >>> b = {'x': 2, 'y': {'b': 3}}
108 >>> update(a, b) # doctest: +SKIP
109 {'x': 2, 'y': {'a': 2, 'b': 3}}
111 >>> a = {'x': 1, 'y': {'a': 2}}
112 >>> b = {'x': 2, 'y': {'b': 3}}
113 >>> update(a, b, priority='old') # doctest: +SKIP
114 {'x': 1, 'y': {'a': 2, 'b': 3}}
116 >>> d = {'x': 0, 'y': {'a': 2}}
117 >>> a = {'x': 1, 'y': {'a': 2}}
118 >>> b = {'x': 2, 'y': {'a': 3, 'b': 3}}
119 >>> update(a, b, priority='new-defaults', defaults=d) # doctest: +SKIP
120 {'x': 1, 'y': {'a': 3, 'b': 3}}
122 See Also
123 --------
124 dask.config.merge
125 """
126 for k, v in new.items():
127 k = canonical_name(k, old)
129 if isinstance(v, Mapping):
130 if k not in old or old[k] is None or not isinstance(old[k], dict):
131 old[k] = {}
132 update(
133 old[k],
134 v,
135 priority=priority,
136 defaults=defaults.get(k) if defaults else None,
137 )
138 elif (
139 priority == "new"
140 or k not in old
141 or (
142 priority == "new-defaults"
143 and defaults
144 and k in defaults
145 and defaults[k] == old[k]
146 )
147 ):
148 old[k] = v
150 return old
153def merge(*dicts: Mapping) -> dict:
154 """Update a sequence of nested dictionaries
156 This prefers the values in the latter dictionaries to those in the former
158 Examples
159 --------
160 >>> a = {'x': 1, 'y': {'a': 2}}
161 >>> b = {'y': {'b': 3}}
162 >>> merge(a, b) # doctest: +SKIP
163 {'x': 1, 'y': {'a': 2, 'b': 3}}
165 See Also
166 --------
167 dask.config.update
168 """
169 result: dict = {}
170 for d in dicts:
171 update(result, d)
172 return result
175def _load_config_file(path: str) -> dict | None:
176 """A helper for loading a config file from a path, and erroring
177 appropriately if the file is malformed."""
178 try:
179 with open(path) as f:
180 config = yaml.safe_load(f.read())
181 except OSError:
182 # Ignore permission errors
183 return None
184 except Exception as exc:
185 raise ValueError(
186 f"A dask config file at {path!r} is malformed, original error "
187 f"message:\n\n{exc}"
188 ) from None
189 if config is not None and not isinstance(config, dict):
190 raise ValueError(
191 f"A dask config file at {path!r} is malformed - config files must have "
192 f"a dict as the top level object, got a {type(config).__name__} instead"
193 )
194 return config
197@overload
198def collect_yaml(
199 paths: Sequence[str], *, return_paths: Literal[False] = False
200) -> Iterator[dict]: ...
203@overload
204def collect_yaml(
205 paths: Sequence[str], *, return_paths: Literal[True]
206) -> Iterator[tuple[pathlib.Path, dict]]: ...
209def collect_yaml(
210 paths: Sequence[str], *, return_paths: bool = False
211) -> Iterator[dict | tuple[pathlib.Path, dict]]:
212 """Collect configuration from yaml files
214 This searches through a list of paths, expands to find all yaml or json
215 files, and then parses each file.
216 """
217 # Find all paths
218 file_paths = []
219 for path in paths:
220 if os.path.exists(path):
221 if os.path.isdir(path):
222 try:
223 file_paths.extend(
224 sorted(
225 os.path.join(path, p)
226 for p in os.listdir(path)
227 if os.path.splitext(p)[1].lower()
228 in (".json", ".yaml", ".yml")
229 )
230 )
231 except OSError:
232 # Ignore permission errors
233 pass
234 else:
235 file_paths.append(path)
237 # Parse yaml files
238 for path in file_paths:
239 config = _load_config_file(path)
240 if config is not None:
241 if return_paths:
242 yield pathlib.Path(path), config
243 else:
244 yield config
247def collect_env(env: Mapping[str, str] | None = None) -> dict:
248 """Collect config from environment variables
250 This grabs environment variables of the form "DASK_FOO__BAR_BAZ=123" and
251 turns these into config variables of the form ``{"foo": {"bar-baz": 123}}``
252 It transforms the key and value in the following way:
254 - Lower-cases the key text
255 - Treats ``__`` (double-underscore) as nested access
256 - Calls ``ast.literal_eval`` on the value
258 Any serialized config passed via ``DASK_INTERNAL_INHERIT_CONFIG`` is also set here.
260 """
262 if env is None:
263 env = os.environ
265 if "DASK_INTERNAL_INHERIT_CONFIG" in env:
266 d = deserialize(env["DASK_INTERNAL_INHERIT_CONFIG"])
267 else:
268 d = {}
270 for name, value in env.items():
271 if name.startswith("DASK_"):
272 varname = name[5:].lower().replace("__", ".")
273 d[varname] = interpret_value(value)
275 result: dict = {}
276 set(d, config=result)
277 return result
280def interpret_value(value: str) -> Any:
281 try:
282 return ast.literal_eval(value)
283 except (SyntaxError, ValueError):
284 pass
286 # Avoid confusion of YAML vs. Python syntax
287 hardcoded_map = {"none": None, "null": None, "false": False, "true": True}
288 return hardcoded_map.get(value.lower(), value)
291def paths_containing_key(
292 key: str,
293 paths: Sequence[str] = paths,
294) -> Iterator[pathlib.Path]:
295 """
296 Generator yielding paths which contain the given key.
297 """
298 # Check existing config files for any which contains this key.
299 for path_ in paths:
300 for path, config in collect_yaml([path_], return_paths=True):
301 try:
302 get(key, config=config)
303 except KeyError:
304 continue
305 else:
306 yield pathlib.Path(path)
309def ensure_file(
310 source: str, destination: str | None = None, comment: bool = True
311) -> None:
312 """
313 Copy file to default location if it does not already exist
315 This tries to move a default configuration file to a default location if
316 if does not already exist. It also comments out that file by default.
318 This is to be used by downstream modules (like dask.distributed) that may
319 have default configuration files that they wish to include in the default
320 configuration path.
322 Parameters
323 ----------
324 source : string, filename
325 Source configuration file, typically within a source directory.
326 destination : string, directory
327 Destination directory. Configurable by ``DASK_CONFIG`` environment
328 variable, falling back to ~/.config/dask.
329 comment : bool, True by default
330 Whether or not to comment out the config file when copying.
331 """
332 if destination is None:
333 destination = PATH
335 # destination is a file and already exists, never overwrite
336 if os.path.isfile(destination):
337 return
339 # If destination is not an existing file, interpret as a directory,
340 # use the source basename as the filename
341 directory = destination
342 destination = os.path.join(directory, os.path.basename(source))
344 try:
345 if not os.path.exists(destination):
346 os.makedirs(directory, exist_ok=True)
348 # Atomically create destination. Parallel testing discovered
349 # a race condition where a process can be busy creating the
350 # destination while another process reads an empty config file.
351 tmp = "%s.tmp.%d" % (destination, os.getpid())
352 with open(source) as f:
353 lines = list(f)
355 if comment:
356 lines = [
357 "# " + line if line.strip() and not line.startswith("#") else line
358 for line in lines
359 ]
361 with open(tmp, "w") as f:
362 f.write("".join(lines))
364 try:
365 os.rename(tmp, destination)
366 except OSError:
367 os.remove(tmp)
368 except OSError:
369 pass
372class set:
373 """Temporarily set configuration values within a context manager
375 Parameters
376 ----------
377 arg : mapping or None, optional
378 A mapping of configuration key-value pairs to set.
379 **kwargs :
380 Additional key-value pairs to set. If ``arg`` is provided, values set
381 in ``arg`` will be applied before those in ``kwargs``.
382 Double-underscores (``__``) in keyword arguments will be replaced with
383 ``.``, allowing nested values to be easily set.
385 Examples
386 --------
387 >>> import dask
389 Set ``'foo.bar'`` in a context, by providing a mapping.
391 >>> with dask.config.set({'foo.bar': 123}):
392 ... pass
394 Set ``'foo.bar'`` in a context, by providing a keyword argument.
396 >>> with dask.config.set(foo__bar=123):
397 ... pass
399 Set ``'foo.bar'`` globally.
401 >>> dask.config.set(foo__bar=123) # doctest: +SKIP
403 See Also
404 --------
405 dask.config.get
406 """
408 config: dict
409 # [(op, path, value), ...]
410 _record: list[tuple[Literal["insert", "replace"], tuple[str, ...], Any]]
412 def __init__(
413 self,
414 arg: Mapping | None = None,
415 config: dict | None = None,
416 lock: threading.Lock = config_lock,
417 **kwargs,
418 ):
419 if config is None: # Keep Sphinx autofunction documentation clean
420 config = global_config
422 with lock:
423 self.config = config
424 self._record = []
426 if arg is not None:
427 for key, value in arg.items():
428 key = check_deprecations(key)
429 self._assign(key.split("."), value, config)
430 if kwargs:
431 for key, value in kwargs.items():
432 key = key.replace("__", ".")
433 key = check_deprecations(key)
434 self._assign(key.split("."), value, config)
436 def __enter__(self):
437 return self.config
439 def __exit__(self, type, value, traceback):
440 for op, path, value in reversed(self._record):
441 d = self.config
442 if op == "replace":
443 for key in path[:-1]:
444 d = d.setdefault(key, {})
445 d[path[-1]] = value
446 else: # insert
447 for key in path[:-1]:
448 try:
449 d = d[key]
450 except KeyError:
451 break
452 else:
453 d.pop(path[-1], None)
455 def _assign(
456 self,
457 keys: Sequence[str],
458 value: Any,
459 d: dict,
460 path: tuple[str, ...] = (),
461 record: bool = True,
462 ) -> None:
463 """Assign value into a nested configuration dictionary
465 Parameters
466 ----------
467 keys : Sequence[str]
468 The nested path of keys to assign the value.
469 value : object
470 d : dict
471 The part of the nested dictionary into which we want to assign the
472 value
473 path : tuple[str], optional
474 The path history up to this point.
475 record : bool, optional
476 Whether this operation needs to be recorded to allow for rollback.
477 """
478 key = canonical_name(keys[0], d)
480 path = path + (key,)
482 if len(keys) == 1:
483 if record:
484 if key in d:
485 self._record.append(("replace", path, d[key]))
486 else:
487 self._record.append(("insert", path, None))
488 d[key] = value
489 else:
490 if key not in d:
491 if record:
492 self._record.append(("insert", path, None))
493 d[key] = {}
494 # No need to record subsequent operations after an insert
495 record = False
496 self._assign(keys[1:], value, d[key], path, record=record)
499def collect(paths: list[str] = paths, env: Mapping[str, str] | None = None) -> dict:
500 """
501 Collect configuration from paths and environment variables
503 Parameters
504 ----------
505 paths : list[str]
506 A list of paths to search for yaml config files
508 env : Mapping[str, str]
509 The system environment variables
511 Returns
512 -------
513 config: dict
515 See Also
516 --------
517 dask.config.refresh: collect configuration and update into primary config
518 """
519 if env is None:
520 env = os.environ
522 configs = [*collect_yaml(paths=paths), collect_env(env=env)]
523 return merge(*configs)
526def refresh(
527 config: dict | None = None, defaults: list[Mapping] = defaults, **kwargs
528) -> None:
529 """
530 Update configuration by re-reading yaml files and env variables
532 This mutates the global dask.config.config, or the config parameter if
533 passed in.
535 This goes through the following stages:
537 1. Clearing out all old configuration
538 2. Updating from the stored defaults from downstream libraries
539 (see update_defaults)
540 3. Updating from yaml files and environment variables
541 4. Automatically renaming deprecated keys (with a warning)
543 Note that some functionality only checks configuration once at startup and
544 may not change behavior, even if configuration changes. It is recommended
545 to restart your python process if convenient to ensure that new
546 configuration changes take place.
548 See Also
549 --------
550 dask.config.collect: for parameters
551 dask.config.update_defaults
552 """
553 if config is None: # Keep Sphinx autofunction documentation clean
554 config = global_config
556 config.clear()
558 for d in defaults:
559 update(config, d, priority="old")
561 update(config, collect(**kwargs))
562 rename(deprecations, config)
565def get(
566 key: str,
567 default: Any = no_default,
568 config: dict | None = None,
569 override_with: Any = None,
570) -> Any:
571 """
572 Get elements from global config
574 If ``override_with`` is not None this value will be passed straight back.
575 Useful for getting kwarg defaults from Dask config.
577 Use '.' for nested access
579 Examples
580 --------
581 >>> from dask import config
582 >>> config.get('foo') # doctest: +SKIP
583 {'x': 1, 'y': 2}
585 >>> config.get('foo.x') # doctest: +SKIP
586 1
588 >>> config.get('foo.x.y', default=123) # doctest: +SKIP
589 123
591 >>> config.get('foo.y', override_with=None) # doctest: +SKIP
592 2
594 >>> config.get('foo.y', override_with=3) # doctest: +SKIP
595 3
597 See Also
598 --------
599 dask.config.set
600 """
601 if override_with is not None:
602 return override_with
604 if config is None: # Keep Sphinx autofunction documentation clean
605 config = global_config
607 keys = key.split(".")
608 result = config
609 for k in keys:
610 k = canonical_name(k, result)
611 try:
612 result = result[k]
613 except (TypeError, IndexError, KeyError):
614 if default is no_default:
615 raise
616 return default
618 return result
621def pop(key: str, default: Any = no_default, config: dict = config) -> Any:
622 """Like ``get``, but remove the element if found
624 See Also
625 --------
626 dask.config.get
627 dask.config.set
628 """
629 keys = key.split(".")
630 result = config
631 for i, k in enumerate(keys):
632 k = canonical_name(k, result)
633 try:
634 if i == len(keys) - 1:
635 return result.pop(k)
636 else:
637 result = result[k]
638 except (TypeError, IndexError, KeyError):
639 if default is no_default:
640 raise
641 return default
644def update_defaults(
645 new: Mapping, config: dict = config, defaults: list[Mapping] = defaults
646) -> None:
647 """Add a new set of defaults to the configuration
649 It does two things:
651 1. Add the defaults to a global collection to be used by refresh later
652 2. Updates the global config with the new configuration.
653 Old values are prioritized over new ones, unless the current value
654 is the old default, in which case it's updated to the new default.
655 """
656 current_defaults = merge(*defaults)
657 defaults.append(new)
658 update(config, new, priority="new-defaults", defaults=current_defaults)
661def expand_environment_variables(config: Any) -> Any:
662 """Expand environment variables in a nested config dictionary
664 This function will recursively search through any nested dictionaries
665 and/or lists.
667 Parameters
668 ----------
669 config : dict, iterable, or str
670 Input object to search for environment variables
672 Returns
673 -------
674 config : same type as input
676 Examples
677 --------
678 >>> expand_environment_variables({'x': [1, 2, '$USER']}) # doctest: +SKIP
679 {'x': [1, 2, 'my-username']}
680 """
681 if isinstance(config, Mapping):
682 return {k: expand_environment_variables(v) for k, v in config.items()}
683 elif isinstance(config, str):
684 return os.path.expandvars(config)
685 elif isinstance(config, (list, tuple, builtins.set)):
686 return type(config)(expand_environment_variables(v) for v in config)
687 else:
688 return config
691#: Mapping of {deprecated key: new key} for renamed keys, or {deprecated key: None} for
692#: removed keys. All deprecated keys must use '-' instead of '_'.
693#: This is used in three places:
694#: 1. In refresh(), which calls rename() to rename and warn upon loading
695#: from ~/.config/dask.yaml, DASK_ env variables, etc.
696#: 2. in distributed/config.py and equivalent modules, where we perform additional
697#: distributed-specific renames for the yaml/env config and enrich this dict
698#: 3. from individual calls to dask.config.set(), which internally invoke
699# check_deprecations()
700deprecations: dict[str, str | None] = {
701 "fuse-ave-width": "optimization.fuse.ave-width",
702 "fuse-max-height": "optimization.fuse.max-height",
703 "fuse-max-width": "optimization.fuse.max-width",
704 "fuse-rename-keys": "optimization.fuse.rename-keys",
705 "fuse-max-depth-new-edges": "optimization.fuse.max-depth-new-edges",
706 # See https://github.com/dask/distributed/pull/4916
707 "ucx.cuda-copy": "distributed.ucx.cuda_copy",
708 "ucx.tcp": "distributed.ucx.tcp",
709 "ucx.nvlink": "distributed.ucx.nvlink",
710 "ucx.infiniband": "distributed.ucx.infiniband",
711 "ucx.rdmacm": "distributed.ucx.rdmacm",
712 "ucx.net-devices": "distributed.ucx.net-devices",
713 "ucx.reuse-endpoints": "distributed.ucx.reuse-endpoints",
714 "rmm.pool-size": "distributed.rmm.pool-size",
715 "shuffle": "dataframe.shuffle.algorithm",
716 "array.rechunk-threshold": "array.rechunk.threshold",
717 "dataframe.shuffle.algorithm": "dataframe.shuffle.method",
718 "dataframe.shuffle-compression": "dataframe.shuffle.compression",
719 "admin.traceback.shorten.what": "admin.traceback.shorten", # changed in 2023.9.0
720 "array.shuffle.chunksize-tolerance": "array.chunk-size-tolerance",
721}
724def rename(
725 deprecations: Mapping[str, str | None] = deprecations, config: dict = config
726) -> None:
727 """Rename old keys to new keys
729 This helps migrate older configuration versions over time
731 See Also
732 --------
733 check_deprecations
734 """
735 for key in deprecations:
736 try:
737 value = pop(key, config=config)
738 except (TypeError, IndexError, KeyError):
739 continue
740 key = canonical_name(key, config=config)
741 new = check_deprecations(key, deprecations)
742 if new:
743 set({new: value}, config=config)
746def check_deprecations(
747 key: str, deprecations: Mapping[str, str | None] = deprecations
748) -> str:
749 """Check if the provided value has been renamed or removed
751 Parameters
752 ----------
753 key : str
754 The configuration key to check
755 deprecations : Dict[str, str]
756 The mapping of aliases
758 Examples
759 --------
760 >>> deprecations = {"old_key": "new_key", "invalid": None}
761 >>> check_deprecations("old_key", deprecations=deprecations) # doctest: +SKIP
762 FutureWarning: Dask configuration key 'old_key' has been deprecated; please use "new_key" instead
764 >>> check_deprecations("invalid", deprecations=deprecations)
765 Traceback (most recent call last):
766 ...
767 ValueError: Dask configuration key 'invalid' has been removed
769 >>> check_deprecations("another_key", deprecations=deprecations)
770 'another_key'
772 Returns
773 -------
774 new: str
775 The proper key, whether the original (if no deprecation) or the aliased
776 value
778 See Also
779 --------
780 rename
781 """
782 old = key.replace("_", "-")
783 if old in deprecations:
784 new = deprecations[old]
785 if new:
786 warnings.warn(
787 f"Dask configuration key {key!r} has been deprecated; "
788 f"please use {new!r} instead",
789 FutureWarning,
790 )
791 return new
792 else:
793 raise ValueError(f"Dask configuration key {key!r} has been removed")
794 else:
795 return key
798def serialize(data: Any) -> str:
799 """Serialize config data into a string.
801 Typically used to pass config via the ``DASK_INTERNAL_INHERIT_CONFIG`` environment variable.
803 Parameters
804 ----------
805 data: json-serializable object
806 The data to serialize
808 Returns
809 -------
810 serialized_data: str
811 The serialized data as a string
813 """
814 return base64.urlsafe_b64encode(json.dumps(data).encode()).decode()
817def deserialize(data: str) -> Any:
818 """De-serialize config data into the original object.
820 Typically when receiving config via the ``DASK_INTERNAL_INHERIT_CONFIG`` environment variable.
822 Parameters
823 ----------
824 data: str
825 String serialized by :func:`dask.config.serialize`
827 Returns
828 -------
829 deserialized_data: obj
830 The de-serialized data
832 """
833 return json.loads(base64.urlsafe_b64decode(data.encode()).decode())
836def _initialize() -> None:
837 fn = os.path.join(os.path.dirname(__file__), "dask.yaml")
839 with open(fn) as f:
840 _defaults = yaml.safe_load(f)
842 update_defaults(_defaults)
845refresh()
846_initialize()