Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/dask/typing.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import abc
4from collections.abc import Callable, Hashable, Mapping, Sequence
5from enum import Enum
6from typing import (
7 TYPE_CHECKING,
8 Any,
9 Literal,
10 Protocol,
11 TypeVar,
12 Union,
13 runtime_checkable,
14)
16if TYPE_CHECKING:
17 # IPython import is relatively slow. Avoid if not necessary
18 from IPython.display import DisplayObject
20 # TODO import from typing (requires Python >=3.10)
21 from typing_extensions import TypeAlias
23CollType = TypeVar("CollType", bound="DaskCollection")
24CollType_co = TypeVar("CollType_co", bound="DaskCollection", covariant=True)
25PostComputeCallable = Callable
28Key: TypeAlias = Union[str, bytes, int, float, tuple["Key", ...]]
29# FIXME: This type is a little misleading. Low level graphs are often
30# MutableMappings but HLGs are not
31Graph: TypeAlias = Mapping[Key, Any]
32# Potentially nested list of Dask keys
33NestedKeys: TypeAlias = list[Union[Key, "NestedKeys"]]
36class SchedulerGetCallable(Protocol):
37 """Protocol defining the signature of a ``__dask_scheduler__`` callable."""
39 def __call__(
40 self,
41 dsk: Graph,
42 keys: Sequence[Key] | Key,
43 **kwargs: Any,
44 ) -> Any:
45 """Method called as the default scheduler for a collection.
47 Parameters
48 ----------
49 dsk :
50 The task graph.
51 keys :
52 Key(s) corresponding to the desired data.
53 **kwargs :
54 Additional arguments.
56 Returns
57 -------
58 Any
59 Result(s) associated with `keys`
61 """
62 raise NotImplementedError("Inheriting class must implement this method.")
65class PostPersistCallable(Protocol[CollType_co]):
66 """Protocol defining the signature of a ``__dask_postpersist__`` callable."""
68 def __call__(
69 self,
70 dsk: Graph,
71 *args: Any,
72 rename: Mapping[str, str] | None = None,
73 ) -> CollType_co:
74 """Method called to rebuild a persisted collection.
76 Parameters
77 ----------
78 dsk: Mapping
79 A mapping which contains at least the output keys returned
80 by __dask_keys__().
81 *args : Any
82 Additional optional arguments If no extra arguments are
83 necessary, it must be an empty tuple.
84 rename : Mapping[str, str], optional
85 If defined, it indicates that output keys may be changing
86 too; e.g. if the previous output of :meth:`__dask_keys__`
87 was ``[("a", 0), ("a", 1)]``, after calling
88 ``rebuild(dsk, *extra_args, rename={"a": "b"})``
89 it must become ``[("b", 0), ("b", 1)]``.
90 The ``rename`` mapping may not contain the collection
91 name(s); in such case the associated keys do not change.
92 It may contain replacements for unexpected names, which
93 must be ignored.
95 Returns
96 -------
97 Collection
98 An equivalent Dask collection with the same keys as
99 computed through a different graph.
101 """
102 raise NotImplementedError("Inheriting class must implement this method.")
105@runtime_checkable
106class DaskCollection(Protocol):
107 """Protocol defining the interface of a Dask collection."""
109 @abc.abstractmethod
110 def __dask_graph__(self) -> Graph:
111 """The Dask task graph.
113 The core Dask collections (Array, DataFrame, Bag, and Delayed)
114 use a :py:class:`~dask.highlevelgraph.HighLevelGraph` to
115 represent the collection task graph. It is also possible to
116 represent the task graph as a low level graph using a Python
117 dictionary.
119 Returns
120 -------
121 Mapping
122 The Dask task graph. If the instance returns a
123 :py:class:`dask.highlevelgraph.HighLevelGraph` then the
124 :py:func:`__dask_layers__` method must be implemented, as
125 defined by the :py:class:`~dask.typing.HLGDaskCollection`
126 protocol.
128 """
129 raise NotImplementedError("Inheriting class must implement this method.")
131 @abc.abstractmethod
132 def __dask_keys__(self) -> NestedKeys:
133 """The output keys of the task graph.
135 Note that there are additional constraints on keys for a Dask
136 collection than those described in the :doc:`task graph
137 specification documentation <spec>`. These additional
138 constraints are described below.
140 All keys must either be non-empty strings or tuples where the first element is a
141 non-empty string, followed by zero or more arbitrary str, bytes, int, float, or
142 tuples thereof. The non-empty string is commonly known as the *collection name*.
143 All collections embedded in the dask package have exactly one name, but this is
144 not a requirement.
146 These are all valid outputs:
148 - ``[]``
149 - ``["x", "y"]``
150 - ``[[("y", "a", 0), ("y", "a", 1)], [("y", "b", 0), ("y", "b", 1)]``
152 Returns
153 -------
154 list
155 A possibly nested list of keys that represent the outputs
156 of the graph. After computation, the results will be
157 returned in the same layout, with the keys replaced with
158 their corresponding outputs.
160 """
161 raise NotImplementedError("Inheriting class must implement this method.")
163 @abc.abstractmethod
164 def __dask_postcompute__(self) -> tuple[PostComputeCallable, tuple]:
165 """Finalizer function and optional arguments to construct final result.
167 Upon computation each key in the collection will have an in
168 memory result, the postcompute function combines each key's
169 result into a final in memory representation. For example,
170 dask.array.Array concatenates the arrays at each chunk into a
171 final in-memory array.
173 Returns
174 -------
175 PostComputeCallable
176 Callable that receives the sequence of the results of each
177 final key along with optional arguments. An example signature
178 would be ``finalize(results: Sequence[Any], *args)``.
179 tuple[Any, ...]
180 Optional arguments passed to the function following the
181 key results (the `*args` part of the
182 ``PostComputeCallable``. If no additional arguments are to
183 be passed then this must be an empty tuple.
185 """
186 raise NotImplementedError("Inheriting class must implement this method.")
188 @abc.abstractmethod
189 def __dask_postpersist__(self) -> tuple[PostPersistCallable, tuple]:
190 """Rebuilder function and optional arguments to construct a persisted collection.
192 See also the documentation for :py:class:`dask.typing.PostPersistCallable`.
194 Returns
195 -------
196 PostPersistCallable
197 Callable that rebuilds the collection. The signature
198 should be
199 ``rebuild(dsk: Mapping, *args: Any, rename: Mapping[str, str] | None)``
200 (as defined by the
201 :py:class:`~dask.typing.PostPersistCallable` protocol).
202 The callable should return an equivalent Dask collection
203 with the same keys as `self`, but with results that are
204 computed through a different graph. In the case of
205 :py:func:`dask.persist`, the new graph will have just the
206 output keys and the values already computed.
207 tuple[Any, ...]
208 Optional arguments passed to the rebuild callable. If no
209 additional arguments are to be passed then this must be an
210 empty tuple.
212 """
213 raise NotImplementedError("Inheriting class must implement this method.")
215 @abc.abstractmethod
216 def __dask_tokenize__(self) -> Hashable:
217 """Value that must fully represent the object."""
218 raise NotImplementedError("Inheriting class must implement this method.")
220 __dask_optimize__: Any
221 """Given a graph and keys, return a new optimized graph.
223 This method can be either a ``staticmethod`` or a ``classmethod``,
224 but not an ``instancemethod``. For example implementations see the
225 definitions of ``__dask_optimize__`` in the core Dask collections:
226 ``dask.array.Array``, ``dask.dataframe.DataFrame``, etc.
228 Note that graphs and keys are merged before calling
229 ``__dask_optimize__``; as such, the graph and keys passed to
230 this method may represent more than one collection sharing the
231 same optimize method.
233 Parameters
234 ----------
235 dsk : Graph
236 The merged graphs from all collections sharing the same
237 ``__dask_optimize__`` method.
238 keys : Sequence[Key]
239 A list of the outputs from ``__dask_keys__`` from all
240 collections sharing the same ``__dask_optimize__`` method.
241 **kwargs : Any
242 Extra keyword arguments forwarded from the call to
243 ``compute`` or ``persist``. Can be used or ignored as
244 needed.
246 Returns
247 -------
248 MutableMapping
249 The optimized Dask graph.
251 """
253 __dask_scheduler__: staticmethod[SchedulerGetCallable]
254 """The default scheduler ``get`` to use for this object.
256 Usually attached to the class as a staticmethod, e.g.:
258 >>> import dask.threaded
259 >>> class MyCollection:
260 ... # Use the threaded scheduler by default
261 ... __dask_scheduler__ = staticmethod(dask.threaded.get)
263 """
265 @abc.abstractmethod
266 def compute(self, **kwargs: Any) -> Any:
267 """Compute this dask collection.
269 This turns a lazy Dask collection into its in-memory
270 equivalent. For example a Dask array turns into a NumPy array
271 and a Dask dataframe turns into a Pandas dataframe. The entire
272 dataset must fit into memory before calling this operation.
274 Parameters
275 ----------
276 scheduler : string, optional
277 Which scheduler to use like "threads", "synchronous" or
278 "processes". If not provided, the default is to check the
279 global settings first, and then fall back to the
280 collection defaults.
281 optimize_graph : bool, optional
282 If True [default], the graph is optimized before
283 computation. Otherwise the graph is run as is. This can be
284 useful for debugging.
285 kwargs :
286 Extra keywords to forward to the scheduler function.
288 Returns
289 -------
290 The collection's computed result.
292 See Also
293 --------
294 dask.compute
296 """
297 raise NotImplementedError("Inheriting class must implement this method.")
299 @abc.abstractmethod
300 def persist(self: CollType, **kwargs: Any) -> CollType:
301 """Persist this dask collection into memory
303 This turns a lazy Dask collection into a Dask collection with
304 the same metadata, but now with the results fully computed or
305 actively computing in the background.
307 The action of function differs significantly depending on the
308 active task scheduler. If the task scheduler supports
309 asynchronous computing, such as is the case of the
310 dask.distributed scheduler, then persist will return
311 *immediately* and the return value's task graph will contain
312 Dask Future objects. However if the task scheduler only
313 supports blocking computation then the call to persist will
314 *block* and the return value's task graph will contain
315 concrete Python results.
317 This function is particularly useful when using distributed
318 systems, because the results will be kept in distributed
319 memory, rather than returned to the local process as with
320 compute.
322 Parameters
323 ----------
324 scheduler : string, optional
325 Which scheduler to use like "threads", "synchronous" or
326 "processes". If not provided, the default is to check the
327 global settings first, and then fall back to the
328 collection defaults.
329 optimize_graph : bool, optional
330 If True [default], the graph is optimized before
331 computation. Otherwise the graph is run as is. This can be
332 useful for debugging.
333 **kwargs
334 Extra keywords to forward to the scheduler function.
336 Returns
337 -------
338 New dask collections backed by in-memory data
340 See Also
341 --------
342 dask.persist
344 """
345 raise NotImplementedError("Inheriting class must implement this method.")
347 @abc.abstractmethod
348 def visualize(
349 self,
350 filename: str = "mydask",
351 format: str | None = None,
352 optimize_graph: bool = False,
353 **kwargs: Any,
354 ) -> DisplayObject | None:
355 """Render the computation of this object's task graph using graphviz.
357 Requires ``graphviz`` to be installed.
359 Parameters
360 ----------
361 filename : str or None, optional
362 The name of the file to write to disk. If the provided
363 `filename` doesn't include an extension, '.png' will be
364 used by default. If `filename` is None, no file will be
365 written, and we communicate with dot using only pipes.
366 format : {'png', 'pdf', 'dot', 'svg', 'jpeg', 'jpg'}, optional
367 Format in which to write output file. Default is 'png'.
368 optimize_graph : bool, optional
369 If True, the graph is optimized before rendering.
370 Otherwise, the graph is displayed as is. Default is False.
371 color: {None, 'order'}, optional
372 Options to color nodes. Provide ``cmap=`` keyword for
373 additional colormap
374 **kwargs
375 Additional keyword arguments to forward to ``to_graphviz``.
377 Examples
378 --------
379 >>> x.visualize(filename='dask.pdf') # doctest: +SKIP
380 >>> x.visualize(filename='dask.pdf', color='order') # doctest: +SKIP
382 Returns
383 -------
384 result : IPython.display.Image, IPython.display.SVG, or None
385 See dask.dot.dot_graph for more information.
387 See Also
388 --------
389 dask.visualize
390 dask.dot.dot_graph
392 Notes
393 -----
394 For more information on optimization see here:
396 https://docs.dask.org/en/latest/optimize.html
398 """
399 raise NotImplementedError("Inheriting class must implement this method.")
402@runtime_checkable
403class HLGDaskCollection(DaskCollection, Protocol):
404 """Protocol defining a Dask collection that uses HighLevelGraphs.
406 This protocol is nearly identical to
407 :py:class:`~dask.typing.DaskCollection`, with the addition of the
408 ``__dask_layers__`` method (required for collections backed by
409 high level graphs).
411 """
413 @abc.abstractmethod
414 def __dask_layers__(self) -> Sequence[str]:
415 """Names of the HighLevelGraph layers."""
416 raise NotImplementedError("Inheriting class must implement this method.")
419class _NoDefault(Enum):
420 """typing-aware constant to detect when the user omits a parameter and you can't use
421 None.
423 Copied from pandas._libs.lib._NoDefault.
425 Usage
426 -----
427 from dask.typing import NoDefault, no_default
429 def f(x: int | None | NoDefault = no_default) -> int:
430 if x is no_default:
431 ...
432 """
434 no_default = "NO_DEFAULT"
436 def __repr__(self) -> str:
437 return "<no_default>"
440no_default = _NoDefault.no_default
441NoDefault = Literal[_NoDefault.no_default]