Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/order.py: 5%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3r""" Static order of nodes in dask graph
5Dask makes decisions on what tasks to prioritize both
7* Dynamically at runtime
8* Statically before runtime
10Dynamically we prefer to run tasks that were just made available. However when
11several tasks become available at the same time we have an opportunity to break
12ties in an intelligent way
14 d
15 |
16 b c
17 \ /
18 a
20For example after we finish ``a`` we can choose to run either ``b`` or ``c``
21next. Making small decisions like this can greatly affect our performance,
22especially because the order in which we run tasks affects the order in which
23we can release memory, which operationally we find to have a large affect on
24many computation. We want to run tasks in such a way that we keep only a small
25amount of data in memory at any given time.
28Static Ordering
29---------------
31And so we create a total ordering over all nodes to serve as a tie breaker. We
32represent this ordering with a dictionary mapping keys to integer values.
33Lower scores have higher priority. These scores correspond to the order in
34which a sequential scheduler would visit each node.
36 {'a': 0,
37 'c': 1,
38 'd': 2,
39 'b': 3}
41There are several ways in which we might order our keys. This is a nuanced
42process that has to take into account many different kinds of workflows, and
43operate efficiently in linear time. We strongly recommend that readers look at
44the docstrings of tests in dask/tests/test_order.py. These tests usually have
45graph types laid out very carefully to show the kinds of situations that often
46arise, and the order we would like to be determined.
48"""
49from collections import defaultdict, deque, namedtuple
50from collections.abc import Callable, Iterable, Mapping, MutableMapping
51from typing import Any, Literal, NamedTuple, overload
53from dask._task_spec import DataNode, DependenciesMapping
54from dask.core import get_deps, getcycle, istask, reverse_dict
55from dask.typing import Key
58class Order(NamedTuple):
59 priority: int
60 critical_path: float | int
63@overload
64def order(
65 dsk: Mapping[Key, Any],
66 dependencies: Mapping[Key, set[Key]] | None = None,
67 *,
68 return_stats: Literal[True],
69) -> dict[Key, Order]: ...
72@overload
73def order(
74 dsk: Mapping[Key, Any],
75 dependencies: Mapping[Key, set[Key]] | None = None,
76 *,
77 return_stats: Literal[False] = False,
78) -> dict[Key, int]: ...
81def order(
82 dsk: Mapping[Key, Any],
83 dependencies: Mapping[Key, set[Key]] | None = None,
84 *,
85 return_stats: bool = False,
86) -> dict[Key, Order] | dict[Key, int]:
87 """Order nodes in dask graph
89 This produces an ordering over our tasks that we use to break ties when
90 executing. We do this ahead of time to reduce a bit of stress on the
91 scheduler and also to assist in static analysis.
93 This currently traverses the graph as a single-threaded scheduler would
94 traverse it.
96 Examples
97 --------
98 >>> inc = lambda x: x + 1
99 >>> add = lambda x, y: x + y
100 >>> dsk = {'a': 1, 'b': 2, 'c': (inc, 'a'), 'd': (add, 'b', 'c')}
101 >>> order(dsk)
102 {'a': 0, 'c': 1, 'b': 2, 'd': 3}
103 """
104 if not dsk:
105 return {}
107 dsk = dict(dsk)
109 dependencies = DependenciesMapping(dsk)
110 dependents = reverse_dict(dependencies)
112 external_keys = set()
113 if len(dependents) != len(dependencies):
114 # There are external keys. Let's add a DataNode to ensure the algo below
115 # is encountering a complete graph. Those artificial data nodes will be
116 # ignored when assigning results
117 for k in dependents:
118 if k not in dependencies:
119 external_keys.add(k)
120 dsk[k] = DataNode(k, object())
122 expected_len = len(dsk)
123 leaf_nodes = {k for k, v in dependents.items() if not v}
124 root_nodes = {k for k, v in dependencies.items() if not v}
126 result: dict[Key, Order | int] = {}
128 # Normalize the graph by removing leaf nodes that are not actual tasks, see
129 # for instance da.store where the task is merely an alias
130 # to multiple keys, i.e. [key1, key2, ...,]
132 # Similarly, we are removing root nodes that are pure data tasks. Those task
133 # are embedded in the run_spec of a task and are not runnable. We have to
134 # assign a priority but their priority has no impact on performance.
135 # The removal of those tasks typically transforms the graph topology in a
136 # way that is simpler to handle
137 all_tasks = False
138 n_removed_leaves = 0
139 requires_data_task = defaultdict(set)
141 while not all_tasks:
142 all_tasks = True
143 for leaf in list(leaf_nodes):
144 if leaf in root_nodes:
145 continue
146 if (
147 not istask(dsk[leaf])
148 # Having a linear chain is fine
149 and len(dependencies[leaf]) > 1
150 ):
151 all_tasks = False
152 # Put non-tasks at the very end since they are merely aliases
153 # and have no impact on performance at all
154 prio = len(dsk) - 1 - n_removed_leaves
155 if return_stats:
156 result[leaf] = Order(prio, -1)
157 else:
158 result[leaf] = prio
159 n_removed_leaves += 1
160 leaf_nodes.remove(leaf)
161 for dep in dependencies[leaf]:
162 dependents[dep].remove(leaf)
163 if not dependents[dep]:
164 leaf_nodes.add(dep)
165 del dsk[leaf]
166 del dependencies[leaf]
167 del dependents[leaf]
169 for root in list(root_nodes):
170 if root in leaf_nodes:
171 continue
172 deps_root = dependents[root]
173 if not istask(dsk[root]) and len(deps_root) > 1:
174 del dsk[root]
175 del dependencies[root]
176 root_nodes.remove(root)
177 del dependents[root]
178 for dep in deps_root:
179 requires_data_task[dep].add(root)
180 if not dependencies[dep]:
181 root_nodes.add(dep)
183 num_needed, total_dependencies = ndependencies(dependencies, dependents)
184 if len(total_dependencies) != len(dsk):
185 cycle = getcycle(dsk, None)
186 raise RuntimeError(
187 "Cycle detected between the following keys:\n -> %s"
188 % "\n -> ".join(str(x) for x in cycle)
189 )
190 assert dependencies is not None
191 roots_connected, max_dependents = _connecting_to_roots(dependencies, dependents)
192 leafs_connected, _ = _connecting_to_roots(dependents, dependencies)
193 i = 0
195 runnable_hull = set()
196 reachable_hull = set()
198 runnable: list[Key] = []
200 known_runnable_paths: dict[Key, list[list[Key]]] = {}
201 known_runnable_paths_pop = known_runnable_paths.pop
203 crit_path_counter = 0
204 scrit_path: set[Key] = set()
205 _crit_path_counter_offset: int | float = 0
206 _sort_keys_cache: dict[Key, tuple[int, int, int, int, str]] = {}
208 def sort_key(x: Key) -> tuple[int, int, int, int, str]:
209 try:
210 return _sort_keys_cache[x]
211 except KeyError:
212 assert dependencies is not None
213 _sort_keys_cache[x] = rv = (
214 total_dependencies[x],
215 len(dependencies[x]),
216 len(roots_connected[x]),
217 -max_dependents[x],
218 # Converting to str is actually faster than creating some
219 # wrapper class and comparisons that come this far are
220 # relatively rare so we prefer fast init over fast comparison
221 str(x),
222 )
223 return rv
225 def add_to_result(item: Key) -> None:
226 # Earlier versions recursed into this method but this could cause
227 # recursion depth errors. This is the only reason for the while loop
228 next_items = [item]
229 nonlocal i
230 nonlocal min_leaf_degree # type: ignore[misc]
231 while next_items:
232 item = next_items.pop()
233 runnable_hull.discard(item)
234 reachable_hull.discard(item)
235 leaf_nodes.discard(item)
236 if item in result:
237 continue
239 while requires_data_task[item]:
240 add_to_result(requires_data_task[item].pop())
241 if return_stats:
242 result[item] = Order(i, crit_path_counter - _crit_path_counter_offset)
243 else:
244 result[item] = i
245 if item not in external_keys:
246 i += 1
247 if item in root_nodes:
248 for leaf in leafs_connected[item]:
249 if leaf in leaf_nodes:
250 degree = leafs_degree.pop(leaf)
251 leafs_per_degree[degree].remove(leaf)
253 new_degree = degree - 1
254 if new_degree > 0:
255 if new_degree < min_leaf_degree:
256 min_leaf_degree = new_degree
257 leafs_per_degree[new_degree].add(leaf)
258 leafs_degree[leaf] = new_degree
259 elif not leafs_per_degree[degree]:
260 assert degree == min_leaf_degree
261 while min_leaf_degree != max_leaf_degree and (
262 min_leaf_degree not in leafs_per_degree
263 or not leafs_per_degree[min_leaf_degree]
264 ):
265 min_leaf_degree += 1
267 # Note: This is a `set` and therefore this introduces a certain
268 # randomness. However, this randomness should not have any impact on
269 # the final result since the `process_runnable` should produce
270 # equivalent results regardless of the order in which runnable is
271 # populated (not identical but equivalent)
272 for dep in dependents.get(item, ()):
273 num_needed[dep] -= 1
274 reachable_hull.add(dep)
275 if not num_needed[dep]:
276 if len(dependents[item]) == 1:
277 next_items.append(dep)
278 else:
279 runnable.append(dep)
281 def _with_offset(func: Callable[..., None]) -> Callable[..., None]:
282 # This decorator is only used to reduce indentation levels. The offset
283 # is purely cosmetical and used for some visualizations and I haven't
284 # settled on how to implement this best so I didn't want to have large
285 # indentations that make things harder to read
287 def wrapper(*args: Any, **kwargs: Any) -> None:
288 nonlocal _crit_path_counter_offset
289 _crit_path_counter_offset = 0.5
290 try:
291 func(*args, **kwargs)
292 finally:
293 _crit_path_counter_offset = 0
295 return wrapper
297 @_with_offset
298 def process_runnables() -> None:
299 """Compute all currently runnable paths and either cache or execute them
301 This is designed to ensure we are running tasks that are free to execute
302 (e.g. the result of a splitter task) not too eagerly. If we executed
303 such free tasks too early we'd be walking the graph in a too wide /
304 breadth first fashion that is not optimal. If instead we were to only
305 execute them once they are needed for a final result, this can cause
306 very high memory pressure since valuable reducers are executed too
307 late.
309 The strategy here is to take all runnable tasks and walk forwards until
310 we hit a reducer node (i.e. a node with more than one dependency). We
311 will remember/cache the path to this reducer node.
312 If this path leads to a leaf or if we find enough runnable paths for a
313 reducer to be runnable, we will execute the path.
315 If instead of a reducer a splitter is encountered that is runnable, we
316 will follow its splitter paths individually and apply the same logic to
317 each branch.
318 """
319 while runnable:
320 candidates = runnable.copy()
321 runnable.clear()
322 while candidates:
323 key = candidates.pop()
324 if key in runnable_hull or key in result:
325 continue
326 if key in leaf_nodes:
327 add_to_result(key)
328 continue
329 path = [key]
330 branches = deque([(0, path)])
332 while branches:
333 nsplits, path = branches.popleft()
334 while True:
335 # Loop invariant. Too expensive to compute at runtime
336 # assert not set(known_runnable_paths).intersection(runnable_hull)
337 current = path[-1]
338 runnable_hull.add(current)
339 deps_downstream = dependents[current]
340 deps_upstream = dependencies[current]
341 if not deps_downstream:
342 # FIXME: The fact that it is possible for
343 # num_needed[current] == 0 means we're doing some
344 # work twice
345 if num_needed[current] <= 1:
346 for k in path:
347 add_to_result(k)
348 else:
349 runnable_hull.discard(current)
350 elif len(path) == 1 or len(deps_upstream) == 1:
351 if len(deps_downstream) > 1:
352 nsplits += 1
353 for d in sorted(deps_downstream, key=sort_key):
354 # This ensures we're only considering splitters
355 # that are genuinely splitting and not
356 # interleaving
357 if len(dependencies[d]) == 1:
358 branch = path.copy()
359 branch.append(d)
360 branches.append((nsplits, branch))
361 break
362 path.extend(deps_downstream)
363 continue
364 elif current in known_runnable_paths:
365 known_runnable_paths[current].append(path)
366 runnable_hull.discard(current)
367 if (
368 len(known_runnable_paths[current])
369 >= num_needed[current]
370 ):
371 pruned_branches: deque[list[Key]] = deque()
372 for path in known_runnable_paths_pop(current):
373 if path[-2] not in result:
374 pruned_branches.append(path)
375 if len(pruned_branches) < num_needed[current]:
376 known_runnable_paths[current] = list(
377 pruned_branches
378 )
379 else:
380 if nsplits > 1:
381 path = []
382 for pruned in pruned_branches:
383 path.extend(pruned)
384 branches.append((nsplits - 1, path))
385 break
387 while pruned_branches:
388 path = pruned_branches.popleft()
389 for k in path:
390 if num_needed[k]:
391 pruned_branches.append(path)
392 break
393 add_to_result(k)
394 else:
395 if (
396 len(dependencies[current]) > 1
397 and num_needed[current] <= 1
398 ):
399 for k in path:
400 add_to_result(k)
401 else:
402 known_runnable_paths[current] = [path]
403 runnable_hull.discard(current)
404 break
406 # Pick strategy
407 # Note: We're trying to be smart here by picking a strategy on how to
408 # determine the critical path. This is not always clear and we may want to
409 # consider just calculating both orderings and picking the one with less
410 # pressure. The only concern to this would be performance but at time of
411 # writing, the most expensive part of ordering is the prep work (mostly
412 # connected roots + sort_key) which can be reused for multiple orderings.
414 # Degree in this context is the number root nodes that have to be loaded for
415 # this leaf to become accessible. Zero means the leaf is already accessible
416 # in which case it _should_ either already be in result or be accessible vie
417 # process_runnables
418 # When picking a new target, we prefer the leafs with the least number of
419 # roots that need loading.
420 leafs_degree = {}
421 leafs_per_degree = defaultdict(set)
422 min_leaf_degree = len(dsk)
423 max_leaf_degree = len(dsk)
424 for leaf in leaf_nodes - root_nodes:
425 degree = len(roots_connected[leaf])
426 min_leaf_degree = min(min_leaf_degree, degree)
427 max_leaf_degree = max(max_leaf_degree, degree)
428 leafs_degree[leaf] = degree
429 leafs_per_degree[degree].add(leaf)
431 def get_target() -> Key:
432 # If we're already mid run and there is a runnable_hull we'll attempt to
433 # pick the next target in a way that minimizes the number of additional
434 # root nodes that are needed
435 all_leafs_accessible = min_leaf_degree == max_leaf_degree
436 is_trivial_lookup = not reachable_hull or all_leafs_accessible
437 if not is_trivial_lookup:
438 candidates = reachable_hull & leafs_per_degree[min_leaf_degree]
439 if not candidates:
440 candidates = leafs_per_degree[min_leaf_degree]
441 # Even without reachable hull overlap this should be relatively
442 # small so one full pass should be fine
443 return min(candidates, key=sort_key)
444 else:
445 return leaf_nodes_sorted.pop()
447 def use_longest_path() -> bool:
448 size = 0
449 # Heavy reducer / splitter topologies often benefit from a very
450 # traditional critical path that expresses the longest chain of
451 # tasks.
452 if abs(len(root_nodes) - len(leaf_nodes)) / len(root_nodes) < 0.8:
453 # If the graph stays about the same, we are checking for symmetry
454 # and choose a "quickest path first" approach if the graph appears
455 # to be asymmetrical
456 for r in root_nodes:
457 if not size:
458 size = len(leafs_connected[r])
459 elif size != len(leafs_connected[r]):
460 return False
462 return True
464 # Some topologies benefit if the node with the most dependencies
465 # is used as first choice, others benefit from the opposite.
466 longest_path = use_longest_path()
467 leaf_nodes_sorted = sorted(leaf_nodes, key=sort_key, reverse=not longest_path)
469 # *************************************************************************
470 # CORE ALGORITHM STARTS HERE
471 #
472 # 0. Nomenclature
473 #
474 # - roots: Nodes that have no dependencies (i.e. typically data producers)
475 # - leafs: Nodes that have no dependents (i.e. user requested keys)
476 # - critical_path: The strategic path through the graph.
477 # - walking forwards: Starting from a root node we walk the graph as if we
478 # were to compute the individual nodes, i.e. along dependents
479 # - walking backwards: Starting from a leaf node we walk the graph in
480 # reverse direction, i.e. along dependencies
481 # - runnable: Nodes that are ready to be computed because all their
482 # dependencies are in result
483 # - runnable_hull: Nodes that could be reached and executed without
484 # "walking back". This typically means that these are tasks than can be
485 # executed without loading additional data/executing additional root
486 # nodes
487 # - reachable_hull: Nodes that are touching the result, i.e. all nodes in
488 # reachable_hull have at least one dependency in result
489 #
490 # A. Build the critical path
491 #
492 # To build the critical path we will use a provided `get_target` function
493 # that returns a node that is anywhere in the graph, typically a leaf
494 # node. This node is not required to be runnable. We will walk the graph
495 # backwards, i.e. from leafs to roots and append nodes to the graph as we
496 # go. The critical path is a
497 # linear path in the graph. While this is a viable strategy, it is not
498 # required for the critical path to be a classical "longest path" but it
499 # can define any route through the graph that should be considered as top
500 # priority.
501 #
502 # 1. Determine the target node by calling ``get_target`` and append the
503 # target to the critical path stack
504 # 2. Take the _most valuable_ (max given a `sort_key`) of its dependents
505 # and append it to the critical path stack. This key is the new target.
506 # 3. Repeat step 2 until we reach a node that has no dependencies and is
507 # therefore runnable
508 #
509 # B. Walk the critical path
510 #
511 # Only the first element of the critical path is an actually runnable node
512 # and this is where we're starting the sort. Strategically, this is the
513 # most important goal to achieve but since not all of the nodes are
514 # immediately runnable we have to walk back and compute other nodes first
515 # before we can unlock the critical path. This typically requires us also
516 # to load more data / run more root tasks.
517 # While walking the critical path we will also unlock non-critical tasks
518 # that could be run but are not contributing to our strategic goal. Under
519 # certain circumstances, those runnable tasks are allowed to be run right
520 # away to reduce memory pressure. This is described in more detail in
521 # `process_runnable`.
522 # Given this, the algorithm is as follows:
523 #
524 # 1. Pop the first element of the critical path
525 # 2a. If the node is already in the result, continue
526 # 2b. If the node is not runnable, we will put it back on the stack and
527 # put all its dependencies on the stack and continue with step 1. This
528 # is what we refer to as "walking back"
529 # 2c. Else, we add the node to the result
530 # 3. If we previously had to walk back we will consider running
531 # non-critical tasks (by calling process_runnables)
532 # 4a. If critical path is not empty, repeat step 1
533 # 4b. Go back to A.) and build a new critical path given a new target that
534 # accounts for the already computed nodes.
535 #
536 # *************************************************************************
538 critical_path: list[Key] = []
539 cpath_append = critical_path.append
540 scpath_add = scrit_path.add
542 def path_append(item: Key) -> None:
543 cpath_append(item)
544 scpath_add(item)
546 scpath_update = scrit_path.update
547 cpath_extend = critical_path.extend
549 def path_extend(items: Iterable[Key]) -> None:
550 cpath_extend(items)
551 scpath_update(items)
553 cpath_pop = critical_path.pop
554 scpath_discard = scrit_path.discard
556 def path_pop() -> Key:
557 item = cpath_pop()
558 scpath_discard(item)
559 return item
561 while len(result) < expected_len:
562 crit_path_counter += 1
563 assert not critical_path
564 assert not scrit_path
566 # A. Build the critical path
567 target = get_target()
568 next_deps = dependencies[target]
569 path_append(target)
571 while next_deps:
572 item = max(next_deps, key=sort_key)
573 path_append(item)
574 next_deps = dependencies[item].difference(result)
575 path_extend(next_deps)
577 # B. Walk the critical path
579 walked_back = False
580 while critical_path:
581 item = path_pop()
582 if item in result:
583 continue
584 if num_needed[item]:
585 path_append(item)
586 deps = dependencies[item].difference(result)
587 unknown: list[Key] = []
588 known: list[Key] = []
589 k_append = known.append
590 uk_append = unknown.append
591 for d in sorted(deps, key=sort_key):
592 if d in known_runnable_paths:
593 k_append(d)
594 else:
595 uk_append(d)
596 if len(unknown) > 1:
597 walked_back = True
599 for d in unknown:
600 path_append(d)
601 for d in known:
602 for path in known_runnable_paths_pop(d):
603 path_extend(reversed(path))
605 del deps
606 continue
607 else:
608 if walked_back and len(runnable) < len(critical_path):
609 process_runnables()
610 add_to_result(item)
611 process_runnables()
613 assert len(result) == expected_len
614 for k in external_keys:
615 del result[k]
616 return result # type: ignore
619def _connecting_to_roots(
620 dependencies: Mapping[Key, set[Key]], dependents: Mapping[Key, set[Key]]
621) -> tuple[dict[Key, frozenset[Key]], dict[Key, int]]:
622 """Determine for every node which root nodes are connected to it (i.e.
623 ancestors). If arguments of dependencies and dependents are switched, this
624 can also be used to determine which leaf nodes are connected to which node
625 (i.e. descendants).
627 Also computes a weight that is defined as (cheaper to compute here)
629 `max(len(dependents[k]) for k in connected_roots[key])`
631 """
632 result = {}
633 current = []
634 num_needed = {k: len(v) for k, v in dependencies.items() if v}
635 max_dependents = {}
636 roots = set()
637 for k, v in dependencies.items():
638 if not v:
639 # Note: Hashing the full keys is relatively expensive. Hashing
640 # integers would be much faster so this could be sped up by just
641 # introducing a counter here. However, the order algorithm is also
642 # sometimes interested in the actual keys and the only way to
643 # benefit from the speedup of using integers would be to convert
644 # this back on demand which makes the code very hard to read.
645 roots.add(k)
646 result[k] = frozenset({k})
647 deps = dependents[k]
648 max_dependents[k] = len(deps)
649 for child in deps:
650 num_needed[child] -= 1
651 if not num_needed[child]:
652 current.append(child)
654 dedup_mapping: dict[frozenset[Key], frozenset[Key]] = {}
655 while current:
656 key = current.pop()
657 if key in result:
658 continue
659 for parent in dependents[key]:
660 num_needed[parent] -= 1
661 if not num_needed[parent]:
662 current.append(parent)
663 # At some point, all the roots are the same, particularly for dense
664 # graphs. We don't want to create new sets over and over again
665 transitive_deps = []
666 transitive_deps_ids = set()
667 max_dependents_key = list()
668 for child in dependencies[key]:
669 r_child = result[child]
670 if id(r_child) in transitive_deps_ids:
671 continue
672 transitive_deps.append(r_child)
673 transitive_deps_ids.add(id(r_child))
674 max_dependents_key.append(max_dependents[child])
676 max_dependents[key] = max(max_dependents_key)
677 if len(transitive_deps_ids) == 1:
678 result[key] = transitive_deps[0]
679 else:
680 d = transitive_deps[0]
681 if all(tdeps.issubset(d) for tdeps in transitive_deps[1:]):
682 result[key] = d
683 else:
684 res = set(d)
685 for tdeps in transitive_deps[1:]:
686 res.update(tdeps)
687 # frozenset is unfortunately triggering a copy. In the event of
688 # a cache hit, this is wasted time but we can't hash the set
689 # otherwise (unless we did it manually) and can therefore not
690 # deduplicate without this copy
691 frozen_res = frozenset(res)
692 del res, tdeps
693 try:
694 result[key] = dedup_mapping[frozen_res]
695 except KeyError:
696 dedup_mapping[frozen_res] = frozen_res
697 result[key] = frozen_res
698 del dedup_mapping
700 empty_set: frozenset[Key] = frozenset()
701 for r in roots:
702 result[r] = empty_set
703 return result, max_dependents
706def ndependencies(
707 dependencies: Mapping[Key, set[Key]], dependents: Mapping[Key, set[Key]]
708) -> tuple[dict[Key, int], dict[Key, int]]:
709 """Number of total data elements on which this key depends
711 For each key we return the number of tasks that must be run for us to run
712 this task.
714 Examples
715 --------
716 >>> inc = lambda x: x + 1
717 >>> dsk = {'a': 1, 'b': (inc, 'a'), 'c': (inc, 'b')}
718 >>> dependencies, dependents = get_deps(dsk)
719 >>> num_dependencies, total_dependencies = ndependencies(dependencies, dependents)
720 >>> sorted(total_dependencies.items())
721 [('a', 1), ('b', 2), ('c', 3)]
723 Returns
724 -------
725 num_dependencies: Dict[key, int]
726 total_dependencies: Dict[key, int]
727 """
728 num_needed = {}
729 result = {}
730 for k, v in dependencies.items():
731 num_needed[k] = len(v)
732 if not v:
733 result[k] = 1
735 num_dependencies = num_needed.copy()
736 current: list[Key] = []
737 current_pop = current.pop
738 current_append = current.append
740 for key in result:
741 for parent in dependents[key]:
742 num_needed[parent] -= 1
743 if not num_needed[parent]:
744 current_append(parent)
745 while current:
746 key = current_pop()
747 result[key] = 1 + sum(result[child] for child in dependencies[key])
748 for parent in dependents[key]:
749 num_needed[parent] -= 1
750 if not num_needed[parent]:
751 current_append(parent)
752 return num_dependencies, result
755OrderInfo = namedtuple(
756 "OrderInfo",
757 (
758 "order",
759 "age",
760 "num_data_when_run",
761 "num_data_when_released",
762 "num_dependencies_freed",
763 ),
764)
767def diagnostics(
768 dsk: MutableMapping[Key, Any],
769 o: Mapping[Key, int] | None = None,
770 dependencies: MutableMapping[Key, set[Key]] | None = None,
771) -> tuple[dict[Key, OrderInfo], list[int]]:
772 """Simulate runtime metrics as though running tasks one at a time in order.
774 These diagnostics can help reveal behaviors of and issues with ``order``.
776 Returns a dict of `namedtuple("OrderInfo")` and a list of the number of outputs held over time.
778 OrderInfo fields:
779 - order : the order in which the node is run.
780 - age : how long the output of a node is held.
781 - num_data_when_run : the number of outputs held in memory when a node is run.
782 - num_data_when_released : the number of outputs held in memory when the output is released.
783 - num_dependencies_freed : the number of dependencies freed by running the node.
784 """
785 if dependencies is None:
786 dependencies, dependents = get_deps(dsk)
787 else:
788 dependents = reverse_dict(dependencies)
789 assert dependencies is not None
790 if o is None:
791 o = order(dsk, dependencies=dependencies, return_stats=False)
793 pressure = []
794 num_in_memory = 0
795 age = {}
796 runpressure = {}
797 releasepressure = {}
798 freed = {}
799 num_needed = {key: len(val) for key, val in dependents.items()}
800 for i, key in enumerate(sorted(dsk, key=o.__getitem__)):
801 pressure.append(num_in_memory)
802 runpressure[key] = num_in_memory
803 released = 0
804 for dep in dependencies[key]:
805 num_needed[dep] -= 1
806 if num_needed[dep] == 0:
807 age[dep] = i - o[dep]
808 releasepressure[dep] = num_in_memory
809 released += 1
810 freed[key] = released
811 if dependents[key]:
812 num_in_memory -= released - 1
813 else:
814 age[key] = 0
815 releasepressure[key] = num_in_memory
816 num_in_memory -= released
818 rv = {
819 key: OrderInfo(
820 val, age[key], runpressure[key], releasepressure[key], freed[key]
821 )
822 for key, val in o.items()
823 }
824 return rv, pressure
827def _f() -> None: ...
830def sanitize_dsk(dsk: MutableMapping[Key, Any]) -> dict:
831 """Take a dask graph and replace callables with a dummy function and remove
832 payload data like numpy arrays, dataframes, etc.
833 """
834 from dask._task_spec import Task, TaskRef
836 new = {}
837 deps = DependenciesMapping(dsk)
838 for key, values in deps.items():
839 new[key] = Task(key, _f, *(TaskRef(k) for k in values))
840 return new