Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/order.py: 5%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3r""" Static order of nodes in dask graph
5Dask makes decisions on what tasks to prioritize both
7* Dynamically at runtime
8* Statically before runtime
10Dynamically we prefer to run tasks that were just made available. However when
11several tasks become available at the same time we have an opportunity to break
12ties in an intelligent way
14 d
15 |
16 b c
17 \ /
18 a
20For example after we finish ``a`` we can choose to run either ``b`` or ``c``
21next. Making small decisions like this can greatly affect our performance,
22especially because the order in which we run tasks affects the order in which
23we can release memory, which operationally we find to have a large affect on
24many computation. We want to run tasks in such a way that we keep only a small
25amount of data in memory at any given time.
28Static Ordering
29---------------
31And so we create a total ordering over all nodes to serve as a tie breaker. We
32represent this ordering with a dictionary mapping keys to integer values.
33Lower scores have higher priority. These scores correspond to the order in
34which a sequential scheduler would visit each node.
36 {'a': 0,
37 'c': 1,
38 'd': 2,
39 'b': 3}
41There are several ways in which we might order our keys. This is a nuanced
42process that has to take into account many different kinds of workflows, and
43operate efficiently in linear time. We strongly recommend that readers look at
44the docstrings of tests in dask/tests/test_order.py. These tests usually have
45graph types laid out very carefully to show the kinds of situations that often
46arise, and the order we would like to be determined.
48"""
49from collections import defaultdict, deque, namedtuple
50from collections.abc import Callable, Iterable, Mapping, MutableMapping
51from typing import Any, Literal, NamedTuple, overload
53from dask._task_spec import DataNode, DependenciesMapping
54from dask.core import get_deps, getcycle, istask, reverse_dict
55from dask.typing import Key
58class Order(NamedTuple):
59 priority: int
60 critical_path: float | int
63@overload
64def order(
65 dsk: Mapping[Key, Any],
66 dependencies: Mapping[Key, set[Key]] | None = None,
67 *,
68 return_stats: Literal[True],
69) -> dict[Key, Order]: ...
72@overload
73def order(
74 dsk: Mapping[Key, Any],
75 dependencies: Mapping[Key, set[Key]] | None = None,
76 *,
77 return_stats: Literal[False] = False,
78) -> dict[Key, int]: ...
81def order(
82 dsk: Mapping[Key, Any],
83 dependencies: Mapping[Key, set[Key]] | None = None,
84 *,
85 return_stats: bool = False,
86) -> dict[Key, Order] | dict[Key, int]:
87 """Order nodes in dask graph
89 This produces an ordering over our tasks that we use to break ties when
90 executing. We do this ahead of time to reduce a bit of stress on the
91 scheduler and also to assist in static analysis.
93 This currently traverses the graph as a single-threaded scheduler would
94 traverse it.
96 Examples
97 --------
98 >>> inc = lambda x: x + 1
99 >>> add = lambda x, y: x + y
100 >>> dsk = {'a': 1, 'b': 2, 'c': (inc, 'a'), 'd': (add, 'b', 'c')}
101 >>> order(dsk)
102 {'a': 0, 'c': 1, 'b': 2, 'd': 3}
103 """
104 if not dsk:
105 return {}
107 dsk = dict(dsk)
109 dependencies = DependenciesMapping(dsk)
110 dependents = reverse_dict(dependencies)
112 external_keys = set()
113 if len(dependents) != len(dependencies):
114 # There are external keys. Let's add a DataNode to ensure the algo below
115 # is encountering a complete graph. Those artificial data nodes will be
116 # ignored when assigning results
117 for k in dependents:
118 if k not in dependencies:
119 external_keys.add(k)
120 dsk[k] = DataNode(k, object())
122 expected_len = len(dsk)
123 leaf_nodes = {k for k, v in dependents.items() if not v}
124 root_nodes = {k for k, v in dependencies.items() if not v}
126 result: dict[Key, Order | int] = {}
128 # Normalize the graph by removing leaf nodes that are not actual tasks, see
129 # for instance da.store where the task is merely an alias
130 # to multiple keys, i.e. [key1, key2, ...,]
132 # Similarly, we are removing root nodes that are pure data tasks. Those task
133 # are embedded in the run_spec of a task and are not runnable. We have to
134 # assign a priority but their priority has no impact on performance.
135 # The removal of those tasks typically transforms the graph topology in a
136 # way that is simpler to handle
137 all_tasks = False
138 n_removed_leaves = 0
139 requires_data_task = defaultdict(set)
141 while not all_tasks:
142 all_tasks = True
143 for leaf in list(leaf_nodes):
144 if leaf in root_nodes:
145 continue
146 if (
147 not istask(dsk[leaf])
148 # Having a linear chain is fine
149 and len(dependencies[leaf]) > 1
150 ):
151 all_tasks = False
152 # Put non-tasks at the very end since they are merely aliases
153 # and have no impact on performance at all
154 prio = len(dsk) - 1 - n_removed_leaves
155 if return_stats:
156 result[leaf] = Order(prio, -1)
157 else:
158 result[leaf] = prio
159 n_removed_leaves += 1
160 leaf_nodes.remove(leaf)
161 for dep in dependencies[leaf]:
162 dependents[dep].remove(leaf)
163 if not dependents[dep]:
164 leaf_nodes.add(dep)
165 del dsk[leaf]
166 del dependencies[leaf]
167 del dependents[leaf]
169 for root in list(root_nodes):
170 if root in leaf_nodes:
171 continue
172 deps_root = dependents[root]
173 if not istask(dsk[root]) and len(deps_root) > 1:
174 del dsk[root]
175 del dependencies[root]
176 root_nodes.remove(root)
177 del dependents[root]
178 for dep in deps_root:
179 requires_data_task[dep].add(root)
180 if not dependencies[dep]:
181 root_nodes.add(dep)
183 num_needed, total_dependencies = ndependencies(dependencies, dependents)
184 if len(total_dependencies) != len(dsk):
185 cycle = getcycle(dsk, None)
186 raise RuntimeError(
187 "Cycle detected between the following keys:\n -> %s"
188 % "\n -> ".join(str(x) for x in cycle)
189 )
190 assert dependencies is not None
191 roots_connected, max_dependents = _connecting_to_roots(dependencies, dependents)
192 leafs_connected, _ = _connecting_to_roots(dependents, dependencies)
193 i = 0
195 runnable_hull = set()
196 reachable_hull = set()
198 runnable: list[Key] = []
200 known_runnable_paths: dict[Key, list[list[Key]]] = {}
201 known_runnable_paths_pop = known_runnable_paths.pop
203 crit_path_counter = 0
204 scrit_path: set[Key] = set()
205 _crit_path_counter_offset: int | float = 0
206 _sort_keys_cache: dict[Key, tuple[int, int, int, int, str]] = {}
208 def sort_key(x: Key) -> tuple[int, int, int, int, str]:
209 try:
210 return _sort_keys_cache[x]
211 except KeyError:
212 assert dependencies is not None
213 _sort_keys_cache[x] = rv = (
214 total_dependencies[x],
215 len(dependencies[x]),
216 len(roots_connected[x]),
217 -max_dependents[x],
218 # Converting to str is actually faster than creating some
219 # wrapper class and comparisons that come this far are
220 # relatively rare so we prefer fast init over fast comparison
221 str(x),
222 )
223 return rv
225 def add_to_result(item: Key) -> None:
226 # Earlier versions recursed into this method but this could cause
227 # recursion depth errors. This is the only reason for the while loop
228 next_items = [item]
229 nonlocal i
230 nonlocal min_leaf_degree # type: ignore[misc]
231 while next_items:
232 item = next_items.pop()
233 runnable_hull.discard(item)
234 reachable_hull.discard(item)
235 leaf_nodes.discard(item)
236 if item in result:
237 continue
239 while requires_data_task[item]:
240 add_to_result(requires_data_task[item].pop())
241 if return_stats:
242 result[item] = Order(i, crit_path_counter - _crit_path_counter_offset)
243 else:
244 result[item] = i
245 if item not in external_keys:
246 i += 1
247 if item in root_nodes:
248 for leaf in leafs_connected[item]:
249 if leaf in leaf_nodes:
250 degree = leafs_degree.pop(leaf)
251 leafs_per_degree[degree].remove(leaf)
253 new_degree = degree - 1
254 if new_degree > 0:
255 if new_degree < min_leaf_degree:
256 min_leaf_degree = new_degree
257 leafs_per_degree[new_degree].add(leaf)
258 leafs_degree[leaf] = new_degree
259 elif not leafs_per_degree[degree]:
260 assert degree == min_leaf_degree
261 while min_leaf_degree != max_leaf_degree and (
262 min_leaf_degree not in leafs_per_degree
263 or not leafs_per_degree[min_leaf_degree]
264 ):
265 min_leaf_degree += 1
267 # Note: This is a `set` and therefore this introduces a certain
268 # randomness. However, this randomness should not have any impact on
269 # the final result since the `process_runnable` should produce
270 # equivalent results regardless of the order in which runnable is
271 # populated (not identical but equivalent)
272 for dep in dependents.get(item, ()):
273 num_needed[dep] -= 1
274 reachable_hull.add(dep)
275 if not num_needed[dep]:
276 if len(dependents[item]) == 1:
277 next_items.append(dep)
278 else:
279 runnable.append(dep)
281 def _with_offset(func: Callable[..., None]) -> Callable[..., None]:
282 # This decorator is only used to reduce indentation levels. The offset
283 # is purely cosmetical and used for some visualizations and I haven't
284 # settled on how to implement this best so I didn't want to have large
285 # indentations that make things harder to read
287 def wrapper(*args: Any, **kwargs: Any) -> None:
288 nonlocal _crit_path_counter_offset
289 _crit_path_counter_offset = 0.5
290 try:
291 func(*args, **kwargs)
292 finally:
293 _crit_path_counter_offset = 0
295 return wrapper
297 @_with_offset
298 def process_runnables() -> None:
299 """Compute all currently runnable paths and either cache or execute them
301 This is designed to ensure we are running tasks that are free to execute
302 (e.g. the result of a splitter task) not too eagerly. If we executed
303 such free tasks too early we'd be walking the graph in a too wide /
304 breadth first fashion that is not optimal. If instead we were to only
305 execute them once they are needed for a final result, this can cause
306 very high memory pressure since valuable reducers are executed too
307 late.
309 The strategy here is to take all runnable tasks and walk forwards until
310 we hit a reducer node (i.e. a node with more than one dependency). We
311 will remember/cache the path to this reducer node.
312 If this path leads to a leaf or if we find enough runnable paths for a
313 reducer to be runnable, we will execute the path.
315 If instead of a reducer a splitter is encountered that is runnable, we
316 will follow its splitter paths individually and apply the same logic to
317 each branch.
318 """
319 while runnable:
320 candidates = runnable.copy()
321 runnable.clear()
322 while candidates:
323 key = candidates.pop()
324 if key in runnable_hull or key in result:
325 continue
326 if key in leaf_nodes:
327 add_to_result(key)
328 continue
329 path = [key]
330 branches = deque([(0, path)])
332 while branches:
333 nsplits, path = branches.popleft()
334 while True:
335 # Loop invariant. Too expensive to compute at runtime
336 # assert not set(known_runnable_paths).intersection(runnable_hull)
337 current = path[-1]
338 runnable_hull.add(current)
339 deps_downstream = dependents[current]
340 deps_upstream = dependencies[current]
341 if not deps_downstream:
342 # FIXME: The fact that it is possible for
343 # num_needed[current] == 0 means we're doing some
344 # work twice
345 if num_needed[current] <= 1:
346 for k in path:
347 add_to_result(k)
348 else:
349 runnable_hull.discard(current)
350 elif len(path) == 1 or len(deps_upstream) == 1:
351 if len(deps_downstream) > 1:
352 nsplits += 1
353 for d in sorted(deps_downstream, key=sort_key):
354 # This ensures we're only considering splitters
355 # that are genuinely splitting and not
356 # interleaving
357 if len(dependencies[d]) == 1:
358 branch = path.copy()
359 branch.append(d)
360 branches.append((nsplits, branch))
361 break
362 path.extend(deps_downstream)
363 continue
364 elif current in known_runnable_paths:
365 known_runnable_paths[current].append(path)
366 runnable_hull.discard(current)
367 if (
368 len(known_runnable_paths[current])
369 >= num_needed[current]
370 ):
371 pruned_branches: deque[list[Key]] = deque()
372 for path in known_runnable_paths_pop(current):
373 if path[-2] not in result:
374 pruned_branches.append(path)
375 if len(pruned_branches) < num_needed[current]:
376 known_runnable_paths[current] = list(
377 pruned_branches
378 )
379 else:
380 if nsplits > 1:
381 path = []
382 for pruned in pruned_branches:
383 path.extend(pruned)
384 branches.append((nsplits - 1, path))
385 break
387 while pruned_branches:
388 path = pruned_branches.popleft()
389 for k in path:
390 if num_needed[k]:
391 pruned_branches.append(path)
392 break
393 add_to_result(k)
394 elif (
395 len(dependencies[current]) > 1 and num_needed[current] <= 1
396 ):
397 for k in path:
398 add_to_result(k)
399 else:
400 known_runnable_paths[current] = [path]
401 runnable_hull.discard(current)
402 break
404 # Pick strategy
405 # Note: We're trying to be smart here by picking a strategy on how to
406 # determine the critical path. This is not always clear and we may want to
407 # consider just calculating both orderings and picking the one with less
408 # pressure. The only concern to this would be performance but at time of
409 # writing, the most expensive part of ordering is the prep work (mostly
410 # connected roots + sort_key) which can be reused for multiple orderings.
412 # Degree in this context is the number root nodes that have to be loaded for
413 # this leaf to become accessible. Zero means the leaf is already accessible
414 # in which case it _should_ either already be in result or be accessible via
415 # process_runnables
416 # When picking a new target, we prefer the leafs with the least number of
417 # roots that need loading.
418 leafs_degree = {}
419 leafs_per_degree = defaultdict(set)
420 min_leaf_degree = len(dsk)
421 max_leaf_degree = len(dsk)
422 for leaf in leaf_nodes - root_nodes:
423 degree = len(roots_connected[leaf])
424 min_leaf_degree = min(min_leaf_degree, degree)
425 max_leaf_degree = max(max_leaf_degree, degree)
426 leafs_degree[leaf] = degree
427 leafs_per_degree[degree].add(leaf)
429 def get_target() -> Key:
430 # If we're already mid run and there is a runnable_hull we'll attempt to
431 # pick the next target in a way that minimizes the number of additional
432 # root nodes that are needed
433 all_leafs_accessible = min_leaf_degree == max_leaf_degree
434 is_trivial_lookup = not reachable_hull or all_leafs_accessible
435 if not is_trivial_lookup:
436 candidates = reachable_hull & leafs_per_degree[min_leaf_degree]
437 if not candidates:
438 candidates = leafs_per_degree[min_leaf_degree]
439 # Even without reachable hull overlap this should be relatively
440 # small so one full pass should be fine
441 return min(candidates, key=sort_key)
442 else:
443 return leaf_nodes_sorted.pop()
445 def use_longest_path() -> bool:
446 size = 0
447 # Heavy reducer / splitter topologies often benefit from a very
448 # traditional critical path that expresses the longest chain of
449 # tasks.
450 if abs(len(root_nodes) - len(leaf_nodes)) / len(root_nodes) < 0.8:
451 # If the graph stays about the same, we are checking for symmetry
452 # and choose a "quickest path first" approach if the graph appears
453 # to be asymmetrical
454 for r in root_nodes:
455 if not size:
456 size = len(leafs_connected[r])
457 elif size != len(leafs_connected[r]):
458 return False
460 return True
462 # Some topologies benefit if the node with the most dependencies
463 # is used as first choice, others benefit from the opposite.
464 longest_path = use_longest_path()
465 leaf_nodes_sorted = sorted(leaf_nodes, key=sort_key, reverse=not longest_path)
467 # *************************************************************************
468 # CORE ALGORITHM STARTS HERE
469 #
470 # 0. Nomenclature
471 #
472 # - roots: Nodes that have no dependencies (i.e. typically data producers)
473 # - leafs: Nodes that have no dependents (i.e. user requested keys)
474 # - critical_path: The strategic path through the graph.
475 # - walking forwards: Starting from a root node we walk the graph as if we
476 # were to compute the individual nodes, i.e. along dependents
477 # - walking backwards: Starting from a leaf node we walk the graph in
478 # reverse direction, i.e. along dependencies
479 # - runnable: Nodes that are ready to be computed because all their
480 # dependencies are in result
481 # - runnable_hull: Nodes that could be reached and executed without
482 # "walking back". This typically means that these are tasks than can be
483 # executed without loading additional data/executing additional root
484 # nodes
485 # - reachable_hull: Nodes that are touching the result, i.e. all nodes in
486 # reachable_hull have at least one dependency in result
487 #
488 # A. Build the critical path
489 #
490 # To build the critical path we will use a provided `get_target` function
491 # that returns a node that is anywhere in the graph, typically a leaf
492 # node. This node is not required to be runnable. We will walk the graph
493 # backwards, i.e. from leafs to roots and append nodes to the graph as we
494 # go. The critical path is a
495 # linear path in the graph. While this is a viable strategy, it is not
496 # required for the critical path to be a classical "longest path" but it
497 # can define any route through the graph that should be considered as top
498 # priority.
499 #
500 # 1. Determine the target node by calling ``get_target`` and append the
501 # target to the critical path stack
502 # 2. Take the _most valuable_ (max given a `sort_key`) of its dependents
503 # and append it to the critical path stack. This key is the new target.
504 # 3. Repeat step 2 until we reach a node that has no dependencies and is
505 # therefore runnable
506 #
507 # B. Walk the critical path
508 #
509 # Only the first element of the critical path is an actually runnable node
510 # and this is where we're starting the sort. Strategically, this is the
511 # most important goal to achieve but since not all of the nodes are
512 # immediately runnable we have to walk back and compute other nodes first
513 # before we can unlock the critical path. This typically requires us also
514 # to load more data / run more root tasks.
515 # While walking the critical path we will also unlock non-critical tasks
516 # that could be run but are not contributing to our strategic goal. Under
517 # certain circumstances, those runnable tasks are allowed to be run right
518 # away to reduce memory pressure. This is described in more detail in
519 # `process_runnable`.
520 # Given this, the algorithm is as follows:
521 #
522 # 1. Pop the first element of the critical path
523 # 2a. If the node is already in the result, continue
524 # 2b. If the node is not runnable, we will put it back on the stack and
525 # put all its dependencies on the stack and continue with step 1. This
526 # is what we refer to as "walking back"
527 # 2c. Else, we add the node to the result
528 # 3. If we previously had to walk back we will consider running
529 # non-critical tasks (by calling process_runnables)
530 # 4a. If critical path is not empty, repeat step 1
531 # 4b. Go back to A.) and build a new critical path given a new target that
532 # accounts for the already computed nodes.
533 #
534 # *************************************************************************
536 critical_path: list[Key] = []
537 cpath_append = critical_path.append
538 scpath_add = scrit_path.add
540 def path_append(item: Key) -> None:
541 cpath_append(item)
542 scpath_add(item)
544 scpath_update = scrit_path.update
545 cpath_extend = critical_path.extend
547 def path_extend(items: Iterable[Key]) -> None:
548 cpath_extend(items)
549 scpath_update(items)
551 cpath_pop = critical_path.pop
552 scpath_discard = scrit_path.discard
554 def path_pop() -> Key:
555 item = cpath_pop()
556 scpath_discard(item)
557 return item
559 while len(result) < expected_len:
560 crit_path_counter += 1
561 assert not critical_path
562 assert not scrit_path
564 # A. Build the critical path
565 target = get_target()
566 next_deps = dependencies[target]
567 path_append(target)
569 while next_deps:
570 item = max(next_deps, key=sort_key)
571 path_append(item)
572 next_deps = dependencies[item].difference(result)
573 path_extend(next_deps)
575 # B. Walk the critical path
577 walked_back = False
578 while critical_path:
579 item = path_pop()
580 if item in result:
581 continue
582 if num_needed[item]:
583 path_append(item)
584 deps = dependencies[item].difference(result)
585 unknown: list[Key] = []
586 known: list[Key] = []
587 k_append = known.append
588 uk_append = unknown.append
589 for d in sorted(deps, key=sort_key):
590 if d in known_runnable_paths:
591 k_append(d)
592 else:
593 uk_append(d)
594 if len(unknown) > 1:
595 walked_back = True
597 for d in unknown:
598 path_append(d)
599 for d in known:
600 for path in known_runnable_paths_pop(d):
601 path_extend(reversed(path))
603 del deps
604 continue
605 else:
606 if walked_back and len(runnable) < len(critical_path):
607 process_runnables()
608 add_to_result(item)
609 process_runnables()
611 assert len(result) == expected_len
612 for k in external_keys:
613 del result[k]
614 return result # type: ignore
617def _connecting_to_roots(
618 dependencies: Mapping[Key, set[Key]], dependents: Mapping[Key, set[Key]]
619) -> tuple[dict[Key, frozenset[Key]], dict[Key, int]]:
620 """Determine for every node which root nodes are connected to it (i.e.
621 ancestors). If arguments of dependencies and dependents are switched, this
622 can also be used to determine which leaf nodes are connected to which node
623 (i.e. descendants).
625 Also computes a weight that is defined as (cheaper to compute here)
627 `max(len(dependents[k]) for k in connected_roots[key])`
629 """
630 result = {}
631 current = []
632 num_needed = {k: len(v) for k, v in dependencies.items() if v}
633 max_dependents = {}
634 roots = set()
635 for k, v in dependencies.items():
636 if not v:
637 # Note: Hashing the full keys is relatively expensive. Hashing
638 # integers would be much faster so this could be sped up by just
639 # introducing a counter here. However, the order algorithm is also
640 # sometimes interested in the actual keys and the only way to
641 # benefit from the speedup of using integers would be to convert
642 # this back on demand which makes the code very hard to read.
643 roots.add(k)
644 result[k] = frozenset({k})
645 deps = dependents[k]
646 max_dependents[k] = len(deps)
647 for child in deps:
648 num_needed[child] -= 1
649 if not num_needed[child]:
650 current.append(child)
652 dedup_mapping: dict[frozenset[Key], frozenset[Key]] = {}
653 while current:
654 key = current.pop()
655 if key in result:
656 continue
657 for parent in dependents[key]:
658 num_needed[parent] -= 1
659 if not num_needed[parent]:
660 current.append(parent)
661 # At some point, all the roots are the same, particularly for dense
662 # graphs. We don't want to create new sets over and over again
663 transitive_deps = []
664 transitive_deps_ids = set()
665 max_dependents_key = list()
666 for child in dependencies[key]:
667 r_child = result[child]
668 if id(r_child) in transitive_deps_ids:
669 continue
670 transitive_deps.append(r_child)
671 transitive_deps_ids.add(id(r_child))
672 max_dependents_key.append(max_dependents[child])
674 max_dependents[key] = max(max_dependents_key)
675 if len(transitive_deps_ids) == 1:
676 result[key] = transitive_deps[0]
677 else:
678 d = transitive_deps[0]
679 if all(tdeps.issubset(d) for tdeps in transitive_deps[1:]):
680 result[key] = d
681 else:
682 res = set(d)
683 for tdeps in transitive_deps[1:]:
684 res.update(tdeps)
685 # frozenset is unfortunately triggering a copy. In the event of
686 # a cache hit, this is wasted time but we can't hash the set
687 # otherwise (unless we did it manually) and can therefore not
688 # deduplicate without this copy
689 frozen_res = frozenset(res)
690 del res, tdeps
691 try:
692 result[key] = dedup_mapping[frozen_res]
693 except KeyError:
694 dedup_mapping[frozen_res] = frozen_res
695 result[key] = frozen_res
696 del dedup_mapping
698 empty_set: frozenset[Key] = frozenset()
699 for r in roots:
700 result[r] = empty_set
701 return result, max_dependents
704def ndependencies(
705 dependencies: Mapping[Key, set[Key]], dependents: Mapping[Key, set[Key]]
706) -> tuple[dict[Key, int], dict[Key, int]]:
707 """Number of total data elements on which this key depends
709 For each key we return the number of tasks that must be run for us to run
710 this task.
712 Examples
713 --------
714 >>> inc = lambda x: x + 1
715 >>> dsk = {'a': 1, 'b': (inc, 'a'), 'c': (inc, 'b')}
716 >>> dependencies, dependents = get_deps(dsk)
717 >>> num_dependencies, total_dependencies = ndependencies(dependencies, dependents)
718 >>> sorted(total_dependencies.items())
719 [('a', 1), ('b', 2), ('c', 3)]
721 Returns
722 -------
723 num_dependencies: Dict[key, int]
724 total_dependencies: Dict[key, int]
725 """
726 num_needed = {}
727 result = {}
728 for k, v in dependencies.items():
729 num_needed[k] = len(v)
730 if not v:
731 result[k] = 1
733 num_dependencies = num_needed.copy()
734 current: list[Key] = []
735 current_pop = current.pop
736 current_append = current.append
738 for key in result:
739 for parent in dependents[key]:
740 num_needed[parent] -= 1
741 if not num_needed[parent]:
742 current_append(parent)
743 while current:
744 key = current_pop()
745 result[key] = 1 + sum(result[child] for child in dependencies[key])
746 for parent in dependents[key]:
747 num_needed[parent] -= 1
748 if not num_needed[parent]:
749 current_append(parent)
750 return num_dependencies, result
753OrderInfo = namedtuple(
754 "OrderInfo",
755 (
756 "order",
757 "age",
758 "num_data_when_run",
759 "num_data_when_released",
760 "num_dependencies_freed",
761 ),
762)
765def diagnostics(
766 dsk: MutableMapping[Key, Any],
767 o: Mapping[Key, int] | None = None,
768 dependencies: MutableMapping[Key, set[Key]] | None = None,
769) -> tuple[dict[Key, OrderInfo], list[int]]:
770 """Simulate runtime metrics as though running tasks one at a time in order.
772 These diagnostics can help reveal behaviors of and issues with ``order``.
774 Returns a dict of `namedtuple("OrderInfo")` and a list of the number of outputs held over time.
776 OrderInfo fields:
777 - order : the order in which the node is run.
778 - age : how long the output of a node is held.
779 - num_data_when_run : the number of outputs held in memory when a node is run.
780 - num_data_when_released : the number of outputs held in memory when the output is released.
781 - num_dependencies_freed : the number of dependencies freed by running the node.
782 """
783 if dependencies is None:
784 dependencies, dependents = get_deps(dsk)
785 else:
786 dependents = reverse_dict(dependencies)
787 assert dependencies is not None
788 if o is None:
789 o = order(dsk, dependencies=dependencies, return_stats=False)
791 pressure = []
792 num_in_memory = 0
793 age = {}
794 runpressure = {}
795 releasepressure = {}
796 freed = {}
797 num_needed = {key: len(val) for key, val in dependents.items()}
798 for i, key in enumerate(sorted(dsk, key=o.__getitem__)):
799 pressure.append(num_in_memory)
800 runpressure[key] = num_in_memory
801 released = 0
802 for dep in dependencies[key]:
803 num_needed[dep] -= 1
804 if num_needed[dep] == 0:
805 age[dep] = i - o[dep]
806 releasepressure[dep] = num_in_memory
807 released += 1
808 freed[key] = released
809 if dependents[key]:
810 num_in_memory -= released - 1
811 else:
812 age[key] = 0
813 releasepressure[key] = num_in_memory
814 num_in_memory -= released
816 rv = {
817 key: OrderInfo(
818 val, age[key], runpressure[key], releasepressure[key], freed[key]
819 )
820 for key, val in o.items()
821 }
822 return rv, pressure
825def _f() -> None: ...
828def sanitize_dsk(dsk: MutableMapping[Key, Any]) -> dict:
829 """Take a dask graph and replace callables with a dummy function and remove
830 payload data like numpy arrays, dataframes, etc.
831 """
832 from dask._task_spec import Task, TaskRef
834 new = {}
835 deps = DependenciesMapping(dsk)
836 for key, values in deps.items():
837 new[key] = Task(key, _f, *(TaskRef(k) for k in values))
838 return new