Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/typing.py: 80%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import abc
4from collections.abc import Callable, Hashable, Mapping, Sequence
5from enum import Enum
6from typing import (
7 TYPE_CHECKING,
8 Any,
9 Literal,
10 Protocol,
11 TypeVar,
12 Union,
13 runtime_checkable,
14)
16if TYPE_CHECKING:
17 # IPython import is relatively slow. Avoid if not necessary
18 # TODO import from typing (requires Python >=3.10)
19 from typing import TypeAlias
21 from IPython.display import DisplayObject
23CollType = TypeVar("CollType", bound="DaskCollection")
24CollType_co = TypeVar("CollType_co", bound="DaskCollection", covariant=True)
25PostComputeCallable = Callable
28Key: TypeAlias = Union[str, int, float, tuple["Key", ...]]
29# FIXME: This type is a little misleading. Low level graphs are often
30# MutableMappings but HLGs are not
31Graph: TypeAlias = Mapping[Key, Any]
32# Potentially nested list of Dask keys
33NestedKeys: TypeAlias = list[Union[Key, "NestedKeys"]]
36class SchedulerGetCallable(Protocol):
37 """Protocol defining the signature of a ``__dask_scheduler__`` callable."""
39 def __call__(
40 self,
41 dsk: Graph,
42 keys: Sequence[Key] | Key,
43 **kwargs: Any,
44 ) -> Any:
45 """Method called as the default scheduler for a collection.
47 Parameters
48 ----------
49 dsk :
50 The task graph.
51 keys :
52 Key(s) corresponding to the desired data.
53 **kwargs :
54 Additional arguments.
56 Returns
57 -------
58 Any
59 Result(s) associated with `keys`
61 """
62 raise NotImplementedError("Inheriting class must implement this method.")
65class PostPersistCallable(Protocol[CollType_co]):
66 """Protocol defining the signature of a ``__dask_postpersist__`` callable."""
68 def __call__(
69 self,
70 dsk: Graph,
71 *args: Any,
72 rename: Mapping[str, str] | None = None,
73 ) -> CollType_co:
74 """Method called to rebuild a persisted collection.
76 Parameters
77 ----------
78 dsk: Mapping
79 A mapping which contains at least the output keys returned
80 by __dask_keys__().
81 *args : Any
82 Additional optional arguments If no extra arguments are
83 necessary, it must be an empty tuple.
84 rename : Mapping[str, str], optional
85 If defined, it indicates that output keys may be changing
86 too; e.g. if the previous output of :meth:`__dask_keys__`
87 was ``[("a", 0), ("a", 1)]``, after calling
88 ``rebuild(dsk, *extra_args, rename={"a": "b"})``
89 it must become ``[("b", 0), ("b", 1)]``.
90 The ``rename`` mapping may not contain the collection
91 name(s); in such case the associated keys do not change.
92 It may contain replacements for unexpected names, which
93 must be ignored.
95 Returns
96 -------
97 Collection
98 An equivalent Dask collection with the same keys as
99 computed through a different graph.
101 """
102 raise NotImplementedError("Inheriting class must implement this method.")
105@runtime_checkable
106class DaskCollection(Protocol):
107 """Protocol defining the interface of a Dask collection."""
109 @abc.abstractmethod
110 def __dask_graph__(self) -> Graph:
111 """The Dask task graph.
113 The core Dask collections (Array, DataFrame, Bag, and Delayed)
114 use a :py:class:`~dask.highlevelgraph.HighLevelGraph` to
115 represent the collection task graph. It is also possible to
116 represent the task graph as a low level graph using a Python
117 dictionary.
119 Returns
120 -------
121 Mapping
122 The Dask task graph. If the instance returns a
123 :py:class:`dask.highlevelgraph.HighLevelGraph` then the
124 :py:func:`__dask_layers__` method must be implemented, as
125 defined by the :py:class:`~dask.typing.HLGDaskCollection`
126 protocol.
128 """
129 raise NotImplementedError("Inheriting class must implement this method.")
131 @abc.abstractmethod
132 def __dask_keys__(self) -> NestedKeys:
133 """The output keys of the task graph.
135 Note that there are additional constraints on keys for a Dask
136 collection than those described in the :doc:`task graph
137 specification documentation <spec>`. These additional
138 constraints are described below.
140 All keys must either be non-empty strings or tuples where the first element is a
141 non-empty string, followed by zero or more arbitrary str, bytes, int, float, or
142 tuples thereof. The non-empty string is commonly known as the *collection name*.
143 All collections embedded in the dask package have exactly one name, but this is
144 not a requirement.
146 These are all valid outputs:
148 - ``[]``
149 - ``["x", "y"]``
150 - ``[[("y", "a", 0), ("y", "a", 1)], [("y", "b", 0), ("y", "b", 1)]``
152 Returns
153 -------
154 list
155 A possibly nested list of keys that represent the outputs
156 of the graph. After computation, the results will be
157 returned in the same layout, with the keys replaced with
158 their corresponding outputs.
160 """
161 raise NotImplementedError("Inheriting class must implement this method.")
163 @abc.abstractmethod
164 def __dask_postcompute__(self) -> tuple[PostComputeCallable, tuple]:
165 """Finalizer function and optional arguments to construct final result.
167 Upon computation each key in the collection will have an in
168 memory result, the postcompute function combines each key's
169 result into a final in memory representation. For example,
170 dask.array.Array concatenates the arrays at each chunk into a
171 final in-memory array.
173 Returns
174 -------
175 PostComputeCallable
176 Callable that receives the sequence of the results of each
177 final key along with optional arguments. An example signature
178 would be ``finalize(results: Sequence[Any], *args)``.
179 tuple[Any, ...]
180 Optional arguments passed to the function following the
181 key results (the `*args` part of the
182 ``PostComputeCallable``. If no additional arguments are to
183 be passed then this must be an empty tuple.
185 """
186 raise NotImplementedError("Inheriting class must implement this method.")
188 @abc.abstractmethod
189 def __dask_postpersist__(self) -> tuple[PostPersistCallable, tuple]:
190 """Rebuilder function and optional arguments to construct a persisted collection.
192 See also the documentation for :py:class:`dask.typing.PostPersistCallable`.
194 Returns
195 -------
196 PostPersistCallable
197 Callable that rebuilds the collection. The signature
198 should be
199 ``rebuild(dsk: Mapping, *args: Any, rename: Mapping[str, str] | None)``
200 (as defined by the
201 :py:class:`~dask.typing.PostPersistCallable` protocol).
202 The callable should return an equivalent Dask collection
203 with the same keys as `self`, but with results that are
204 computed through a different graph. In the case of
205 :py:func:`dask.persist`, the new graph will have just the
206 output keys and the values already computed.
207 tuple[Any, ...]
208 Optional arguments passed to the rebuild callable. If no
209 additional arguments are to be passed then this must be an
210 empty tuple.
212 """
213 raise NotImplementedError("Inheriting class must implement this method.")
215 @abc.abstractmethod
216 def __dask_tokenize__(self) -> Hashable:
217 """Value that must fully represent the object."""
218 raise NotImplementedError("Inheriting class must implement this method.")
220 __dask_optimize__: Any
221 """Given a graph and keys, return a new optimized graph.
223 This method can be either a ``staticmethod`` or a ``classmethod``,
224 but not an ``instancemethod``. For example implementations see the
225 definitions of ``__dask_optimize__`` in the core Dask collections:
226 ``dask.array.Array``, ``dask.dataframe.DataFrame``, etc.
228 Note that graphs and keys are merged before calling
229 ``__dask_optimize__``; as such, the graph and keys passed to
230 this method may represent more than one collection sharing the
231 same optimize method.
233 Parameters
234 ----------
235 dsk : Graph
236 The merged graphs from all collections sharing the same
237 ``__dask_optimize__`` method.
238 keys : Sequence[Key]
239 A list of the outputs from ``__dask_keys__`` from all
240 collections sharing the same ``__dask_optimize__`` method.
241 **kwargs : Any
242 Extra keyword arguments forwarded from the call to
243 ``compute`` or ``persist``. Can be used or ignored as
244 needed.
246 Returns
247 -------
248 MutableMapping
249 The optimized Dask graph.
251 """
253 # FIXME: It is currently not possible to define a staticmethod from a callback protocol
254 # Also, the definition in `is_dask_collection` cannot be satisfied by a
255 # protocol / typing check
256 # staticmethod[SchedulerGetCallable]
257 __dask_scheduler__: staticmethod
258 """The default scheduler ``get`` to use for this object.
260 Usually attached to the class as a staticmethod, e.g.:
262 >>> import dask.threaded
263 >>> class MyCollection:
264 ... # Use the threaded scheduler by default
265 ... __dask_scheduler__ = staticmethod(dask.threaded.get)
267 """
269 @abc.abstractmethod
270 def compute(self, **kwargs: Any) -> Any:
271 """Compute this dask collection.
273 This turns a lazy Dask collection into its in-memory
274 equivalent. For example a Dask array turns into a NumPy array
275 and a Dask dataframe turns into a Pandas dataframe. The entire
276 dataset must fit into memory before calling this operation.
278 Parameters
279 ----------
280 scheduler : string, optional
281 Which scheduler to use like "threads", "synchronous" or
282 "processes". If not provided, the default is to check the
283 global settings first, and then fall back to the
284 collection defaults.
285 optimize_graph : bool, optional
286 If True [default], the graph is optimized before
287 computation. Otherwise the graph is run as is. This can be
288 useful for debugging.
289 kwargs :
290 Extra keywords to forward to the scheduler function.
292 Returns
293 -------
294 The collection's computed result.
296 See Also
297 --------
298 dask.compute
300 """
301 raise NotImplementedError("Inheriting class must implement this method.")
303 @abc.abstractmethod
304 def persist(self: CollType, **kwargs: Any) -> CollType:
305 """Persist this dask collection into memory
307 This turns a lazy Dask collection into a Dask collection with
308 the same metadata, but now with the results fully computed or
309 actively computing in the background.
311 The action of function differs significantly depending on the
312 active task scheduler. If the task scheduler supports
313 asynchronous computing, such as is the case of the
314 dask.distributed scheduler, then persist will return
315 *immediately* and the return value's task graph will contain
316 Dask Future objects. However if the task scheduler only
317 supports blocking computation then the call to persist will
318 *block* and the return value's task graph will contain
319 concrete Python results.
321 This function is particularly useful when using distributed
322 systems, because the results will be kept in distributed
323 memory, rather than returned to the local process as with
324 compute.
326 Parameters
327 ----------
328 scheduler : string, optional
329 Which scheduler to use like "threads", "synchronous" or
330 "processes". If not provided, the default is to check the
331 global settings first, and then fall back to the
332 collection defaults.
333 optimize_graph : bool, optional
334 If True [default], the graph is optimized before
335 computation. Otherwise the graph is run as is. This can be
336 useful for debugging.
337 **kwargs
338 Extra keywords to forward to the scheduler function.
340 Returns
341 -------
342 New dask collections backed by in-memory data
344 See Also
345 --------
346 dask.persist
348 """
349 raise NotImplementedError("Inheriting class must implement this method.")
351 @abc.abstractmethod
352 def visualize(
353 self,
354 filename: str = "mydask",
355 format: str | None = None,
356 optimize_graph: bool = False,
357 **kwargs: Any,
358 ) -> DisplayObject | None:
359 """Render the computation of this object's task graph using graphviz.
361 Requires ``graphviz`` to be installed.
363 Parameters
364 ----------
365 filename : str or None, optional
366 The name of the file to write to disk. If the provided
367 `filename` doesn't include an extension, '.png' will be
368 used by default. If `filename` is None, no file will be
369 written, and we communicate with dot using only pipes.
370 format : {'png', 'pdf', 'dot', 'svg', 'jpeg', 'jpg'}, optional
371 Format in which to write output file. Default is 'png'.
372 optimize_graph : bool, optional
373 If True, the graph is optimized before rendering.
374 Otherwise, the graph is displayed as is. Default is False.
375 color: {None, 'order'}, optional
376 Options to color nodes. Provide ``cmap=`` keyword for
377 additional colormap
378 **kwargs
379 Additional keyword arguments to forward to ``to_graphviz``.
381 Examples
382 --------
383 >>> x.visualize(filename='dask.pdf') # doctest: +SKIP
384 >>> x.visualize(filename='dask.pdf', color='order') # doctest: +SKIP
386 Returns
387 -------
388 result : IPython.display.Image, IPython.display.SVG, or None
389 See dask.dot.dot_graph for more information.
391 See Also
392 --------
393 dask.visualize
394 dask.dot.dot_graph
396 Notes
397 -----
398 For more information on optimization see here:
400 https://docs.dask.org/en/latest/optimize.html
402 """
403 raise NotImplementedError("Inheriting class must implement this method.")
406@runtime_checkable
407class HLGDaskCollection(DaskCollection, Protocol):
408 """Protocol defining a Dask collection that uses HighLevelGraphs.
410 This protocol is nearly identical to
411 :py:class:`~dask.typing.DaskCollection`, with the addition of the
412 ``__dask_layers__`` method (required for collections backed by
413 high level graphs).
415 """
417 @abc.abstractmethod
418 def __dask_layers__(self) -> Sequence[str]:
419 """Names of the HighLevelGraph layers."""
420 raise NotImplementedError("Inheriting class must implement this method.")
423class _NoDefault(Enum):
424 """typing-aware constant to detect when the user omits a parameter and you can't use
425 None.
427 Copied from pandas._libs.lib._NoDefault.
429 Usage
430 -----
431 from dask.typing import NoDefault, no_default
433 def f(x: int | None | NoDefault = no_default) -> int:
434 if x is no_default:
435 ...
436 """
438 no_default = "NO_DEFAULT"
440 def __repr__(self) -> str:
441 return "<no_default>"
444no_default = _NoDefault.no_default
445NoDefault = Literal[_NoDefault.no_default]