1from __future__ import annotations
2
3import math
4import numbers
5from enum import Enum
6
7from dask import config, utils
8from dask._task_spec import GraphNode
9from dask.core import (
10 flatten,
11 get_dependencies,
12 ishashable,
13 istask,
14 reverse_dict,
15 subs,
16 toposort,
17)
18
19
20def cull(dsk, keys):
21 """Return new dask with only the tasks required to calculate keys.
22
23 In other words, remove unnecessary tasks from dask.
24 ``keys`` may be a single key or list of keys.
25
26 Examples
27 --------
28 >>> def inc(x):
29 ... return x + 1
30
31 >>> def add(x, y):
32 ... return x + y
33
34 >>> d = {'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)}
35 >>> dsk, dependencies = cull(d, 'out')
36 >>> dsk # doctest: +ELLIPSIS
37 {'out': (<function add at ...>, 'x', 10), 'x': 1}
38 >>> dependencies # doctest: +ELLIPSIS
39 {'out': ['x'], 'x': []}
40
41 Returns
42 -------
43 dsk: culled dask graph
44 dependencies: Dict mapping {key: [deps]}. Useful side effect to accelerate
45 other optimizations, notably fuse.
46 """
47 if not isinstance(keys, (list, set)):
48 keys = [keys]
49
50 seen = set()
51 dependencies = dict()
52 out = {}
53 work = list(set(flatten(keys)))
54
55 while work:
56 new_work = []
57 for k in work:
58 dependencies_k = get_dependencies(dsk, k, as_list=True) # fuse needs lists
59 out[k] = dsk[k]
60 dependencies[k] = dependencies_k
61 for d in dependencies_k:
62 if d not in seen:
63 seen.add(d)
64 new_work.append(d)
65
66 work = new_work
67
68 return out, dependencies
69
70
71def default_fused_linear_keys_renamer(keys):
72 """Create new keys for fused tasks"""
73 typ = type(keys[0])
74 if typ is str:
75 names = [utils.key_split(x) for x in keys[:0:-1]]
76 names.append(keys[0])
77 return "-".join(names)
78 elif typ is tuple and len(keys[0]) > 0 and isinstance(keys[0][0], str):
79 names = [utils.key_split(x) for x in keys[:0:-1]]
80 names.append(keys[0][0])
81 return ("-".join(names),) + keys[0][1:]
82 else:
83 return None
84
85
86def fuse_linear(dsk, keys=None, dependencies=None, rename_keys=True):
87 """Return new dask graph with linear sequence of tasks fused together.
88
89 If specified, the keys in ``keys`` keyword argument are *not* fused.
90 Supply ``dependencies`` from output of ``cull`` if available to avoid
91 recomputing dependencies.
92
93 **This function is mostly superseded by ``fuse``**
94
95 Parameters
96 ----------
97 dsk: dict
98 keys: list
99 dependencies: dict, optional
100 {key: [list-of-keys]}. Must be a list to provide count of each key
101 This optional input often comes from ``cull``
102 rename_keys: bool or func, optional
103 Whether to rename fused keys with ``default_fused_linear_keys_renamer``
104 or not. Renaming fused keys can keep the graph more understandable
105 and comprehensive, but it comes at the cost of additional processing.
106 If False, then the top-most key will be used. For advanced usage, a
107 func is also accepted, ``new_key = rename_keys(fused_key_list)``.
108
109 Examples
110 --------
111 >>> def inc(x):
112 ... return x + 1
113
114 >>> def add(x, y):
115 ... return x + y
116
117 >>> d = {'a': 1, 'b': (inc, 'a'), 'c': (inc, 'b')}
118 >>> dsk, dependencies = fuse(d)
119 >>> dsk # doctest: +SKIP
120 {'a-b-c': (inc, (inc, 1)), 'c': 'a-b-c'}
121 >>> dsk, dependencies = fuse(d, rename_keys=False)
122 >>> dsk # doctest: +ELLIPSIS
123 {'c': (<function inc at ...>, (<function inc at ...>, 1))}
124 >>> dsk, dependencies = fuse(d, keys=['b'], rename_keys=False)
125 >>> dsk # doctest: +ELLIPSIS
126 {'b': (<function inc at ...>, 1), 'c': (<function inc at ...>, 'b')}
127
128 Returns
129 -------
130 dsk: output graph with keys fused
131 dependencies: dict mapping dependencies after fusion. Useful side effect
132 to accelerate other downstream optimizations.
133 """
134 if keys is not None and not isinstance(keys, set):
135 if not isinstance(keys, list):
136 keys = [keys]
137 keys = set(flatten(keys))
138
139 if dependencies is None:
140 dependencies = {k: get_dependencies(dsk, k, as_list=True) for k in dsk}
141
142 # locate all members of linear chains
143 child2parent = {}
144 unfusible = set()
145 for parent in dsk:
146 deps = dependencies[parent]
147 has_many_children = len(deps) > 1
148 for child in deps:
149 if keys is not None and child in keys:
150 unfusible.add(child)
151 elif child in child2parent:
152 del child2parent[child]
153 unfusible.add(child)
154 elif has_many_children:
155 unfusible.add(child)
156 elif child not in unfusible:
157 child2parent[child] = parent
158
159 # construct the chains from ancestor to descendant
160 chains = []
161 parent2child = dict(map(reversed, child2parent.items()))
162 while child2parent:
163 child, parent = child2parent.popitem()
164 chain = [child, parent]
165 while parent in child2parent:
166 parent = child2parent.pop(parent)
167 del parent2child[parent]
168 chain.append(parent)
169 chain.reverse()
170 while child in parent2child:
171 child = parent2child.pop(child)
172 del child2parent[child]
173 chain.append(child)
174 chains.append(chain)
175
176 dependencies = {k: set(v) for k, v in dependencies.items()}
177
178 if rename_keys is True:
179 key_renamer = default_fused_linear_keys_renamer
180 elif rename_keys is False:
181 key_renamer = None
182 else:
183 key_renamer = rename_keys
184
185 # create a new dask with fused chains
186 rv = {}
187 fused = set()
188 aliases = set()
189 is_renamed = False
190 for chain in chains:
191 if key_renamer is not None:
192 new_key = key_renamer(chain)
193 is_renamed = (
194 new_key is not None and new_key not in dsk and new_key not in rv
195 )
196 child = chain.pop()
197 val = dsk[child]
198 while chain:
199 parent = chain.pop()
200 dependencies[parent].update(dependencies.pop(child))
201 dependencies[parent].remove(child)
202 val = subs(dsk[parent], child, val)
203 fused.add(child)
204 child = parent
205 fused.add(child)
206 if is_renamed:
207 rv[new_key] = val
208 rv[child] = new_key
209 dependencies[new_key] = dependencies[child]
210 dependencies[child] = {new_key}
211 aliases.add(child)
212 else:
213 rv[child] = val
214 for key, val in dsk.items():
215 if key not in fused:
216 rv[key] = val
217 if aliases:
218 for key, deps in dependencies.items():
219 for old_key in deps & aliases:
220 new_key = rv[old_key]
221 deps.remove(old_key)
222 deps.add(new_key)
223 rv[key] = subs(rv[key], old_key, new_key)
224 if keys is not None:
225 for key in aliases - keys:
226 del rv[key]
227 del dependencies[key]
228 return rv, dependencies
229
230
231def _flat_set(x):
232 if x is None:
233 return set()
234 elif isinstance(x, set):
235 return x
236 elif not isinstance(x, (list, set)):
237 x = [x]
238 return set(x)
239
240
241def inline(dsk, keys=None, inline_constants=True, dependencies=None):
242 """Return new dask with the given keys inlined with their values.
243
244 Inlines all constants if ``inline_constants`` keyword is True. Note that
245 the constant keys will remain in the graph, to remove them follow
246 ``inline`` with ``cull``.
247
248 Examples
249 --------
250 >>> def inc(x):
251 ... return x + 1
252
253 >>> def add(x, y):
254 ... return x + y
255
256 >>> d = {'x': 1, 'y': (inc, 'x'), 'z': (add, 'x', 'y')}
257 >>> inline(d) # doctest: +ELLIPSIS
258 {'x': 1, 'y': (<function inc at ...>, 1), 'z': (<function add at ...>, 1, 'y')}
259
260 >>> inline(d, keys='y') # doctest: +ELLIPSIS
261 {'x': 1, 'y': (<function inc at ...>, 1), 'z': (<function add at ...>, 1, (<function inc at ...>, 1))}
262
263 >>> inline(d, keys='y', inline_constants=False) # doctest: +ELLIPSIS
264 {'x': 1, 'y': (<function inc at ...>, 'x'), 'z': (<function add at ...>, 'x', (<function inc at ...>, 'x'))}
265 """
266 if dependencies and isinstance(next(iter(dependencies.values())), list):
267 dependencies = {k: set(v) for k, v in dependencies.items()}
268
269 keys = _flat_set(keys)
270
271 if dependencies is None:
272 dependencies = {k: get_dependencies(dsk, k) for k in dsk}
273
274 if inline_constants:
275 keys.update(
276 k
277 for k, v in dsk.items()
278 if (ishashable(v) and v in dsk) or (not dependencies[k] and not istask(v))
279 )
280
281 # Keys may depend on other keys, so determine replace order with toposort.
282 # The values stored in `keysubs` do not include other keys.
283 replaceorder = toposort(
284 {k: dsk[k] for k in keys if k in dsk}, dependencies=dependencies
285 )
286 keysubs = {}
287 for key in replaceorder:
288 val = dsk[key]
289 for dep in keys & dependencies[key]:
290 if dep in keysubs:
291 replace = keysubs[dep]
292 else:
293 replace = dsk[dep]
294 val = subs(val, dep, replace)
295 keysubs[key] = val
296
297 # Make new dask with substitutions
298 dsk2 = keysubs.copy()
299 for key, val in dsk.items():
300 if key not in dsk2:
301 for item in keys & dependencies[key]:
302 val = subs(val, item, keysubs[item])
303 dsk2[key] = val
304 return dsk2
305
306
307def inline_functions(
308 dsk, output, fast_functions=None, inline_constants=False, dependencies=None
309):
310 """Inline cheap functions into larger operations
311
312 Examples
313 --------
314 >>> inc = lambda x: x + 1
315 >>> add = lambda x, y: x + y
316 >>> double = lambda x: x * 2
317 >>> dsk = {'out': (add, 'i', 'd'), # doctest: +SKIP
318 ... 'i': (inc, 'x'),
319 ... 'd': (double, 'y'),
320 ... 'x': 1, 'y': 1}
321 >>> inline_functions(dsk, [], [inc]) # doctest: +SKIP
322 {'out': (add, (inc, 'x'), 'd'),
323 'd': (double, 'y'),
324 'x': 1, 'y': 1}
325
326 Protect output keys. In the example below ``i`` is not inlined because it
327 is marked as an output key.
328
329 >>> inline_functions(dsk, ['i', 'out'], [inc, double]) # doctest: +SKIP
330 {'out': (add, 'i', (double, 'y')),
331 'i': (inc, 'x'),
332 'x': 1, 'y': 1}
333 """
334 if not fast_functions:
335 return dsk
336
337 output = set(output)
338
339 fast_functions = set(fast_functions)
340
341 if dependencies is None:
342 dependencies = {k: get_dependencies(dsk, k) for k in dsk}
343 dependents = reverse_dict(dependencies)
344
345 def inlinable(key, task):
346 if (
347 not isinstance(task, GraphNode)
348 and istask(task)
349 and key not in output
350 and dependents[key]
351 ):
352 try:
353 if functions_of(task).issubset(fast_functions) and not any(
354 isinstance(dsk[d], GraphNode) for d in dependents[key]
355 ):
356 return True
357 except TypeError:
358 pass
359 return False
360
361 keys = [k for k, v in dsk.items() if inlinable(k, v)]
362
363 if keys:
364 dsk = inline(
365 dsk, keys, inline_constants=inline_constants, dependencies=dependencies
366 )
367 for k in keys:
368 del dsk[k]
369 return dsk
370
371
372def unwrap_partial(func):
373 while hasattr(func, "func"):
374 func = func.func
375 return func
376
377
378def functions_of(task):
379 """Set of functions contained within nested task
380
381 Examples
382 --------
383 >>> inc = lambda x: x + 1
384 >>> add = lambda x, y: x + y
385 >>> mul = lambda x, y: x * y
386 >>> task = (add, (mul, 1, 2), (inc, 3)) # doctest: +SKIP
387 >>> functions_of(task) # doctest: +SKIP
388 set([add, mul, inc])
389 """
390 funcs = set()
391
392 work = [task]
393 sequence_types = {list, tuple}
394
395 while work:
396 new_work = []
397 for task in work:
398 if type(task) in sequence_types:
399 if istask(task):
400 funcs.add(unwrap_partial(task[0]))
401 new_work.extend(task[1:])
402 else:
403 new_work.extend(task)
404 work = new_work
405
406 return funcs
407
408
409def default_fused_keys_renamer(keys, max_fused_key_length=120):
410 """Create new keys for ``fuse`` tasks.
411
412 The optional parameter `max_fused_key_length` is used to limit the maximum string length for each renamed key.
413 If this parameter is set to `None`, there is no limit.
414 """
415 it = reversed(keys)
416 first_key = next(it)
417 typ = type(first_key)
418
419 if max_fused_key_length: # Take into account size of hash suffix
420 max_fused_key_length -= 5
421
422 def _enforce_max_key_limit(key_name):
423 if max_fused_key_length and len(key_name) > max_fused_key_length:
424 name_hash = f"{hash(key_name):x}"[:4]
425 key_name = f"{key_name[:max_fused_key_length]}-{name_hash}"
426 return key_name
427
428 if typ is str:
429 first_name = utils.key_split(first_key)
430 names = {utils.key_split(k) for k in it}
431 names.discard(first_name)
432 names = sorted(names)
433 names.append(first_key)
434 concatenated_name = "-".join(names)
435 return _enforce_max_key_limit(concatenated_name)
436 elif typ is tuple and len(first_key) > 0 and isinstance(first_key[0], str):
437 first_name = utils.key_split(first_key)
438 names = {utils.key_split(k) for k in it}
439 names.discard(first_name)
440 names = sorted(names)
441 names.append(first_key[0])
442 concatenated_name = "-".join(names)
443 return (_enforce_max_key_limit(concatenated_name),) + first_key[1:]
444
445
446# PEP-484 compliant singleton constant
447# https://www.python.org/dev/peps/pep-0484/#support-for-singleton-types-in-unions
448class Default(Enum):
449 token = 0
450
451 def __repr__(self) -> str:
452 return "<default>"
453
454
455_default = Default.token
456
457
458def fuse(
459 dsk,
460 keys=None,
461 dependencies=None,
462 ave_width=_default,
463 max_width=_default,
464 max_height=_default,
465 max_depth_new_edges=_default,
466 rename_keys=_default,
467):
468 """Fuse tasks that form reductions; more advanced than ``fuse_linear``
469
470 This trades parallelism opportunities for faster scheduling by making tasks
471 less granular. It can replace ``fuse_linear`` in optimization passes.
472
473 This optimization applies to all reductions--tasks that have at most one
474 dependent--so it may be viewed as fusing "multiple input, single output"
475 groups of tasks into a single task. There are many parameters to fine
476 tune the behavior, which are described below. ``ave_width`` is the
477 natural parameter with which to compare parallelism to granularity, so
478 it should always be specified. Reasonable values for other parameters
479 will be determined using ``ave_width`` if necessary.
480
481 Parameters
482 ----------
483 dsk: dict
484 dask graph
485 keys: list or set, optional
486 Keys that must remain in the returned dask graph
487 dependencies: dict, optional
488 {key: [list-of-keys]}. Must be a list to provide count of each key
489 This optional input often comes from ``cull``
490 ave_width: float (default 1)
491 Upper limit for ``width = num_nodes / height``, a good measure of
492 parallelizability.
493 dask.config key: ``optimization.fuse.ave-width``
494 max_width: int (default infinite)
495 Don't fuse if total width is greater than this.
496 dask.config key: ``optimization.fuse.max-width``
497 max_height: int or None (default None)
498 Don't fuse more than this many levels. Set to None to dynamically
499 adjust to ``1.5 + ave_width * log(ave_width + 1)``.
500 dask.config key: ``optimization.fuse.max-height``
501 max_depth_new_edges: int or None (default None)
502 Don't fuse if new dependencies are added after this many levels.
503 Set to None to dynamically adjust to ave_width * 1.5.
504 dask.config key: ``optimization.fuse.max-depth-new-edges``
505 rename_keys: bool or func, optional (default True)
506 Whether to rename the fused keys with ``default_fused_keys_renamer``
507 or not. Renaming fused keys can keep the graph more understandable
508 and comprehensive, but it comes at the cost of additional processing.
509 If False, then the top-most key will be used. For advanced usage, a
510 function to create the new name is also accepted.
511 dask.config key: ``optimization.fuse.rename-keys``
512
513 Returns
514 -------
515 dsk
516 output graph with keys fused
517 dependencies
518 dict mapping dependencies after fusion. Useful side effect to accelerate other
519 downstream optimizations.
520 """
521
522 # Perform low-level fusion unless the user has
523 # specified False explicitly.
524 if config.get("optimization.fuse.active") is False:
525 return dsk, dependencies
526
527 if keys is not None and not isinstance(keys, set):
528 if not isinstance(keys, list):
529 keys = [keys]
530 keys = set(flatten(keys))
531
532 # Read defaults from dask.yaml and/or user-defined config file
533 if ave_width is _default:
534 ave_width = config.get("optimization.fuse.ave-width")
535 assert ave_width is not _default
536 if max_height is _default:
537 max_height = config.get("optimization.fuse.max-height")
538 assert max_height is not _default
539 if max_depth_new_edges is _default:
540 max_depth_new_edges = config.get("optimization.fuse.max-depth-new-edges")
541 assert max_depth_new_edges is not _default
542 if max_depth_new_edges is None:
543 max_depth_new_edges = ave_width * 1.5
544 if max_width is _default:
545 max_width = config.get("optimization.fuse.max-width")
546 assert max_width is not _default
547 if max_width is None:
548 max_width = 1.5 + ave_width * math.log(ave_width + 1)
549
550 if not ave_width or not max_height:
551 return dsk, dependencies
552
553 if rename_keys is _default:
554 rename_keys = config.get("optimization.fuse.rename-keys")
555 assert rename_keys is not _default
556 if rename_keys is True:
557 key_renamer = default_fused_keys_renamer
558 elif rename_keys is False:
559 key_renamer = None
560 elif not callable(rename_keys):
561 raise TypeError("rename_keys must be a boolean or callable")
562 else:
563 key_renamer = rename_keys
564 rename_keys = key_renamer is not None
565
566 if dependencies is None:
567 deps = {k: get_dependencies(dsk, k, as_list=True) for k in dsk}
568 else:
569 deps = {
570 k: v if isinstance(v, list) else get_dependencies(dsk, k, as_list=True)
571 for k, v in dependencies.items()
572 }
573
574 rdeps = {}
575 for k, vals in deps.items():
576 for v in vals:
577 if v not in rdeps:
578 rdeps[v] = [k]
579 else:
580 rdeps[v].append(k)
581 deps[k] = set(vals)
582
583 reducible = set()
584 for k, vals in rdeps.items():
585 if (
586 len(vals) == 1
587 and k not in (keys or ())
588 and k in dsk
589 and not isinstance(dsk[k], GraphNode)
590 and (type(dsk[k]) is tuple or isinstance(dsk[k], (numbers.Number, str)))
591 and not any(isinstance(dsk[v], GraphNode) for v in vals)
592 ):
593 reducible.add(k)
594
595 if not reducible and (all(len(set(v)) != 1 for v in rdeps.values())):
596 # Quick return if there's nothing to do. Only progress if there's tasks
597 # fusible by the main `fuse`
598 return dsk, deps
599
600 rv = dsk.copy()
601 fused_trees = {}
602 # These are the stacks we use to store data as we traverse the graph
603 info_stack = []
604 children_stack = []
605 # For speed
606 deps_pop = deps.pop
607 reducible_add = reducible.add
608 reducible_pop = reducible.pop
609 reducible_remove = reducible.remove
610 fused_trees_pop = fused_trees.pop
611 info_stack_append = info_stack.append
612 info_stack_pop = info_stack.pop
613 children_stack_append = children_stack.append
614 children_stack_extend = children_stack.extend
615 children_stack_pop = children_stack.pop
616 while reducible:
617 parent = reducible_pop()
618 reducible_add(parent)
619 while parent in reducible:
620 # Go to the top
621 parent = rdeps[parent][0]
622 children_stack_append(parent)
623 children_stack_extend(reducible & deps[parent])
624 while True:
625 child = children_stack[-1]
626 if child != parent:
627 children = reducible & deps[child]
628 while children:
629 # Depth-first search
630 children_stack_extend(children)
631 parent = child
632 child = children_stack[-1]
633 children = reducible & deps[child]
634 children_stack_pop()
635 # This is a leaf node in the reduction region
636 # key, task, fused_keys, height, width, number of nodes, fudge, set of edges
637 info_stack_append(
638 (
639 child,
640 rv[child],
641 [child] if rename_keys else None,
642 1,
643 1,
644 1,
645 0,
646 deps[child] - reducible,
647 )
648 )
649 else:
650 children_stack_pop()
651 # Calculate metrics and fuse as appropriate
652 deps_parent = deps[parent]
653 edges = deps_parent - reducible
654 children = deps_parent - edges
655 num_children = len(children)
656
657 if num_children == 1:
658 (
659 child_key,
660 child_task,
661 child_keys,
662 height,
663 width,
664 num_nodes,
665 fudge,
666 children_edges,
667 ) = info_stack_pop()
668 num_children_edges = len(children_edges)
669
670 if fudge > num_children_edges - 1 >= 0:
671 fudge = num_children_edges - 1
672 edges |= children_edges
673 no_new_edges = len(edges) == num_children_edges
674 if not no_new_edges:
675 fudge += 1
676 if (
677 (num_nodes + fudge) / height <= ave_width
678 and
679 # Sanity check; don't go too deep if new levels introduce new edge dependencies
680 (no_new_edges or height < max_depth_new_edges)
681 and (
682 not isinstance(dsk[parent], GraphNode)
683 # TODO: substitute can be implemented with GraphNode.inline
684 # or isinstance(dsk[child_key], GraphNode)
685 )
686 ):
687 # Perform substitutions as we go
688 val = subs(dsk[parent], child_key, child_task)
689 deps_parent.remove(child_key)
690 deps_parent |= deps_pop(child_key)
691 del rv[child_key]
692 reducible_remove(child_key)
693 if rename_keys:
694 child_keys.append(parent)
695 fused_trees[parent] = child_keys
696 fused_trees_pop(child_key, None)
697
698 if children_stack:
699 if no_new_edges:
700 # Linear fuse
701 info_stack_append(
702 (
703 parent,
704 val,
705 child_keys,
706 height,
707 width,
708 num_nodes,
709 fudge,
710 edges,
711 )
712 )
713 else:
714 info_stack_append(
715 (
716 parent,
717 val,
718 child_keys,
719 height + 1,
720 width,
721 num_nodes + 1,
722 fudge,
723 edges,
724 )
725 )
726 else:
727 rv[parent] = val
728 break
729 else:
730 rv[child_key] = child_task
731 reducible_remove(child_key)
732 if children_stack:
733 # Allow the parent to be fused, but only under strict circumstances.
734 # Ensure that linear chains may still be fused.
735 if fudge > int(ave_width - 1):
736 fudge = int(ave_width - 1)
737 # This task *implicitly* depends on `edges`
738 info_stack_append(
739 (
740 parent,
741 rv[parent],
742 [parent] if rename_keys else None,
743 1,
744 width,
745 1,
746 fudge,
747 edges,
748 )
749 )
750 else:
751 break
752 else:
753 child_keys = []
754 height = 1
755 width = 0
756 num_single_nodes = 0
757 num_nodes = 0
758 fudge = 0
759 children_edges = set()
760 max_num_edges = 0
761 children_info = info_stack[-num_children:]
762 del info_stack[-num_children:]
763 for (
764 _,
765 _,
766 _,
767 cur_height,
768 cur_width,
769 cur_num_nodes,
770 cur_fudge,
771 cur_edges,
772 ) in children_info:
773 if cur_height == 1:
774 num_single_nodes += 1
775 elif cur_height > height:
776 height = cur_height
777 width += cur_width
778 num_nodes += cur_num_nodes
779 fudge += cur_fudge
780 if len(cur_edges) > max_num_edges:
781 max_num_edges = len(cur_edges)
782 children_edges |= cur_edges
783 # Fudge factor to account for possible parallelism with the boundaries
784 num_children_edges = len(children_edges)
785 fudge += min(
786 num_children - 1, max(0, num_children_edges - max_num_edges)
787 )
788
789 if fudge > num_children_edges - 1 >= 0:
790 fudge = num_children_edges - 1
791 edges |= children_edges
792 no_new_edges = len(edges) == num_children_edges
793 if not no_new_edges:
794 fudge += 1
795 if (
796 (num_nodes + fudge) / height <= ave_width
797 and num_single_nodes <= ave_width
798 and width <= max_width
799 and height <= max_height
800 and
801 # Sanity check; don't go too deep if new levels introduce new edge dependencies
802 (no_new_edges or height < max_depth_new_edges)
803 and (
804 not isinstance(dsk[parent], GraphNode)
805 and not any(
806 isinstance(dsk[child_key], GraphNode)
807 for child_key in children
808 )
809 # TODO: substitute can be implemented with GraphNode.inline
810 # or all(
811 # isintance(dsk[child], GraphNode) for child in children
812 )
813 ):
814 # Perform substitutions as we go
815 val = dsk[parent]
816 children_deps = set()
817 for child_info in children_info:
818 cur_child = child_info[0]
819 val = subs(val, cur_child, child_info[1])
820 del rv[cur_child]
821 children_deps |= deps_pop(cur_child)
822 reducible_remove(cur_child)
823 if rename_keys:
824 fused_trees_pop(cur_child, None)
825 child_keys.extend(child_info[2])
826 deps_parent -= children
827 deps_parent |= children_deps
828
829 if rename_keys:
830 child_keys.append(parent)
831 fused_trees[parent] = child_keys
832
833 if children_stack:
834 info_stack_append(
835 (
836 parent,
837 val,
838 child_keys,
839 height + 1,
840 width,
841 num_nodes + 1,
842 fudge,
843 edges,
844 )
845 )
846 else:
847 rv[parent] = val
848 break
849 else:
850 for child_info in children_info:
851 rv[child_info[0]] = child_info[1]
852 reducible_remove(child_info[0])
853 if children_stack:
854 # Allow the parent to be fused, but only under strict circumstances.
855 # Ensure that linear chains may still be fused.
856 if width > max_width:
857 width = max_width
858 if fudge > int(ave_width - 1):
859 fudge = int(ave_width - 1)
860 # key, task, height, width, number of nodes, fudge, set of edges
861 # This task *implicitly* depends on `edges`
862 info_stack_append(
863 (
864 parent,
865 rv[parent],
866 [parent] if rename_keys else None,
867 1,
868 width,
869 1,
870 fudge,
871 edges,
872 )
873 )
874 else:
875 break
876 # Traverse upwards
877 parent = rdeps[parent][0]
878
879 if key_renamer:
880 for root_key, fused_keys in fused_trees.items():
881 alias = key_renamer(fused_keys)
882 if alias is not None and alias not in rv:
883 rv[alias] = rv[root_key]
884 rv[root_key] = alias
885 deps[alias] = deps[root_key]
886 deps[root_key] = {alias}
887
888 return rv, deps