Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/typing.py: 82%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import abc
4from collections.abc import Callable, Hashable, Mapping, Sequence
5from enum import Enum
6from typing import (
7 TYPE_CHECKING,
8 Any,
9 Literal,
10 Protocol,
11 TypeAlias,
12 TypeVar,
13 Union,
14 runtime_checkable,
15)
17if TYPE_CHECKING:
18 # IPython import is relatively slow. Avoid if not necessary
19 from IPython.display import DisplayObject
21CollType = TypeVar("CollType", bound="DaskCollection")
22CollType_co = TypeVar("CollType_co", bound="DaskCollection", covariant=True)
23PostComputeCallable = Callable
26Key: TypeAlias = str | int | float | tuple["Key", ...]
27# FIXME: This type is a little misleading. Low level graphs are often
28# MutableMappings but HLGs are not
29Graph: TypeAlias = Mapping[Key, Any]
30# Potentially nested list of Dask keys
31NestedKeys: TypeAlias = list[Union[Key, "NestedKeys"]]
34class SchedulerGetCallable(Protocol):
35 """Protocol defining the signature of a ``__dask_scheduler__`` callable."""
37 def __call__(
38 self,
39 dsk: Graph,
40 keys: Sequence[Key] | Key,
41 **kwargs: Any,
42 ) -> Any:
43 """Method called as the default scheduler for a collection.
45 Parameters
46 ----------
47 dsk :
48 The task graph.
49 keys :
50 Key(s) corresponding to the desired data.
51 **kwargs :
52 Additional arguments.
54 Returns
55 -------
56 Any
57 Result(s) associated with `keys`
59 """
60 raise NotImplementedError("Inheriting class must implement this method.")
63class PostPersistCallable(Protocol[CollType_co]):
64 """Protocol defining the signature of a ``__dask_postpersist__`` callable."""
66 def __call__(
67 self,
68 dsk: Graph,
69 *args: Any,
70 rename: Mapping[str, str] | None = None,
71 ) -> CollType_co:
72 """Method called to rebuild a persisted collection.
74 Parameters
75 ----------
76 dsk: Mapping
77 A mapping which contains at least the output keys returned
78 by __dask_keys__().
79 *args : Any
80 Additional optional arguments If no extra arguments are
81 necessary, it must be an empty tuple.
82 rename : Mapping[str, str], optional
83 If defined, it indicates that output keys may be changing
84 too; e.g. if the previous output of :meth:`__dask_keys__`
85 was ``[("a", 0), ("a", 1)]``, after calling
86 ``rebuild(dsk, *extra_args, rename={"a": "b"})``
87 it must become ``[("b", 0), ("b", 1)]``.
88 The ``rename`` mapping may not contain the collection
89 name(s); in such case the associated keys do not change.
90 It may contain replacements for unexpected names, which
91 must be ignored.
93 Returns
94 -------
95 Collection
96 An equivalent Dask collection with the same keys as
97 computed through a different graph.
99 """
100 raise NotImplementedError("Inheriting class must implement this method.")
103@runtime_checkable
104class DaskCollection(Protocol):
105 """Protocol defining the interface of a Dask collection."""
107 @abc.abstractmethod
108 def __dask_graph__(self) -> Graph:
109 """The Dask task graph.
111 The core Dask collections (Array, DataFrame, Bag, and Delayed)
112 use a :py:class:`~dask.highlevelgraph.HighLevelGraph` to
113 represent the collection task graph. It is also possible to
114 represent the task graph as a low level graph using a Python
115 dictionary.
117 Returns
118 -------
119 Mapping
120 The Dask task graph. If the instance returns a
121 :py:class:`dask.highlevelgraph.HighLevelGraph` then the
122 :py:func:`__dask_layers__` method must be implemented, as
123 defined by the :py:class:`~dask.typing.HLGDaskCollection`
124 protocol.
126 """
127 raise NotImplementedError("Inheriting class must implement this method.")
129 @abc.abstractmethod
130 def __dask_keys__(self) -> NestedKeys:
131 """The output keys of the task graph.
133 Note that there are additional constraints on keys for a Dask
134 collection than those described in the :doc:`task graph
135 specification documentation <spec>`. These additional
136 constraints are described below.
138 All keys must either be non-empty strings or tuples where the first element is a
139 non-empty string, followed by zero or more arbitrary str, bytes, int, float, or
140 tuples thereof. The non-empty string is commonly known as the *collection name*.
141 All collections embedded in the dask package have exactly one name, but this is
142 not a requirement.
144 These are all valid outputs:
146 - ``[]``
147 - ``["x", "y"]``
148 - ``[[("y", "a", 0), ("y", "a", 1)], [("y", "b", 0), ("y", "b", 1)]``
150 Returns
151 -------
152 list
153 A possibly nested list of keys that represent the outputs
154 of the graph. After computation, the results will be
155 returned in the same layout, with the keys replaced with
156 their corresponding outputs.
158 """
159 raise NotImplementedError("Inheriting class must implement this method.")
161 @abc.abstractmethod
162 def __dask_postcompute__(self) -> tuple[PostComputeCallable, tuple]:
163 """Finalizer function and optional arguments to construct final result.
165 Upon computation each key in the collection will have an in
166 memory result, the postcompute function combines each key's
167 result into a final in memory representation. For example,
168 dask.array.Array concatenates the arrays at each chunk into a
169 final in-memory array.
171 Returns
172 -------
173 PostComputeCallable
174 Callable that receives the sequence of the results of each
175 final key along with optional arguments. An example signature
176 would be ``finalize(results: Sequence[Any], *args)``.
177 tuple[Any, ...]
178 Optional arguments passed to the function following the
179 key results (the `*args` part of the
180 ``PostComputeCallable``. If no additional arguments are to
181 be passed then this must be an empty tuple.
183 """
184 raise NotImplementedError("Inheriting class must implement this method.")
186 @abc.abstractmethod
187 def __dask_postpersist__(self) -> tuple[PostPersistCallable, tuple]:
188 """Rebuilder function and optional arguments to construct a persisted collection.
190 See also the documentation for :py:class:`dask.typing.PostPersistCallable`.
192 Returns
193 -------
194 PostPersistCallable
195 Callable that rebuilds the collection. The signature
196 should be
197 ``rebuild(dsk: Mapping, *args: Any, rename: Mapping[str, str] | None)``
198 (as defined by the
199 :py:class:`~dask.typing.PostPersistCallable` protocol).
200 The callable should return an equivalent Dask collection
201 with the same keys as `self`, but with results that are
202 computed through a different graph. In the case of
203 :py:func:`dask.persist`, the new graph will have just the
204 output keys and the values already computed.
205 tuple[Any, ...]
206 Optional arguments passed to the rebuild callable. If no
207 additional arguments are to be passed then this must be an
208 empty tuple.
210 """
211 raise NotImplementedError("Inheriting class must implement this method.")
213 @abc.abstractmethod
214 def __dask_tokenize__(self) -> Hashable:
215 """Value that must fully represent the object."""
216 raise NotImplementedError("Inheriting class must implement this method.")
218 __dask_optimize__: Any
219 """Given a graph and keys, return a new optimized graph.
221 This method can be either a ``staticmethod`` or a ``classmethod``,
222 but not an ``instancemethod``. For example implementations see the
223 definitions of ``__dask_optimize__`` in the core Dask collections:
224 ``dask.array.Array``, ``dask.dataframe.DataFrame``, etc.
226 Note that graphs and keys are merged before calling
227 ``__dask_optimize__``; as such, the graph and keys passed to
228 this method may represent more than one collection sharing the
229 same optimize method.
231 Parameters
232 ----------
233 dsk : Graph
234 The merged graphs from all collections sharing the same
235 ``__dask_optimize__`` method.
236 keys : Sequence[Key]
237 A list of the outputs from ``__dask_keys__`` from all
238 collections sharing the same ``__dask_optimize__`` method.
239 **kwargs : Any
240 Extra keyword arguments forwarded from the call to
241 ``compute`` or ``persist``. Can be used or ignored as
242 needed.
244 Returns
245 -------
246 MutableMapping
247 The optimized Dask graph.
249 """
251 # FIXME: It is currently not possible to define a staticmethod from a callback protocol
252 # Also, the definition in `is_dask_collection` cannot be satisfied by a
253 # protocol / typing check
254 # staticmethod[SchedulerGetCallable]
255 __dask_scheduler__: staticmethod
256 """The default scheduler ``get`` to use for this object.
258 Usually attached to the class as a staticmethod, e.g.:
260 >>> import dask.threaded
261 >>> class MyCollection:
262 ... # Use the threaded scheduler by default
263 ... __dask_scheduler__ = staticmethod(dask.threaded.get)
265 """
267 @abc.abstractmethod
268 def compute(self, **kwargs: Any) -> Any:
269 """Compute this dask collection.
271 This turns a lazy Dask collection into its in-memory
272 equivalent. For example a Dask array turns into a NumPy array
273 and a Dask dataframe turns into a Pandas dataframe. The entire
274 dataset must fit into memory before calling this operation.
276 Parameters
277 ----------
278 scheduler : string, optional
279 Which scheduler to use like "threads", "synchronous" or
280 "processes". If not provided, the default is to check the
281 global settings first, and then fall back to the
282 collection defaults.
283 optimize_graph : bool, optional
284 If True [default], the graph is optimized before
285 computation. Otherwise the graph is run as is. This can be
286 useful for debugging.
287 kwargs :
288 Extra keywords to forward to the scheduler function.
290 Returns
291 -------
292 The collection's computed result.
294 See Also
295 --------
296 dask.compute
298 """
299 raise NotImplementedError("Inheriting class must implement this method.")
301 @abc.abstractmethod
302 def persist(self: CollType, **kwargs: Any) -> CollType:
303 """Persist this dask collection into memory
305 This turns a lazy Dask collection into a Dask collection with
306 the same metadata, but now with the results fully computed or
307 actively computing in the background.
309 The action of function differs significantly depending on the
310 active task scheduler. If the task scheduler supports
311 asynchronous computing, such as is the case of the
312 dask.distributed scheduler, then persist will return
313 *immediately* and the return value's task graph will contain
314 Dask Future objects. However if the task scheduler only
315 supports blocking computation then the call to persist will
316 *block* and the return value's task graph will contain
317 concrete Python results.
319 This function is particularly useful when using distributed
320 systems, because the results will be kept in distributed
321 memory, rather than returned to the local process as with
322 compute.
324 Parameters
325 ----------
326 scheduler : string, optional
327 Which scheduler to use like "threads", "synchronous" or
328 "processes". If not provided, the default is to check the
329 global settings first, and then fall back to the
330 collection defaults.
331 optimize_graph : bool, optional
332 If True [default], the graph is optimized before
333 computation. Otherwise the graph is run as is. This can be
334 useful for debugging.
335 **kwargs
336 Extra keywords to forward to the scheduler function.
338 Returns
339 -------
340 New dask collections backed by in-memory data
342 See Also
343 --------
344 dask.persist
346 """
347 raise NotImplementedError("Inheriting class must implement this method.")
349 @abc.abstractmethod
350 def visualize(
351 self,
352 filename: str = "mydask",
353 format: str | None = None,
354 optimize_graph: bool = False,
355 **kwargs: Any,
356 ) -> DisplayObject | None:
357 """Render the computation of this object's task graph using graphviz.
359 Requires ``graphviz`` to be installed.
361 Parameters
362 ----------
363 filename : str or None, optional
364 The name of the file to write to disk. If the provided
365 `filename` doesn't include an extension, '.png' will be
366 used by default. If `filename` is None, no file will be
367 written, and we communicate with dot using only pipes.
368 format : {'png', 'pdf', 'dot', 'svg', 'jpeg', 'jpg'}, optional
369 Format in which to write output file. Default is 'png'.
370 optimize_graph : bool, optional
371 If True, the graph is optimized before rendering.
372 Otherwise, the graph is displayed as is. Default is False.
373 color: {None, 'order'}, optional
374 Options to color nodes. Provide ``cmap=`` keyword for
375 additional colormap
376 **kwargs
377 Additional keyword arguments to forward to ``to_graphviz``.
379 Examples
380 --------
381 >>> x.visualize(filename='dask.pdf') # doctest: +SKIP
382 >>> x.visualize(filename='dask.pdf', color='order') # doctest: +SKIP
384 Returns
385 -------
386 result : IPython.display.Image, IPython.display.SVG, or None
387 See dask.dot.dot_graph for more information.
389 See Also
390 --------
391 dask.visualize
392 dask.dot.dot_graph
394 Notes
395 -----
396 For more information on optimization see here:
398 https://docs.dask.org/en/latest/optimize.html
400 """
401 raise NotImplementedError("Inheriting class must implement this method.")
404@runtime_checkable
405class HLGDaskCollection(DaskCollection, Protocol):
406 """Protocol defining a Dask collection that uses HighLevelGraphs.
408 This protocol is nearly identical to
409 :py:class:`~dask.typing.DaskCollection`, with the addition of the
410 ``__dask_layers__`` method (required for collections backed by
411 high level graphs).
413 """
415 @abc.abstractmethod
416 def __dask_layers__(self) -> Sequence[str]:
417 """Names of the HighLevelGraph layers."""
418 raise NotImplementedError("Inheriting class must implement this method.")
421class _NoDefault(Enum):
422 """typing-aware constant to detect when the user omits a parameter and you can't use
423 None.
425 Copied from pandas._libs.lib._NoDefault.
427 Usage
428 -----
429 from dask.typing import NoDefault, no_default
431 def f(x: int | None | NoDefault = no_default) -> int:
432 if x is no_default:
433 ...
434 """
436 no_default = "NO_DEFAULT"
438 def __repr__(self) -> str:
439 return "<no_default>"
442no_default = _NoDefault.no_default
443NoDefault = Literal[_NoDefault.no_default]